Skip to content

Commit

Permalink
have fs io manager record path metadata entry (#7261)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Apr 11, 2022
1 parent bdd4dd7 commit 3d82da9
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -307,16 +307,13 @@ def _csv_hello_world_event_sequence(self):
"ExecutionStepStartEvent",
"ExecutionStepInputEvent",
"ExecutionStepOutputEvent",
"LogMessageEvent",
"HandledOutputEvent",
"ExecutionStepSuccessEvent",
"LogsCapturedEvent",
"ExecutionStepStartEvent",
"LogMessageEvent",
"LoadedInputEvent",
"ExecutionStepInputEvent",
"ExecutionStepOutputEvent",
"LogMessageEvent",
"HandledOutputEvent",
"ExecutionStepSuccessEvent",
"RunSuccessEvent",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ def resources(self) -> Any:

@property
def asset_key(self) -> Optional[AssetKey]:
if not self._name:
return None

matching_input_defs = [
input_def
for input_def in cast(SolidDefinition, self._solid_def).input_defs
Expand Down
5 changes: 3 additions & 2 deletions python_modules/dagster/dagster/core/storage/fs_io_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ def handle_output(self, context, obj):
check.inst_param(context, "context", OutputContext)

filepath = self._get_path(context)
context.log.debug(f"Writing file at: {filepath}")

# Ensure path exists
mkdir_p(os.path.dirname(filepath))
Expand Down Expand Up @@ -143,12 +142,14 @@ def handle_output(self, context, obj):
"https://docs.dagster.io/deployment/executors#overview"
)

context.add_output_metadata({"path": MetadataValue.path(os.path.abspath(filepath))})

def load_input(self, context):
"""Unpickle the file and Load it to a data object."""
check.inst_param(context, "context", InputContext)

filepath = self._get_path(context.upstream_output)
context.log.debug(f"Loading file from: {filepath}")
context.add_input_metadata({"path": MetadataValue.path(os.path.abspath(filepath))})

with open(filepath, self.read_mode) as read_obj:
return pickle.load(read_obj)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def before():

@op(ins={"a": In(asset_key=in_asset_key)}, out={})
def after(a):
pass
del a

@job(resource_defs={"io_manager": my_io_manager})
def my_job():
Expand Down Expand Up @@ -239,7 +239,7 @@ def asset_1():

@asset(partitions_def=partitions_def)
def asset_2(asset_1):
return 2
return asset_1 + 1

class MyIOManager(IOManager):
def handle_output(self, context, obj):
Expand Down Expand Up @@ -270,14 +270,13 @@ def my_io_manager(_):
)


def test_context_error_add_input_metadata():
def test_build_input_context_add_input_metadata():
@op
def my_op():
pass

context = build_input_context(op_def=my_op)
with pytest.raises(CheckError):
context.add_input_metadata({"foo": "bar"})
context.add_input_metadata({"foo": "bar"})


def test_io_manager_single_partition_materialization():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pytest

from dagster import ModeDefinition, execute_pipeline, graph, op, pipeline, solid
from dagster import MetadataValue, ModeDefinition, execute_pipeline, graph, op, pipeline, solid
from dagster.core.definitions.version_strategy import VersionStrategy
from dagster.core.errors import DagsterInvariantViolationError
from dagster.core.execution.api import create_execution_plan
Expand Down Expand Up @@ -41,15 +41,24 @@ def test_fs_io_manager():
assert len(handled_output_events) == 2

filepath_a = os.path.join(tmpdir_path, result.run_id, "solid_a", "result")
result_metadata_entry_a = handled_output_events[0].event_specific_data.metadata_entries[0]
assert result_metadata_entry_a.label == "path"
assert result_metadata_entry_a.value == MetadataValue.path(filepath_a)
assert os.path.isfile(filepath_a)
with open(filepath_a, "rb") as read_obj:
assert pickle.load(read_obj) == [1, 2, 3]

loaded_input_events = list(filter(lambda evt: evt.is_loaded_input, result.event_list))
input_metadata_entry_a = loaded_input_events[0].event_specific_data.metadata_entries[0]
assert input_metadata_entry_a.label == "path"
assert input_metadata_entry_a.value == MetadataValue.path(filepath_a)
assert len(loaded_input_events) == 1
assert "solid_a" == loaded_input_events[0].event_specific_data.upstream_step_key

filepath_b = os.path.join(tmpdir_path, result.run_id, "solid_b", "result")
result_metadata_entry_b = handled_output_events[1].event_specific_data.metadata_entries[0]
assert result_metadata_entry_b.label == "path"
assert result_metadata_entry_b.value == MetadataValue.path(filepath_b)
assert os.path.isfile(filepath_b)
with open(filepath_b, "rb") as read_obj:
assert pickle.load(read_obj) == 1
Expand Down

0 comments on commit 3d82da9

Please sign in to comment.