## Data Pipeline Orchestration with Dagster Ops

In this notebook we will get to know the basics of dagster Ops. Therefore, we will create a simple training pipeline to train an XGBoost classifier.

Dagsters definition of Ops:
> Ops are the core unit of computation in Dagster. The computational core of a software-defined asset is an op. 
> An individual op should perform relatively simple tasks, such as:
> * Deriving a dataset from other datasets
> * Executing a database query
> * Initiating a Spark job in a remote cluster
> * Querying an API and storing the result in a data warehouse
> * Sending an email or Slack message

Based on the data created in the dagster assets exercise, we want to derive training and test data, train the classifier, create a prediction for the test data, and finally create an analysis to determine how well the classifier performs on the test data. 

Therefore, we planned a small training pipeline wich will perform the following steps:

1. Split data into subsets -> Create a training and test dataset (`split_data` OP)
2. Train classifier -> Fit a XGBoost classifier (`train` OP)
3. Create predictions -> Use the classifier to create predictions for the test data (`predict` OP)
4. Analyse predictions -> Create a confusion matrix and a classification report for the predictions (`analyze` OP)

The code for these tasks is already provided. All you need to do is put their logic together in the form of a dagster op job.

After the definitions are complete, we will have a look at the dagster UI and run the op job.

Here are the imports, we will need for the whole task. You can ignore them for now.

In [None]:
from typing import Tuple

import matplotlib.pyplot as plt
import mlflow
import numpy as np
import pandas as pd
from dagster import AssetKey, Config, Definitions, In, OpExecutionContext, Out, job, op
from dagster_mlflow import end_mlflow_on_run_finished, mlflow_tracking
from pydantic import Field
from sklearn.metrics import (
    ConfusionMatrixDisplay,
    classification_report,
    confusion_matrix,
)
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
from xgboost.callback import TrainingCallback

## 1) Split data into subsets
 The function 'split_data' contains the logic, with wich the subsets are generated.
 `SplitDataConfig` is a dagster config class which makes it possible to adjust the configuration of the op. You will see, that this config, can be modified via the dagster UI, without changing the underlying code.
 
 By adding the `op` decorator to the `split_data` function, you define it as a dagster op. Please add the decorator.
 We want to give some more information about the op. As you can see, the function generates a total of four output values. Please define these output values as `Out`s for the op. You can also do this with `out` parameter of the op decorator that is dictionary (e.g. `"test_value":Out()`).
 The output parameter should have the following naming to stay consistent with the following ops:
 `input_train`, `input_test`, `target_train`, `target_test`, `target_names`.
 

It could also be that the separator (`sep`) used to read the CSV files changes over time. Please add the `separator` as an additional configuration parameter to the `SplitDataConfig` and use the parameter in the `split_data` function. 

In [None]:
class SplitDataConfig(Config):
    data_path: str = Field(
        description="File path of the input data",
        default="./data/genres_standardized.csv",
    )
    target_column: str = Field(
        description="Column name of the target column", default="genre"
    )
    test_set_size: float = Field(
        description="Size of the test set in percentage", default=0.2
    )
    seperator: str = Field(
        description="Seperator that should be used to load the data as a DataFrame",
        default=";",
    )

In [None]:
@op(
    out={
        "input_train": Out(),
        "input_test": Out(),
        "target_train": Out(),
        "target_test": Out(),
        "target_names": Out(),
    },
    # ...
)
def split_data(
    config: SplitDataConfig,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series, pd.Series]:
    data = pd.read_csv(config.data_path, sep=config.seperator)
    columns = list(data.columns)
    columns.remove(config.target_column)
    data[config.target_column] = data[config.target_column].astype("category")
    mlflow.log_param(key="test_size", value=config.test_set_size)
    data["target"] = data[config.target_column].cat.codes
    X_train, X_test, y_train, y_test = train_test_split(
        data[columns], data["target"], test_size=config.test_set_size
    )
    return X_train, X_test, y_train, y_test, data[config.target_column]

## 2)Train classifier
The `train_classifier` method uses the previously created subsets and the associated targets to create an XGBoost classifier. The config class `TrainConfig` is also used here to configure the ‘Train_classifier' op. For a better understanding, please add the inputs (`In`) `input_train` and `target_train` to the op decorator. 

In [None]:
class TrainConfig(Config):
    number_of_estimators: int = Field(description="Number of boosting rounds")
    learning_rate: float = Field(description="Boosting learning rate", default=0.1)
    max_depth: int = Field(
        description="Maximum tree depth for base learners", default=8
    )
    min_child_weight: float = Field(
        description="Minimum sum of instance weight(hessian) needed in a child",
        default=1,
    )
    gamma: float = Field(
        description="Minimum loss reduction required to make a further partition on a leaf node of the tree",
        default=0,
    )
    number_of_jobs: int = Field(
        description="Number of parallel threads used to run xgboost", default=4
    )

In [None]:
class MlflowCallback(TrainingCallback):
    def after_iteration(self, model, epoch, evals_log) -> bool:
        for data, metric in evals_log.items():
            for metric_name, log in metric.items():
                mlflow.log_metric(
                    key=metric_name, value=sum(log) / len(log), step=epoch
                )
        return False

In [None]:
@op(
    ins={
        "input_train": In(),
        "target_train": In(),
    },
    out={"classifier": Out()},
    # ...
)
def train_classifier(
    config: TrainConfig, input_train: pd.DataFrame, target_train: pd.Series
) -> XGBClassifier:
    model = XGBClassifier(
        learning_rate=config.learning_rate,
        n_estimators=config.number_of_estimators,
        max_depth=config.max_depth,
        min_child_weight=config.min_child_weight,
        gamma=config.gamma,
        n_jobs=config.number_of_jobs,
        callbacks=[MlflowCallback()],
    )
    model.fit(
        input_train, target_train, eval_set=[(input_train, target_train)], verbose=False
    )
    mlflow.xgboost.log_model(model, "spotify_genre_classifier")
    return model

## 3) Create predictions
The classifier created in the `train_classifier` op is used in the `predict` function to create predictions for `input_test` from the `split_data` op. 
To make it clear to other users of the pipeline in the Dagster UI what exactly happens in this op, please add a docstring(`""" """`) to the function. 

The docstring could look like this: 

`In this project, an XGBoost classifier is used to generate predictions for a test set from the Spotify genres dataset.` 

In [None]:
@op(ins={"classifier": In(), "input_test": In()}, out={"predictions": Out()})
def predict(classifier: XGBClassifier, input_test: pd.DataFrame) -> np.ndarray:
    """In this project, an XGBoost classifier is used to generate predictions for a test set from the Spotify genres dataset."""
    predictions = classifier.predict(input_test)
    return predictions

In [None]:
class AnalyzeConfig(Config):
    confusion_matrix_path: str = Field(default="./data/confusion_materix.png")
    report_path: str = Field(default="./data/classification_report.csv")

## 4) Analyze predictions
Finally, the generated predictions must be analyzed so that the classifier can be evaluated. The analysis is performed by the `analyze` function. 
In contrast to the other op's, this op has a parameter `context` of the type `OpExecutionContext`. It is possible to access values and functions of the execution of the op via this context. A logger (`context.log`) is also available via the context, which can be used to log something during the execution of the op. 

Please log the accuracy (`df_classification_report.loc["accuracy"].mean()`) via the context logger. 



In [None]:
@op(
    ins={"target_test": In(), "predictions": In(), "target_names": In()},
    # ...
)
def analyze(
    context: OpExecutionContext,
    config: AnalyzeConfig,
    target_test: pd.Series,
    predictions: np.ndarray,
    target_names: pd.Series,
):
    target_test = np.asarray(target_test)
    category_labels = target_names.cat.categories
    fig, ax = plt.subplots(figsize=(10, 10))
    ConfusionMatrixDisplay.from_predictions(
        target_test, predictions, ax=ax, display_labels=category_labels
    )
    ax.tick_params(axis="x", labelrotation=70, labelbottom=True)
    fig.savefig(config.confusion_matrix_path, pad_inches=20)
    report = classification_report(target_test, predictions, output_dict=True)
    df_classification_report = pd.DataFrame(report).transpose()
    df_classification_report.to_csv(config.report_path)
    context.log.info("Accuracy: %s", df_classification_report.loc["accuracy"].mean())
    mlflow.log_artifact("./data/classification_report.csv")
    mlflow.log_artifact("./data/confusion_materix.png")

## 5) Create a op job
To define a job from the individual ops, the ops (functions) must be linked to each other via their return values. 
Create an op job that uses the previously created ops. 

> *Note*: The parameters `config` and `context` do not have to be set when calling the ops. dagster will do this for us later. 

First call the `split_data` op and save the return values in variables so that you know later which subsets and targets are behind which variables. 

Then call the method "train_classifier" and pass the parameters `input_train` and `target_train`. Save the return value (`classifier`) in a variable as well. 

Proceed in the same way with the ops `predict` and `analyze`. 

In [None]:
# ...
@job(# ...)
def spotify_genre_classification():
    input_train, input_test, target_train, target_test, target_names = split_data()
    classifier = train_classifier(input_train=input_train, target_train=target_train)
    predictions = predict(classifier=classifier, input_test=input_test)
    analyze(target_test=target_test, predictions=predictions, target_names=target_names)

In [None]:
defs = Definitions(
    jobs=[spotify_genre_classification], resources={"mlflow": mlflow_tracking}
)

## 6) Start the job via the Dagster UI
Open the [Dagster UI](http://localhost:3000)
> **_NOTE:_** To ensure that the latest code is used, update the code location (Deployment -> `dagster_exercise_ops_jobs.py` -> Reload).


You will see the `Overview` page by default. Click on the `Jobs` tab and open the `spotify_genre_classification` job again. In addition to the `Overview` tab, there is also a `Launchpad` tab. Open the Launchpad. You should see something like that: 
![](./data/assets/dagster_ui_ops_job.png)

You can start the job using the `Launch Run` button in the bottom right-hand corner. Start the job. As soon as the job has run successfully, you should also see your logged accuracy relatively far down in the events. You can find the created confusion matrix and the other metrics here in the jupyter lab in the `data` folder. 

## 7) Adjust train config 
The pipelines can be easily configured via the Dagster UI. 

First remove any parameter from the configuration (e.g. `seperator`). Then the 'Scaffold all default config' button will be activated. Click that button. The deleted entry is added again with the default value from the source code. 

Now edit the `TrainConfig` in the source code. 
Remove the default value for the parameter `number_of_estimators`. Save this notebook.

In the Launchpad of the Dagster UI, you will find a small reload button to the right of the job title (`spotify_genre_classification`) with which you can update the code. Update the code and remove the `number_of_estimators` parameter from the Launchpad. 

![](./data/assets/dagster_ui_ops_job_missing_config.png)

You will notice that Dagster displays an error that a configuration entry is missing. Unlike a parameter with a default value, Dagster cannot execute the job without the `number_of_estimators` parameter. If you click on the `Scaffold missing config` button, dagster adds the values to the launchpad and initializes it with the value `0``.