# Building Parametrized Pipelines

<div class="alert alert-info">

This tutorial is a sequel to `02_build_pipelines.ipynb` and will need artifacts and files stored from it. Please ensure `02_build_pipelines.ipynb` has been run before running this tutorial.

</div>

<div class="alert alert-info">

If you encounter issues you cannot resolve, simply ask in our [Slack community](https://join.slack.com/t/lineacommunity/shared_invite/zt-18kizfn3b-1Qu_HDT3ahGudnAwoFAw9Q)'s `#support` channel. We are always happy and ready to help you!

</div>

<div class="alert alert-info">

You can ignore `# NBVAL_*` comments in certain cell blocks. They are for passing unit tests only, which we do to make sure the examples are always functional as we update the codebase.

</div>

Oftentimes, data scientists/engineers need to run the same pipeline with different parameters. For instance, they may want to use a different data set for model training and/or prediction. To produce a parametrized pipeline, we can use pipeline API’s (optional) `input_parameters` argument.

In [1]:
# NBVAL_IGNORE_OUTPUT
import lineapy
lineapy.tag("Building Parameterized Pipelines Demo")

As a concrete example, consider the pipeline created in `02_build_pipelines.ipynb`, where we got an “inflexible” pipeline that has the data source (url) as a fixed value:

In [2]:
# NBVAL_IGNORE_OUTPUT
%cat ./output/pipeline_basics/iris_pipeline_module.py

import pandas as pd
from sklearn.linear_model import LinearRegression


def get_iris_preprocessed():
    url = "https://raw.githubusercontent.com/LineaLabs/lineapy/main/examples/tutorials/data/iris.csv"
    df = pd.read_csv(url)
    color_map = {"Setosa": "green", "Versicolor": "blue", "Virginica": "red"}
    df["variety_color"] = df["variety"].map(color_map)
    df["d_versicolor"] = df["variety"].apply(lambda x: 1 if x == "Versicolor" else 0)
    df["d_virginica"] = df["variety"].apply(lambda x: 1 if x == "Virginica" else 0)
    return df


def get_iris_model(df):
    mod = LinearRegression()
    mod.fit(
        X=df[["petal.width", "d_versicolor", "d_virginica"]],
        y=df["sepal.width"],
    )
    return mod


def run_session_including_iris_preprocessed():
    # Given multiple artifacts, we need to save each right after
    # its calculation to protect from any irrelevant downstream
    # mutations (e.g., inside other artifact calculations)
    import

Instead, we can run:

In [3]:
# NBVAL_IGNORE_OUTPUT

# Build an Airflow pipeline using artifacts
lineapy.to_pipeline(
    pipeline_name="iris_pipeline_parametrized",
    artifacts=["iris_preprocessed", "iris_model"],
    dependencies={"iris_model": {"iris_preprocessed"}},
    input_parameters=["url"],  # Specify variable(s) to parametrize
    output_dir="./output/pipeline_parametrization/",
    framework="AIRFLOW",
)

Generated module file: output/pipeline_parametrization/iris_pipeline_parametrized_module.py                                                                                                                  
Generated requirements file: output/pipeline_parametrization/iris_pipeline_parametrized_requirements.txt                                                                                                     
Generated DAG file: output/pipeline_parametrization/iris_pipeline_parametrized_dag.py                                                                                                                        
Generated Docker file: output/pipeline_parametrization/iris_pipeline_parametrized_Dockerfile                                                                                                                 


PosixPath('output/pipeline_parametrization')

to get a parametrized pipline, like so:

In [4]:
# NBVAL_IGNORE_OUTPUT
%cat ./output/pipeline_parametrization/iris_pipeline_parametrized_module.py

import argparse

import pandas as pd
from sklearn.linear_model import LinearRegression


def get_iris_preprocessed(url):
    df = pd.read_csv(url)
    color_map = {"Setosa": "green", "Versicolor": "blue", "Virginica": "red"}
    df["variety_color"] = df["variety"].map(color_map)
    df["d_versicolor"] = df["variety"].apply(lambda x: 1 if x == "Versicolor" else 0)
    df["d_virginica"] = df["variety"].apply(lambda x: 1 if x == "Virginica" else 0)
    return df


def get_iris_model(df):
    mod = LinearRegression()
    mod.fit(
        X=df[["petal.width", "d_versicolor", "d_virginica"]],
        y=df["sepal.width"],
    )
    return mod


def run_session_including_iris_preprocessed(
    url="https://raw.githubusercontent.com/LineaLabs/lineapy/main/examples/tutorials/data/iris.csv",
):
    # Given multiple artifacts, we need to save each right after
    # its calculation to protect from any irrelevant downstream
    # mutations (e.g., inside other artifact ca

As shown, we now have url factored out as an easily tunable parameter for the pipeline, which allows us to run it with various data sources beyond those we started with (hence increasing the pipeline’s utility).

Note that we get parametrization reflected in the framework-specific DAG file as well:

In [5]:
# NBVAL_IGNORE_OUTPUT
%cat ./output/pipeline_parametrization/iris_pipeline_parametrized_dag.py

import pathlib
import pickle

import iris_pipeline_parametrized_module
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


def dag_setup():
    pickle_folder = pathlib.Path("/tmp").joinpath("iris_pipeline_parametrized")
    if not pickle_folder.exists():
        pickle_folder.mkdir()


def dag_teardown():
    pickle_files = (
        pathlib.Path("/tmp").joinpath("iris_pipeline_parametrized").glob("*.pickle")
    )
    for f in pickle_files:
        f.unlink()


def task_iris_preprocessed(url):

    url = str(url)

    df = iris_pipeline_parametrized_module.get_iris_preprocessed(url)

    pickle.dump(df, open("/tmp/iris_pipeline_parametrized/variable_df.pickle", "wb"))


def task_iris_model():

    df = pickle.load(open("/tmp/iris_pipeline_parametrized/variable_df.pickle", "rb"))

    mod = iris_pipeline_parametrized_module.get_iris_model(df)

    pickle.dump(mod, open("/tm

Hence, we can easily modify pipeline runs in the target system (Airflow in this case).

<div class="alert alert-warning">

Currently, `input_parameters` only accepts variables from literal assignment such as `a = "123"`. For each variable to be parametrized, there should be only one literal assignment across all artifact code for the pipeline. For instance, if both `a = "123"` and `a = "abc"` exist in the pipeline's artifact code, we cannot make `a` an input parameter since its reference is ambiguous, i.e., we are not sure which literal assignment `a` refers to.

</div>