Skip to content

Commit

Permalink
fix: resolve correct legacy arguments for emr pyspark step launcher (#…
Browse files Browse the repository at this point in the history
…12419)

### Summary & Motivation
Compare `deploy_local_job_package` with `deploy_local_pipeline_package`,
as intended.

### How I Tested These Changes
pytest
  • Loading branch information
rexledesma committed Feb 22, 2023
1 parent 2fc07dc commit 00a5d89
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def emr_pyspark_step_launcher(context):
) or context.resource_config.get("local_pipeline_package_path")

if context.resource_config.get("deploy_local_job_package") and context.resource_config.get(
"deploy_local_job_package"
"deploy_local_pipeline_package"
):
raise DagsterInvariantViolationError(
"Provided both ``deploy_local_job_package`` and legacy version "
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import os
from unittest import mock

from dagster import DagsterEvent, EventLogEntry
import pytest
from dagster import DagsterEvent, EventLogEntry, build_init_resource_context
from dagster._core.execution.plan.objects import StepSuccessData
from dagster_aws.emr.pyspark_step_launcher import EmrPySparkStepLauncher
from dagster_aws.emr.pyspark_step_launcher import EmrPySparkStepLauncher, emr_pyspark_step_launcher

EVENTS = [
EventLogEntry(
Expand Down Expand Up @@ -59,3 +61,53 @@ def test_wait_for_completion(_mock_is_emr_step_complete, _mock_read_events):
launcher.wait_for_completion(mock.MagicMock(), None, None, None, None, check_interval=0)
)
assert yielded_events == [event.dagster_event for event in EVENTS if event.is_dagster_event]


def test_emr_pyspark_step_launcher_legacy_arguments():
mock_config = {
"local_job_package_path": os.path.abspath(os.path.dirname(__file__)),
"cluster_id": "123",
"staging_bucket": "bucket",
"region_name": "us-west-1",
}

with pytest.raises(Exception):
emr_pyspark_step_launcher(
build_init_resource_context(
config={
**mock_config,
"local_pipeline_package_path": "path",
}
)
)

with pytest.raises(Exception):
emr_pyspark_step_launcher(
build_init_resource_context(
config={
**mock_config,
"deploy_local_job_package": True,
"deploy_local_pipeline_package": True,
}
)
)

with pytest.raises(Exception):
emr_pyspark_step_launcher(
build_init_resource_context(
config={
**mock_config,
"s3_job_package_path": "path",
"s3_pipeline_package_path": "path",
}
)
)

assert emr_pyspark_step_launcher(
build_init_resource_context(
config={
**mock_config,
"deploy_local_job_package": True,
}
)
)

0 comments on commit 00a5d89

Please sign in to comment.