Skip to content

Commit

Permalink
make EventLogEntry.message change safe to downgrade (#6912)
Browse files Browse the repository at this point in the history
someone hit a forward compat issue with the removal of EventLogEntry.message that got me concerned about people doing runs on >= 0.14.3 but reading them back out on < 0.14.3 for some reason such as having to downgrade or not applied everywhere yet upgrades. 

Here we put `message: ""` in to the `EventLogEntry` payload so it can deserialize safely in older versions

## Test Plan

added a back compat test and verfieid it failed without the serdes changes
  • Loading branch information
alangenfeld committed Mar 3, 2022
1 parent d894501 commit e4fabe9
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 4 deletions.
22 changes: 19 additions & 3 deletions python_modules/dagster/dagster/core/events/log.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from typing import NamedTuple, Optional, Union
from typing import Any, Dict, NamedTuple, Optional, Union

from dagster import check
from dagster.core.errors import DagsterInvariantViolationError
from dagster.core.events import DagsterEvent
from dagster.core.utils import coerce_valid_log_level
from dagster.serdes import (
from dagster.serdes.serdes import (
DefaultNamedTupleSerializer,
WhitelistMap,
deserialize_json_to_dagster_namedtuple,
register_serdes_tuple_fallbacks,
serialize_dagster_namedtuple,
Expand All @@ -19,7 +21,21 @@
)


@whitelist_for_serdes
class EventLogEntrySerializer(DefaultNamedTupleSerializer):
@classmethod
def value_to_storage_dict(
cls,
value: NamedTuple,
whitelist_map: WhitelistMap,
descent_path: str,
) -> Dict[str, Any]:
storage_dict = super().value_to_storage_dict(value, whitelist_map, descent_path)
# include an empty string for the message field to allow older versions of dagster to load the events
storage_dict["message"] = ""
return storage_dict


@whitelist_for_serdes(serializer=EventLogEntrySerializer)
class EventLogEntry(
NamedTuple(
"_EventLogEntry",
Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/serdes/serdes.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ def unpack_inner_value(val: Any, whitelist_map: WhitelistMap, descent_path: str)
)
if not whitelist_map.has_tuple_entry(lookup_name):
name_str = (
'"klass_name"'
f'"{klass_name}"'
if klass_name == lookup_name
else f'"{klass_name}" (mapped to: "{lookup_name}")'
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import os
import re
import sqlite3
import time
from collections import namedtuple
from enum import Enum
from gzip import GzipFile
from typing import NamedTuple, Optional, Union

import pytest

Expand Down Expand Up @@ -38,6 +40,7 @@
serialize_dagster_namedtuple,
serialize_value,
)
from dagster.utils.error import SerializableErrorInfo
from dagster.utils.test import copy_directory


Expand Down Expand Up @@ -713,3 +716,71 @@ def test_schedule_namedtuple_job_instigator_backcompat():
assert tick.tick_data
assert tick.instigator_type
assert tick.instigator_name


def test_legacy_event_log_load():
# ensure EventLogEntry 0.14.3+ can still be loaded by older dagster versions
# to avoid downgrades etc from creating operational issues
legacy_env = WhitelistMap.create()

# snapshot of EventLogEntry pre commit ea19544
@_whitelist_for_serdes(
whitelist_map=legacy_env,
storage_name="EventLogEntry", # use this to avoid collision with current EventLogEntry
)
class OldEventLogEntry(
NamedTuple(
"_OldEventLogEntry",
[
("error_info", Optional[SerializableErrorInfo]),
("message", str),
("level", Union[str, int]),
("user_message", str),
("run_id", str),
("timestamp", float),
("step_key", Optional[str]),
("pipeline_name", Optional[str]),
("dagster_event", Optional[DagsterEvent]),
],
)
):
def __new__(
cls,
error_info,
message,
level,
user_message,
run_id,
timestamp,
step_key=None,
pipeline_name=None,
dagster_event=None,
job_name=None,
):
pipeline_name = pipeline_name or job_name
return super().__new__(
cls,
check.opt_inst_param(error_info, "error_info", SerializableErrorInfo),
check.str_param(message, "message"),
level, # coerce_valid_log_level call omitted
check.str_param(user_message, "user_message"),
check.str_param(run_id, "run_id"),
check.float_param(timestamp, "timestamp"),
check.opt_str_param(step_key, "step_key"),
check.opt_str_param(pipeline_name, "pipeline_name"),
check.opt_inst_param(dagster_event, "dagster_event", DagsterEvent),
)

# current event log entry
new_event = EventLogEntry(
user_message="test 1 2 3",
error_info=None,
level="debug",
run_id="fake_run_id",
timestamp=time.time(),
)

storage_str = serialize_dagster_namedtuple(new_event)

result = _deserialize_json(storage_str, legacy_env)
assert result.message is not None

0 comments on commit e4fabe9

Please sign in to comment.