In [None]:
# See https://docs.kedro.org/en/stable/notebooks_and_ipython/kedro_and_notebooks.html
# %load_ext kedro.ipython

In [None]:
# catalog.list()

In [1]:
# <project_root>/register_prefect_flow.py
import click

In [5]:
from dagster import get_dagster_logger


def kedro_init(
    pipeline_name: str,
    project_path: Path,
    env: str,
):
    """
    Initializes a Kedro session and returns the DataCatalog and KedroSession.


    """
    # bootstrap project within task / flow scope

    logger = get_dagster_logger()
    logger.info("Bootstrapping project")
    bootstrap_project(project_path)

    session = KedroSession.create(
        project_path=project_path,
        env=env,
    )
    # Note that for logging inside a Prefect task logger is used.
    logger.info("Session created with ID %s", session.session_id)
    pipeline = pipelines.get(pipeline_name)
    logger.info("Loading context...")
    context = session.load_context()
    catalog = context.catalog
    logger.info("Registering datasets...")
    unregistered_ds = pipeline.datasets() - set(catalog.list())
    for ds_name in unregistered_ds:
        catalog.add(ds_name, MemoryDataset())
    return {"catalog": catalog, "sess_id": session.session_id}

In [22]:
execution_config = kedro_init(
    pipeline_name="data_processing", project_path="../", env=None
)

In [26]:
catalog = execution_config["catalog"]
catalog.list()


[1m[[0m
    [32m'companies'[0m,
    [32m'reviews'[0m,
    [32m'shuttles'[0m,
    [32m'preprocessed_companies'[0m,
    [32m'preprocessed_shuttles'[0m,
    [32m'model_input_table'[0m,
    [32m'regressor'[0m,
    [32m'parameters'[0m,
    [32m'params:model_options'[0m,
    [32m'params:model_options.test_size'[0m,
    [32m'params:model_options.random_state'[0m,
    [32m'params:model_options.features'[0m
[1m][0m

In [30]:
pipelines["__default__"].nodes


[1m[[0m
    [1;35mNode[0m[1m([0mpreprocess_companies, [32m'companies'[0m, [32m'preprocessed_companies'[0m, [32m'preprocess_companies_node'[0m[1m)[0m,
    [1;35mNode[0m[1m([0mpreprocess_shuttles, [32m'shuttles'[0m, [32m'preprocessed_shuttles'[0m, [32m'preprocess_shuttles_node'[0m[1m)[0m,
    [1;35mNode[0m[1m([0mcreate_model_input_table, [1m[[0m[32m'preprocessed_shuttles'[0m, [32m'preprocessed_companies'[0m, [32m'reviews'[0m[1m][0m, [32m'model_input_table'[0m, [32m'create_model_input_table_node'[0m[1m)[0m,
    [1;35mNode[0m[1m([0msplit_data, [1m[[0m[32m'model_input_table'[0m, [32m'params:model_options'[0m[1m][0m, [1m[[0m[32m'X_train'[0m, [32m'X_test'[0m, [32m'y_train'[0m, [32m'y_test'[0m[1m][0m, [32m'split_data_node'[0m[1m)[0m,
    [1;35mNode[0m[1m([0mtrain_model, [1m[[0m[32m'X_train'[0m, [32m'y_train'[0m[1m][0m, [32m'regressor'[0m, [32m'train_model_node'[0m[1m)[0m,
    [1;35mNode[0m[1m([0mev

In [25]:
from kedro.io import DataCatalog, MemoryDataset

memory_assets = {
    asset_name: asset
    for asset_name in catalog.list()
    if isinstance(asset, MemoryDataset)
}
memory_assets

In [20]:
from pydantic import BaseModel


# Define the first Pydantic model
class ModelA(BaseModel):
    field1: str
    field2: int


# Define the second Pydantic model
class ModelB(BaseModel):
    field3: float
    field4: bool


class CombinedModel(ModelA, ModelB):
    pass


# Example usage
combined_instance = CombinedModel(field1="example", field2=42, field3=3.14, field4=True)

# Print the instance of the combined model
print(combined_instance)

field3=3.14 field4=True field1='example' field2=42


In [27]:
combined_instance.__annotations__

[1m{[0m[32m'field1'[0m: [1m<[0m[1;95mclass[0m[39m [0m[32m'str'[0m[39m>, [0m[32m'field2'[0m[39m: <class [0m[32m'int'[0m[1m>[0m[1m}[0m

In [None]:
def init_kedro_tasks_by_execution_layer(
    pipeline_name: str,
    execution_config: Union[None, Dict[str, Union[DataCatalog, str]]] = None,
) -> List[List[Callable]]:
    """
    Inits the Kedro tasks ordered topologically in groups, which implies that an earlier group
    is the dependency of later one.

    Args:
        pipeline_name (str): The pipeline name to execute
        execution_config (Union[None, Dict[str, Union[DataCatalog, str]]], optional):
        The required execution config for each node. Defaults to None.

    Returns:
        List[List[Callable]]: A list of topologically ordered task groups
    """

    pipeline = pipelines.get(pipeline_name)

    execution_layers = []

    # Return a list of the pipeline nodes in topologically ordered groups,
    #  i.e. if node A needs to be run before node B, it will appear in an
    #  earlier group.
    for layer in pipeline.grouped_nodes:
        execution_layer = []
        for node in layer:
            # Use a function for task instantiation which avoids duplication of
            # tasks
            task = instantiate_task(node, execution_config)
            execution_layer.append(task)
        execution_layers.append(execution_layer)

    return execution_layers

In [None]:
@flow(name="my_flow")
def my_flow(pipeline_name: str, env: str):
    logger = get_dagster_logger()  # Use context?
    project_path = Path.cwd()

    metadata = bootstrap_project(project_path)
    logger.info("Project name: %s", metadata.project_name)

    logger.info("Initializing Kedro...")
    execution_config = kedro_init(
        pipeline_name=pipeline_name, project_path=project_path, env=env
    )

    logger.info("Building execution layers...")
    execution_layers = init_kedro_tasks_by_execution_layer(
        pipeline_name, execution_config
    )

    for layer in execution_layers:
        logger.info("Running layer...")
        for node_task in layer:
            logger.info("Running node...")
            node_task()

In [None]:
@click.command()
@click.option("-p", "--pipeline", "pipeline_name", default="__default__")
@click.option("--env", "-e", type=str, default="base")
@click.option("--deployment_name", "deployment_name", default="example")
@click.option("--work_pool_name", "work_pool_name", default="default")
@click.option("--work_queue_name", "work_queue_name", default="default")
@click.option("--version", "version", default="1.0")
def dagster_deploy(
    pipeline_name, env, deployment_name, work_pool_name, work_queue_name, version
):
    """Register a Kedro pipeline as a Dagster job."""

    # Pipeline name to execute
    pipeline_name = pipeline_name or "__default__"

    # Use standard deployment configuration for local execution. If you require a different
    # infrastructure, check the API docs for Deployments at: https://docs.prefect.io/latest/api-ref/prefect/deployments/
    deployment = Deployment.build_from_flow(
        flow=my_flow,
        name=deployment_name,
        path=str(Path.cwd()),
        version=version,
        parameters={
            "pipeline_name": pipeline_name,
            "env": env,
        },
        infra_overrides={"env": {"PREFECT_LOGGING_LEVEL": "DEBUG"}},
        work_pool_name=work_pool_name,
        work_queue_name=work_queue_name,
    )

    deployment.apply()