Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1. bulk_mode. 2. _execute_step. 3. Type checking. #1632

Merged
merged 4 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions cumulusci/tasks/bulkdata/load.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from collections import defaultdict
import datetime
from unittest.mock import MagicMock
from typing import Union

from sqlalchemy import Column, MetaData, Table, Unicode, create_engine, text
from sqlalchemy.orm import aliased, Session
from sqlalchemy.ext.automap import automap_base
Expand All @@ -15,11 +18,12 @@
BulkApiDmlOperation,
DataOperationStatus,
DataOperationType,
DataOperationJobResult,
)
from cumulusci.tasks.salesforce import BaseSalesforceApiTask
from cumulusci.utils import os_friendly_path

from cumulusci.tasks.bulkdata.mapping_parser import parse_from_yaml
from cumulusci.tasks.bulkdata.mapping_parser import parse_from_yaml, MappingStep


class LoadData(BaseSalesforceApiTask, SqlAlchemyMixin):
Expand Down Expand Up @@ -92,7 +96,7 @@ def _run_task(self):
started = True

self.logger.info(f"Running step: {name}")
result = self._load_mapping(mapping)
result = self._execute_step(mapping)
if result.status is DataOperationStatus.JOB_FAILURE:
raise BulkDataException(
f"Step {name} did not complete successfully: {','.join(result.job_errors)}"
Expand All @@ -101,13 +105,15 @@ def _run_task(self):
if name in self.after_steps:
for after_name, after_step in self.after_steps[name].items():
self.logger.info(f"Running post-load step: {after_name}")
result = self._load_mapping(after_step)
result = self._execute_step(after_step)
if result.status is DataOperationStatus.JOB_FAILURE:
raise BulkDataException(
f"Step {after_name} did not complete successfully: {','.join(result.job_errors)}"
)

def _load_mapping(self, mapping):
def _execute_step(
self, mapping: MappingStep
) -> Union[DataOperationJobResult, MagicMock]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like having to include mocks in the type signature. Is there no way to tell pydantic to always allow them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you mean typeguard.

And yes, there is a plan to fix it at the Typeguard level but not merged yet.

agronholm/typeguard#105

I'll bug the maintainer again, but if he doesn't respond we might have to figure out whether to fork, abandon or continue to work around small bugs in typeguard. I found another one on the weekend. Easy to fix, but only if he accepts my PRs.

"""Load data for a single step."""

if mapping.get("fields", {}).get("RecordTypeId"):
Expand Down
2 changes: 2 additions & 0 deletions cumulusci/tasks/bulkdata/mapping_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pydantic import Field, validator, ValidationError

from cumulusci.utils.yaml.model_parser import CCIDictModel
from typing_extensions import Literal

LOGGER_NAME = "MAPPING_LOADER"
logger = getLogger(LOGGER_NAME)
Expand All @@ -31,6 +32,7 @@ class MappingStep(CCIDictModel):
action: str = "insert"
oid_as_pk: bool = False # this one should be discussed and probably deprecated
record_type: str = None # should be discussed and probably deprecated
bulk_mode: Literal["Serial", "Parallel"] = "Parallel"

@validator("record_type")
def record_type_is_deprecated(cls, v):
Expand Down
76 changes: 31 additions & 45 deletions cumulusci/tasks/bulkdata/tests/test_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
)
from cumulusci.tasks.bulkdata.tests.utils import _make_task
from cumulusci.utils import temporary_dir
from cumulusci.tasks.bulkdata.mapping_parser import MappingLookup
from cumulusci.tasks.bulkdata.mapping_parser import MappingLookup, MappingStep


MAPPING_FILE = """Insert Households:
Expand Down Expand Up @@ -51,7 +51,9 @@


class MockBulkApiDmlOperation(BaseDmlOperation):
def __init__(self, *, sobject, operation, api_options, context, fields):
def __init__(
self, *, context, sobject=None, operation=None, api_options=None, fields=None,
):
super().__init__(
sobject=sobject,
operation=operation,
Expand Down Expand Up @@ -166,14 +168,16 @@ def test_run_task__start_step(self):
task._init_db = mock.Mock()
task._init_mapping = mock.Mock()
task.mapping = {}
task.mapping["Insert Households"] = {"one": 1}
task.mapping["Insert Contacts"] = {"two": 2}
task.mapping["Insert Households"] = MappingStep(sf_object="one", fields={})
task.mapping["Insert Contacts"] = MappingStep(sf_object="two", fields={})
task.after_steps = {}
task._load_mapping = mock.Mock(
task._execute_step = mock.Mock(
return_value=DataOperationJobResult(DataOperationStatus.SUCCESS, [], 0, 0)
)
task()
task._load_mapping.assert_called_once_with({"two": 2, "action": "insert"})
task._execute_step.assert_called_once_with(
MappingStep(sf_object="two", fields={})
)

def test_run_task__after_steps(self):
task = _make_task(
Expand All @@ -193,11 +197,11 @@ def test_run_task__after_steps(self):
"Insert Contacts": {"three": 3},
"Insert Households": households_steps,
}
task._load_mapping = mock.Mock(
task._execute_step = mock.Mock(
return_value=DataOperationJobResult(DataOperationStatus.SUCCESS, [], 0, 0)
)
task()
task._load_mapping.assert_has_calls(
task._execute_step.assert_has_calls(
[mock.call(1), mock.call(4), mock.call(5), mock.call(2), mock.call(3)]
)

Expand All @@ -219,7 +223,7 @@ def test_run_task__after_steps_failure(self):
"Insert Contacts": {"three": 3},
"Insert Households": households_steps,
}
task._load_mapping = mock.Mock(
task._execute_step = mock.Mock(
side_effect=[
DataOperationJobResult(DataOperationStatus.SUCCESS, [], 0, 0),
DataOperationJobResult(DataOperationStatus.JOB_FAILURE, [], 0, 0),
Expand Down Expand Up @@ -302,32 +306,6 @@ def test_init_options__bulk_mode_wrong(self):
with self.assertRaises(TaskOptionsError):
_make_task(LoadData, {"options": {"bulk_mode": "Test"}})

@mock.patch("cumulusci.tasks.bulkdata.load.BulkApiDmlOperation")
def test_bulk_mode_override(self, stepmock):
task = _make_task(
LoadData,
{
"options": {
"database_url": "file:///test.db",
"mapping": "mapping.yml",
"bulk_mode": "Serial",
}
},
)
task.session = mock.Mock()
task._load_record_types = mock.Mock()
task._process_job_results = mock.Mock()

task._load_mapping(
{
"sf_object": "Account",
"action": "insert",
"fields": {"Name": "Name"},
"bulk_mode": "XYZZY",
}
)
assert stepmock.mock_calls[0][2]["api_options"]["bulk_mode"] == "XYZZY"

def test_init_options__database_url(self):
t = _make_task(
LoadData,
Expand Down Expand Up @@ -607,7 +585,7 @@ def test_run_task__exception_failure(self):
)
task._init_db = mock.Mock()
task._init_mapping = mock.Mock()
task._load_mapping = mock.Mock(
task._execute_step = mock.Mock(
return_value=DataOperationJobResult(
DataOperationStatus.JOB_FAILURE, [], 0, 0
)
Expand Down Expand Up @@ -854,7 +832,7 @@ def test_generate_results_id_map__respects_silent_error_flag(self):
]

@mock.patch("cumulusci.tasks.bulkdata.load.BulkApiDmlOperation")
def test_load_mapping__record_type_mapping(self, step_mock):
def test_execute_step__record_type_mapping(self, step_mock):
task = _make_task(
LoadData,
{"options": {"database_url": "sqlite://", "mapping": "mapping.yml"}},
Expand All @@ -864,18 +842,26 @@ def test_load_mapping__record_type_mapping(self, step_mock):
task._load_record_types = mock.Mock()
task._process_job_results = mock.Mock()

task._load_mapping(
{"sf_object": "Account", "action": "insert", "fields": {"Name": "Name"}}
task._execute_step(
MappingStep(
**{
"sf_object": "Account",
"action": "insert",
"fields": {"Name": "Name"},
}
)
)

task._load_record_types.assert_not_called()

task._load_mapping(
{
"sf_object": "Account",
"action": "insert",
"fields": {"Name": "Name", "RecordTypeId": "RecordTypeId"},
}
task._execute_step(
MappingStep(
**{
"sf_object": "Account",
"action": "insert",
"fields": {"Name": "Name", "RecordTypeId": "RecordTypeId"},
}
)
)
task._load_record_types.assert_called_once_with(
["Account"], task.session.connection.return_value
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ simple-salesforce==0.75.3
six==1.14.0
SQLAlchemy==1.3.15
terminaltables==3.1.0
typing-extensions==3.7.4.1
unicodecsv==0.14.1
uritemplate==3.0.1
urllib3==1.25.8
Expand Down