Skip to content

Commit

Permalink
remove get_event_records calls from asset backfill (#21783)
Browse files Browse the repository at this point in the history
## Summary & Motivation
We want to deprecate get_event_records calls in favor of narrower APIs

## How I Tested These Changes
BK
  • Loading branch information
prha committed May 16, 2024
1 parent ffa3d5e commit da7848c
Showing 1 changed file with 28 additions and 24 deletions.
52 changes: 28 additions & 24 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@
DagsterDefinitionChangedDeserializationError,
DagsterInvariantViolationError,
)
from dagster._core.event_api import EventRecordsFilter
from dagster._core.events import DagsterEventType
from dagster._core.event_api import AssetRecordsFilter
from dagster._core.instance import DagsterInstance, DynamicPartitionsStore
from dagster._core.storage.dagster_run import (
CANCELABLE_RUN_STATUSES,
Expand Down Expand Up @@ -79,7 +78,7 @@
from .backfill import PartitionBackfill

RUN_CHUNK_SIZE = 25

MATERIALIZATION_CHUNK_SIZE = 1000

MAX_RUNS_CANCELED_PER_ITERATION = 50

Expand Down Expand Up @@ -1112,29 +1111,34 @@ def get_asset_backfill_iteration_materialized_partitions(
"""
recently_materialized_asset_partitions = AssetGraphSubset()
for asset_key in asset_backfill_data.target_subset.asset_keys:
records = instance_queryer.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=asset_key,
after_cursor=asset_backfill_data.latest_storage_id,
cursor = None
has_more = True
while has_more:
result = instance_queryer.instance.fetch_materializations(
AssetRecordsFilter(
asset_key=asset_key,
after_storage_id=asset_backfill_data.latest_storage_id,
),
cursor=cursor,
limit=MATERIALIZATION_CHUNK_SIZE,
)
)
records_in_backfill = [
record
for record in records
if instance_queryer.run_has_tag(
run_id=record.run_id, tag_key=BACKFILL_ID_TAG, tag_value=backfill_id
records_in_backfill = [
record
for record in result.records
if instance_queryer.run_has_tag(
run_id=record.run_id, tag_key=BACKFILL_ID_TAG, tag_value=backfill_id
)
]
recently_materialized_asset_partitions |= AssetGraphSubset.from_asset_partition_set(
{
AssetKeyPartitionKey(asset_key, record.partition_key)
for record in records_in_backfill
},
asset_graph,
)
]
recently_materialized_asset_partitions |= AssetGraphSubset.from_asset_partition_set(
{
AssetKeyPartitionKey(asset_key, record.partition_key)
for record in records_in_backfill
},
asset_graph,
)

yield None
cursor = result.cursor
has_more = result.has_more
yield None

updated_materialized_subset = (
asset_backfill_data.materialized_subset | recently_materialized_asset_partitions
Expand Down

0 comments on commit da7848c

Please sign in to comment.