Skip to content

Commit

Permalink
avoid framework code triggering metadata_entries warning (#7412)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Apr 15, 2022
1 parent 9ad8f54 commit 920c759
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 121 deletions.
24 changes: 12 additions & 12 deletions examples/hacker_news/hacker_news/ops/id_range_for_time.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime, timezone
from typing import Tuple

from dagster import MetadataEntry, Out, Output, check, op
from dagster import Out, Output, check, op


def binary_search_nearest_left(get_value, start, end, min_target):
Expand Down Expand Up @@ -75,17 +75,17 @@ def _get_item_timestamp(item_id):
start_timestamp = str(datetime.fromtimestamp(_get_item_timestamp(start_id), tz=timezone.utc))
end_timestamp = str(datetime.fromtimestamp(_get_item_timestamp(end_id), tz=timezone.utc))

metadata_entries = [
MetadataEntry.int(value=max_item_id, label="max_item_id"),
MetadataEntry.int(value=start_id, label="start_id"),
MetadataEntry.int(value=end_id, label="end_id"),
MetadataEntry.int(value=end_id - start_id, label="items"),
MetadataEntry.text(text=start_timestamp, label="start_timestamp"),
MetadataEntry.text(text=end_timestamp, label="end_timestamp"),
]
metadata = {
"max_item_id": max_item_id,
"start_id": start_id,
"end_id": end_id,
"items": end_id - start_id,
"start_timestamp": start_timestamp,
"end_timestamp": end_timestamp,
}

id_range = (start_id, end_id)
return id_range, metadata_entries
return id_range, metadata


@op(
Expand All @@ -99,9 +99,9 @@ def id_range_for_time(context):
"""
For the configured time partition, searches for the range of ids that were created in that time.
"""
id_range, metadata_entries = _id_range_for_time(
id_range, metadata = _id_range_for_time(
context.resources.partition_bounds["start"],
context.resources.partition_bounds["end"],
context.resources.hn_client,
)
yield Output(id_range, metadata_entries=metadata_entries)
yield Output(id_range, metadata=metadata)
24 changes: 12 additions & 12 deletions examples/hacker_news/hacker_news/resources/dbt_asset_resource.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from typing import Any, Dict, List
from typing import Any, Dict, List, Mapping

import pandas
from dagster_dbt import DbtOutput

from dagster import AssetKey, AssetMaterialization, MetadataEntry
from dagster import AssetKey, AssetMaterialization, MetadataValue
from dagster.core.definitions.metadata import RawMetadataValue

from .snowflake_io_manager import connect_snowflake

Expand All @@ -22,10 +23,8 @@ class DbtAssetResource:
def __init__(self, asset_key_prefix: List[str]):
self._asset_key_prefix = asset_key_prefix

def _get_metadata(self, result: Dict[str, Any]) -> List[MetadataEntry]:
return [
MetadataEntry.float(value=result["execution_time"], label="Execution Time (seconds)")
]
def _get_metadata(self, result: Dict[str, Any]) -> Mapping[str, RawMetadataValue]:
return {"Execution Time (seconds)": result["execution_time"]}

def get_asset_materializations(self, dbt_output: DbtOutput) -> List[AssetMaterialization]:
ret = []
Expand All @@ -46,7 +45,7 @@ def get_asset_materializations(self, dbt_output: DbtOutput) -> List[AssetMateria
ret.append(
AssetMaterialization(
description=f"dbt node: {unique_id}",
metadata_entries=self._get_metadata(result),
metadata=self._get_metadata(result),
asset_key=asset_key,
)
)
Expand All @@ -68,7 +67,7 @@ def __init__(self, snowflake_config: Dict[str, str], dbt_schema: str):
self._dbt_schema = dbt_schema
super().__init__(asset_key_prefix=["snowflake", dbt_schema])

def _get_metadata(self, result: Dict[str, Any]) -> List[MetadataEntry]:
def _get_metadata(self, result: Dict[str, Any]) -> Mapping[str, RawMetadataValue]:
"""
Here, we run queries against our output Snowflake database tables to add additional context
to our asset materializations.
Expand All @@ -80,7 +79,8 @@ def _get_metadata(self, result: Dict[str, Any]) -> List[MetadataEntry]:
sample_rows = pandas.read_sql_query(
f"SELECT * FROM {table_name} SAMPLE ROW (10 rows)", con
)
return super()._get_metadata(result) + [
MetadataEntry.int(int(n_rows.iloc[0][0]), "dbt Model Number of Rows"),
MetadataEntry.md(sample_rows.astype("str").to_markdown(), "dbt Model Sample Rows"),
]
return {
**super()._get_metadata(result),
"dbt Model Number of Rows": int(n_rows.iloc[0][0]),
"dbt Model Sample Rows": MetadataValue.md(sample_rows.astype("str").to_markdown()),
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from hacker_news_assets.partitions import hourly_partitions

from dagster import MetadataEntry, Output, asset, check
from dagster import Output, asset, check


def binary_search_nearest_left(get_value, start, end, min_target):
Expand Down Expand Up @@ -71,17 +71,17 @@ def _get_item_timestamp(item_id):
start_timestamp = str(datetime.fromtimestamp(_get_item_timestamp(start_id), tz=timezone.utc))
end_timestamp = str(datetime.fromtimestamp(_get_item_timestamp(end_id), tz=timezone.utc))

metadata_entries = [
MetadataEntry.int(value=max_item_id, label="max_item_id"),
MetadataEntry.int(value=start_id, label="start_id"),
MetadataEntry.int(value=end_id, label="end_id"),
MetadataEntry.int(value=end_id - start_id, label="items"),
MetadataEntry.text(text=start_timestamp, label="start_timestamp"),
MetadataEntry.text(text=end_timestamp, label="end_timestamp"),
]
metadata = {
"max_item_id": max_item_id,
"start_id": start_id,
"end_id": end_id,
"items": end_id - start_id,
"start_timestamp": start_timestamp,
"end_timestamp": end_timestamp,
}

id_range = (start_id, end_id)
return id_range, metadata_entries
return id_range, metadata


@asset(
Expand All @@ -94,7 +94,7 @@ def id_range_for_time(context):
For the configured time partition, searches for the range of ids that were created in that time.
"""
start, end = context.output_asset_partitions_time_window()
id_range, metadata_entries = _id_range_for_time(
id_range, metadata = _id_range_for_time(
start.timestamp(), end.timestamp(), context.resources.hn_client
)
yield Output(id_range, metadata_entries=metadata_entries)
yield Output(id_range, metadata=metadata)
41 changes: 26 additions & 15 deletions python_modules/dagster/dagster/core/execution/plan/execute_step.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import inspect
import warnings
from collections import defaultdict
from typing import AbstractSet, Any, Dict, Iterator, List, Optional, Set, Tuple, Union, cast

Expand Down Expand Up @@ -42,7 +43,7 @@
from dagster.core.storage.tags import MEMOIZED_RUN_TAG
from dagster.core.types.dagster_type import DagsterType, DagsterTypeKind
from dagster.utils import ensure_gen, iterate_with_context
from dagster.utils.backcompat import experimental_functionality_warning
from dagster.utils.backcompat import ExperimentalWarning, experimental_functionality_warning
from dagster.utils.timing import time_execution_scope

from .compute import SolidOutputUnion
Expand Down Expand Up @@ -484,19 +485,25 @@ def _get_output_asset_materializations(
metadata_mapping[partition].append(entry)

for partition in asset_partitions:
yield AssetMaterialization(
asset_key=asset_key,
partition=partition,
metadata_entries=metadata_mapping[partition],
)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ExperimentalWarning)

yield AssetMaterialization(
asset_key=asset_key,
partition=partition,
metadata_entries=metadata_mapping[partition],
)
else:
for entry in all_metadata:
if isinstance(entry, PartitionMetadataEntry):
raise DagsterInvariantViolationError(
f"Output {output_def.name} got a PartitionMetadataEntry ({entry}), but "
"is not associated with any specific partitions."
)
yield AssetMaterialization(asset_key=asset_key, metadata_entries=all_metadata)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ExperimentalWarning)

yield AssetMaterialization(asset_key=asset_key, metadata_entries=all_metadata)


def _store_output(
Expand Down Expand Up @@ -574,15 +581,19 @@ def _gen_fn():
raise DagsterInvariantViolationError(
f"When handling output '{output_context.name}' of {output_context.solid_def.node_type_str} '{output_context.solid_def.name}', received a materialization with metadata, while context.add_output_metadata was used within the same call to handle_output. Due to potential conflicts, this is not allowed. Please specify metadata in one place within the `handle_output` function."
)

if manager_metadata_entries:
materialization = AssetMaterialization(
asset_key=materialization.asset_key,
description=materialization.description,
metadata_entries=manager_metadata_entries,
partition=materialization.partition,
tags=materialization.tags,
metadata=None,
)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ExperimentalWarning)

materialization = AssetMaterialization(
asset_key=materialization.asset_key,
description=materialization.description,
metadata_entries=manager_metadata_entries,
partition=materialization.partition,
tags=materialization.tags,
metadata=None,
)
yield DagsterEvent.asset_materialization(step_context, materialization, input_lineage)

asset_key, partitions = _asset_key_and_partitions_for_output(
Expand Down
5 changes: 1 addition & 4 deletions python_modules/libraries/dagster-aws/dagster_aws/s3/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
Field,
FileHandle,
In,
MetadataEntry,
MetadataValue,
Out,
Output,
Expand Down Expand Up @@ -75,9 +74,7 @@ def file_handle_to_s3(context, file_handle):

yield AssetMaterialization(
asset_key=s3_file_handle.s3_path,
metadata_entries=[
MetadataEntry(last_key(key), value=MetadataValue.path(s3_file_handle.s3_path))
],
metadata={last_key(key): MetadataValue.path(s3_file_handle.s3_path)},
)

yield Output(value=s3_file_handle, output_name="s3_file_handle")
34 changes: 16 additions & 18 deletions python_modules/libraries/dagster-dbt/dagster_dbt/rpc/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from requests import Response
from requests.exceptions import RequestException

from dagster import Failure, MetadataEntry, RetryRequested
from dagster import Failure, RetryRequested
from dagster.core.execution.context.compute import SolidExecutionContext


Expand Down Expand Up @@ -50,35 +50,33 @@ def raise_for_rpc_error(context: SolidExecutionContext, resp: Response) -> None:
elif error["code"] == DBTErrors.project_compile_failure_error.value:
raise Failure(
description=error["message"],
metadata_entries=[
MetadataEntry("RPC Error Code", value=str(error["code"])),
MetadataEntry("RPC Error Cause", value=error["data"]["cause"]["message"]),
],
metadata={
"RPC Error Code": str(error["code"]),
"RPC Error Cause": error["data"]["cause"]["message"],
},
)
elif error["code"] == DBTErrors.rpc_process_killed_error.value:
raise Failure(
description=error["message"],
metadata_entries=[
MetadataEntry("RPC Error Code", value=str(error["code"])),
MetadataEntry("RPC Signum", value=str(error["data"]["signum"])),
MetadataEntry("RPC Error Message", value=error["data"]["message"]),
],
metadata={
"RPC Error Code": str(error["code"]),
"RPC Signum": str(error["data"]["signum"]),
"RPC Error Message": error["data"]["message"],
},
)
elif error["code"] == DBTErrors.rpc_timeout_error.value:
raise Failure(
description=error["message"],
metadata_entries=[
MetadataEntry("RPC Error Code", value=str(error["code"])),
MetadataEntry("RPC Timeout", value=str(error["data"]["timeout"])),
MetadataEntry("RPC Error Message", value=error["data"]["message"]),
],
metadata={
"RPC Error Code": str(error["code"]),
"RPC Timeout": str(error["data"]["timeout"]),
"RPC Error Message": error["data"]["message"],
},
)
else:
raise Failure(
description=error["message"],
metadata_entries=[
MetadataEntry("RPC Error Code", value=str(error["code"])),
],
metadata={"RPC Error Code": str(error["code"])},
)


Expand Down

0 comments on commit 920c759

Please sign in to comment.