Skip to content

Commit

Permalink
Revert "Add failures to partition status cache (#12599)"
Browse files Browse the repository at this point in the history
This reverts commit 83c875d.
  • Loading branch information
johannkm committed Mar 1, 2023
1 parent 5d9e03c commit f8acfcd
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 299 deletions.
176 changes: 19 additions & 157 deletions python_modules/dagster/dagster/_core/storage/partition_status_cache.py
@@ -1,11 +1,9 @@
from typing import Dict, List, Mapping, NamedTuple, Optional, Sequence, Set, cast
from typing import List, NamedTuple, Optional, Sequence, Set, cast

from dagster import (
AssetKey,
DagsterEventType,
DagsterInstance,
DagsterRunStatus,
EventLogRecord,
EventRecordsFilter,
_check as check,
)
Expand All @@ -21,7 +19,6 @@
)
from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition
from dagster._core.instance import DynamicPartitionsStore
from dagster._core.storage.pipeline_run import RunsFilter
from dagster._core.storage.tags import (
MULTIDIMENSIONAL_PARTITION_PREFIX,
get_dimension_from_partition_tag,
Expand All @@ -43,7 +40,7 @@ class AssetStatusCacheValue(
("latest_storage_id", int),
("partitions_def_id", Optional[str]),
("serialized_materialized_partition_subset", Optional[str]),
("serialized_failed_partition_subset", Optional[str]),
# Future partition subset features will go here, e.g. stale partition subsets
],
)
):
Expand All @@ -59,33 +56,23 @@ class AssetStatusCacheValue(
serialized_materialized_partition_subset (Optional(str)): The serialized representation of the
materialized partition subsets, up to the latest storage id. None if the asset is
unpartitioned.
serialized_failed_partition_subset (Optional(str)): The serialized representation of the failed
partition subsets, up to the latest storage id. None if the asset is unpartitioned.
"""

def __new__(
cls,
latest_storage_id: int,
partitions_def_id: Optional[str] = None,
serialized_materialized_partition_subset: Optional[str] = None,
serialized_failed_partition_subset: Optional[str] = None,
):
check.int_param(latest_storage_id, "latest_storage_id")
check.opt_str_param(partitions_def_id, "partitions_def_id")
check.opt_inst_param(
serialized_materialized_partition_subset,
"serialized_materialized_partition_subset",
str,
)
check.opt_inst_param(
serialized_failed_partition_subset, "serialized_failed_partition_subset", str
)
check.opt_str_param(partitions_def_id, "partitions_def_id")
return super(AssetStatusCacheValue, cls).__new__(
cls,
latest_storage_id,
partitions_def_id,
serialized_materialized_partition_subset,
serialized_failed_partition_subset,
cls, latest_storage_id, partitions_def_id, serialized_materialized_partition_subset
)

@staticmethod
Expand All @@ -107,14 +94,6 @@ def deserialize_materialized_partition_subsets(

return partitions_def.deserialize_subset(self.serialized_materialized_partition_subset)

def deserialize_failed_partition_subsets(
self, partitions_def: PartitionsDefinition
) -> PartitionsSubset:
if not self.serialized_failed_partition_subset:
return partitions_def.empty_subset()

return partitions_def.deserialize_subset(self.serialized_failed_partition_subset)


def get_materialized_multipartitions(
instance: DagsterInstance, asset_key: AssetKey, partitions_def: MultiPartitionsDefinition
Expand Down Expand Up @@ -218,83 +197,6 @@ def _build_status_cache(
latest_storage_id=latest_storage_id,
partitions_def_id=partitions_def.serializable_unique_identifier,
serialized_materialized_partition_subset=serialized_materialized_partition_subset.serialize(),
serialized_failed_partition_subset=_build_failed_partition_subset(
instance, asset_key, partitions_def
).serialize(),
)


def _filter_incomplete_materialized_runs_to_failed(
instance: DagsterInstance, incomplete_materialization_runs: Mapping[str, str]
) -> Set[str]:
if not incomplete_materialization_runs:
return set()

failed_run_ids = {
r.run_id
for r in instance.get_runs(
filters=RunsFilter(
run_ids=list(incomplete_materialization_runs.values()),
statuses=[DagsterRunStatus.FAILURE],
)
)
}
return {p for p, run_id in incomplete_materialization_runs.items() if run_id in failed_run_ids}


def _build_failed_partition_subset(
instance: DagsterInstance, asset_key: AssetKey, partitions_def: PartitionsDefinition
) -> PartitionsSubset:
incomplete_materialization_runs = instance.event_log_storage.get_latest_asset_partition_materialization_attempts_without_materializations(
asset_key
)

if not incomplete_materialization_runs:
return partitions_def.empty_subset()

return partitions_def.empty_subset().with_partition_keys(
get_validated_partition_keys(
instance,
partitions_def,
_filter_incomplete_materialized_runs_to_failed(
instance, incomplete_materialization_runs
),
)
)


def _get_updated_failed_partition_subset(
instance: DagsterInstance,
asset_key: AssetKey,
partitions_def: PartitionsDefinition,
current_cached_subset: PartitionsSubset,
unevaluated_event_records: Sequence[EventLogRecord],
) -> PartitionsSubset:
current_failed_partitions = set(current_cached_subset.get_partition_keys())

incomplete_materialization_runs: Dict[str, str] = {}
for record in sorted(unevaluated_event_records, key=lambda r: r.storage_id):
event = check.not_none(record.event_log_entry.dagster_event)
if event.is_asset_materialization_planned:
if event.partition:
# If we have a planned materilization for a partition, keep track of it to see if we
# also find a materialization. If not, we'll check if the run failed and add it to
# the failed partitions.
incomplete_materialization_runs[event.partition] = record.run_id
if event.is_step_materialization:
if event.partition:
incomplete_materialization_runs.pop(event.partition, None)
# if we have a new materialization for a partition, that negates the old failure
current_failed_partitions.discard(event.partition)

new_failed_partitions = _filter_incomplete_materialized_runs_to_failed(
instance, incomplete_materialization_runs
)

return partitions_def.empty_subset().with_partition_keys(
get_validated_partition_keys(
instance, partitions_def, new_failed_partitions | current_failed_partitions
)
)


Expand All @@ -308,27 +210,17 @@ def _get_updated_status_cache(
This method accepts the current asset status cache value, and fetches unevaluated
records from the event log. It then updates the cache value with the new materializations.
"""
unevaluated_planned_event_records = instance.get_event_records(
event_records_filter=EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION_PLANNED,
asset_key=asset_key,
after_cursor=current_status_cache_value.latest_storage_id,
)
)
unevaluated_materialization_event_records = instance.get_event_records(
unevaluated_event_records = instance.get_event_records(
event_records_filter=EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=asset_key,
after_cursor=current_status_cache_value.latest_storage_id,
)
)

if not (unevaluated_materialization_event_records or unevaluated_planned_event_records):
if not unevaluated_event_records:
return current_status_cache_value

unevaluated_event_records = list(unevaluated_planned_event_records)
unevaluated_event_records.extend(list(unevaluated_materialization_event_records))

latest_storage_id = max([record.storage_id for record in unevaluated_event_records])
if not partitions_def or not isinstance(partitions_def, CACHEABLE_PARTITION_TYPES):
return AssetStatusCacheValue(latest_storage_id=latest_storage_id)
Expand All @@ -347,40 +239,24 @@ def _get_updated_status_cache(
)
newly_materialized_partitions = set()

for record in unevaluated_event_records:
if not record.event_log_entry.dagster_event:
check.failed("Expected dagster event")

if record.event_log_entry.dagster_event.is_step_materialization:
if record.event_log_entry.dagster_event.partition:
newly_materialized_partitions.add(record.event_log_entry.dagster_event.partition)
elif not record.event_log_entry.dagster_event.is_asset_materialization_planned:
check.failed("Expected materialization or materialization planned event")
for unevaluated_materializations in unevaluated_event_records:
if not (
unevaluated_materializations.event_log_entry.dagster_event
and unevaluated_materializations.event_log_entry.dagster_event.is_step_materialization
):
check.failed("Expected materialization event")
if unevaluated_materializations.event_log_entry.dagster_event.partition:
newly_materialized_partitions.add(
unevaluated_materializations.event_log_entry.dagster_event.partition
)

materialized_subset = materialized_subset.with_partition_keys(
get_validated_partition_keys(instance, partitions_def, newly_materialized_partitions)
)

failed_subset: PartitionsSubset = (
partitions_def.deserialize_subset(
current_status_cache_value.serialized_failed_partition_subset
)
if current_status_cache_value
and current_status_cache_value.serialized_failed_partition_subset
else partitions_def.empty_subset()
)

return AssetStatusCacheValue(
latest_storage_id=latest_storage_id,
partitions_def_id=current_status_cache_value.partitions_def_id,
serialized_materialized_partition_subset=materialized_subset.serialize(),
serialized_failed_partition_subset=_get_updated_failed_partition_subset(
instance,
asset_key,
partitions_def,
failed_subset,
unevaluated_event_records,
).serialize(),
)


Expand Down Expand Up @@ -409,33 +285,19 @@ def _get_fresh_asset_status_cache_value(
if cached_status_data is None or cached_status_data.partitions_def_id != (
partitions_def.serializable_unique_identifier if partitions_def else None
):
planned_event_records = instance.get_event_records(
event_records_filter=EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION_PLANNED,
asset_key=asset_key,
),
limit=1,
)
materialized_event_records = instance.get_event_records(
event_records = instance.get_event_records(
event_records_filter=EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=asset_key,
),
limit=1,
)

if materialized_event_records or planned_event_records:
latest_storage_id = max(
next(iter(materialized_event_records)).storage_id
if materialized_event_records
else 0,
next(iter(planned_event_records)).storage_id if planned_event_records else 0,
)
if event_records:
updated_cache_value = _build_status_cache(
instance=instance,
asset_key=asset_key,
partitions_def=partitions_def,
latest_storage_id=latest_storage_id,
latest_storage_id=next(iter(event_records)).storage_id,
)
else:
updated_cache_value = _get_updated_status_cache(
Expand Down

0 comments on commit f8acfcd

Please sign in to comment.