Skip to content

Commit

Permalink
warn for instance.get_event_records calls without an event type filter (
Browse files Browse the repository at this point in the history
#7848)

* force instance.get_event_records calls to have a filter applied with an event type

* add missing call site

* remove event log storage

* fix broken storage tests

* lint

* downgrade from erroring to warning

* add hooks to register runs for internal
  • Loading branch information
prha committed May 12, 2022
1 parent 042d3d2 commit f06689b
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -373,14 +373,17 @@ def __init__(
self._run_status_sensor_fn = check.callable_param(
run_status_sensor_fn, "run_status_sensor_fn"
)
event_type = PIPELINE_RUN_STATUS_TO_EVENT_TYPE[pipeline_run_status]

def _wrapped_fn(context: SensorEvaluationContext):
# initiate the cursor to (most recent event id, current timestamp) when:
# * it's the first time starting the sensor
# * or, the cursor isn't in valid format (backcompt)
if context.cursor is None or not RunStatusSensorCursor.is_valid(context.cursor):
most_recent_event_records = list(
context.instance.get_event_records(ascending=False, limit=1)
context.instance.get_event_records(
EventRecordsFilter(event_type=event_type), ascending=False, limit=1
)
)
most_recent_event_id = (
most_recent_event_records[0].storage_id
Expand Down Expand Up @@ -409,7 +412,7 @@ def _wrapped_fn(context: SensorEvaluationContext):
id=record_id,
run_updated_after=cast(datetime, pendulum.parse(update_timestamp)),
),
event_type=PIPELINE_RUN_STATUS_TO_EVENT_TYPE[pipeline_run_status],
event_type=event_type,
),
ascending=True,
limit=5,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,16 @@ def __new__(
before_timestamp: Optional[float] = None,
):
check.opt_list_param(asset_partitions, "asset_partitions", of_type=str)
event_type = check.opt_inst_param(event_type, "event_type", DagsterEventType)
if not event_type:
warnings.warn(
"The use of `EventRecordsFilter` without an event type is deprecated and will "
"begin erroring starting in 0.15.0"
)

return super(EventRecordsFilter, cls).__new__(
cls,
event_type=check.opt_inst_param(event_type, "event_type", DagsterEventType),
event_type=event_type,
asset_key=check.opt_inst_param(asset_key, "asset_key", AssetKey),
asset_partitions=asset_partitions,
after_cursor=check.opt_inst_param(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import time
import warnings
from collections import OrderedDict, defaultdict
from typing import Dict, Iterable, Mapping, Optional, Sequence

Expand Down Expand Up @@ -173,6 +174,12 @@ def get_event_records(
limit: Optional[int] = None,
ascending: bool = False,
) -> Iterable[EventLogRecord]:
if not event_records_filter:
warnings.warn(
"The use of `get_event_records` without an `EventRecordsFilter` is deprecated and "
"will begin erroring starting in 0.15.0"
)

after_id = (
(
event_records_filter.after_cursor.id
Expand Down Expand Up @@ -352,6 +359,7 @@ def get_asset_events(
)
event_records = self.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=asset_key,
asset_partitions=partitions,
before_cursor=before_cursor,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import warnings
from abc import abstractmethod
from collections import OrderedDict
from datetime import datetime
Expand Down Expand Up @@ -674,6 +675,12 @@ def get_event_records(
check.opt_int_param(limit, "limit")
check.bool_param(ascending, "ascending")

if not event_records_filter:
warnings.warn(
"The use of `get_event_records` without an `EventRecordsFilter` is deprecated and "
"will begin erroring starting in 0.15.0"
)

query = db.select([SqlEventLogStorageTable.c.id, SqlEventLogStorageTable.c.event])
if event_records_filter and event_records_filter.asset_key:
asset_details = next(iter(self._get_assets_details([event_records_filter.asset_key])))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,11 @@ def _solids():
event.dagster_event.event_type_value == DagsterEventType.ASSET_MATERIALIZATION.value
)

records = storage.get_event_records(EventRecordsFilter(asset_key=asset_key))
records = storage.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION, asset_key=asset_key
)
)
assert len(records) == 1
record = records[0]
assert isinstance(record, EventLogRecord)
Expand Down Expand Up @@ -1147,7 +1151,7 @@ def _pipeline():
@pytest.mark.parametrize(
"cursor_dt", cursor_datetime_args()
) # test both tz-aware and naive datetimes
def test_get_event_records(self, storage, cursor_dt, test_run_id):
def test_get_event_records(self, storage, instance, cursor_dt):
if isinstance(storage, SqliteEventLogStorage):
# test sqlite in test_get_event_records_sqlite
pytest.skip()
Expand All @@ -1170,57 +1174,58 @@ def materialize_one(_):
def _solids():
materialize_one()

events, _ = _synthesize_events(_solids, run_id=test_run_id)
def _store_run_events(run_id):
events, _ = _synthesize_events(_solids, run_id=run_id)
for event in events:
storage.store_event(event)

for event in events:
storage.store_event(event)
# store events for three runs
[run_id_1, run_id_2, run_id_3] = [make_new_run_id(), make_new_run_id(), make_new_run_id()]

all_records = storage.get_event_records()
# all logs returned in descending order
assert all_records
min_record_num = all_records[-1].storage_id
max_record_num = min_record_num + len(all_records) - 1
assert [r[0] for r in all_records] == list(range(max_record_num, min_record_num - 1, -1))
assert _event_types([all_records[0].event_log_entry]) == [DagsterEventType.PIPELINE_SUCCESS]
assert _event_types([all_records[-1].event_log_entry]) == [DagsterEventType.PIPELINE_START]

# after cursor
def _build_cursor(record_id_cursor, run_cursor_dt):
if not run_cursor_dt:
return record_id_cursor
return RunShardedEventsCursor(id=record_id_cursor, run_updated_after=run_cursor_dt)

assert not list(
filter(
lambda r: r.storage_id <= 2,
storage.get_event_records(
EventRecordsFilter(after_cursor=_build_cursor(2, cursor_dt))
),
)
)
assert [
i.storage_id
for i in storage.get_event_records(
EventRecordsFilter(after_cursor=_build_cursor(min_record_num + 2, cursor_dt)),
ascending=True,
limit=2,
with create_and_delete_test_runs(instance, [run_id_1, run_id_2, run_id_3]):
_store_run_events(run_id_1)
_store_run_events(run_id_2)
_store_run_events(run_id_3)

all_success_events = storage.get_event_records(
EventRecordsFilter(event_type=DagsterEventType.RUN_SUCCESS)
)
] == [min_record_num + 3, min_record_num + 4]
assert [
i.storage_id
for i in storage.get_event_records(
EventRecordsFilter(after_cursor=_build_cursor(min_record_num + 2, cursor_dt)),
ascending=False,
limit=2,

assert len(all_success_events) == 3
min_success_record_id = all_success_events[-1].storage_id

# after cursor
def _build_cursor(record_id_cursor, run_cursor_dt):
if not run_cursor_dt:
return record_id_cursor
return RunShardedEventsCursor(id=record_id_cursor, run_updated_after=run_cursor_dt)

assert not list(
filter(
lambda r: r.storage_id <= min_success_record_id,
storage.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.RUN_SUCCESS,
after_cursor=_build_cursor(min_success_record_id, cursor_dt),
)
),
)
)
] == [max_record_num, max_record_num - 1]
assert [
i.storage_id
for i in storage.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.RUN_SUCCESS,
after_cursor=_build_cursor(min_success_record_id, cursor_dt),
),
ascending=True,
limit=2,
)
] == [record.storage_id for record in all_success_events[:2][::-1]]

filtered_records = storage.get_event_records(
EventRecordsFilter(event_type=DagsterEventType.PIPELINE_SUCCESS)
)
assert _event_types([r.event_log_entry for r in filtered_records]) == [
DagsterEventType.PIPELINE_SUCCESS
]
assert set(_event_types([r.event_log_entry for r in all_success_events])) == {
DagsterEventType.RUN_SUCCESS
}

def test_get_event_records_sqlite(self, storage):
if not isinstance(storage, SqliteEventLogStorage):
Expand Down Expand Up @@ -1269,15 +1274,6 @@ def a_pipe():
run_records = instance.get_run_records()
assert len(run_records) == 1

# all logs returned in descending order
all_event_records = storage.get_event_records()
assert _event_types([all_event_records[0].event_log_entry]) == [
DagsterEventType.PIPELINE_SUCCESS
]
assert _event_types([all_event_records[-1].event_log_entry]) == [
DagsterEventType.PIPELINE_START
]

# second run
events = []
execute_run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ def test_local():
@mock.patch("dagster_aws.emr.emr.EmrJobRunner.is_emr_step_complete")
def test_pyspark_emr(mock_is_emr_step_complete, mock_read_events, mock_s3_bucket):
with instance_for_test() as instance:
execute_pipeline(reconstructable(define_do_nothing_pipe), mode="local", instance=instance)
mock_read_events.return_value = [
record.event_log_entry for record in instance.get_event_records()
]
result = execute_pipeline(
reconstructable(define_do_nothing_pipe), mode="local", instance=instance
)
mock_read_events.return_value = instance.all_logs(result.run_id)

run_job_flow_args = dict(
Instances={
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,13 @@ def test_pyspark_databricks(
mock_get_run_state.side_effect = [running_state] * 5 + [final_state]

with instance_for_test() as instance:
execute_pipeline(
result = execute_pipeline(
pipeline=reconstructable(define_do_nothing_pipe), mode="local", instance=instance
)
mock_get_step_events.return_value = [
record.event_log_entry
for record in instance.get_event_records()
if record.event_log_entry.step_key == "do_nothing_solid"
event
for event in instance.all_logs(result.run_id)
if event.step_key == "do_nothing_solid"
]
config = BASE_DATABRICKS_PYSPARK_STEP_LAUNCHER_CONFIG.copy()
config.pop("local_pipeline_package_path")
Expand Down

0 comments on commit f06689b

Please sign in to comment.