Skip to content

Commit

Permalink
Merge pull request #1632 from SFDO-Tooling/feature/load_data_enhancem…
Browse files Browse the repository at this point in the history
…ents

1. bulk_mode. 2. _execute_step. 3. Type checking.
  • Loading branch information
David Glick authored Mar 24, 2020
2 parents b1aa8bc + 5c41996 commit 3ed1357
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 49 deletions.
14 changes: 10 additions & 4 deletions cumulusci/tasks/bulkdata/load.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from collections import defaultdict
import datetime
from unittest.mock import MagicMock
from typing import Union

from sqlalchemy import Column, MetaData, Table, Unicode, create_engine, text
from sqlalchemy.orm import aliased, Session
from sqlalchemy.ext.automap import automap_base
Expand All @@ -15,11 +18,12 @@
BulkApiDmlOperation,
DataOperationStatus,
DataOperationType,
DataOperationJobResult,
)
from cumulusci.tasks.salesforce import BaseSalesforceApiTask
from cumulusci.utils import os_friendly_path

from cumulusci.tasks.bulkdata.mapping_parser import parse_from_yaml
from cumulusci.tasks.bulkdata.mapping_parser import parse_from_yaml, MappingStep


class LoadData(BaseSalesforceApiTask, SqlAlchemyMixin):
Expand Down Expand Up @@ -92,7 +96,7 @@ def _run_task(self):
started = True

self.logger.info(f"Running step: {name}")
result = self._load_mapping(mapping)
result = self._execute_step(mapping)
if result.status is DataOperationStatus.JOB_FAILURE:
raise BulkDataException(
f"Step {name} did not complete successfully: {','.join(result.job_errors)}"
Expand All @@ -101,13 +105,15 @@ def _run_task(self):
if name in self.after_steps:
for after_name, after_step in self.after_steps[name].items():
self.logger.info(f"Running post-load step: {after_name}")
result = self._load_mapping(after_step)
result = self._execute_step(after_step)
if result.status is DataOperationStatus.JOB_FAILURE:
raise BulkDataException(
f"Step {after_name} did not complete successfully: {','.join(result.job_errors)}"
)

def _load_mapping(self, mapping):
def _execute_step(
self, mapping: MappingStep
) -> Union[DataOperationJobResult, MagicMock]:
"""Load data for a single step."""

if mapping.get("fields", {}).get("RecordTypeId"):
Expand Down
2 changes: 2 additions & 0 deletions cumulusci/tasks/bulkdata/mapping_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pydantic import Field, validator, ValidationError

from cumulusci.utils.yaml.model_parser import CCIDictModel
from typing_extensions import Literal

LOGGER_NAME = "MAPPING_LOADER"
logger = getLogger(LOGGER_NAME)
Expand All @@ -31,6 +32,7 @@ class MappingStep(CCIDictModel):
action: str = "insert"
oid_as_pk: bool = False # this one should be discussed and probably deprecated
record_type: str = None # should be discussed and probably deprecated
bulk_mode: Literal["Serial", "Parallel"] = "Parallel"

@validator("record_type")
def record_type_is_deprecated(cls, v):
Expand Down
76 changes: 31 additions & 45 deletions cumulusci/tasks/bulkdata/tests/test_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
)
from cumulusci.tasks.bulkdata.tests.utils import _make_task
from cumulusci.utils import temporary_dir
from cumulusci.tasks.bulkdata.mapping_parser import MappingLookup
from cumulusci.tasks.bulkdata.mapping_parser import MappingLookup, MappingStep


MAPPING_FILE = """Insert Households:
Expand Down Expand Up @@ -51,7 +51,9 @@


class MockBulkApiDmlOperation(BaseDmlOperation):
def __init__(self, *, sobject, operation, api_options, context, fields):
def __init__(
self, *, context, sobject=None, operation=None, api_options=None, fields=None,
):
super().__init__(
sobject=sobject,
operation=operation,
Expand Down Expand Up @@ -166,14 +168,16 @@ def test_run_task__start_step(self):
task._init_db = mock.Mock()
task._init_mapping = mock.Mock()
task.mapping = {}
task.mapping["Insert Households"] = {"one": 1}
task.mapping["Insert Contacts"] = {"two": 2}
task.mapping["Insert Households"] = MappingStep(sf_object="one", fields={})
task.mapping["Insert Contacts"] = MappingStep(sf_object="two", fields={})
task.after_steps = {}
task._load_mapping = mock.Mock(
task._execute_step = mock.Mock(
return_value=DataOperationJobResult(DataOperationStatus.SUCCESS, [], 0, 0)
)
task()
task._load_mapping.assert_called_once_with({"two": 2, "action": "insert"})
task._execute_step.assert_called_once_with(
MappingStep(sf_object="two", fields={})
)

def test_run_task__after_steps(self):
task = _make_task(
Expand All @@ -193,11 +197,11 @@ def test_run_task__after_steps(self):
"Insert Contacts": {"three": 3},
"Insert Households": households_steps,
}
task._load_mapping = mock.Mock(
task._execute_step = mock.Mock(
return_value=DataOperationJobResult(DataOperationStatus.SUCCESS, [], 0, 0)
)
task()
task._load_mapping.assert_has_calls(
task._execute_step.assert_has_calls(
[mock.call(1), mock.call(4), mock.call(5), mock.call(2), mock.call(3)]
)

Expand All @@ -219,7 +223,7 @@ def test_run_task__after_steps_failure(self):
"Insert Contacts": {"three": 3},
"Insert Households": households_steps,
}
task._load_mapping = mock.Mock(
task._execute_step = mock.Mock(
side_effect=[
DataOperationJobResult(DataOperationStatus.SUCCESS, [], 0, 0),
DataOperationJobResult(DataOperationStatus.JOB_FAILURE, [], 0, 0),
Expand Down Expand Up @@ -302,32 +306,6 @@ def test_init_options__bulk_mode_wrong(self):
with self.assertRaises(TaskOptionsError):
_make_task(LoadData, {"options": {"bulk_mode": "Test"}})

@mock.patch("cumulusci.tasks.bulkdata.load.BulkApiDmlOperation")
def test_bulk_mode_override(self, stepmock):
task = _make_task(
LoadData,
{
"options": {
"database_url": "file:///test.db",
"mapping": "mapping.yml",
"bulk_mode": "Serial",
}
},
)
task.session = mock.Mock()
task._load_record_types = mock.Mock()
task._process_job_results = mock.Mock()

task._load_mapping(
{
"sf_object": "Account",
"action": "insert",
"fields": {"Name": "Name"},
"bulk_mode": "XYZZY",
}
)
assert stepmock.mock_calls[0][2]["api_options"]["bulk_mode"] == "XYZZY"

def test_init_options__database_url(self):
t = _make_task(
LoadData,
Expand Down Expand Up @@ -607,7 +585,7 @@ def test_run_task__exception_failure(self):
)
task._init_db = mock.Mock()
task._init_mapping = mock.Mock()
task._load_mapping = mock.Mock(
task._execute_step = mock.Mock(
return_value=DataOperationJobResult(
DataOperationStatus.JOB_FAILURE, [], 0, 0
)
Expand Down Expand Up @@ -854,7 +832,7 @@ def test_generate_results_id_map__respects_silent_error_flag(self):
]

@mock.patch("cumulusci.tasks.bulkdata.load.BulkApiDmlOperation")
def test_load_mapping__record_type_mapping(self, step_mock):
def test_execute_step__record_type_mapping(self, step_mock):
task = _make_task(
LoadData,
{"options": {"database_url": "sqlite://", "mapping": "mapping.yml"}},
Expand All @@ -864,18 +842,26 @@ def test_load_mapping__record_type_mapping(self, step_mock):
task._load_record_types = mock.Mock()
task._process_job_results = mock.Mock()

task._load_mapping(
{"sf_object": "Account", "action": "insert", "fields": {"Name": "Name"}}
task._execute_step(
MappingStep(
**{
"sf_object": "Account",
"action": "insert",
"fields": {"Name": "Name"},
}
)
)

task._load_record_types.assert_not_called()

task._load_mapping(
{
"sf_object": "Account",
"action": "insert",
"fields": {"Name": "Name", "RecordTypeId": "RecordTypeId"},
}
task._execute_step(
MappingStep(
**{
"sf_object": "Account",
"action": "insert",
"fields": {"Name": "Name", "RecordTypeId": "RecordTypeId"},
}
)
)
task._load_record_types.assert_called_once_with(
["Account"], task.session.connection.return_value
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ simple-salesforce==0.75.3
six==1.14.0
SQLAlchemy==1.3.15
terminaltables==3.1.0
typing-extensions==3.7.4.1
unicodecsv==0.14.1
uritemplate==3.0.1
urllib3==1.25.8
Expand Down

0 comments on commit 3ed1357

Please sign in to comment.