Skip to content

Commit

Permalink
rename PipelineRunsFilter => RunsFilter (#6811)
Browse files Browse the repository at this point in the history
* runs filter

* rename callsites for PipelineRunsFilter => RunsFilter

* remove pipeline_name property access

* fix import

* isort

* deprecation comment
  • Loading branch information
prha committed Feb 28, 2022
1 parent 9dd063a commit 63556df
Show file tree
Hide file tree
Showing 37 changed files with 227 additions and 225 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
)
from marks import mark_daemon

from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunsFilter
from dagster.core.storage.pipeline_run import PipelineRun, RunsFilter
from dagster.core.test_utils import poll_for_finished_run


Expand All @@ -22,7 +22,7 @@ def test_execute_schedule_on_celery_k8s( # pylint: disable=redefined-outer-name
dagster_instance_for_daemon.start_schedule(reoriginated_schedule)

scheduler_runs = dagster_instance_for_daemon.get_runs(
PipelineRunsFilter(tags=PipelineRun.tags_for_schedule(reoriginated_schedule))
RunsFilter(tags=PipelineRun.tags_for_schedule(reoriginated_schedule))
)

assert len(scheduler_runs) == 0
Expand All @@ -33,7 +33,7 @@ def test_execute_schedule_on_celery_k8s( # pylint: disable=redefined-outer-name

while True:
schedule_runs = dagster_instance_for_daemon.get_runs(
PipelineRunsFilter(tags=PipelineRun.tags_for_schedule(reoriginated_schedule))
RunsFilter(tags=PipelineRun.tags_for_schedule(reoriginated_schedule))
)

if len(schedule_runs) > 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from dagster import check
from dagster.core.storage.compute_log_manager import ComputeIOType
from dagster.core.storage.pipeline_run import PipelineRunStatus, PipelineRunsFilter
from dagster.core.storage.pipeline_run import PipelineRunStatus, RunsFilter
from dagster.serdes import serialize_dagster_namedtuple
from dagster.utils.error import serializable_error_info_from_exc_info

Expand All @@ -25,15 +25,15 @@ def _force_mark_as_canceled(graphene_info, run_id):

instance = graphene_info.context.instance

reloaded_record = instance.get_run_records(PipelineRunsFilter(run_ids=[run_id]))[0]
reloaded_record = instance.get_run_records(RunsFilter(run_ids=[run_id]))[0]

if not reloaded_record.pipeline_run.is_finished:
message = (
"This pipeline was forcibly marked as canceled from outside the execution context. The "
"computational resources created by the run may not have been fully cleaned up."
)
instance.report_run_canceled(reloaded_record.pipeline_run, message=message)
reloaded_record = instance.get_run_records(PipelineRunsFilter(run_ids=[run_id]))[0]
reloaded_record = instance.get_run_records(RunsFilter(run_ids=[run_id]))[0]

return GrapheneTerminateRunSuccess(GrapheneRun(reloaded_record))

Expand All @@ -52,7 +52,7 @@ def terminate_pipeline_execution(graphene_info, run_id, terminate_policy):
check.str_param(run_id, "run_id")

instance = graphene_info.context.instance
records = instance.get_run_records(PipelineRunsFilter(run_ids=[run_id]))
records = instance.get_run_records(RunsFilter(run_ids=[run_id]))

force_mark_as_canceled = (
terminate_policy == GrapheneTerminateRunPolicy.MARK_AS_CANCELED_IMMEDIATELY
Expand Down Expand Up @@ -126,7 +126,7 @@ def get_pipeline_run_observable(graphene_info, run_id, after=None):
check.str_param(run_id, "run_id")
check.opt_int_param(after, "after")
instance = graphene_info.context.instance
records = instance.get_run_records(PipelineRunsFilter(run_ids=[run_id]))
records = instance.get_run_records(RunsFilter(run_ids=[run_id]))

if not records:

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from graphql.execution.base import ResolveInfo

from dagster import check
from dagster.core.storage.pipeline_run import PipelineRunsFilter
from dagster.core.storage.pipeline_run import RunsFilter

from ..external import get_external_pipeline_or_raise
from ..utils import ExecutionMetadata, ExecutionParams, capture_error
Expand Down Expand Up @@ -50,8 +50,6 @@ def _launch_pipeline_execution(graphene_info, execution_params, is_reexecuted=Fa
check.bool_param(is_reexecuted, "is_reexecuted")

run = do_launch(graphene_info, execution_params, is_reexecuted)
records = graphene_info.context.instance.get_run_records(
PipelineRunsFilter(run_ids=[run.run_id])
)
records = graphene_info.context.instance.get_run_records(RunsFilter(run_ids=[run.run_id]))

return GrapheneLaunchRunSuccess(run=GrapheneRun(records[0]))
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
RepositoryHandle,
RepositorySelector,
)
from dagster.core.storage.pipeline_run import PipelineRunsFilter
from dagster.core.storage.pipeline_run import RunsFilter
from dagster.core.storage.tags import PARTITION_NAME_TAG, PARTITION_SET_TAG, TagType, get_tag_type

from .utils import capture_error
Expand Down Expand Up @@ -180,7 +180,7 @@ def get_partition_set_partition_statuses(graphene_info, repository_handle, parti
repository_handle, partition_set_name
)
all_partition_set_runs = graphene_info.context.instance.get_runs(
PipelineRunsFilter(tags={PARTITION_SET_TAG: partition_set_name})
RunsFilter(tags={PARTITION_SET_TAG: partition_set_name})
)
runs_by_partition = {}
for run in all_partition_set_runs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dagster.core.errors import DagsterRunNotFoundError
from dagster.core.execution.stats import StepEventStatus
from dagster.core.host_representation import PipelineSelector
from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunsFilter
from dagster.core.storage.pipeline_run import PipelineRun, RunsFilter
from dagster.core.storage.tags import TagType, get_tag_type
from dagster.utils import utc_datetime_from_timestamp

Expand Down Expand Up @@ -51,7 +51,7 @@ def get_run_by_id(graphene_info, run_id):
from ..schema.pipelines.pipeline import GrapheneRun

instance = graphene_info.context.instance
records = instance.get_run_records(PipelineRunsFilter(run_ids=[run_id]))
records = instance.get_run_records(RunsFilter(run_ids=[run_id]))
if not records:
return GrapheneRunNotFoundError(run_id)
else:
Expand Down Expand Up @@ -84,7 +84,7 @@ def get_run_group(graphene_info, run_id):
run_group_run_ids = [run.run_id for run in run_group]
records_by_id = {
record.pipeline_run.run_id: record
for record in instance.get_run_records(PipelineRunsFilter(run_ids=run_group_run_ids))
for record in instance.get_run_records(RunsFilter(run_ids=run_group_run_ids))
}
return GrapheneRunGroup(
root_run_id=root_run_id,
Expand All @@ -95,7 +95,7 @@ def get_run_group(graphene_info, run_id):
def get_runs(graphene_info, filters, cursor=None, limit=None):
from ..schema.pipelines.pipeline import GrapheneRun

check.opt_inst_param(filters, "filters", PipelineRunsFilter)
check.opt_inst_param(filters, "filters", RunsFilter)
check.opt_str_param(cursor, "cursor")
check.opt_int_param(limit, "limit")

Expand Down Expand Up @@ -129,9 +129,7 @@ def get_in_progress_runs_by_step(graphene_info, job_names, step_keys):
in_progress_records = []
for job_name in job_names:
in_progress_records.extend(
instance.get_run_records(
PipelineRunsFilter(pipeline_name=job_name, statuses=PENDING_STATUSES)
)
instance.get_run_records(RunsFilter(pipeline_name=job_name, statuses=PENDING_STATUSES))
)

in_progress_runs_by_step = defaultdict(list)
Expand Down Expand Up @@ -216,7 +214,7 @@ def get_latest_asset_run_by_step_key(graphene_info, asset_nodes):
run_records = []
for job_name in job_names:
run_records.extend(
instance.get_run_records(PipelineRunsFilter(pipeline_name=job_name), limit=5)
instance.get_run_records(RunsFilter(pipeline_name=job_name), limit=5)
)

if len(run_records) == 0:
Expand Down Expand Up @@ -262,7 +260,7 @@ def get_asset_runs_count_by_step(graphene_info, asset_nodes):
runs_count = sum(
[
instance.get_runs_count(
PipelineRunsFilter(
RunsFilter(
pipeline_name=job_name,
updated_after=utc_datetime_from_timestamp(event.timestamp)
if event
Expand Down Expand Up @@ -292,7 +290,7 @@ def get_run_groups(graphene_info, filters=None, cursor=None, limit=None):
from ..schema.pipelines.pipeline import GrapheneRun
from ..schema.runs import GrapheneRunGroup

check.opt_inst_param(filters, "filters", PipelineRunsFilter)
check.opt_inst_param(filters, "filters", RunsFilter)
check.opt_str_param(cursor, "cursor")
check.opt_int_param(limit, "limit")

Expand All @@ -301,7 +299,7 @@ def get_run_groups(graphene_info, filters=None, cursor=None, limit=None):
run_ids = {run.run_id for run_group in run_groups.values() for run in run_group.get("runs", [])}
records_by_ids = {
record.pipeline_run.run_id: record
for record in instance.get_run_records(PipelineRunsFilter(run_ids=list(run_ids)))
for record in instance.get_run_records(RunsFilter(run_ids=list(run_ids)))
}

for root_run_id in run_groups:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dagster.core.events.log import EventLogEntry
from dagster.core.host_representation import ExternalRepository
from dagster.core.scheduler.instigation import InstigatorType
from dagster.core.storage.pipeline_run import JobBucket, PipelineRunsFilter, RunRecord, TagBucket
from dagster.core.storage.pipeline_run import JobBucket, RunRecord, RunsFilter, TagBucket
from dagster.core.storage.tags import SCHEDULE_NAME_TAG, SENSOR_NAME_TAG


Expand Down Expand Up @@ -67,7 +67,7 @@ def _fetch(self, data_type, limit):
records.extend(
list(
self._instance.get_run_records(
filters=PipelineRunsFilter(pipeline_name=job_name), limit=limit
filters=RunsFilter(pipeline_name=job_name), limit=limit
)
)
)
Expand All @@ -92,7 +92,7 @@ def _fetch(self, data_type, limit):
records.extend(
list(
self._instance.get_run_records(
filters=PipelineRunsFilter(tags={SCHEDULE_NAME_TAG: schedule_name}),
filters=RunsFilter(tags={SCHEDULE_NAME_TAG: schedule_name}),
limit=limit,
)
)
Expand All @@ -116,7 +116,7 @@ def _fetch(self, data_type, limit):
records.extend(
list(
self._instance.get_run_records(
filters=PipelineRunsFilter(tags={SENSOR_NAME_TAG: sensor_name}),
filters=RunsFilter(tags={SENSOR_NAME_TAG: sensor_name}),
limit=limit,
)
)
Expand Down Expand Up @@ -203,7 +203,7 @@ def get_run_record_by_run_id(self, run_id: str) -> Optional[RunRecord]:
return self._records.get(run_id)

def _fetch(self):
records = self._instance.get_run_records(PipelineRunsFilter(run_ids=list(self._run_ids)))
records = self._instance.get_run_records(RunsFilter(run_ids=list(self._run_ids)))
for record in records:
self._records[record.pipeline_run.run_id] = record

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from dagster import check
from dagster.core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster.core.storage.pipeline_run import PipelineRunsFilter
from dagster.core.storage.pipeline_run import RunsFilter

from .errors import (
GrapheneInvalidOutputError,
Expand Down Expand Up @@ -116,7 +116,7 @@ def __init__(self, backfill_job):
def resolve_runs(self, graphene_info, **kwargs):
from .pipelines.pipeline import GrapheneRun

filters = PipelineRunsFilter.for_backfill(self._backfill_job.backfill_id)
filters = RunsFilter.for_backfill(self._backfill_job.backfill_id)
return [
GrapheneRun(record)
for record in graphene_info.context.instance.get_run_records(
Expand All @@ -126,7 +126,7 @@ def resolve_runs(self, graphene_info, **kwargs):
]

def resolve_numRequested(self, graphene_info):
filters = PipelineRunsFilter.for_backfill(self._backfill_job.backfill_id)
filters = RunsFilter.for_backfill(self._backfill_job.backfill_id)
run_count = graphene_info.context.instance.get_runs_count(filters)
if self._backfill_job.status == BulkActionStatus.COMPLETED:
return len(self._backfill_job.partition_names)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import graphene
import pendulum

from dagster.core.storage.pipeline_run import PipelineRunStatus, PipelineRunsFilter
from dagster.core.storage.pipeline_run import PipelineRunStatus, RunsFilter

from .pipelines.status import GrapheneRunStatus
from .runs import GrapheneRunConfigData
Expand Down Expand Up @@ -55,7 +55,7 @@ def to_selector(self):
updated_after = pendulum.from_timestamp(self.updatedAfter) if self.updatedAfter else None
created_before = pendulum.from_timestamp(self.createdBefore) if self.createdBefore else None

return PipelineRunsFilter(
return RunsFilter(
run_ids=self.runIds,
pipeline_name=self.pipelineName,
tags=tags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
ScheduleInstigatorData,
SensorInstigatorData,
)
from dagster.core.storage.pipeline_run import PipelineRunsFilter
from dagster.core.storage.pipeline_run import RunsFilter
from dagster.core.storage.tags import TagType, get_tag_type
from dagster.seven.compat.pendulum import to_timezone
from dagster.utils.error import SerializableErrorInfo, serializable_error_info_from_exc_info
Expand Down Expand Up @@ -130,7 +130,7 @@ def resolve_runs(self, graphene_info):

records_by_id = {
record.pipeline_run.run_id: record
for record in instance.get_run_records(PipelineRunsFilter(run_ids=run_ids))
for record in instance.get_run_records(RunsFilter(run_ids=run_ids))
}

return [GrapheneRun(records_by_id[run_id]) for run_id in run_ids if run_id in records_by_id]
Expand Down Expand Up @@ -333,9 +333,9 @@ def resolve_runs(self, graphene_info, **kwargs):
return [GrapheneRun(record) for record in records]

if self._instigator_state.instigator_type == InstigatorType.SENSOR:
filters = PipelineRunsFilter.for_sensor(self._instigator_state)
filters = RunsFilter.for_sensor(self._instigator_state)
else:
filters = PipelineRunsFilter.for_schedule(self._instigator_state)
filters = RunsFilter.for_schedule(self._instigator_state)
return [
GrapheneRun(record)
for record in graphene_info.context.instance.get_run_records(
Expand All @@ -346,9 +346,9 @@ def resolve_runs(self, graphene_info, **kwargs):

def resolve_runsCount(self, graphene_info):
if self._instigator_state.instigator_type == InstigatorType.SENSOR:
filters = PipelineRunsFilter.for_sensor(self._instigator_state)
filters = RunsFilter.for_sensor(self._instigator_state)
else:
filters = PipelineRunsFilter.for_schedule(self._instigator_state)
filters = RunsFilter.for_schedule(self._instigator_state)
return graphene_info.context.instance.get_runs_count(filters=filters)

def resolve_tick(self, graphene_info, timestamp):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from dagster import check
from dagster.core.host_representation import ExternalPartitionSet, RepositoryHandle
from dagster.core.storage.pipeline_run import PipelineRunsFilter
from dagster.core.storage.pipeline_run import RunsFilter
from dagster.core.storage.tags import PARTITION_NAME_TAG, PARTITION_SET_TAG
from dagster.utils import merge_dicts

Expand Down Expand Up @@ -133,14 +133,14 @@ def resolve_runs(self, graphene_info, **kwargs):
}
if filters is not None:
filters = filters.to_selector()
runs_filter = PipelineRunsFilter(
runs_filter = RunsFilter(
run_ids=filters.run_ids,
pipeline_name=filters.pipeline_name,
pipeline_name=filters.job_name,
statuses=filters.statuses,
tags=merge_dicts(filters.tags, partition_tags),
)
else:
runs_filter = PipelineRunsFilter(tags=partition_tags)
runs_filter = RunsFilter(tags=partition_tags)

return get_runs(
graphene_info, runs_filter, cursor=kwargs.get("cursor"), limit=kwargs.get("limit")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dagster import check
from dagster.core.host_representation.external import ExternalExecutionPlan, ExternalPipeline
from dagster.core.host_representation.external_data import ExternalPresetData
from dagster.core.storage.pipeline_run import PipelineRunStatus, PipelineRunsFilter, RunRecord
from dagster.core.storage.pipeline_run import PipelineRunStatus, RunRecord, RunsFilter
from dagster.core.storage.tags import TagType, get_tag_type
from dagster.utils import datetime_as_float

Expand Down Expand Up @@ -362,9 +362,7 @@ def resolve_events(self, graphene_info, after=-1):

def _get_run_record(self, instance):
if not self._run_record:
self._run_record = instance.get_run_records(PipelineRunsFilter(run_ids=[self.run_id]))[
0
]
self._run_record = instance.get_run_records(RunsFilter(run_ids=[self.run_id]))[0]
return self._run_record

def resolve_startTime(self, graphene_info):
Expand Down Expand Up @@ -521,7 +519,7 @@ def resolve_solidSelection(self, _graphene_info):
return self.get_represented_pipeline().solid_selection

def resolve_runs(self, graphene_info, **kwargs):
runs_filter = PipelineRunsFilter(pipeline_name=self.get_represented_pipeline().name)
runs_filter = RunsFilter(pipeline_name=self.get_represented_pipeline().name)
return get_runs(graphene_info, runs_filter, kwargs.get("cursor"), kwargs.get("limit"))

def resolve_schedules(self, graphene_info):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import graphene

from dagster.core.scheduler.instigation import TickStatus
from dagster.core.storage.pipeline_run import PipelineRunsFilter
from dagster.core.storage.pipeline_run import RunsFilter

from ..errors import GraphenePythonError
from ..instigation import GrapheneInstigationTickStatus
Expand All @@ -27,7 +27,7 @@ def tick_specific_data_from_dagster_tick(graphene_info, tick):
if tick.status == TickStatus.SUCCESS:
if tick.run_ids and graphene_info.context.instance.has_run(tick.run_ids[0]):
record = graphene_info.context.instance.get_run_records(
PipelineRunsFilter(run_ids=[tick.run_ids[0]])
RunsFilter(run_ids=[tick.run_ids[0]])
)[0]
return GrapheneScheduleTickSuccessData(run=GrapheneRun(record))
return GrapheneScheduleTickSuccessData(run=None)
Expand Down

0 comments on commit 63556df

Please sign in to comment.