You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hello, I've discovered that inside the multi_asset, I can use async functions. It appears to be a substitute for the async executor. I'm attempting to migrate a portion of our transformation into the multi_asset, but I'm unsure how to handle Exceptions. Here is an example:
import asyncio
from dagster import AssetKey, AssetOut, Output, multi_asset
src = {
"a": 2,
"b": 3,
"c": 2,
"d": 1,
"e": 11,
}
outs = {t: AssetOut() for t, _ in src.items()}
internal_asset_deps = {
"a": set(),
"b": {AssetKey("a")},
"c": {AssetKey("b")},
"d": {AssetKey("b"), AssetKey("c")},
"e": set(),
}
async def fn(var: str, sleep_time: int):
if var == "b":
raise ValueError("Exemple Error")
await asyncio.sleep(sleep_time)
return var
@multi_asset(outs=outs, internal_asset_deps=internal_asset_deps)
async def assets(context):
success = True
for t, i in src.items():
try:
await asyncio.create_task(fn(t, i))
context.log.info(t)
yield Output(None, output_name=t)
except Exception:
success = False
if not success:
raise ValueError("Some assets failed")
I am using internal_asset_deps to specify dependencies between internal assets. But what happens when e.g. b asset fails; I cannot rise an exception in fn since whole op fails (and e.g. asset e will fail, even it is not necessary). If I catch exceptions, than asset c and d are Unsynced but Materialized:
Is there any better way how to work with exceptions in multi_assets?
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Hello, I've discovered that inside the
multi_asset
, I can use async functions. It appears to be a substitute for the async executor. I'm attempting to migrate a portion of our transformation into themulti_asset
, but I'm unsure how to handle Exceptions. Here is an example:I am using
internal_asset_deps
to specify dependencies between internal assets. But what happens when e.g.b
asset fails; I cannot rise an exception infn
since wholeop
fails (and e.g. assete
will fail, even it is not necessary). If I catch exceptions, than assetc
andd
areUnsynced
butMaterialized
:Is there any better way how to work with exceptions in
multi_assets
?Thanks a lot!!!
Beta Was this translation helpful? Give feedback.
All reactions