From 8a07033f1655b8ab0f55c3e5915d6c38cc161223 Mon Sep 17 00:00:00 2001 From: prha Date: Wed, 8 May 2024 15:52:13 -0700 Subject: [PATCH 1/6] remove get_event_records calls from graphql --- .../dagster_graphql/schema/asset_graph.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index b154fdd33de17..19c0a61dc6e4b 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -21,7 +21,7 @@ SensorType, ) from dagster._core.errors import DagsterInvariantViolationError -from dagster._core.event_api import EventRecordsFilter +from dagster._core.event_api import AssetRecordsFilter from dagster._core.events import DagsterEventType from dagster._core.remote_representation import CodeLocation, ExternalRepository from dagster._core.remote_representation.external import ExternalJob, ExternalSensor @@ -577,15 +577,14 @@ def resolve_assetMaterializationUsedData( instance=graphene_info.context.instance, asset_graph=asset_graph ) data_time_resolver = CachingDataTimeResolver(instance_queryer=instance_queryer) - event_records = instance.get_event_records( - EventRecordsFilter( - event_type=DagsterEventType.ASSET_MATERIALIZATION, + event_records = instance.fetch_materializations( + AssetRecordsFilter( + asset_key=asset_key, before_timestamp=int(timestampMillis) / 1000.0 + 1, after_timestamp=int(timestampMillis) / 1000.0 - 1, - asset_key=asset_key, ), limit=1, - ) + ).records if not event_records: return [] From a2fa956955fdda908cada915bb06cd1ab2fb601b Mon Sep 17 00:00:00 2001 From: prha Date: Wed, 8 May 2024 16:02:01 -0700 Subject: [PATCH 2/6] remove get_event_records calls - from caching instance queryer --- .../_utils/caching_instance_queryer.py | 53 +++++++++++-------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index 6b1e267a5f6b4..37eaccbcf8eb5 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -40,7 +40,7 @@ DagsterDefinitionChangedDeserializationError, DagsterInvalidDefinitionError, ) -from dagster._core.event_api import AssetRecordsFilter, EventRecordsFilter +from dagster._core.event_api import AssetRecordsFilter from dagster._core.events import DagsterEventType from dagster._core.instance import DagsterInstance, DynamicPartitionsStore from dagster._core.storage.batch_asset_record_loader import BatchAssetRecordLoader @@ -57,6 +57,8 @@ from dagster._core.storage.event_log.base import AssetRecord from dagster._core.storage.partition_status_cache import AssetStatusCacheValue +RECORD_BATCH_SIZE = 1000 + class CachingInstanceQueryer(DynamicPartitionsStore): """Provides utility functions for querying for asset-materialization related data from the @@ -232,18 +234,21 @@ def _get_latest_materialization_or_observation_record( return None return asset_record.asset_entry.last_materialization_record - records = self.instance.get_event_records( - EventRecordsFilter( - event_type=self._event_type_for_key(asset_partition.asset_key), - asset_key=asset_partition.asset_key, - asset_partitions=( - [asset_partition.partition_key] if asset_partition.partition_key else None - ), - before_cursor=before_cursor, + records_filter = AssetRecordsFilter( + asset_key=asset_partition.asset_key, + asset_partitions=( + [asset_partition.partition_key] if asset_partition.partition_key else None ), - ascending=False, - limit=1, + before_storage_id=before_cursor, ) + if self.asset_graph.get(asset_partition.asset_key).is_observable: + records = self.instance.fetch_observations( + records_filter, ascending=False, limit=1 + ).records + else: + records = self.instance.fetch_materializations( + records_filter, ascending=False, limit=1 + ).records return next(iter(records), None) @cached_method @@ -373,17 +378,21 @@ def next_version_record( after_cursor: Optional[int], data_version: Optional[DataVersion], ) -> Optional["EventLogRecord"]: - for record in self.instance.get_event_records( - EventRecordsFilter( - event_type=DagsterEventType.ASSET_OBSERVATION, - asset_key=asset_key, - after_cursor=after_cursor, - ), - ascending=True, - ): - record_version = extract_data_version_from_entry(record.event_log_entry) - if record_version is not None and record_version != data_version: - return record + has_more = True + cursor = None + while has_more: + result = self.instance.fetch_observations( + AssetRecordsFilter(asset_key=asset_key, after_storage_id=after_cursor), + limit=RECORD_BATCH_SIZE, + cursor=cursor, + ascending=True, + ) + has_more = result.has_more + cursor = result.cursor + for record in result.records: + record_version = extract_data_version_from_entry(record.event_log_entry) + if record_version is not None and record_version != data_version: + return record # no records found with a new data version return None From 2e0a9bf418d83ca86ba34901d60895fb5c62fdf8 Mon Sep 17 00:00:00 2001 From: prha Date: Wed, 8 May 2024 20:55:59 -0700 Subject: [PATCH 3/6] remove get_event_records calls from branching io manager --- .../storage/branching/branching_io_manager.py | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/python_modules/dagster/dagster/_core/storage/branching/branching_io_manager.py b/python_modules/dagster/dagster/_core/storage/branching/branching_io_manager.py index 42e7ef89a568a..b67e58e9baa55 100644 --- a/python_modules/dagster/dagster/_core/storage/branching/branching_io_manager.py +++ b/python_modules/dagster/dagster/_core/storage/branching/branching_io_manager.py @@ -7,8 +7,7 @@ ) from dagster._core.definitions.events import AssetKey, AssetMaterialization from dagster._core.definitions.metadata import TextMetadataValue -from dagster._core.event_api import EventRecordsFilter -from dagster._core.events import DagsterEventType +from dagster._core.event_api import AssetRecordsFilter from dagster._core.events.log import EventLogEntry from dagster._core.instance import DagsterInstance from dagster._core.storage.io_manager import IOManager @@ -22,16 +21,13 @@ def get_text_metadata_value(materialization: AssetMaterialization, key: str) -> def latest_materialization_log_entry( instance: DagsterInstance, asset_key: AssetKey, partition_key: Optional[str] = None ) -> Optional[EventLogEntry]: - event_records = [ - *instance.get_event_records( - event_records_filter=EventRecordsFilter( - event_type=DagsterEventType.ASSET_MATERIALIZATION, - asset_key=asset_key, - asset_partitions=[partition_key] if partition_key else None, - ), - limit=1, - ) - ] + event_records = instance.fetch_materializations( + AssetRecordsFilter( + asset_key=asset_key, + asset_partitions=[partition_key] if partition_key else None, + ), + limit=1, + ).records return event_records[0].event_log_entry if event_records else None From 07de82dbc99263351b785eb2b7d45f57b73f1112 Mon Sep 17 00:00:00 2001 From: prha Date: Wed, 8 May 2024 20:59:11 -0700 Subject: [PATCH 4/6] remove get_event_records calls from asset sensor definition --- .../_core/definitions/asset_sensor_definition.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_sensor_definition.py b/python_modules/dagster/dagster/_core/definitions/asset_sensor_definition.py index 580bd4c0b1fc2..7c2ffe78a8741 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_sensor_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_sensor_definition.py @@ -88,8 +88,7 @@ def __init__( ): self._asset_key = check.inst_param(asset_key, "asset_key", AssetKey) - from dagster._core.events import DagsterEventType - from dagster._core.storage.event_log.base import EventRecordsFilter + from dagster._core.event_api import AssetRecordsFilter resource_arg_names: Set[str] = { arg.name for arg in get_resource_args(asset_materialization_fn) @@ -109,15 +108,14 @@ def _fn(context) -> Any: except ValueError: after_cursor = None - event_records = context.instance.get_event_records( - EventRecordsFilter( - event_type=DagsterEventType.ASSET_MATERIALIZATION, + event_records = context.instance.fetch_materializations( + AssetRecordsFilter( asset_key=self._asset_key, - after_cursor=after_cursor, + after_storage_id=after_cursor, ), ascending=False, limit=1, - ) + ).records if not event_records: yield SkipReason( From d86c9a98382bf4aa9fd775e69f21595852086e30 Mon Sep 17 00:00:00 2001 From: prha Date: Wed, 8 May 2024 21:11:01 -0700 Subject: [PATCH 5/6] remove get_event_records calls from asset backfill --- .../dagster/_core/execution/asset_backfill.py | 52 ++++++++++--------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 88f23784c456f..918fda44661f2 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -49,8 +49,7 @@ DagsterDefinitionChangedDeserializationError, DagsterInvariantViolationError, ) -from dagster._core.event_api import EventRecordsFilter -from dagster._core.events import DagsterEventType +from dagster._core.event_api import AssetRecordsFilter from dagster._core.instance import DagsterInstance, DynamicPartitionsStore from dagster._core.storage.dagster_run import ( CANCELABLE_RUN_STATUSES, @@ -79,7 +78,7 @@ from .backfill import PartitionBackfill RUN_CHUNK_SIZE = 25 - +MATERIALIZATION_CHUNK_SIZE = 1000 MAX_RUNS_CANCELED_PER_ITERATION = 50 @@ -1112,29 +1111,34 @@ def get_asset_backfill_iteration_materialized_partitions( """ recently_materialized_asset_partitions = AssetGraphSubset() for asset_key in asset_backfill_data.target_subset.asset_keys: - records = instance_queryer.instance.get_event_records( - EventRecordsFilter( - event_type=DagsterEventType.ASSET_MATERIALIZATION, - asset_key=asset_key, - after_cursor=asset_backfill_data.latest_storage_id, + cursor = None + has_more = True + while has_more: + result = instance_queryer.instance.fetch_materializations( + AssetRecordsFilter( + asset_key=asset_key, + after_storage_id=asset_backfill_data.latest_storage_id, + ), + cursor=cursor, + limit=MATERIALIZATION_CHUNK_SIZE, ) - ) - records_in_backfill = [ - record - for record in records - if instance_queryer.run_has_tag( - run_id=record.run_id, tag_key=BACKFILL_ID_TAG, tag_value=backfill_id + records_in_backfill = [ + record + for record in result.records + if instance_queryer.run_has_tag( + run_id=record.run_id, tag_key=BACKFILL_ID_TAG, tag_value=backfill_id + ) + ] + recently_materialized_asset_partitions |= AssetGraphSubset.from_asset_partition_set( + { + AssetKeyPartitionKey(asset_key, record.partition_key) + for record in records_in_backfill + }, + asset_graph, ) - ] - recently_materialized_asset_partitions |= AssetGraphSubset.from_asset_partition_set( - { - AssetKeyPartitionKey(asset_key, record.partition_key) - for record in records_in_backfill - }, - asset_graph, - ) - - yield None + cursor = result.cursor + has_more = result.has_more + yield None updated_materialized_subset = ( asset_backfill_data.materialized_subset | recently_materialized_asset_partitions From 3d672fe6f11f291e808e58efc5fd3306430a733a Mon Sep 17 00:00:00 2001 From: prha Date: Wed, 8 May 2024 21:39:15 -0700 Subject: [PATCH 6/6] remove get_event_records calls from multi asset sensor --- .../multi_asset_sensor_definition.py | 112 ++++++++++-------- 1 file changed, 61 insertions(+), 51 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/multi_asset_sensor_definition.py b/python_modules/dagster/dagster/_core/definitions/multi_asset_sensor_definition.py index 3f13ea01caa61..4c5a48423a3be 100644 --- a/python_modules/dagster/dagster/_core/definitions/multi_asset_sensor_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/multi_asset_sensor_definition.py @@ -33,7 +33,7 @@ from dagster._core.instance import DagsterInstance from dagster._core.instance.ref import InstanceRef from dagster._utils import normalize_to_repository -from dagster._utils.warnings import normalize_renamed_param +from dagster._utils.warnings import deprecation_warning, normalize_renamed_param from .events import AssetKey from .run_request import RunRequest, SensorResult, SkipReason @@ -55,6 +55,7 @@ from dagster._core.storage.event_log.base import EventLogRecord MAX_NUM_UNCONSUMED_EVENTS = 25 +FETCH_MATERIALIZATION_BATCH_SIZE = 1000 class MultiAssetSensorAssetCursorComponent( @@ -296,8 +297,7 @@ def __init__( ) def _cache_initial_unconsumed_events(self) -> None: - from dagster._core.events import DagsterEventType - from dagster._core.storage.event_log.base import EventRecordsFilter + from dagster._core.event_api import AssetRecordsFilter # This method caches the initial unconsumed events for each asset key. To generate the # current unconsumed events, call get_trailing_unconsumed_events instead. @@ -309,12 +309,13 @@ def _cache_initial_unconsumed_events(self) -> None: self._get_cursor(asset_key).trailing_unconsumed_partitioned_event_ids.values() ) if unconsumed_event_ids: - event_records = self.instance.get_event_records( - EventRecordsFilter( - event_type=DagsterEventType.ASSET_MATERIALIZATION, + event_records = self.instance.fetch_materializations( + AssetRecordsFilter( + asset_key=asset_key, storage_ids=unconsumed_event_ids, - ) - ) + ), + limit=len(unconsumed_event_ids), + ).records self._initial_unconsumed_events_by_id.update( {event_record.storage_id: event_record for event_record in event_records} ) @@ -440,26 +441,40 @@ def materialization_records_for_key( asset_key (AssetKey): The asset to fetch materialization events for limit (Optional[int]): The number of events to fetch """ - from dagster._core.events import DagsterEventType - from dagster._core.storage.event_log.base import EventRecordsFilter + from dagster._core.event_api import AssetRecordsFilter asset_key = check.inst_param(asset_key, "asset_key", AssetKey) if asset_key not in self._assets_by_key: raise DagsterInvalidInvocationError(f"Asset key {asset_key} not monitored by sensor.") - events = list( - self.instance.get_event_records( - EventRecordsFilter( - event_type=DagsterEventType.ASSET_MATERIALIZATION, - asset_key=asset_key, - after_cursor=self._get_cursor(asset_key).latest_consumed_event_id, - ), - ascending=True, - limit=limit, - ) - ) + if not limit: + deprecation_warning("Calling materialization_records_for_key without a limit", "1.8") + records = [] + has_more = True + cursor = None + while has_more: + result = self.instance.fetch_materializations( + AssetRecordsFilter( + asset_key=asset_key, + after_storage_id=self._get_cursor(asset_key).latest_consumed_event_id, + ), + ascending=True, + limit=FETCH_MATERIALIZATION_BATCH_SIZE, + cursor=cursor, + ) + cursor = result.cursor + has_more = result.has_more + records.extend(result.records) + return records - return events + return self.instance.fetch_materializations( + AssetRecordsFilter( + asset_key=asset_key, + after_storage_id=self._get_cursor(asset_key).latest_consumed_event_id, + ), + ascending=True, + limit=limit, + ).records def _get_cursor(self, asset_key: AssetKey) -> MultiAssetSensorAssetCursorComponent: """Returns the MultiAssetSensorAssetCursorComponent for the asset key. @@ -506,8 +521,7 @@ def my_sensor(context): # returns {"2022-07-05": EventLogRecord(...)} """ - from dagster._core.events import DagsterEventType - from dagster._core.storage.event_log.base import EventLogRecord, EventRecordsFilter + from dagster._core.event_api import AssetRecordsFilter, EventLogRecord asset_key = check.inst_param(asset_key, "asset_key", AssetKey) @@ -547,26 +561,31 @@ def my_sensor(context): # Add partition and materialization to the end of the OrderedDict materialization_by_partition[partition] = unconsumed_event - partition_materializations = self.instance.get_event_records( - EventRecordsFilter( - event_type=DagsterEventType.ASSET_MATERIALIZATION, - asset_key=asset_key, - asset_partitions=partitions_to_fetch, - after_cursor=self._get_cursor(asset_key).latest_consumed_event_id, - ), - ascending=True, - ) - for materialization in partition_materializations: - partition = materialization.partition_key + has_more = True + cursor = None + while has_more: + result = self.instance.fetch_materializations( + AssetRecordsFilter( + asset_key=asset_key, + asset_partitions=partitions_to_fetch, + after_storage_id=self._get_cursor(asset_key).latest_consumed_event_id, + ), + ascending=True, + limit=FETCH_MATERIALIZATION_BATCH_SIZE, + cursor=cursor, + ) + cursor = result.cursor + has_more = result.has_more + for materialization in result.records: + if not isinstance(materialization.partition_key, str): + continue - if isinstance(partition, str): - if partition in materialization_by_partition: + if materialization.partition_key in materialization_by_partition: # Remove partition to ensure materialization_by_partition preserves # the order of materializations - materialization_by_partition.pop(partition) + materialization_by_partition.pop(materialization.partition_key) # Add partition and materialization to the end of the OrderedDict - materialization_by_partition[partition] = materialization - + materialization_by_partition[materialization.partition_key] = materialization return materialization_by_partition @public @@ -929,21 +948,12 @@ def get_asset_cursor_with_advances( def get_cursor_from_latest_materializations( asset_keys: Sequence[AssetKey], instance: DagsterInstance ) -> str: - from dagster._core.events import DagsterEventType - from dagster._core.storage.event_log.base import EventRecordsFilter - cursor_dict: Dict[str, MultiAssetSensorAssetCursorComponent] = {} for asset_key in asset_keys: - materializations = instance.get_event_records( - EventRecordsFilter( - DagsterEventType.ASSET_MATERIALIZATION, - asset_key=asset_key, - ), - limit=1, - ) + materializations = instance.fetch_materializations(asset_key, limit=1).records if materializations: - last_materialization = list(materializations)[-1] + last_materialization = materializations[0] cursor_dict[str(asset_key)] = MultiAssetSensorAssetCursorComponent( last_materialization.partition_key,