Skip to content

Commit

Permalink
Event log methods for event log consumer (#8253)
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm committed Jun 13, 2022
1 parent 2e46b28 commit 233575a
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 2 deletions.
16 changes: 16 additions & 0 deletions python_modules/dagster/dagster/core/storage/event_log/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,22 @@ def get_event_records(
) -> Iterable[EventLogRecord]:
pass

def supports_event_consumer_queries(self) -> bool:
return False

def get_logs_for_all_runs_by_log_id(
self,
after_cursor: int = -1,
dagster_event_type: Optional[Union[DagsterEventType, Set[DagsterEventType]]] = None,
limit: Optional[int] = None,
) -> Mapping[int, EventLogEntry]:
"""Get event records across all runs. Only supported for non sharded sql storage"""
raise NotImplementedError()

def get_maximum_record_id(self) -> Optional[int]:
"""Get the current greatest record id in the event log. Only supported for non sharded sql storage"""
raise NotImplementedError()

@abstractmethod
def get_asset_records(
self, asset_keys: Optional[Sequence[AssetKey]] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from abc import abstractmethod
from collections import OrderedDict
from datetime import datetime
from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence, cast
from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence, Set, Union, cast

import pendulum
import sqlalchemy as db
Expand Down Expand Up @@ -710,6 +710,64 @@ def get_event_records(

return event_records

def supports_event_consumer_queries(self):
return True

def get_logs_for_all_runs_by_log_id(
self,
after_cursor: int = -1,
dagster_event_type: Optional[Union[DagsterEventType, Set[DagsterEventType]]] = None,
limit: Optional[int] = None,
) -> Mapping[int, EventLogEntry]:
check.int_param(after_cursor, "after_cursor")
check.invariant(
after_cursor >= -1,
f"Don't know what to do with negative cursor {after_cursor}",
)
dagster_event_types = (
{dagster_event_type}
if isinstance(dagster_event_type, DagsterEventType)
else check.opt_set_param(
dagster_event_type, "dagster_event_type", of_type=DagsterEventType
)
)

query = (
db.select([SqlEventLogStorageTable.c.id, SqlEventLogStorageTable.c.event])
.where(SqlEventLogStorageTable.c.id > after_cursor)
.order_by(SqlEventLogStorageTable.c.id.asc())
)

if dagster_event_types:
query = query.where(
SqlEventLogStorageTable.c.dagster_event_type.in_(
[dagster_event_type.value for dagster_event_type in dagster_event_types]
)
)

if limit:
query = query.limit(limit)

with self.index_connection() as conn:
results = conn.execute(query).fetchall()

events = {}
try:
for (
record_id,
json_str,
) in results:
events[record_id] = deserialize_as(json_str, EventLogEntry)
except (seven.JSONDecodeError, check.CheckError):
logging.warning("Could not parse event record id `%s`.", record_id)

return events

def get_maximum_record_id(self) -> Optional[int]:
with self.index_connection() as conn:
result = conn.execute(db.select([db.func.max(SqlEventLogStorageTable.c.id)])).fetchone()
return result[0]

def _construct_asset_record_from_row(self, row, last_materialization: Optional[EventLogEntry]):
asset_key = AssetKey.from_db_string(row[1])
if asset_key:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,9 @@ def get_event_records(

return event_records[:limit]

def supports_event_consumer_queries(self):
return False

def delete_events(self, run_id):
with self.run_connection(run_id) as conn:
self.delete_events_for_run(conn, run_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from collections import Counter
from contextlib import ExitStack, contextmanager
from typing import List, Optional
from typing import List, Optional, cast

import mock
import pendulum
Expand Down Expand Up @@ -2089,3 +2089,162 @@ def never_materializes_asset():
records = storage.get_asset_records([asset_key])
assert len(records) == 1
assert result.run_id == records[0].asset_entry.last_run_id

def test_get_logs_for_all_runs_by_log_id_of_type(self, storage):
if not storage.supports_event_consumer_queries():
pytest.skip("storage does not support event consumer queries")

@op
def return_one(_):
return 1

def _ops():
return_one()

for _ in range(2):
events, _ = _synthesize_events(_ops)
for event in events:
storage.store_event(event)

assert _event_types(
storage.get_logs_for_all_runs_by_log_id(
dagster_event_type=DagsterEventType.PIPELINE_SUCCESS,
).values()
) == [DagsterEventType.PIPELINE_SUCCESS, DagsterEventType.PIPELINE_SUCCESS]

assert _event_types(
storage.get_logs_for_all_runs_by_log_id(
dagster_event_type=DagsterEventType.STEP_SUCCESS,
).values()
) == [DagsterEventType.STEP_SUCCESS, DagsterEventType.STEP_SUCCESS]

assert _event_types(
storage.get_logs_for_all_runs_by_log_id(
dagster_event_type={
DagsterEventType.STEP_SUCCESS,
DagsterEventType.PIPELINE_SUCCESS,
},
).values()
) == [
DagsterEventType.STEP_SUCCESS,
DagsterEventType.PIPELINE_SUCCESS,
DagsterEventType.STEP_SUCCESS,
DagsterEventType.PIPELINE_SUCCESS,
]

def test_get_logs_for_all_runs_by_log_id_cursor(self, storage):
if not storage.supports_event_consumer_queries():
pytest.skip("storage does not support event consumer queries")

@op
def return_one(_):
return 1

def _ops():
return_one()

for _ in range(2):
events, _ = _synthesize_events(_ops)
for event in events:
storage.store_event(event)

events_by_log_id = storage.get_logs_for_all_runs_by_log_id(
dagster_event_type={
DagsterEventType.STEP_SUCCESS,
DagsterEventType.PIPELINE_SUCCESS,
},
)

assert _event_types(events_by_log_id.values()) == [
DagsterEventType.STEP_SUCCESS,
DagsterEventType.PIPELINE_SUCCESS,
DagsterEventType.STEP_SUCCESS,
DagsterEventType.PIPELINE_SUCCESS,
]

after_cursor_events_by_log_id = storage.get_logs_for_all_runs_by_log_id(
after_cursor=min(events_by_log_id.keys()),
dagster_event_type={
DagsterEventType.STEP_SUCCESS,
DagsterEventType.PIPELINE_SUCCESS,
},
)

assert _event_types(after_cursor_events_by_log_id.values()) == [
DagsterEventType.PIPELINE_SUCCESS,
DagsterEventType.STEP_SUCCESS,
DagsterEventType.PIPELINE_SUCCESS,
]

def test_get_logs_for_all_runs_by_log_id_limit(self, storage):
if not storage.supports_event_consumer_queries():
pytest.skip("storage does not support event consumer queries")

@op
def return_one(_):
return 1

def _ops():
return_one()

for _ in range(2):
events, _ = _synthesize_events(_ops)
for event in events:
storage.store_event(event)

events_by_log_id = storage.get_logs_for_all_runs_by_log_id(
dagster_event_type={
DagsterEventType.STEP_SUCCESS,
DagsterEventType.PIPELINE_SUCCESS,
},
limit=3,
)

assert _event_types(events_by_log_id.values()) == [
DagsterEventType.STEP_SUCCESS,
DagsterEventType.PIPELINE_SUCCESS,
DagsterEventType.STEP_SUCCESS,
]

def test_get_maximum_record_id(self, storage):
if not storage.supports_event_consumer_queries():
pytest.skip("storage does not support event consumer queries")

storage.wipe()
assert storage.get_maximum_record_id() == None

storage.store_event(
EventLogEntry(
error_info=None,
level="debug",
user_message="",
run_id="foo_run",
timestamp=time.time(),
dagster_event=DagsterEvent(
DagsterEventType.ENGINE_EVENT.value,
"nonce",
event_specific_data=EngineEventData.in_process(999),
),
)
)

index = cast(int, storage.get_maximum_record_id())
assert isinstance(index, int)

for i in range(10):
storage.store_event(
EventLogEntry(
error_info=None,
level="debug",
user_message="",
run_id=f"foo_run_{i}",
timestamp=time.time(),
dagster_event=DagsterEvent(
DagsterEventType.ENGINE_EVENT.value,
"nonce",
event_specific_data=EngineEventData.in_process(999),
),
)
)

assert storage.get_maximum_record_id() == index + 10

0 comments on commit 233575a

Please sign in to comment.