Skip to content

Commit

Permalink
Add valid start time check to materialized time partitions subsets (#…
Browse files Browse the repository at this point in the history
…12403)

User reported a negative number of missing partitions in dagit:
https://dagster.slack.com/archives/C01U954MEER/p1676368552724739

We originally checked for partition key validity while iterating through
time windows in time window subset construction. Recent changes made it
possible to add partition keys without iterating through time windows.
In backcompat cases for fetching materialized partitions, constructing
the subset did not check for validity subsequently causing the
materialized partitions subset to contain partition keys that were out
of the valid time window range.

This PR makes a fix for this case and adds a test.
  • Loading branch information
clairelin135 committed Feb 16, 2023
1 parent 532ced5 commit cf0779b
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,9 @@ def get_materialized_partitions_subset(
if not partitions_def:
return None

if instance.can_cache_asset_status_data() and partitions_def in CACHEABLE_PARTITION_TYPES:
if instance.can_cache_asset_status_data() and isinstance(
partitions_def, CACHEABLE_PARTITION_TYPES
):
# When the "cached_status_data" column exists in storage, update the column to contain
# the latest partition status values
updated_cache_value = get_and_update_asset_status_cache_value(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1845,7 +1845,9 @@ def abc_asset(_):

@asset(partitions_def=daily_partitions_def)
def daily_asset(_):
return 1
# invalid partition key
yield AssetMaterialization(asset_key="daily_asset", partition="2021-01-01")
yield Output(5)

multipartitions_def = MultiPartitionsDefinition(
{
Expand Down Expand Up @@ -1922,11 +1924,11 @@ def test_1d_materialized_subset_backcompat():
"b",
}

abc_selector = infer_pipeline_selector(graphql_context, "daily_asset_job")
daily_job_selector = infer_pipeline_selector(graphql_context, "daily_asset_job")
result = execute_dagster_graphql(
graphql_context,
GET_1D_MATERIALIZED_PARTITIONS,
variables={"pipelineSelector": abc_selector},
variables={"pipelineSelector": daily_job_selector},
)
assert result.data
assert len(result.data["assetNodes"]) == 1
Expand All @@ -1938,7 +1940,7 @@ def test_1d_materialized_subset_backcompat():
result = execute_dagster_graphql(
graphql_context,
GET_1D_MATERIALIZED_PARTITIONS,
variables={"pipelineSelector": abc_selector},
variables={"pipelineSelector": daily_job_selector},
)
assert result.data
ranges = result.data["assetNodes"][0]["materializedPartitions"]["ranges"]
Expand All @@ -1948,6 +1950,16 @@ def test_1d_materialized_subset_backcompat():
assert ranges[1]["startKey"] == "2022-03-05"
assert ranges[1]["endKey"] == "2022-03-06"

result = execute_dagster_graphql(
graphql_context,
GET_PARTITION_STATS,
variables={"pipelineSelector": daily_job_selector},
)
assert result.data
assert result.data["assetNodes"]
assert len(result.data["assetNodes"]) == 1
assert result.data["assetNodes"][0]["partitionStats"]["numMaterialized"] == 3


def test_2d_materialized_subset_backcompat():
with instance_for_test() as instance:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,10 +652,10 @@ def empty_subset(self) -> "PartitionsSubset":

def is_valid_partition_key(self, partition_key: str) -> bool:
try:
datetime.strptime(partition_key, self.fmt)
time_obj = datetime.strptime(partition_key, self.fmt)
return time_obj.timestamp() >= self.start.timestamp()
except ValueError:
return False
return True

@property
def serializable_unique_identifier(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
)
from dagster._serdes import deserialize_json_to_dagster_namedtuple, whitelist_for_serdes

CACHEABLE_PARTITION_TYPES = {
CACHEABLE_PARTITION_TYPES = (
TimeWindowPartitionsDefinition,
MultiPartitionsDefinition,
StaticPartitionsDefinition,
}
)


@whitelist_for_serdes
Expand Down Expand Up @@ -170,9 +170,7 @@ def _build_status_cache(
This method refreshes the asset status cache for a given asset key. It recalculates
the materialized partition subset for the asset key and updates the cache value.
"""
if not partitions_def or not any(
isinstance(partitions_def, partition_type) for partition_type in CACHEABLE_PARTITION_TYPES
):
if not partitions_def or not isinstance(partitions_def, CACHEABLE_PARTITION_TYPES):
return AssetStatusCacheValue(latest_storage_id=latest_storage_id)

materialized_keys: Sequence[str]
Expand Down Expand Up @@ -224,9 +222,7 @@ def _get_updated_status_cache(
return current_status_cache_value

latest_storage_id = max([record.storage_id for record in unevaluated_event_records])
if not partitions_def or not any(
isinstance(partitions_def, partition_type) for partition_type in CACHEABLE_PARTITION_TYPES
):
if not partitions_def or not isinstance(partitions_def, CACHEABLE_PARTITION_TYPES):
return AssetStatusCacheValue(latest_storage_id=latest_storage_id)

check.invariant(
Expand Down

0 comments on commit cf0779b

Please sign in to comment.