Skip to content

Commit

Permalink
replace internal uses of MetadataEntry static api (#7126)
Browse files Browse the repository at this point in the history
* replace internal uses of metadataentry static api
  • Loading branch information
smackesey committed Mar 19, 2022
1 parent 8aaa2e6 commit dfeff6c
Show file tree
Hide file tree
Showing 41 changed files with 263 additions and 245 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
from dagster.core.asset_defs import SourceAsset, asset, build_assets_job
from dagster.core.definitions.decorators.sensor import sensor
from dagster.core.definitions.executor_definition import in_process_executor
from dagster.core.definitions.metadata import MetadataValue
from dagster.core.definitions.reconstructable import ReconstructableRepository
from dagster.core.definitions.sensor_definition import RunRequest, SkipReason
from dagster.core.log_manager import coerce_valid_log_level
Expand Down Expand Up @@ -694,28 +695,32 @@ def materialize(_):
asset_key="all_types",
description="a materialization with all metadata types",
metadata_entries=[
MetadataEntry.text("text is cool", "text"),
MetadataEntry.url("https://bigty.pe/neato", "url"),
MetadataEntry.fspath("/tmp/awesome", "path"),
MetadataEntry.json({"is_dope": True}, "json"),
MetadataEntry.python_artifact(MetadataEntry, "python class"),
MetadataEntry.python_artifact(file_relative_path, "python function"),
MetadataEntry.float(1.2, "float"),
MetadataEntry.int(1, "int"),
MetadataEntry.float(float("nan"), "float NaN"),
MetadataEntry.int(LONG_INT, "long int"),
MetadataEntry.pipeline_run("fake_run_id", "pipeline run"),
MetadataEntry.asset(AssetKey("my_asset"), "my asset"),
MetadataEntry.table(
label="table",
records=[
TableRecord(foo=1, bar=2),
TableRecord(foo=3, bar=4),
],
MetadataEntry("text", value="text is cool"),
MetadataEntry("url", value=MetadataValue.url("https://bigty.pe/neato")),
MetadataEntry("path", value=MetadataValue.path("/tmp/awesome")),
MetadataEntry("json", value={"is_dope": True}),
MetadataEntry("python class", value=MetadataValue.python_artifact(MetadataEntry)),
MetadataEntry(
"python function", value=MetadataValue.python_artifact(file_relative_path)
),
MetadataEntry.table_schema(
label="table_schema",
schema=TableSchema(
MetadataEntry("float", value=1.2),
MetadataEntry("int", value=1),
MetadataEntry("float NaN", value=float("nan")),
MetadataEntry("long int", value=LONG_INT),
MetadataEntry("pipeline run", value=MetadataValue.pipeline_run("fake_run_id")),
MetadataEntry("my asset", value=AssetKey("my_asset")),
MetadataEntry(
"table",
value=MetadataValue.table(
records=[
TableRecord(foo=1, bar=2),
TableRecord(foo=3, bar=4),
],
),
),
MetadataEntry(
"table_schema",
value=TableSchema(
columns=[
TableColumn(
name="foo",
Expand Down Expand Up @@ -1324,18 +1329,20 @@ def backcompat_materialize(_):
asset_key="all_types",
description="a materialization with all metadata types",
metadata_entries=[
MetadataEntry.text("text is cool", "text"),
MetadataEntry.url("https://bigty.pe/neato", "url"),
MetadataEntry.fspath("/tmp/awesome", "path"),
MetadataEntry.json({"is_dope": True}, "json"),
MetadataEntry.python_artifact(MetadataEntry, "python class"),
MetadataEntry.python_artifact(file_relative_path, "python function"),
MetadataEntry.float(1.2, "float"),
MetadataEntry.int(1, "int"),
MetadataEntry.float(float("nan"), "float NaN"),
MetadataEntry.int(LONG_INT, "long int"),
MetadataEntry.pipeline_run("fake_run_id", "pipeline run"),
MetadataEntry.asset(AssetKey("my_asset"), "my asset"),
MetadataEntry("text", value="text is cool"),
MetadataEntry("url", value=MetadataValue.url("https://bigty.pe/neato")),
MetadataEntry("path", value=MetadataValue.path("/tmp/awesome")),
MetadataEntry("json", value={"is_dope": True}),
MetadataEntry("python class", value=MetadataValue.python_artifact(MetadataEntry)),
MetadataEntry(
"python function", value=MetadataValue.python_artifact(file_relative_path)
),
MetadataEntry("float", value=1.2),
MetadataEntry("int", value=1),
MetadataEntry("float NaN", value=float("nan")),
MetadataEntry("long int", value=LONG_INT),
MetadataEntry("pipeline run", value=MetadataValue.pipeline_run("fake_run_id")),
MetadataEntry("my asset", value=AssetKey("my_asset")),
],
)
yield Output(None)
Expand Down
5 changes: 3 additions & 2 deletions python_modules/dagster/dagster/core/definitions/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from .metadata import (
MetadataEntry,
MetadataValue,
PartitionMetadataEntry,
RawMetadataValue,
last_file_comp,
Expand Down Expand Up @@ -434,7 +435,7 @@ def file(
return AssetMaterialization(
asset_key=cast(Union[str, AssetKey, List[str]], asset_key),
description=description,
metadata_entries=[MetadataEntry.fspath(path)],
metadata_entries=[MetadataEntry("path", value=MetadataValue.path(path))],
)


Expand Down Expand Up @@ -541,7 +542,7 @@ def file(
return Materialization(
label=last_file_comp(path),
description=description,
metadata_entries=[MetadataEntry.fspath(path)],
metadata_entries=[MetadataEntry("path", value=MetadataValue.path(path))],
asset_key=asset_key,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def normalize_metadata(


def normalize_metadata_value(raw_value: RawMetadataValue):
from dagster.core.definitions.events import AssetKey

if isinstance(raw_value, MetadataValue):
return raw_value
Expand All @@ -112,18 +113,13 @@ def normalize_metadata_value(raw_value: RawMetadataValue):
elif isinstance(raw_value, int):
return MetadataValue.int(raw_value)
elif isinstance(raw_value, dict):
try:
# check that the value is JSON serializable
seven.dumps(raw_value)
return MetadataValue.json(raw_value)
except TypeError:
raise DagsterInvalidMetadata(
"Value is a dictionary but is not JSON serializable. "
"Consider wrapping the value with the appropriate MetadataValue type."
)

if isinstance(raw_value, os.PathLike):
return MetadataValue.json(raw_value)
elif isinstance(raw_value, os.PathLike):
return MetadataValue.path(raw_value)
elif isinstance(raw_value, AssetKey):
return MetadataValue.asset(raw_value)
elif isinstance(raw_value, TableSchema):
return MetadataValue.table_schema(raw_value)

raise DagsterInvalidMetadata(
f"Its type was {type(raw_value)}. Consider wrapping the value with the appropriate "
Expand Down Expand Up @@ -390,7 +386,9 @@ def validate_table(context, df):

@staticmethod
@experimental
def table(records: List[TableRecord], schema: TableSchema) -> "TableMetadataValue":
def table(
records: List[TableRecord], schema: Optional[TableSchema] = None
) -> "TableMetadataValue":
"""Static constructor for a metadata value wrapping arbitrary tabular data as
:py:class:`TableMetadataValue`. Can be used as the value type for the `metadata`
parameter for supported events. For example:
Expand Down Expand Up @@ -420,7 +418,7 @@ def emit_metadata(context):
Args:
records (List[TableRecord]): The data as a list of records (i.e. rows).
schema (TableSchema): A schema for the table.
schema (Optional[TableSchema]): A schema for the table.
"""
return TableMetadataValue(records, schema)

Expand Down Expand Up @@ -545,9 +543,13 @@ class JsonMetadataValue(
"""

def __new__(cls, data: Optional[Dict[str, Any]]):
return super(JsonMetadataValue, cls).__new__(
cls, check.opt_dict_param(data, "data", key_type=str)
)
data = check.opt_dict_param(data, "data", key_type=str)
try:
# check that the value is JSON serializable
seven.dumps(data)
except TypeError:
raise DagsterInvalidMetadata("Value is a dictionary but is not JSON serializable.")
return super(JsonMetadataValue, cls).__new__(cls, data)


@whitelist_for_serdes(storage_name="MarkdownMetadataEntryData")
Expand Down Expand Up @@ -706,11 +708,30 @@ def infer_column_type(value):
else:
return "string"

def __new__(cls, records: List[TableRecord], schema: TableSchema):
def __new__(cls, records: List[TableRecord], schema: Optional[TableSchema]):

check.list_param(records, "records", of_type=TableRecord)
check.opt_inst_param(schema, "schema", TableSchema)

if len(records) == 0:
schema = check.not_none(schema, "schema must be provided if records is empty")
else:
columns = set(records[0].data.keys())
for record in records[1:]:
check.invariant(
set(record.data.keys()) == columns, "All records must have the same fields"
)
schema = schema or TableSchema(
columns=[
TableColumn(name=k, type=TableMetadataValue.infer_column_type(v))
for k, v in records[0].data.items()
]
)

return super(TableMetadataValue, cls).__new__(
cls,
check.list_param(records, "records", of_type=TableRecord),
check.inst_param(schema, "schema", TableSchema),
records,
schema,
)


Expand Down Expand Up @@ -1124,20 +1145,6 @@ def emit_metadata(context):
`"bool"` or `"float"` inferred from the first record's values. If a value does
not directly match one of the above types, it will be treated as a string.
"""
if len(records) == 0:
schema = check.not_none(schema, "schema must be provided if records is empty")
else:
columns = set(records[0].data.keys())
for record in records[1:]:
check.invariant(
set(record.data.keys()) == columns, "All records must have the same fields"
)
schema = schema or TableSchema(
columns=[
TableColumn(name=k, type=TableMetadataValue.infer_column_type(v))
for k, v in records[0].data.items()
]
)
return MetadataEntry(label, description, TableMetadataValue(records, schema))

@staticmethod
Expand Down
12 changes: 7 additions & 5 deletions python_modules/dagster/dagster/core/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ def resource_init_success(
key,
value=MetadataValue.python_artifact(resource_instances[key].__class__),
),
MetadataEntry(f"{key}:init_time_ms", value=resource_init_times[key]),
MetadataEntry(f"{key}:init_time", value=resource_init_times[key]),
]
)

Expand Down Expand Up @@ -1026,7 +1026,9 @@ def object_store_operation(
value_name=value_name,
address=object_store_operation_result.key,
metadata_entries=[
MetadataEntry.path(object_store_operation_result.key, label="key")
MetadataEntry(
"key", value=MetadataValue.path(object_store_operation_result.key)
),
],
version=object_store_operation_result.version,
mapping_key=object_store_operation_result.mapping_key,
Expand Down Expand Up @@ -1345,9 +1347,9 @@ def multiprocess(
pid: int, step_keys_to_execute: Optional[List[str]] = None
) -> "EngineEventData":
return EngineEventData(
metadata_entries=[MetadataEntry.text(str(pid), "pid")]
metadata_entries=[MetadataEntry("pid", value=str(pid))]
+ (
[MetadataEntry.text(str(step_keys_to_execute), "step_keys")]
[MetadataEntry("step_keys", value=str(step_keys_to_execute))]
if step_keys_to_execute
else []
)
Expand All @@ -1356,7 +1358,7 @@ def multiprocess(
@staticmethod
def interrupted(steps_interrupted: List[str]) -> "EngineEventData":
return EngineEventData(
metadata_entries=[MetadataEntry.text(str(steps_interrupted), "steps_interrupted")]
metadata_entries=[MetadataEntry("steps_interrupted", value=str(steps_interrupted))]
)

@staticmethod
Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/core/executor/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ def execute(self):
self.pipeline_run,
EngineEventData(
[
MetadataEntry.text(str(os.getpid()), "pid"),
MetadataEntry.text(self.step_key, "step_key"),
MetadataEntry("pid", value=str(os.getpid())),
MetadataEntry("step_key", value=self.step_key),
],
marker_end=DELEGATE_MARKER,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ def execute(self, plan_context: PlanOrchestrationContext, execution_plan: Execut
"run will be resumed",
EngineEventData(
metadata_entries=[
MetadataEntry.text(str(running_steps.keys()), "steps_in_flight")
MetadataEntry(
"steps_in_flight", value=str(running_steps.keys())
)
]
),
)
Expand Down
6 changes: 4 additions & 2 deletions python_modules/dagster/dagster/core/storage/fs_io_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dagster.config import Field
from dagster.config.source import StringSource
from dagster.core.definitions.events import AssetKey, AssetMaterialization
from dagster.core.definitions.metadata import MetadataEntry
from dagster.core.definitions.metadata import MetadataEntry, MetadataValue
from dagster.core.errors import DagsterInvariantViolationError
from dagster.core.execution.context.input import InputContext
from dagster.core.execution.context.output import OutputContext
Expand Down Expand Up @@ -192,7 +192,9 @@ def handle_output(self, context, obj):

return AssetMaterialization(
asset_key=AssetKey([context.pipeline_name, context.step_key, context.name]),
metadata_entries=[MetadataEntry.fspath(os.path.abspath(filepath))],
metadata_entries=[
MetadataEntry("path", value=MetadataValue.path(os.path.abspath(filepath)))
],
)

def load_input(self, context):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,8 @@ def type_check_method(_, value):
return TypeCheck(
True,
metadata_entries=[
MetadataEntry.text(label="row_count", text=str(len(value))),
MetadataEntry.text(label="series_names", text=", ".join(value.keys())),
MetadataEntry("row_count", value=str(len(value))),
MetadataEntry("series_names", value=", ".join(value.keys())),
],
)

Expand Down Expand Up @@ -461,7 +461,7 @@ def test_raise_on_error_true_type_check_returns_unsuccessful_type_check():
FalsyType = DagsterType(
name="FalsyType",
type_check_fn=lambda _, _val: TypeCheck(
success=False, metadata_entries=[MetadataEntry.text("foo", "bar", "baz")]
success=False, metadata_entries=[MetadataEntry("bar", value="foo")]
),
)

Expand All @@ -477,7 +477,6 @@ def foo_pipeline():
execute_pipeline(foo_pipeline)
assert e.value.metadata_entries[0].label == "bar"
assert e.value.metadata_entries[0].entry_data.text == "foo"
assert e.value.metadata_entries[0].description == "baz"
assert isinstance(e.value.dagster_type, DagsterType)

pipeline_result = execute_pipeline(foo_pipeline, raise_on_error=False)
Expand Down Expand Up @@ -550,7 +549,7 @@ def test_raise_on_error_true_type_check_returns_successful_type_check():
TruthyExceptionType = DagsterType(
name="TruthyExceptionType",
type_check_fn=lambda _, _val: TypeCheck(
success=True, metadata_entries=[MetadataEntry.text("foo", "bar", "baz")]
success=True, metadata_entries=[MetadataEntry("bar", value="foo")]
),
)

Expand All @@ -572,9 +571,7 @@ def foo_pipeline():
event.event_specific_data.type_check_data.metadata_entries[0].entry_data.text
== "foo"
)
assert (
event.event_specific_data.type_check_data.metadata_entries[0].description == "baz"
)
assert event.event_specific_data.type_check_data.metadata_entries[0]

pipeline_result = execute_pipeline(foo_pipeline, raise_on_error=False)
assert pipeline_result.success
Expand Down

0 comments on commit dfeff6c

Please sign in to comment.