
A task that represents a subflow will be annotated as such in its `state_details` via the presence of a `child_flow_run_id` field.  A subflow can be identified via the presence of a `parent_task_run_id` on `state_details`.

In [9]:

def is_notebook() -> bool:
    try:
        shell = get_ipython().__class__.__name__
        if shell == 'ZMQInteractiveShell':
            return True   # Jupyter notebook or qtconsole
        elif shell == 'TerminalInteractiveShell':
            return False  # Terminal running IPython
        else:
            return False  # Other type (?)
    except NameError:
        return False      # Probably standard Python interpreter

print(is_notebook())

True


In [None]:
one = 1
two = 2
num = one + two


# Read from pickle

In [None]:
# import anyio
# from prefect import flow
# from prefect.results import ResultFactory
# from prefect.context import FlowRunContext
# from prefect.client.orion import get_client
# from prefect.task_runners import ConcurrentTaskRunner
# from contextlib import AsyncExitStack
# import cloudpickle
# with open(flow_run_context_filename, "rb") as f:
#     flow_run_context_kwargs = cloudpickle.load(f)

# stack = AsyncExitStack()
# client = await stack.enter_async_context(get_client())
# background_tasks = await stack.enter_async_context(
#     anyio.create_task_group()
# )
# flow_run_context = FlowRunContext(
#     client=client,
#     background_tasks=background_tasks,
#     **flow_run_context_kwargs
# )
# print(flow_run_context)

# Dummy flow and flow_run_id will be injected as a param

In [None]:
import anyio
from prefect import flow
from prefect.results import ResultFactory
from prefect.context import FlowRunContext
from prefect.client.orion import get_client
from prefect.task_runners import ConcurrentTaskRunner
from contextlib import AsyncExitStack


@flow
async def dummy_flow():
    ...

stack = AsyncExitStack()
client = await stack.enter_async_context(get_client())

dummy_result_factory = await ResultFactory.from_flow(dummy_flow, client=client)
flow_run = await client.read_flow_run(flow_run_id)
background_tasks = await stack.enter_async_context(
    anyio.create_task_group()
)
sync_portal = stack.enter_context(anyio.start_blocking_portal())
flow_run_context = FlowRunContext(
    flow=dummy_flow,  # dummy flow,
    flow_run=flow_run,  # the desired flow run
    client=client,
    result_factory=dummy_result_factory,
    task_runner=ConcurrentTaskRunner(),
    background_tasks=background_tasks,
    sync_portal=sync_portal
)

# Make context available through whole notebook 

In [None]:
flow_run_context.__enter__()

# Run subflow

In [None]:
from prefect import flow, get_run_logger
from prefect.context import FlowRunContext

@flow
def inner_flow():
    logger = get_run_logger()
    flow_run_context = FlowRunContext.get()
    logger.info(str(flow_run_context))
    logger.info("IT RAN")

inner_flow()

#  Exit to stop pending

In [None]:
flow_run_context.__exit__()

# idea for injecting code into notebook

In [None]:
import nbformat
from papermill.iorw import papermill_io
from papermill import execute_notebook

notebook_path = "test_notebook.ipynb"

nb = nbformat.reads(papermill_io.read(notebook_path), as_version=4)
# add injected code here...
# then run
execute_notebook(nb, None)