Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dagster: add new latest version provider in conftest #2518

Merged
merged 1 commit into from
Mar 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 36 additions & 2 deletions integration/dagster/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
EventLogRecord,
)
from dagster.core.execution.plan.objects import StepFailureData, StepSuccessData
from dagster.core.host_representation import ExternalRepositoryOrigin
from dagster.core.types.loadable_target_origin import LoadableTargetOrigin
from dagster.version import __version__ as DAGSTER_VERSION

Expand All @@ -44,6 +43,7 @@ class DagsterRunLE1_2_2Provider(DagsterRunProvider):
def get_instance(self, repository_name: str) -> DagsterRun:
from dagster.core.host_representation import ( # type: ignore
ExternalPipelineOrigin,
ExternalRepositoryOrigin,
InProcessRepositoryLocationOrigin,
)

Expand Down Expand Up @@ -74,6 +74,7 @@ class DagsterRunLE1_3_2Provider(DagsterRunProvider):
def get_instance(self, repository_name: str) -> DagsterRun:
from dagster.core.host_representation import (
ExternalPipelineOrigin,
ExternalRepositoryOrigin,
InProcessCodeLocationOrigin,
)

Expand All @@ -95,6 +96,36 @@ def get_instance(self, repository_name: str) -> DagsterRun:
return dagster_run


class DagsterRunLE1_6_9Provider(DagsterRunProvider):
"""Class for provisioning `dagster.DagsterRun` instance for Dagster
versioin >= `1.3.3`.
"""

def get_instance(self, repository_name: str) -> DagsterRun:
from dagster.core.host_representation import (
ExternalJobOrigin,
ExternalRepositoryOrigin,
InProcessCodeLocationOrigin,
)

dagster_run = DagsterRun(
job_name="test",
execution_plan_snapshot_id="123",
external_job_origin=ExternalJobOrigin(
external_repository_origin=ExternalRepositoryOrigin(
code_location_origin=InProcessCodeLocationOrigin(
loadable_target_origin=LoadableTargetOrigin(
python_file="/openlineage/dagster/tests/test_pipelines/repo.py",
)
),
repository_name=repository_name,
),
job_name="test",
),
)
return dagster_run


class DagsterRunLatestProvider(DagsterRunProvider):
"""Class for provisioning `dagster.DagsterRun` instance for Dagster
versioin >= `1.3.3`.
Expand All @@ -106,8 +137,9 @@ class to `DagsterRunLEx_y_yProvider`. And then implement a brand new `DagsterRun
"""

def get_instance(self, repository_name: str) -> DagsterRun:
from dagster.core.host_representation import (
from dagster.core.remote_representation.origin import (
ExternalJobOrigin,
ExternalRepositoryOrigin,
InProcessCodeLocationOrigin,
)

Expand Down Expand Up @@ -174,6 +206,8 @@ def make_pipeline_run_with_external_pipeline_origin(
dagster_run_provider = DagsterRunLE1_2_2Provider()
if not dagster_run_provider and parsed_dagster_version <= parse_version("1.3.2"):
dagster_run_provider = DagsterRunLE1_3_2Provider()
if not dagster_run_provider and parsed_dagster_version <= parse_version("1.6.9"):
dagster_run_provider = DagsterRunLE1_6_9Provider()
if not dagster_run_provider:
dagster_run_provider = DagsterRunLatestProvider()
return dagster_run_provider.get_instance(repository_name)