-
I'm trying to figure out how to run DAG steps in parallel while passing data between them through memory to minimize I/O latency. I've written the following code to test this scenario. When I run ./run_materialize.py, I noticed that although the data transfer between the DAG steps is fast (thanks to it being in-memory), the parallel parts within graph_asset_example are not running in parallel as expected. Is there a way to run these portions in parallel (multi-process) while keeping the data in-memory between steps to avoid additional I/O latency? Here is the content of ./tutorial/init.py: from dagster import (
asset,
define_asset_job,
AssetSelection,
Definitions,
FilesystemIOManager,
op,
# graph_asset,
graph,
DynamicOut,
DynamicOutput,
AssetsDefinition,
AssetKey,
)
from typing import List, Any
import random
import time
@asset()
def example_asset_0() -> int:
return 1
@asset(io_manager_key="fs_io_manager")
def example_asset_1(example_asset_0: int) -> int:
return example_asset_0 + 1
@asset()
def example_asset_2(example_asset_1: int) -> int:
return example_asset_1 + 1
@op(out=DynamicOut(int))
def return_dynamic(input_val: int):
print(input_val)
# outputs = []
for idx, page_key in enumerate(range(random.randint(5, 10))):
yield DynamicOutput(page_key, mapping_key=str(idx))
@op()
def op1(value: int) -> int:
time.sleep(10)
return value
@op()
def op2(value: int) -> int:
return value
@op()
def path_through(collected_list: List[Any]) -> List[Any]:
return collected_list
@graph()
def graph_asset_example(input_val: int):
result = path_through(return_dynamic(input_val).map(op1).map(op2).collect())
return result
dynamic_graph_asset = AssetsDefinition.from_graph(
graph_asset_example,
keys_by_input_name={"input_val": AssetKey("example_asset_2")},
keys_by_output_name={"result": AssetKey("dynamic_graph_asset_yeah")},
metadata_by_output_name={"result": {"num_records": 1}},
)
images_job = define_asset_job(name="images_job", selection=AssetSelection.all())
defs = Definitions(
assets=[
example_asset_0,
example_asset_1,
example_asset_2,
dynamic_graph_asset,
],
jobs=[
images_job,
],
resources={
"fs_io_manager": FilesystemIOManager(),
},
) And here is the content of ./run_materialize.py: import tutorial
from dagster import load_assets_from_modules, materialize_to_memory
all_assets = load_assets_from_modules([tutorial])
materialize_to_memory(all_assets) |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 7 replies
-
Hi @takerfume - it's not currently possible to run DAG steps in parallel while passing data in memory. This largely boils down to limitations with Python itself: Python's global interpreter lock makes it so that multithreaded Python code mostly cannot take advantage of underlying OS-level parallelism. #4041 is the issue where we're tracking adding multi-threaded in-process execution. The reason we haven't prioritized this so far is that, because of the global interpreter lock, it would only help with parallelism when most of the computation is delegated outside of Python. |
Beta Was this translation helpful? Give feedback.
-
Reopened discussion because closed discussions don't appear in search results |
Beta Was this translation helpful? Give feedback.
-
@takerfume thanks for sharing your code. I am facing a very similar problem and it will be helpful.
@sryza can you elaborate on how you use those hooks? My main concern is the scenario where an Op |
Beta Was this translation helpful? Give feedback.
Yes, I do think it would be possible to implement a custom IO manager that uses Redis as a backend for in-memory data exchange. You might be able to use op hooks for freeing the memory.