Skip to content

Commit

Permalink
Revert "Get partitions with planned but not completed materialization…
Browse files Browse the repository at this point in the history
…s, the event log query (#12404)"

This reverts commit e1f9d7f.
  • Loading branch information
johannkm committed Mar 1, 2023
1 parent f8acfcd commit 7f4676b
Show file tree
Hide file tree
Showing 4 changed files with 0 additions and 317 deletions.
Expand Up @@ -368,12 +368,6 @@ def get_materialization_count_by_partition(
) -> Mapping[AssetKey, Mapping[str, int]]:
pass

@abstractmethod
def get_latest_asset_partition_materialization_attempts_without_materializations(
self, asset_key: AssetKey
) -> Mapping[str, str]:
pass

@abstractmethod
def get_dynamic_partitions(self, partitions_def_name: str) -> Sequence[str]:
"""Get the list of partition keys for a dynamic partitions definition."""
Expand Down
Expand Up @@ -1705,118 +1705,6 @@ def get_materialization_count_by_partition(

return materialization_count_by_partition

def get_latest_asset_partition_materialization_attempts_without_materializations(
self, asset_key: AssetKey
) -> Mapping[str, str]:
"""
Fetch the latest materialzation and materialization planned events for each partition of the given asset.
Return the partitions that have a materialization planned event but no matching (same run) materialization event.
These materializations could be in progress, or they could have failed. A separate query checking the run status
is required to know.
Returns a mapping of partition to run_id.
"""
check.inst_param(asset_key, "asset_key", AssetKey)

latest_event_ids_subquery = (
db.select(
[
SqlEventLogStorageTable.c.dagster_event_type,
SqlEventLogStorageTable.c.partition,
db.func.max(SqlEventLogStorageTable.c.id).label("id"),
]
)
.where(
db.and_(
db.or_(
SqlEventLogStorageTable.c.asset_key == asset_key.to_string(),
SqlEventLogStorageTable.c.asset_key == asset_key.to_string(legacy=True),
),
SqlEventLogStorageTable.c.partition != None, # noqa: E711
)
)
.group_by(
SqlEventLogStorageTable.c.dagster_event_type, SqlEventLogStorageTable.c.partition
)
)

assets_details = self._get_assets_details([asset_key])
latest_event_ids_subquery = self._add_assets_wipe_filter_to_query(
latest_event_ids_subquery, assets_details, [asset_key]
).alias("latest_materialization_event_ids")

latest_events_subquery = (
db.select(
[
SqlEventLogStorageTable.c.dagster_event_type,
SqlEventLogStorageTable.c.partition,
SqlEventLogStorageTable.c.run_id,
]
)
.select_from(
latest_event_ids_subquery.join(
SqlEventLogStorageTable,
SqlEventLogStorageTable.c.id == latest_event_ids_subquery.c.id,
),
)
.alias("latest_materialization_events")
)

materialization_planned_events = (
db.select(
[
latest_events_subquery.c.dagster_event_type,
latest_events_subquery.c.partition,
latest_events_subquery.c.run_id,
]
)
.where(
latest_events_subquery.c.dagster_event_type
== DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value
)
.alias("materialization_planned_events")
)

materialization_events = (
db.select(
[
latest_events_subquery.c.dagster_event_type,
latest_events_subquery.c.partition,
latest_events_subquery.c.run_id,
]
)
.where(
latest_events_subquery.c.dagster_event_type
== DagsterEventType.ASSET_MATERIALIZATION.value
)
.alias("materialization_events")
)

query = (
db.select(
[
materialization_planned_events.c.partition,
materialization_planned_events.c.run_id,
]
)
.select_from(
materialization_planned_events.join(
materialization_events,
db.and_(
materialization_planned_events.c.partition
== materialization_events.c.partition,
materialization_planned_events.c.run_id == materialization_events.c.run_id,
),
isouter=True,
)
)
.where(materialization_events.c.run_id == None) # noqa: E711
)

with self.index_connection() as conn:
rows = conn.execute(query).fetchall()
return {row["partition"]: row["run_id"] for row in rows}

def _check_partitions_table(self) -> None:
# Guards against cases where the user is not running the latest migration for
# partitions storage. Should be updated when the partitions storage schema changes.
Expand Down
Expand Up @@ -488,13 +488,6 @@ def get_materialization_count_by_partition(
asset_keys, after_cursor
)

def get_latest_asset_partition_materialization_attempts_without_materializations(
self, asset_key: "AssetKey"
) -> Mapping[str, str]:
return self._storage.event_log_storage.get_latest_asset_partition_materialization_attempts_without_materializations(
asset_key
)

def get_dynamic_partitions(self, partitions_def_name: str) -> Sequence[str]:
return self._storage.event_log_storage.get_dynamic_partitions(partitions_def_name)

Expand Down
Expand Up @@ -37,7 +37,6 @@
from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionKey
from dagster._core.definitions.pipeline_base import InMemoryPipeline
from dagster._core.events import (
AssetMaterializationPlannedData,
DagsterEvent,
DagsterEventType,
EngineEventData,
Expand Down Expand Up @@ -1998,197 +1997,6 @@ def _fetch_counts(storage, after_cursor=None):
)
assert _fetch_counts(storage, after_cursor=9999999999) == {c: {}, d: {}}

def test_get_latest_asset_partition_materialization_attempts_without_materializations(
self, storage, instance
):
a = AssetKey(["a"])
run_id_1 = make_new_run_id()
run_id_2 = make_new_run_id()
run_id_3 = make_new_run_id()
run_id_4 = make_new_run_id()
with create_and_delete_test_runs(instance, [run_id_1, run_id_2, run_id_3, run_id_4]):
# no events
assert (
storage.get_latest_asset_partition_materialization_attempts_without_materializations(
a
)
== {}
)

storage.store_event(
EventLogEntry(
error_info=None,
level="debug",
user_message="",
run_id=run_id_1,
timestamp=time.time(),
dagster_event=DagsterEvent(
DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value,
"nonce",
event_specific_data=AssetMaterializationPlannedData(a, "foo"),
),
)
)
storage.store_event(
EventLogEntry(
error_info=None,
level="debug",
user_message="",
run_id=run_id_2,
timestamp=time.time(),
dagster_event=DagsterEvent(
DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value,
"nonce",
event_specific_data=AssetMaterializationPlannedData(a, "bar"),
),
)
)

# no materializations yet
assert storage.get_latest_asset_partition_materialization_attempts_without_materializations(
a
) == {
"foo": run_id_1,
"bar": run_id_2,
}

storage.store_event(
EventLogEntry(
error_info=None,
level="debug",
user_message="",
run_id=run_id_1,
timestamp=time.time(),
dagster_event=DagsterEvent(
DagsterEventType.ASSET_MATERIALIZATION.value,
"nonce",
event_specific_data=StepMaterializationData(
AssetMaterialization(asset_key=a, partition="foo")
),
),
)
)

# foo got materialized later in the same run
assert storage.get_latest_asset_partition_materialization_attempts_without_materializations(
a
) == {
"bar": run_id_2
}

storage.store_event(
EventLogEntry(
error_info=None,
level="debug",
user_message="",
run_id=run_id_3,
timestamp=time.time(),
dagster_event=DagsterEvent(
DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value,
"nonce",
event_specific_data=AssetMaterializationPlannedData(a, "foo"),
),
)
)

# a new run has been started for foo
assert storage.get_latest_asset_partition_materialization_attempts_without_materializations(
a
) == {
"foo": run_id_3,
"bar": run_id_2,
}

storage.store_event(
EventLogEntry(
error_info=None,
level="debug",
user_message="",
run_id=run_id_3,
timestamp=time.time(),
dagster_event=DagsterEvent(
DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value,
"nonce",
event_specific_data=AssetMaterializationPlannedData(
AssetKey(["other"]), "foo"
),
),
)
)

# other assets don't get included
assert storage.get_latest_asset_partition_materialization_attempts_without_materializations(
a
) == {
"foo": run_id_3,
"bar": run_id_2,
}

# other assets don't get included
assert storage.get_latest_asset_partition_materialization_attempts_without_materializations(
a
) == {
"foo": run_id_3,
"bar": run_id_2,
}

# wipe asset, make sure we respect that
if self.can_wipe():
storage.wipe_asset(a)
assert (
storage.get_latest_asset_partition_materialization_attempts_without_materializations(
a
)
== {}
)

storage.store_event(
EventLogEntry(
error_info=None,
level="debug",
user_message="",
run_id=run_id_4,
timestamp=time.time(),
dagster_event=DagsterEvent(
DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value,
"nonce",
event_specific_data=AssetMaterializationPlannedData(a, "bar"),
),
)
)

# new materialization planned appears
assert storage.get_latest_asset_partition_materialization_attempts_without_materializations(
a
) == {
"bar": run_id_4,
}

storage.store_event(
EventLogEntry(
error_info=None,
level="debug",
user_message="",
run_id=run_id_4,
timestamp=time.time(),
dagster_event=DagsterEvent(
DagsterEventType.ASSET_MATERIALIZATION.value,
"nonce",
event_specific_data=StepMaterializationData(
AssetMaterialization(asset_key=a, partition="bar")
),
),
)
)

# and goes away
assert (
storage.get_latest_asset_partition_materialization_attempts_without_materializations(
a
)
== {}
)

def test_get_observation(self, storage, test_run_id):
a = AssetKey(["key_a"])

Expand Down

0 comments on commit 7f4676b

Please sign in to comment.