Skip to content

Commit

Permalink
add caching
Browse files Browse the repository at this point in the history
  • Loading branch information
JamieDeMaria committed Sep 30, 2022
1 parent 3bf8d41 commit eb57d32
Showing 1 changed file with 82 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# pylint: disable=anomalous-backslash-in-string
import json
from collections import defaultdict
from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Sequence, Set, Tuple
from typing import TYPE_CHECKING, Dict, Mapping, Optional, Set, Tuple

import toposort

Expand Down Expand Up @@ -40,21 +40,27 @@ def _get_parent_updates(
context,
current_asset: AssetKey,
parent_assets: Set[AssetKey],
cursor_timestamp: float,
cursor_storage_id: int,
will_materialize_set: Set[AssetKey],
wait_for_in_progress_runs: bool,
) -> Mapping[AssetKey, Tuple[bool, float]]:
planned_materialization_cache: Dict[AssetKey, "EventLogRecord"],
latest_materialization_cache: Dict[AssetKey, "EventLogRecord"],
) -> Tuple[
Mapping[AssetKey, Tuple[bool, int]],
Dict[AssetKey, "EventLogRecord"],
Dict[AssetKey, "EventLogRecord"],
]:
"""The bulk of the logic in the sensor is in this function. At the end of the function we return a
dictionary that maps each asset to a Tuple. The Tuple contains a boolean, indicating if the asset
has materialized or will materialize, and a float representing the timestamp the parent asset
has materialized or will materialize, and an in representing the storage id the parent asset
would update the cursor too if it is the most recent materialization of a parent asset. In some cases
we set the timestamp to 0.0 so that the timestamps of other parent materializations will take precedent.
we set the storage id to 0 so that the storage ids of other parent materializations will take precedent.
Args:
current_asset: We want to determine if this asset should materialize, so we gather information about
if its parents have materialized.
parent_assets: the parents of current_asset.
cursor_timestamp: In the cursor for the sensor we store the timestamp of the most recent materialization
cursor_storage_id: In the cursor for the sensor we store the storage id of the most recent materialization
of current_asset's parents. This allows us to see if any of the parents have been materialized
more recently.
will_materialize_set: A set of all of the assets the sensor has already determined it will materialize.
Expand All @@ -72,7 +78,7 @@ def _get_parent_updates(
We iterate through each parent of the asset and determine its materialization info. The parent
asset's materialization status can be one of three options:
1. The parent has materialized since the last time the child was materialized (determined by comparing
the timestamp of the parent materialization to the cursor_timestamp).
the storage_id of the parent materialization to the cursor_storage_id).
2. The parent is slated to be materialized (i.e. included in will_materialize_list)
3. The parent has not been materialized and will not be materialized by the sensor.
Expand All @@ -91,15 +97,15 @@ def _get_parent_updates(
from dagster._core.events import DagsterEventType
from dagster._core.storage.event_log.base import EventRecordsFilter

parent_asset_event_records: Dict[AssetKey, Tuple[bool, float]] = {}
parent_asset_event_records: Dict[AssetKey, Tuple[bool, int]] = {}

for p in parent_assets:
if p in will_materialize_set:
# if p will be materialized by this sensor, then we can also materialize current_asset
# we don't know what time asset p will be materialized so we set the cursor val to 0.0
parent_asset_event_records[p] = (
True,
0.0,
0,
)
# TODO - when source asset versioning lands, add a check here that will see if the version has
# updated if p is a source asset
Expand All @@ -108,16 +114,22 @@ def _get_parent_updates(
# if p is currently being materialized, then we don't want to materialize current_asset

# get the most recent planned materialization
materialization_planned_event_records = context.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION_PLANNED,
asset_key=p,
),
ascending=False,
limit=1,
)
if p in planned_materialization_cache.keys():
# put it in a list so the indexing later works
materialization_planned_event_records = [planned_materialization_cache[p]]
else:
materialization_planned_event_records = context.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION_PLANNED,
asset_key=p,
),
ascending=False,
limit=1,
)

if materialization_planned_event_records:
# add it to the cache
planned_materialization_cache[p] = materialization_planned_event_records[0]
# see if the most recent planned materialization is part of an in progress run
in_progress = context.instance.get_runs(
filters=RunsFilter(
Expand All @@ -131,20 +143,30 @@ def _get_parent_updates(
# we don't want to materialize current_asset because p is
# being materialized. We'll materialize the asset on the next tick when the
# materialization of p is complete
parent_asset_event_records = {pp: (False, 0.0) for pp in parent_assets}
parent_asset_event_records = {pp: (False, 0) for pp in parent_assets}

return parent_asset_event_records
return (
parent_asset_event_records,
planned_materialization_cache,
latest_materialization_cache,
)
# check if there is a completed materialization for p
event_records = context.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=p,
),
ascending=False,
limit=1,
)
if p in latest_materialization_cache.keys():
# put it in a list so the indexing later works
event_records = [latest_materialization_cache[p]]
else:
event_records = context.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=p,
),
ascending=False,
limit=1,
)

if event_records and event_records[0].event_log_entry.timestamp > cursor_timestamp:
if event_records and event_records[0].storage_id > cursor_storage_id:
# add it to the cache
latest_materialization_cache[p] = event_records[0]
# if the run for the materialization of p also materialized current_asset, we
# don't consider p "updated" when determining if current_asset should materialize
other_materialized_asset_records = context.instance.get_records_for_run(
Expand All @@ -160,19 +182,23 @@ def _get_parent_updates(
# on the next sensor tick
parent_asset_event_records[p] = (
False,
event_records[0].event_log_entry.timestamp,
event_records[0].storage_id,
)
else:
# current_asset was not updated along with p, so we consider p updated
parent_asset_event_records[p] = (
True,
event_records[0].event_log_entry.timestamp,
event_records[0].storage_id,
)
else:
# p has not been materialized and will not be materialized by the sensor
parent_asset_event_records[p] = (False, 0.0)
parent_asset_event_records[p] = (False, 0)

return parent_asset_event_records
return (
parent_asset_event_records,
planned_materialization_cache,
latest_materialization_cache,
)


def _make_sensor(
Expand All @@ -187,12 +213,12 @@ def _make_sensor(
"""Creates the sensor that will monitor the parents of all provided assets and determine
which assets should be materialized (ie their parents have been updated).
The cursor for this sensor is a dictionary mapping stringified AssetKeys to a timestamp (float). For each
asset we keep track of the timestamp of the most recent materialization of a parent asset. For example
if asset X has parents A, B, and C where A was materialized at time 1, B at time 2 and C at time 3. When
the sensor runs, the cursor for X will be set to 3. This way, the next time the sensor runs, we can ignore
the materializations prior to time 3. If asset A materialized again at time 4, we would know that this materialization
has not been incorporated into the child asset yet.
The cursor for this sensor is a dictionary mapping stringified AssetKeys to a storage_id (int). For each
asset we keep track of the storage_id of the most recent materialization of a parent asset. For example
if asset X has parents A, B, and C where A was materialized w/ storage_id 1, B w/ storage_id 2 and
C w/ storage_id 3. When the sensor runs, the cursor for X will be set to 3. This way, the next time
the sensor runs, we can ignore the materializations prior to storage_id 3. If asset A materialized
again w/ storage_id 4, we would know that this materialization has not been incorporated into the child asset yet.
"""

def sensor_fn(context):
Expand All @@ -208,9 +234,14 @@ def sensor_fn(context):
source_assets=source_asset_defs_by_key.values(),
)

cursor_dict: Dict[str, float] = json.loads(context.cursor) if context.cursor else {}
should_materialize: Set[AssetKey] = {}
cursor_update_dict: Dict[str, float] = {}
cursor_dict: Dict[str, int] = json.loads(context.cursor) if context.cursor else {}
should_materialize: Set[AssetKey] = set()
cursor_update_dict: Dict[str, int] = {}
# keep track of the in planned materializations for each parent so we don't repeat
# calls to the db
planned_materialization_cache: Dict[AssetKey, "EventLogRecord"] = {}
# keep track of the latest materialization for each parent so we don't repeat calls to the db
latest_materialization_cache: Dict[AssetKey, "EventLogRecord"] = {}

# sort the assets topologically so that we process them in order
toposort_assets = list(toposort.toposort(upstream))
Expand All @@ -222,15 +253,21 @@ def sensor_fn(context):
# determine which assets should materialize based on the materialization status of their
# parents
for a in toposort_assets:
a_cursor = cursor_dict.get(str(a), 0.0)
a_cursor = cursor_dict.get(str(a), 0)
cursor_update_dict[str(a)] = a_cursor
parent_update_records = _get_parent_updates(
(
parent_update_records,
planned_materialization_cache,
latest_materialization_cache,
) = _get_parent_updates(
context,
current_asset=a,
parent_assets=upstream[a],
cursor_timestamp=a_cursor,
will_materialize_list=should_materialize,
cursor_storage_id=a_cursor,
will_materialize_set=should_materialize,
wait_for_in_progress_runs=wait_for_in_progress_runs,
planned_materialization_cache=planned_materialization_cache,
latest_materialization_cache=latest_materialization_cache,
)

condition = all if wait_for_all_upstream else any
Expand Down

0 comments on commit eb57d32

Please sign in to comment.