Skip to content

Commit

Permalink
Fix problems from "remove old asset lineage from docs (#8382)" (#8389)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza authored and clairelin135 committed Jun 14, 2022
1 parent db08e07 commit 531e5da
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# isort: skip_file
# pylint: disable=reimported
import os

import pandas as pd
from dagster import AssetKey, AssetMaterialization, IOManager


def read_csv(_path):
return pd.DataFrame()


# start_marker_0
from dagster import AssetMaterialization, IOManager


class PandasCsvIOManager(IOManager):
def load_input(self, context):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
return read_csv(file_path)

def handle_output(self, context, obj):
file_path = os.path.join("my_base_dir", context.step_key, context.name)

obj.to_csv(file_path)

context.log_event(
AssetMaterialization(
asset_key=AssetKey(file_path),
description="Persisted result to storage.",
)
)


# end_marker_0


# start_marker_1
from dagster import AssetMaterialization, IOManager


class PandasCsvIOManagerWithAsset(IOManager):
def load_input(self, context):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
return read_csv(file_path)

def handle_output(self, context, obj):
file_path = os.path.join("my_base_dir", context.step_key, context.name)

obj.to_csv(file_path)

context.log_event(
AssetMaterialization(
asset_key=AssetKey(file_path),
description="Persisted result to storage.",
metadata={
"number of rows": obj.shape[0],
"some_column mean": obj["some_column"].mean(),
},
)
)


# end_marker_1
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# isort: skip_file
# pylint: disable=unused-argument,reimported


def read_df():
return 1


def read_df_for_date(_):
return 1


def persist_to_storage(df):
return "tmp"


def calculate_bytes(df):
return 1.0


# start_materialization_ops_marker_0
from dagster import op


@op
def my_simple_op():
df = read_df()
remote_storage_path = persist_to_storage(df)
return remote_storage_path


# end_materialization_ops_marker_0

# start_materialization_ops_marker_1
from dagster import AssetMaterialization, op


@op
def my_materialization_op(context):
df = read_df()
remote_storage_path = persist_to_storage(df)
context.log_event(
AssetMaterialization(
asset_key="my_dataset", description="Persisted result to storage"
)
)
return remote_storage_path


# end_materialization_ops_marker_1


# start_partitioned_asset_materialization
from dagster import op, AssetMaterialization


@op(config_schema={"date": str})
def my_partitioned_asset_op(context):
partition_date = context.op_config["date"]
df = read_df_for_date(partition_date)
remote_storage_path = persist_to_storage(df)
context.log_event(
AssetMaterialization(asset_key="my_dataset", partition=partition_date)
)
return remote_storage_path


# end_partitioned_asset_materialization


# start_materialization_ops_marker_2
from dagster import op, AssetMaterialization, MetadataValue


@op
def my_metadata_materialization_op(context):
df = read_df()
remote_storage_path = persist_to_storage(df)
context.log_event(
AssetMaterialization(
asset_key="my_dataset",
description="Persisted result to storage",
metadata={
"text_metadata": "Text-based metadata for this event",
"path": MetadataValue.path(remote_storage_path),
"dashboard_url": MetadataValue.url(
"http://mycoolsite.com/url_for_my_data"
),
"size (bytes)": calculate_bytes(df),
},
)
)
return remote_storage_path


# end_materialization_ops_marker_2


# start_materialization_ops_marker_3
from dagster import op, AssetKey, AssetMaterialization, job, Output


@op
def my_asset_key_materialization_op(context):
df = read_df()
remote_storage_path = persist_to_storage(df)
yield AssetMaterialization(
asset_key=AssetKey(["dashboard", "my_cool_site"]),
description="Persisted result to storage",
metadata={
"dashboard_url": MetadataValue.url("http://mycoolsite.com/dashboard"),
"size (bytes)": calculate_bytes(df),
},
)
yield Output(remote_storage_path)


# end_materialization_ops_marker_3


@job
def my_asset_job():
my_materialization_op()
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import pandas as pd

from dagster import ModeDefinition, execute_pipeline, io_manager, pipeline, solid
from docs_snippets.concepts.assets.materialization_io_managers import (
PandasCsvIOManager,
PandasCsvIOManagerWithAsset,
)


class DummyClass(pd.DataFrame):
def to_csv(self, _path): # pylint: disable=arguments-differ
return


def _generate_pipeline_for_io_manager(manager, config_schema=None):
@io_manager(output_config_schema=config_schema or {})
def custom_io_manager(_):
return manager

@solid
def dummy_solid():
return DummyClass.from_dict({"some_column": [2]})

@pipeline(
mode_defs=[ModeDefinition(resource_defs={"io_manager": custom_io_manager})]
)
def dummy_pipeline():
dummy_solid()

return dummy_pipeline


def test_pipelines_compile_and_execute():
managers = [
PandasCsvIOManager(),
PandasCsvIOManagerWithAsset(),
]
for manager in managers:
result = execute_pipeline(_generate_pipeline_for_io_manager(manager))
assert result
assert result.success
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from dagster import build_op_context
from docs_snippets.concepts.assets.materialization_ops import (
my_asset_key_materialization_op,
my_materialization_op,
my_metadata_materialization_op,
my_partitioned_asset_op,
my_simple_op,
)


def test_ops_compile_and_execute():
ops = [
(my_asset_key_materialization_op, True),
(my_materialization_op, True),
(my_metadata_materialization_op, True),
(my_simple_op, False),
]

for op, has_context_arg in ops:
op(None) if has_context_arg else op() # pylint: disable=expression-not-assigned


def test_partition_config_ops_compile_and_execute():
ops = [
my_partitioned_asset_op,
]

for op in ops:
context = build_op_context(config={"date": "2020-01-01"})

op(context)

0 comments on commit 531e5da

Please sign in to comment.