Skip to content

Commit

Permalink
Add <1.6.10 provider in Dagster conftest. (#2518)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>
  • Loading branch information
JDarDagran authored and mobuchowski committed Mar 29, 2024
1 parent 2cd1459 commit 2466dca
Showing 1 changed file with 36 additions and 2 deletions.
38 changes: 36 additions & 2 deletions integration/dagster/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
EventLogRecord,
)
from dagster.core.execution.plan.objects import StepFailureData, StepSuccessData
from dagster.core.host_representation import ExternalRepositoryOrigin
from dagster.core.types.loadable_target_origin import LoadableTargetOrigin
from dagster.version import __version__ as DAGSTER_VERSION

Expand All @@ -44,6 +43,7 @@ class DagsterRunLE1_2_2Provider(DagsterRunProvider):
def get_instance(self, repository_name: str) -> DagsterRun:
from dagster.core.host_representation import ( # type: ignore
ExternalPipelineOrigin,
ExternalRepositoryOrigin,
InProcessRepositoryLocationOrigin,
)

Expand Down Expand Up @@ -74,6 +74,7 @@ class DagsterRunLE1_3_2Provider(DagsterRunProvider):
def get_instance(self, repository_name: str) -> DagsterRun:
from dagster.core.host_representation import (
ExternalPipelineOrigin,
ExternalRepositoryOrigin,
InProcessCodeLocationOrigin,
)

Expand All @@ -95,6 +96,36 @@ def get_instance(self, repository_name: str) -> DagsterRun:
return dagster_run


class DagsterRunLE1_6_9Provider(DagsterRunProvider):
"""Class for provisioning `dagster.DagsterRun` instance for Dagster
versioin >= `1.3.3`.
"""

def get_instance(self, repository_name: str) -> DagsterRun:
from dagster.core.host_representation import (
ExternalJobOrigin,
ExternalRepositoryOrigin,
InProcessCodeLocationOrigin,
)

dagster_run = DagsterRun(
job_name="test",
execution_plan_snapshot_id="123",
external_job_origin=ExternalJobOrigin(
external_repository_origin=ExternalRepositoryOrigin(
code_location_origin=InProcessCodeLocationOrigin(
loadable_target_origin=LoadableTargetOrigin(
python_file="/openlineage/dagster/tests/test_pipelines/repo.py",
)
),
repository_name=repository_name,
),
job_name="test",
),
)
return dagster_run


class DagsterRunLatestProvider(DagsterRunProvider):
"""Class for provisioning `dagster.DagsterRun` instance for Dagster
versioin >= `1.3.3`.
Expand All @@ -106,8 +137,9 @@ class to `DagsterRunLEx_y_yProvider`. And then implement a brand new `DagsterRun
"""

def get_instance(self, repository_name: str) -> DagsterRun:
from dagster.core.host_representation import (
from dagster.core.remote_representation.origin import (
ExternalJobOrigin,
ExternalRepositoryOrigin,
InProcessCodeLocationOrigin,
)

Expand Down Expand Up @@ -174,6 +206,8 @@ def make_pipeline_run_with_external_pipeline_origin(
dagster_run_provider = DagsterRunLE1_2_2Provider()
if not dagster_run_provider and parsed_dagster_version <= parse_version("1.3.2"):
dagster_run_provider = DagsterRunLE1_3_2Provider()
if not dagster_run_provider and parsed_dagster_version <= parse_version("1.6.9"):
dagster_run_provider = DagsterRunLE1_6_9Provider()
if not dagster_run_provider:
dagster_run_provider = DagsterRunLatestProvider()
return dagster_run_provider.get_instance(repository_name)

0 comments on commit 2466dca

Please sign in to comment.