Skip to content

Commit

Permalink
[instance] rm events_for_asset_key and get_asset_events (#7602)
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Jun 6, 2022
1 parent ad8a0a2 commit b89f4d0
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 293 deletions.
44 changes: 0 additions & 44 deletions python_modules/dagster/dagster/core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1382,50 +1382,6 @@ def get_asset_records(
) -> Iterable["AssetRecord"]:
return self._event_storage.get_asset_records(asset_keys)

@traced
def events_for_asset_key(
self,
asset_key,
partitions=None,
before_cursor=None,
after_cursor=None,
cursor=None,
before_timestamp=None,
limit=None,
ascending=False,
):
check.inst_param(asset_key, "asset_key", AssetKey)

warnings.warn(
"""
The method `events_for_asset_key` on DagsterInstance has been deprecated as of `0.12.0` in favor of
the method `get_event_records`. The method `get_event_records` takes in an `EventRecordsFilter`
argument that allows for filtering by asset key and asset key partitions. The return value is a
list of `EventLogRecord` objects, each of which contains a storage_id and an event log entry.
Example:
records = instance.get_event_records(
EventRecordsFilter(
asset_key=asset_key,
asset_partitions=partitions,
after_cursor=after_cursor,
),
)
"""
)

return self._event_storage.get_asset_events(
asset_key,
partitions,
before_cursor,
after_cursor,
limit,
before_timestamp=before_timestamp,
ascending=ascending,
include_cursor=True,
cursor=cursor,
)

@traced
def run_ids_for_asset_key(self, asset_key):
check.inst_param(asset_key, "asset_key", AssetKey)
Expand Down
53 changes: 1 addition & 52 deletions python_modules/dagster/dagster/core/storage/event_log/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,7 @@
from abc import ABC, abstractmethod
from datetime import datetime
from enum import Enum
from typing import (
Callable,
Iterable,
List,
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
Tuple,
Union,
)
from typing import Callable, Iterable, List, Mapping, NamedTuple, Optional, Sequence, Set, Union

import dagster._check as check
from dagster.core.assets import AssetDetails
Expand Down Expand Up @@ -375,21 +364,6 @@ def get_latest_materialization_events(
) -> Mapping[AssetKey, Optional[EventLogEntry]]:
pass

@abstractmethod
def get_asset_events(
self,
asset_key: AssetKey,
partitions: Optional[List[str]] = None,
before_cursor: Optional[int] = None,
after_cursor: Optional[int] = None,
limit: Optional[int] = None,
ascending: bool = False,
include_cursor: bool = False,
before_timestamp=None,
cursor: Optional[int] = None, # deprecated
) -> Union[Iterable[EventLogEntry], Iterable[Tuple[int, EventLogEntry]]]:
pass

@abstractmethod
def get_asset_run_ids(self, asset_key: AssetKey) -> Iterable[str]:
pass
Expand All @@ -406,28 +380,3 @@ def get_materialization_count_by_partition(

def alembic_version(self):
return None


def extract_asset_events_cursor(cursor, before_cursor, after_cursor, ascending):
if cursor:
warnings.warn(
"Parameter `cursor` is a deprecated for `get_asset_events`. Use `before_cursor` or `after_cursor` instead"
)
if ascending and after_cursor is None:
after_cursor = cursor
if not ascending and before_cursor is None:
before_cursor = cursor

if after_cursor is not None:
try:
after_cursor = int(after_cursor)
except ValueError:
after_cursor = None

if before_cursor is not None:
try:
before_cursor = int(before_cursor)
except ValueError:
before_cursor = None

return before_cursor, after_cursor
33 changes: 0 additions & 33 deletions python_modules/dagster/dagster/core/storage/event_log/in_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
EventLogStorage,
EventRecordsFilter,
RunShardedEventsCursor,
extract_asset_events_cursor,
)


Expand Down Expand Up @@ -359,38 +358,6 @@ def get_latest_materialization_events(

return materializations_by_key

def get_asset_events(
self,
asset_key,
partitions=None,
before_cursor=None,
after_cursor=None,
limit=None,
ascending=False,
include_cursor=False,
before_timestamp=None,
cursor=None,
):
before_cursor, after_cursor = extract_asset_events_cursor(
cursor, before_cursor, after_cursor, ascending
)
event_records = self.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=asset_key,
asset_partitions=partitions,
before_cursor=before_cursor,
after_cursor=after_cursor,
before_timestamp=before_timestamp,
),
limit=limit,
ascending=ascending,
)
if include_cursor:
return [tuple([record.storage_id, record.event_log_entry]) for record in event_records]
else:
return [record.event_log_entry for record in event_records]

def get_asset_run_ids(self, asset_key):
asset_run_ids = set()
for run_id, records in self._logs.items():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
EventLogStorage,
EventRecordsFilter,
RunShardedEventsCursor,
extract_asset_events_cursor,
)
from .migration import ASSET_DATA_MIGRATIONS, ASSET_KEY_INDEX_COLS, EVENT_LOG_DATA_MIGRATIONS
from .schema import AssetKeyTable, SecondaryIndexMigrationTable, SqlEventLogStorageTable
Expand Down Expand Up @@ -1085,40 +1084,6 @@ def _add_assets_wipe_filter_to_query(

return query

def get_asset_events(
self,
asset_key,
partitions=None,
before_cursor=None,
after_cursor=None,
limit=None,
ascending=False,
include_cursor=False, # deprecated
before_timestamp=None,
cursor=None, # deprecated
):
check.inst_param(asset_key, "asset_key", AssetKey)
check.opt_list_param(partitions, "partitions", of_type=str)
before_cursor, after_cursor = extract_asset_events_cursor(
cursor, before_cursor, after_cursor, ascending
)
event_records = self.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=asset_key,
asset_partitions=partitions,
before_cursor=before_cursor,
after_cursor=after_cursor,
before_timestamp=before_timestamp,
),
limit=limit,
ascending=ascending,
)
if include_cursor:
return [tuple([record.storage_id, record.event_log_entry]) for record in event_records]
else:
return [record.event_log_entry for record in event_records]

def get_asset_run_ids(self, asset_key):
check.inst_param(asset_key, "asset_key", AssetKey)
query = (
Expand Down

0 comments on commit b89f4d0

Please sign in to comment.