# Introduction to the Code-First Pipelines

In [1]:
import pandas as pd
from sklearn import datasets
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.metrics import ConfusionMatrixDisplay
import matplotlib.pyplot as plt

In [2]:
from cf_pipelines import Pipeline

A pipeline has a name – ideally the name of the project and a location. If you don't provide one, a temporary one will be selected fot you.

In [3]:
iris_pipeline = Pipeline("Iris pipeline", location='outputs')

## Steps

### First step – data ingestion

A pipeline is comprised of steps. To include a function as part of the pipeline you must use the `step` decorator, passing as an argument the group this particular step belongs to. For example, I have selected *"data_ingestion"* you can choose any name you want.

In [4]:
@iris_pipeline.step("data_ingestion")
def get_data():
    d = datasets.load_iris()
    df = pd.DataFrame(d["data"])
    df.columns = d["feature_names"]
    df["target"] = d["target"]
    return {"raw_data.csv": df}

A step can contain any Python code, but an important requisite is that they have to return a dictionary with at least one key-value pair:

 - The **key** can be any valid Python identifier (a combination of letters in lowercase or uppercase, digits and underscores), like "raw_data" followed by a file extension, like ".csv"
 - The **value** that should be associated with the key
 
These key value pairs are what we will know as a **product**.

The products a function returns will be available for each subsequent step within the pipeline.

#### Why do we need a file extension?

The extension will determine how the product is persisted on disk. Yes! the pipelines store a copy of each product ensuring there is full reproducibility and data provenance.

### A second step – feature engineering

Say I have a step that depends on a product generated by a previous step. In order for me to access it I would need to add it as a named argument for the function that needs it. For example, in the function below I want to use a dataframe from the previous step, I specify `raw_data` as a named argument, and then treat its value as I would do with any other dataframe. 

 > 🚨 Named arguments should appear after a single asterisk `*` in the function arguments)

In [5]:
@iris_pipeline.step("feature_engineering")
def create_feature_interactions(*, raw_data):
    ft = raw_data["sepal length (cm)"] * raw_data["sepal width (cm)"]
    df = pd.DataFrame({"sepal area (cm2)": ft})
    return {"engineered_features.parquet": df}

¿See?, I am returning yet another product with a different file extension and it will be stored and available for me to consume in the next steps.

## Third step – ???

In the next step I want to demonstrate two things:

 - A group name can be totally unrelated to machine learning (and can be amost any string), see the decorator.
 - A step can consume the products generated by any other previous steps, not necessarily only the previous one, in the function below I am using both `raw_data` and `engineered_features`

In [6]:
@iris_pipeline.step("another_name")
def join_features_and_split(*, raw_data, engineered_features):
    training_data = raw_data.join(engineered_features)
    
    x = training_data.drop("target", axis="columns")
    y = training_data[["target"]]
    
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.33, random_state=42)

    return {
        "train_x.csv": x_train,
        "test_x.csv": x_test,
        "train_y.csv": y_train,
        "test_y.csv": y_test,
    }

## Other steps – training and testing

I have shown you almost all the cool bits of the `Pipelines` framework, but there is some more machine learning left to do 

In [7]:
@iris_pipeline.step("training")
def model_training(*, train_x, train_y):
    clf = RandomForestClassifier()
    clf.fit(train_x, train_y["target"])
    y_pred = clf.predict(train_x)
    return {"model": clf}

As you can see, the above product does not specify an extension, which means the product will be persisted using the pickle module. This may bot be the best way to store something on disk, so my recommendation is that you always try to specify an extension.

In [8]:
@iris_pipeline.step("model_testing")
def test_my_model(*, test_x, test_y, model):
    y_pred = model.predict(test_x)
    fig = plt.figure(figsize=(5, 5))
    ax = fig.gca()
    ConfusionMatrixDisplay.from_estimator(model, test_x, test_y, ax=ax)
    return {"matrix.png": fig}

Finally, we can execute the pipeline by using its `run` method.

In [14]:
iris_pipeline.run()

0it [00:00, ?it/s]

## Reviewing the outputs

In [10]:
!tree outputs

zsh:1: command not found: tree


In [11]:
!head outputs/another_name/test_x.csv

head: outputs/another_name/test_x.csv: No such file or directory


### Making predictions with the trained model

In [12]:
import pickle

with open("outputs/training/model", "rb") as rb:
    classifier = pickle.load(rb)

FileNotFoundError: [Errno 2] No such file or directory: 'outputs/training/model'

In [None]:
new_instance = {
    "sepal length (cm)": 6.0,
    "sepal width (cm)": 3.1,
    "petal length (cm)": 3.8,
    "petal width (cm)": 0.5,
    "sepal area (cm2)": 18.6,
}
instance = pd.DataFrame([new_instance])

In [None]:
classifier.predict(instance)