Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove get_event_records calls from multi asset sensor #21784

Merged
merged 6 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
SensorType,
)
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.event_api import EventRecordsFilter
from dagster._core.event_api import AssetRecordsFilter
from dagster._core.events import DagsterEventType
from dagster._core.remote_representation import CodeLocation, ExternalRepository
from dagster._core.remote_representation.external import ExternalJob, ExternalSensor
Expand Down Expand Up @@ -577,15 +577,14 @@ def resolve_assetMaterializationUsedData(
instance=graphene_info.context.instance, asset_graph=asset_graph
)
data_time_resolver = CachingDataTimeResolver(instance_queryer=instance_queryer)
event_records = instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
event_records = instance.fetch_materializations(
AssetRecordsFilter(
asset_key=asset_key,
before_timestamp=int(timestampMillis) / 1000.0 + 1,
after_timestamp=int(timestampMillis) / 1000.0 - 1,
asset_key=asset_key,
),
limit=1,
)
).records

if not event_records:
return []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ def __init__(
):
self._asset_key = check.inst_param(asset_key, "asset_key", AssetKey)

from dagster._core.events import DagsterEventType
from dagster._core.storage.event_log.base import EventRecordsFilter
from dagster._core.event_api import AssetRecordsFilter

resource_arg_names: Set[str] = {
arg.name for arg in get_resource_args(asset_materialization_fn)
Expand All @@ -109,15 +108,14 @@ def _fn(context) -> Any:
except ValueError:
after_cursor = None

event_records = context.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
event_records = context.instance.fetch_materializations(
AssetRecordsFilter(
asset_key=self._asset_key,
after_cursor=after_cursor,
after_storage_id=after_cursor,
),
ascending=False,
limit=1,
)
).records

if not event_records:
yield SkipReason(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from dagster._core.instance import DagsterInstance
from dagster._core.instance.ref import InstanceRef
from dagster._utils import normalize_to_repository
from dagster._utils.warnings import normalize_renamed_param
from dagster._utils.warnings import deprecation_warning, normalize_renamed_param

from .events import AssetKey
from .run_request import RunRequest, SensorResult, SkipReason
Expand All @@ -55,6 +55,7 @@
from dagster._core.storage.event_log.base import EventLogRecord

MAX_NUM_UNCONSUMED_EVENTS = 25
FETCH_MATERIALIZATION_BATCH_SIZE = 1000


class MultiAssetSensorAssetCursorComponent(
Expand Down Expand Up @@ -296,8 +297,7 @@ def __init__(
)

def _cache_initial_unconsumed_events(self) -> None:
from dagster._core.events import DagsterEventType
from dagster._core.storage.event_log.base import EventRecordsFilter
from dagster._core.event_api import AssetRecordsFilter

# This method caches the initial unconsumed events for each asset key. To generate the
# current unconsumed events, call get_trailing_unconsumed_events instead.
Expand All @@ -309,12 +309,13 @@ def _cache_initial_unconsumed_events(self) -> None:
self._get_cursor(asset_key).trailing_unconsumed_partitioned_event_ids.values()
)
if unconsumed_event_ids:
event_records = self.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
event_records = self.instance.fetch_materializations(
AssetRecordsFilter(
asset_key=asset_key,
storage_ids=unconsumed_event_ids,
)
)
),
limit=len(unconsumed_event_ids),
).records
self._initial_unconsumed_events_by_id.update(
{event_record.storage_id: event_record for event_record in event_records}
)
Expand Down Expand Up @@ -440,26 +441,40 @@ def materialization_records_for_key(
asset_key (AssetKey): The asset to fetch materialization events for
limit (Optional[int]): The number of events to fetch
"""
from dagster._core.events import DagsterEventType
from dagster._core.storage.event_log.base import EventRecordsFilter
from dagster._core.event_api import AssetRecordsFilter

asset_key = check.inst_param(asset_key, "asset_key", AssetKey)
if asset_key not in self._assets_by_key:
raise DagsterInvalidInvocationError(f"Asset key {asset_key} not monitored by sensor.")

events = list(
self.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=asset_key,
after_cursor=self._get_cursor(asset_key).latest_consumed_event_id,
),
ascending=True,
limit=limit,
)
)
if not limit:
deprecation_warning("Calling materialization_records_for_key without a limit", "1.8")
records = []
has_more = True
cursor = None
while has_more:
result = self.instance.fetch_materializations(
AssetRecordsFilter(
asset_key=asset_key,
after_storage_id=self._get_cursor(asset_key).latest_consumed_event_id,
),
ascending=True,
limit=FETCH_MATERIALIZATION_BATCH_SIZE,
cursor=cursor,
)
cursor = result.cursor
has_more = result.has_more
records.extend(result.records)
return records

return events
return self.instance.fetch_materializations(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not blocking, but there's probably a way to avoid duplicate code here, e.g. by tracking the total number of records we've fetched so far across all iterations and seeing when it reaches the user-provided limit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah... this is the PR that I think could be rewritten the most. I haven't quite grokked all the fetching patterns - it feels like there's a better way to do this.

Also, it feels like we should revisit some of the APIs that we expose here to have better data fetching hygiene. Probably not the highest priority item, but if you had time to pair on this, I'd definitely be willing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't be surprised if we remove multi-asset sensor after auto-materialize policies support user-defined functions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Which is to say I think it's probably not worth putting too much effort here)

AssetRecordsFilter(
asset_key=asset_key,
after_storage_id=self._get_cursor(asset_key).latest_consumed_event_id,
),
ascending=True,
limit=limit,
).records

def _get_cursor(self, asset_key: AssetKey) -> MultiAssetSensorAssetCursorComponent:
"""Returns the MultiAssetSensorAssetCursorComponent for the asset key.
Expand Down Expand Up @@ -506,8 +521,7 @@ def my_sensor(context):
# returns {"2022-07-05": EventLogRecord(...)}

"""
from dagster._core.events import DagsterEventType
from dagster._core.storage.event_log.base import EventLogRecord, EventRecordsFilter
from dagster._core.event_api import AssetRecordsFilter, EventLogRecord

asset_key = check.inst_param(asset_key, "asset_key", AssetKey)

Expand Down Expand Up @@ -547,26 +561,31 @@ def my_sensor(context):
# Add partition and materialization to the end of the OrderedDict
materialization_by_partition[partition] = unconsumed_event

partition_materializations = self.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=asset_key,
asset_partitions=partitions_to_fetch,
after_cursor=self._get_cursor(asset_key).latest_consumed_event_id,
),
ascending=True,
)
for materialization in partition_materializations:
partition = materialization.partition_key
has_more = True
cursor = None
while has_more:
result = self.instance.fetch_materializations(
AssetRecordsFilter(
asset_key=asset_key,
asset_partitions=partitions_to_fetch,
after_storage_id=self._get_cursor(asset_key).latest_consumed_event_id,
),
ascending=True,
limit=FETCH_MATERIALIZATION_BATCH_SIZE,
cursor=cursor,
)
cursor = result.cursor
has_more = result.has_more
for materialization in result.records:
if not isinstance(materialization.partition_key, str):
continue

if isinstance(partition, str):
if partition in materialization_by_partition:
if materialization.partition_key in materialization_by_partition:
# Remove partition to ensure materialization_by_partition preserves
# the order of materializations
materialization_by_partition.pop(partition)
materialization_by_partition.pop(materialization.partition_key)
# Add partition and materialization to the end of the OrderedDict
materialization_by_partition[partition] = materialization

materialization_by_partition[materialization.partition_key] = materialization
return materialization_by_partition

@public
Expand Down Expand Up @@ -929,21 +948,12 @@ def get_asset_cursor_with_advances(
def get_cursor_from_latest_materializations(
asset_keys: Sequence[AssetKey], instance: DagsterInstance
) -> str:
from dagster._core.events import DagsterEventType
from dagster._core.storage.event_log.base import EventRecordsFilter

cursor_dict: Dict[str, MultiAssetSensorAssetCursorComponent] = {}

for asset_key in asset_keys:
materializations = instance.get_event_records(
EventRecordsFilter(
DagsterEventType.ASSET_MATERIALIZATION,
asset_key=asset_key,
),
limit=1,
)
materializations = instance.fetch_materializations(asset_key, limit=1).records
if materializations:
last_materialization = list(materializations)[-1]
last_materialization = materializations[0]

cursor_dict[str(asset_key)] = MultiAssetSensorAssetCursorComponent(
last_materialization.partition_key,
Expand Down
52 changes: 28 additions & 24 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@
DagsterDefinitionChangedDeserializationError,
DagsterInvariantViolationError,
)
from dagster._core.event_api import EventRecordsFilter
from dagster._core.events import DagsterEventType
from dagster._core.event_api import AssetRecordsFilter
from dagster._core.instance import DagsterInstance, DynamicPartitionsStore
from dagster._core.storage.dagster_run import (
CANCELABLE_RUN_STATUSES,
Expand Down Expand Up @@ -79,7 +78,7 @@
from .backfill import PartitionBackfill

RUN_CHUNK_SIZE = 25

MATERIALIZATION_CHUNK_SIZE = 1000

MAX_RUNS_CANCELED_PER_ITERATION = 50

Expand Down Expand Up @@ -1112,29 +1111,34 @@ def get_asset_backfill_iteration_materialized_partitions(
"""
recently_materialized_asset_partitions = AssetGraphSubset()
for asset_key in asset_backfill_data.target_subset.asset_keys:
records = instance_queryer.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=asset_key,
after_cursor=asset_backfill_data.latest_storage_id,
cursor = None
has_more = True
while has_more:
result = instance_queryer.instance.fetch_materializations(
AssetRecordsFilter(
asset_key=asset_key,
after_storage_id=asset_backfill_data.latest_storage_id,
),
cursor=cursor,
limit=MATERIALIZATION_CHUNK_SIZE,
)
)
records_in_backfill = [
record
for record in records
if instance_queryer.run_has_tag(
run_id=record.run_id, tag_key=BACKFILL_ID_TAG, tag_value=backfill_id
records_in_backfill = [
record
for record in result.records
if instance_queryer.run_has_tag(
run_id=record.run_id, tag_key=BACKFILL_ID_TAG, tag_value=backfill_id
)
]
recently_materialized_asset_partitions |= AssetGraphSubset.from_asset_partition_set(
{
AssetKeyPartitionKey(asset_key, record.partition_key)
for record in records_in_backfill
},
asset_graph,
)
]
recently_materialized_asset_partitions |= AssetGraphSubset.from_asset_partition_set(
{
AssetKeyPartitionKey(asset_key, record.partition_key)
for record in records_in_backfill
},
asset_graph,
)

yield None
cursor = result.cursor
has_more = result.has_more
yield None

updated_materialized_subset = (
asset_backfill_data.materialized_subset | recently_materialized_asset_partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
)
from dagster._core.definitions.events import AssetKey, AssetMaterialization
from dagster._core.definitions.metadata import TextMetadataValue
from dagster._core.event_api import EventRecordsFilter
from dagster._core.events import DagsterEventType
from dagster._core.event_api import AssetRecordsFilter
from dagster._core.events.log import EventLogEntry
from dagster._core.instance import DagsterInstance
from dagster._core.storage.io_manager import IOManager
Expand All @@ -22,16 +21,13 @@ def get_text_metadata_value(materialization: AssetMaterialization, key: str) ->
def latest_materialization_log_entry(
instance: DagsterInstance, asset_key: AssetKey, partition_key: Optional[str] = None
) -> Optional[EventLogEntry]:
event_records = [
*instance.get_event_records(
event_records_filter=EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=asset_key,
asset_partitions=[partition_key] if partition_key else None,
),
limit=1,
)
]
event_records = instance.fetch_materializations(
AssetRecordsFilter(
asset_key=asset_key,
asset_partitions=[partition_key] if partition_key else None,
),
limit=1,
).records
return event_records[0].event_log_entry if event_records else None


Expand Down