How to make @graph_asset return Output or equivalent? #22368
Replies: 3 comments 1 reply
-
I think that the best way to go about this is creating an I/O Manager for the job:
|
Beta Was this translation helpful? Give feedback.
-
Ran into the same problem. It would be very nice if If you need different metadata per asset_graph a different solution is to add a final metadata
|
Beta Was this translation helpful? Give feedback.
-
I think you can just return an Output with the metadata from whatever the last op is, and the asset's materialization value will be that output with the metadata. Another thing that might be confusing you is that you don't need to put your graph_asset inside another asset - @asset(
description='Extracts the category links from the supermarket website.',
compute_kind='python',
group_name='01_extract'
)
def get_category_links(context: AssetExecutionContext, supermarket: SupermarketResource) -> Output[List[str]]:
# does some computation
return Output(
category_urls, # ['example.com/alpha', 'example.com/beta', ...]
metadata={
"n_categories": len(category_urls),
"preview": category_urls[:5],
}
)
@op(out=DynamicOut(str))
def split_links(context: OpExecutionContext, category_links: List[str]):
for idx, link in enumerate(category_links[:5]):
yield DynamicOutput(value=link, mapping_key=f'link_{idx}')
@op
def retrieve_page(context: OpExecutionContext, link: str) -> List[str]:
context.log.info(f"Retrieving link: {link}")
response = requests.get(link)
# process each page, get a list of products ['Product A', 'Product B', ...]
return products
@op
def consolidate_links(context: OpExecutionContext, results: List[List[str]]) -> List[List[str]]:
context.log.info(f"Consolidated Results: {results}")
return Output(
results,
metadata={
"n_items": len(answer),
"preview": answer[:5],
}
)
@graph_asset(
description='Dynamic Mapping Graph',
group_name='02_transform'
)
def transform_links(get_category_links: List[str]) -> List[List[str]]:
links = split_links(get_category_links)
results = links.map(retrieve_page)
return consolidate_links(results.collect())
# Not necessary, won't do anything except return a function handle, which might break on output. transform_links is already an asset
#@asset(
# description='Extracts the product links from the supermarket website.',
# compute_kind='python',
# group_name='03_load'
#)
#def load_links(context: AssetExecutionContext, transform_links: List[List[str]]):
# return transform_links
#``` |
Beta Was this translation helpful? Give feedback.
-
Hi,
So I have a very simple scraper that I would like to run. E -> T -> L, but the Transformation step is a
@graph_asset
computed from a couple of@ops
. I would like the result of my Transform step to be typeOutput
withmetadata
however, I'm getting an error if I add it to my Transform step.How can I write something that looks similar to:
So that I can annotate and get metadata at every step of my pipeline?
Beta Was this translation helpful? Give feedback.
All reactions