Skip to content

Commit

Permalink
context.partition_time_window (#7795)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed May 11, 2022
1 parent 2572d66 commit 62b2ffb
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from .hook_definition import HookDefinition
from .logger_definition import LoggerDefinition
from .mode import ModeDefinition
from .partition import PartitionSetDefinition, PartitionedConfig
from .partition import PartitionSetDefinition, PartitionedConfig, PartitionsDefinition
from .pipeline_definition import PipelineDefinition
from .preset import PresetDefinition
from .resource_definition import ResourceDefinition
Expand Down Expand Up @@ -292,6 +292,14 @@ def get_partition_set_def(self) -> Optional["PartitionSetDefinition"]:

return self._cached_partition_set

@property
def partitions_def(self) -> Optional[PartitionsDefinition]:
mode = self.get_mode_definition()
if not mode.partitioned_config:
return None

return mode.partitioned_config.partitions_def

def run_request_for_partition(self, partition_key: str, run_key: Optional[str]) -> RunRequest:
partition_set = self.get_partition_set_def()
if not partition_set:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,15 @@ def partition_key(self) -> str:
"""
return self._step_execution_context.partition_key

@property
def partition_time_window(self) -> str:
"""The partition time window for the current run.
Raises an error if the current run is not a partitioned run, or if the job's partition
definition is not a TimeWindowPartitionsDefinition.
"""
return self._step_execution_context.partition_time_window

def output_asset_partition_key(self, output_name: str = "result") -> str:
"""Returns the asset partition key for the given output. Defaults to "result", which is the
name of the default output.
Expand Down
19 changes: 19 additions & 0 deletions python_modules/dagster/dagster/core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import dagster._check as check
from dagster.core.definitions.events import AssetKey, AssetLineageInfo
from dagster.core.definitions.hook_definition import HookDefinition
from dagster.core.definitions.job_definition import JobDefinition
from dagster.core.definitions.mode import ModeDefinition
from dagster.core.definitions.op_definition import OpDefinition
from dagster.core.definitions.partition_key_range import PartitionKeyRange
Expand Down Expand Up @@ -318,6 +319,24 @@ def partition_key(self) -> str:
)
return tags[PARTITION_NAME_TAG]

@property
def partition_time_window(self) -> str:
pipeline_def = self._execution_data.pipeline_def
if not isinstance(pipeline_def, JobDefinition):
check.failed(
# isinstance(pipeline_def, JobDefinition),
"Can only call 'partition_time_window', when using jobs, not legacy pipelines",
)
partitions_def = pipeline_def.partitions_def

if not isinstance(partitions_def, TimeWindowPartitionsDefinition):
check.failed(
f"Expected a TimeWindowPartitionsDefinition, but instead found {type(partitions_def)}",
)

# mypy thinks partitions_def is <nothing> here because ????
return partitions_def.time_window_for_partition_key(self.partition_key) # type: ignore

@property
def has_partition_key(self) -> bool:
return PARTITION_NAME_TAG in self._plan_data.pipeline_run.tags
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import enum
import json

import pendulum
import pytest

from dagster import (
Expand Down Expand Up @@ -29,7 +30,7 @@
StaticPartitionsDefinition,
)
from dagster.core.definitions.pipeline_definition import PipelineSubsetDefinition
from dagster.core.definitions.time_window_partitions import DailyPartitionsDefinition
from dagster.core.definitions.time_window_partitions import DailyPartitionsDefinition, TimeWindow
from dagster.core.errors import (
DagsterConfigMappingFunctionError,
DagsterInvalidConfigError,
Expand Down Expand Up @@ -965,6 +966,9 @@ def test_job_partitions_def():
def my_op(context):
assert context.has_partition_key
assert context.partition_key == "2020-01-01"
assert context.partition_time_window == TimeWindow(
pendulum.parse("2020-01-01"), pendulum.parse("2020-01-02")
)

@graph
def my_graph():
Expand Down

0 comments on commit 62b2ffb

Please sign in to comment.