Skip to content

Commit

Permalink
Speed up get_assets_for_run_id when there are a lot of events (#6735)
Browse files Browse the repository at this point in the history
Summary:
Current impl of this method gets every log and filters them down for events that happen to contain asset materializations. This is potentially very expensive if there are a lot of non-system logs though. Instead, use our index to filter down to events with a particular event type.

I think to make this method truly efficient we are going to need a method on run_id + event_type though...
  • Loading branch information
gibsondan committed Feb 22, 2022
1 parent f55ef00 commit 8cba839
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dagster import AssetKey, DagsterEventType, EventRecordsFilter, check, seven
from dagster.core.events import ASSET_EVENTS

from .utils import capture_error

Expand Down Expand Up @@ -160,7 +161,7 @@ def get_assets_for_run_id(graphene_info, run_id):

check.str_param(run_id, "run_id")

records = graphene_info.context.instance.all_logs(run_id)
records = graphene_info.context.instance.all_logs(run_id, of_type=ASSET_EVENTS)
asset_keys = [
record.dagster_event.asset_key
for record in records
Expand Down
5 changes: 5 additions & 0 deletions python_modules/dagster/dagster/core/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ class DagsterEventType(Enum):

PIPELINE_RUN_STATUS_TO_EVENT_TYPE = {v: k for k, v in EVENT_TYPE_TO_PIPELINE_RUN_STATUS.items()}

ASSET_EVENTS = {
DagsterEventType.ASSET_MATERIALIZATION,
DagsterEventType.ASSET_OBSERVATION,
}


def _assert_type(
method: str, expected_type: DagsterEventType, actual_type: DagsterEventType
Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1133,7 +1133,7 @@ def logs_after(
)

@traced
def all_logs(self, run_id, of_type: "DagsterEventType" = None):
def all_logs(self, run_id, of_type: Union["DagsterEventType", Set["DagsterEventType"]] = None):
return self._event_storage.get_logs_for_run(run_id, of_type=of_type)

def watch_event_logs(self, run_id, cursor, cb):
Expand Down
15 changes: 13 additions & 2 deletions python_modules/dagster/dagster/core/storage/event_log/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
import warnings
from abc import ABC, abstractmethod, abstractproperty
from datetime import datetime
from typing import Callable, Iterable, List, Mapping, NamedTuple, Optional, Sequence, Tuple, Union
from typing import (
Callable,
Iterable,
List,
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
Tuple,
Union,
)

from dagster import check
from dagster.core.definitions.events import AssetKey
Expand Down Expand Up @@ -118,7 +129,7 @@ def get_logs_for_run(
self,
run_id: str,
cursor: Optional[int] = -1,
of_type: Optional[DagsterEventType] = None,
of_type: Optional[Union[DagsterEventType, Set[DagsterEventType]]] = None,
limit: Optional[int] = None,
) -> Iterable[EventLogEntry]:
"""Get all of the logs corresponding to a run.
Expand Down
23 changes: 19 additions & 4 deletions python_modules/dagster/dagster/core/storage/event_log/in_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,29 @@ def get_logs_for_run(
cursor >= -1,
"Don't know what to do with negative cursor {cursor}".format(cursor=cursor),
)
check.opt_inst_param(of_type, "of_type", DagsterEventType)

of_types = (
(
{of_type.value}
if isinstance(of_type, DagsterEventType)
else (
{
dagster_event_type.value
for dagster_event_type in check.set_param(
of_type, "of_type", DagsterEventType
)
}
)
)
if of_type
else None
)

cursor = cursor + 1
if of_type:
if of_types:
events = list(
filter(
lambda r: r.is_dagster_event
and r.dagster_event.event_type_value == of_type.value,
lambda r: r.is_dagster_event and r.dagster_event.event_type_value in of_types,
self._logs[run_id][cursor:],
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,16 +202,25 @@ def get_logs_for_run_by_log_id(
cursor >= -1,
"Don't know what to do with negative cursor {cursor}".format(cursor=cursor),
)
check.opt_inst_param(dagster_event_type, "dagster_event_type", DagsterEventType)

dagster_event_types = (
{dagster_event_type}
if isinstance(dagster_event_type, DagsterEventType)
else check.opt_set_param(
dagster_event_type, "dagster_event_type", of_type=DagsterEventType
)
)

query = (
db.select([SqlEventLogStorageTable.c.id, SqlEventLogStorageTable.c.event])
.where(SqlEventLogStorageTable.c.run_id == run_id)
.order_by(SqlEventLogStorageTable.c.id.asc())
)
if dagster_event_type:
query = query.where(
SqlEventLogStorageTable.c.dagster_event_type == dagster_event_type.value
if dagster_event_types:
query = query.filter(
SqlEventLogStorageTable.c.dagster_event_type.in_(
[dagster_event_type.value for dagster_event_type in dagster_event_types]
)
)

# adjust 0 based index cursor to SQL offset
Expand Down Expand Up @@ -259,7 +268,12 @@ def get_logs_for_run(
cursor >= -1,
"Don't know what to do with negative cursor {cursor}".format(cursor=cursor),
)
check.opt_inst_param(of_type, "of_type", DagsterEventType)

check.invariant(
not of_type
or isinstance(of_type, DagsterEventType)
or isinstance(of_type, (frozenset, set))
)

events_by_id = self.get_logs_for_run_by_log_id(run_id, cursor, of_type, limit)
return [event for id, event in sorted(events_by_id.items(), key=lambda x: x[0])]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
sensor,
)
from dagster.core.errors import DagsterInvalidInvocationError
from dagster.core.events import DagsterEventType
from dagster.core.test_utils import instance_for_test


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,16 @@ def _solids():
storage.get_logs_for_run(result.run_id, of_type=DagsterEventType.STEP_SUCCESS)
) == [DagsterEventType.STEP_SUCCESS]

assert (
_event_types(
storage.get_logs_for_run(
result.run_id,
of_type={DagsterEventType.STEP_SUCCESS, DagsterEventType.PIPELINE_SUCCESS},
)
)
== [DagsterEventType.STEP_SUCCESS, DagsterEventType.PIPELINE_SUCCESS]
)

def test_basic_get_logs_for_run_cursor(self, storage):
@solid
def return_one(_):
Expand Down

0 comments on commit 8cba839

Please sign in to comment.