<div class='bar_title'></div>

*Enterprise AI*

# Assignment 2 - ZenML & Feature Engineering
Gunther Gust / Justus Ameling<br>
Chair of Enterprise AI

Summer Semester 24

<img src="https://github.com/GuntherGust/tds2_data/blob/main/images/d3.png?raw=true" style="width:20%; float:left;" />

In this assignment we look at ZenML for the first time. ZenML is an open source machine learning lifecycle management tool that aims to simplify the workflow from development to production. It enables data scientists and engineers to create reproducible pipelines that ensure consistent and comparable experiments. With its modular architecture, ZenML supports the integration of different ML tools and frameworks, making it a flexible solution for a wide range of deployment scenarios.

Our objective for this assignment is to develop a training pipeline to predict whether it will rain in Perth, Australia, tomorrow. To achieve this, our pipeline will load the necessary data, perform feature engineering, build a model, and evaluate it.

First of all, before we can start setting up our pipeline, we need to run three commands.
- The first one is `!rm -rf .zen'. This command will remove the .zen folder and all its subfolders. So we use this command to start with a fresh zenML repository.
- The `!zenml init' command creates this repo for us by creating a folder that contains a YAML file at the beginning.
- The last command `!zenml integration install sklearn -y` is used to install the sklearn library with the appropriate version to be used with ZenML.
    - ZenML is capable of working with several libraries, and Sklearn is just one of them - <a href="https://www.zenml.io/integrations">see more</a>

In [None]:
!rm -rf .zen
!zenml init
!zenml integration install sklearn -y

## Data Inspection

First of all, we need to understand our dataset. Thus, let us load the  `Weather_Perth.csv` file with the pandas library and apply the three following descriptive measures:
- Print the first 5 lines with the head command
- Print the dtypes of all columns
- Print the number of null values per column

In [18]:
import pandas as pd
pd.set_option('display.max_columns', None) #This command allows us to inspect all the columns

In [19]:
data = pd.read_csv("Weather_Perth.csv")
data.head()

Unnamed: 0,Date,MinTemp,MaxTemp,Rainfall,Evaporation,Sunshine,WindGustDir,WindGustSpeed,WindDir9am,WindDir3pm,WindSpeed9am,WindSpeed3pm,Humidity9am,Humidity3pm,Pressure9am,Pressure3pm,Cloud9am,Cloud3pm,Temp9am,Temp3pm,RainTomorrow
0,2008-07-01,2.7,18.8,0.0,0.8,9.1,ENE,20.0,,E,0.0,7.0,97.0,53.0,1027.6,1024.5,2.0,3.0,8.5,18.1,No
1,2008-07-02,6.4,20.7,0.0,1.8,7.0,NE,22.0,ESE,ENE,6.0,9.0,80.0,39.0,1024.1,1019.0,0.0,6.0,11.1,19.7,No
2,2008-07-03,6.5,19.9,0.4,2.2,7.3,NE,31.0,,WNW,0.0,4.0,84.0,71.0,1016.8,1015.6,1.0,3.0,12.1,17.7,Yes
3,2008-07-04,9.5,19.2,1.8,1.2,4.7,W,26.0,NNE,NNW,11.0,6.0,93.0,73.0,1019.3,1018.4,6.0,6.0,13.2,17.7,Yes
4,2008-07-05,9.5,16.4,1.8,1.4,4.9,WSW,44.0,W,SW,13.0,17.0,69.0,57.0,1020.4,1022.1,7.0,5.0,15.9,16.0,Yes


In [20]:
data.dtypes

Date              object
MinTemp          float64
MaxTemp          float64
Rainfall         float64
Evaporation      float64
Sunshine         float64
WindGustDir       object
WindGustSpeed    float64
WindDir9am        object
WindDir3pm        object
WindSpeed9am     float64
WindSpeed3pm     float64
Humidity9am      float64
Humidity3pm      float64
Pressure9am      float64
Pressure3pm      float64
Cloud9am         float64
Cloud3pm         float64
Temp9am          float64
Temp3pm          float64
RainTomorrow      object
dtype: object

In [21]:
data.isna().sum()

Date               0
MinTemp            0
MaxTemp            1
Rainfall           0
Evaporation        1
Sunshine           5
WindGustDir        5
WindGustSpeed      5
WindDir9am       134
WindDir3pm         7
WindSpeed9am       0
WindSpeed3pm       1
Humidity9am        9
Humidity3pm        8
Pressure9am        1
Pressure3pm        1
Cloud9am           2
Cloud3pm           4
Temp9am            0
Temp3pm            1
RainTomorrow       0
dtype: int64

## ZenML

Now that we have a good understanding of our data, we can begin developing our Pipeline. A ZenML **pipeline** comprises of multiple **steps** that are necessary for a successful ML process. Each step in the pipeline represents a different task, such as data loading, feature engineering, or model tuning. We can define these steps by writing functions that adhere to the framework's requirements. Once we've constructed all of the steps, we can combine them into a single Python function that represents our pipeline.

Let's begin by importing some more packages, and then let us look at the first step of our future pipeline in more detail.

In [22]:
from zenml import pipeline, step
from typing_extensions import Annotated
from typing import Tuple
from sklearn.base import ClassifierMixin
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, MinMaxScaler
from sklearn.linear_model import LogisticRegression

Our first step is the `loading_data` function, which loads our CSV file using the panda's library. To understand the concept of ZenML better, we will drill down all the parts from top to bottom.

1. *Decorator*: `@step(enable_cache=False)`
    - The decorator will wrap our function in the background into another function and thus transform it into a step.
    - Steps have some additional functionality, and one of them is caching, which allows us to skip the execution of a function when no new information is available.
        - For this function, we have turned off caching since it cannot prove if external objects have changed <a href="https://docs.zenml.io/user-guide/starter-guide/cache-previous-executions"> see more</a>
2. *Function Header*: `def loading_data(filename: str) -> Annotated[pd.DataFrame,"data_loader"]:`
    - The next interesting thing about ZenML is that it stores all inputs and outputs of our steps as <a href="https://docs.zenml.io/getting-started/core-concepts#artifacts">Artifacts</a>. For this purpose, ZenML wants to know the type of arguments - <a href="https://docs.python.org/3/library/typing.html"> see more</a>
    - Our input parameter `filename` has the type String (str) 
    - Our return value of the function is a `pd.DataFrame`, and we have additionally given it the name `input_data` so that we can retrieve it later faster.
3. *Function Body*: 
    - We use the already-known pandas function to retrieve our CSV file. After that, we return the constructed DataFrame.

In [23]:
@step(enable_cache=False)
def loading_data(filename: str) -> Annotated[pd.DataFrame,"input_data"]:
    """ Loads a CV File and transforms it to a Pandas DataFrame
    """
    data = pd.read_csv(filename,index_col="Date")
    return data

Our next step will use the loaded DataFrame and an attribute `label` to split our data into a train and test set. Therefore, we can use the `train_test_split` method from the sklearn library. Additionally, since our data is sorted by date, we want the last 20% of the data as our test set. To avoid shuffling the data, which is a default option for the train_test_split method, we must pass the argument `shuffle=False` with the function. Finally, the function will return the four variables, which are described again in the function header for the step function.

In [24]:
@step
def split_data(dataset:pd.DataFrame, label: str) -> Tuple[
    Annotated[pd.DataFrame,"X_train"],
    Annotated[pd.DataFrame,"X_test"],
    Annotated[pd.Series,"y_train"],
    Annotated[pd.Series,"y_test"]]:
    """
    Splits a dataset into training and testing sets.

    This function takes a pandas DataFrame and a specified label column, then
    divides the data into training and testing subsets. The splitting process does
    not shuffle the data, which preserves the original ordering in the training
    and testing sets.
    """
    X = dataset.drop(label,axis=1)
    Y = dataset[label]
    X_train,X_test,y_train,y_test = train_test_split(X,Y,test_size=0.2,shuffle=False)
    return X_train,X_test,y_train,y_test

### Feature Engineering

As we have seen in the beginning, our dataset contains null values. Thus, we want to define a step that eliminates all of them. Our Step function should, therefore, impute all numerical values with a simple median strategy and all non-numerical values with a most frequent strategy. Both Imputers should be applied on the train and test set, which are finally returned. Hint: Have a look at assignment one again!

In [25]:
@step
def impute_missing_values(X_train:pd.DataFrame,X_test:pd.DataFrame) -> Tuple[Annotated[pd.DataFrame,"X_train_imputed"],Annotated[pd.DataFrame,"X_test_imputed"]]:
    """
    Imputes missing values in training and testing datasets.

    This function separately imputes missing values in categorical and numerical
    columns of the provided dataframes. For numerical columns, the median of the
    column is used to replace missing values. For categorical columns, the most frequent
    value in the column is used as the replacement.
    """
    categorical_imputer = SimpleImputer(strategy="most_frequent")
    numerical_imputer = SimpleImputer(strategy="median")
    categorical_columns = X_train.select_dtypes(include="object").columns
    numerical_columns = X_train.select_dtypes(exclude="object").columns

    X_train[numerical_columns] = pd.DataFrame(
    numerical_imputer.fit_transform(X_train[numerical_columns]),index=X_train.index, columns=numerical_columns
    )

    X_test[numerical_columns] = pd.DataFrame(
    numerical_imputer.transform(X_test[numerical_columns]),index=X_test.index, columns=numerical_columns
    )

    X_train[categorical_columns] = pd.DataFrame(
    categorical_imputer.fit_transform(X_train[categorical_columns]),index=X_train.index, columns=categorical_columns
    )

    X_test[categorical_columns] = pd.DataFrame(
    categorical_imputer.transform(X_test[categorical_columns]),index=X_test.index, columns=categorical_columns
    )
    return X_train,X_test 

The next feature engineering step is the encoding of the categorical variables. We want to create a step that applies one-hot encoding on all categorical variables on the train and test set to handle our categorical values. Finally, the function must return the train and test set, replacing the original categorical columns with the one-hot-encoded values.

In [26]:
@step
def encode_categorical_values(X_train:pd.DataFrame,X_test:pd.DataFrame) -> Tuple[Annotated[pd.DataFrame,"X_train_encoded"],Annotated[pd.DataFrame,"X_test_encoded"]]:
    """
    Encodes categorical columns in the training and testing datasets using one-hot encoding.

    This function identifies all columns with data type 'object' in the training dataset as
    categorical and applies one-hot encoding to these columns. The one-hot encoded columns
    replace the original categorical columns in both datasets. The encoder is fitted only on
    the training data to avoid data leakage into the testing set.
    """
    one_hot_encoder = OneHotEncoder(sparse_output=False)
    categorical_columns = X_train.select_dtypes(include="object").columns
    encoded_values_train = pd.DataFrame(one_hot_encoder.fit_transform(X_train[categorical_columns]),index=X_train.index,columns=one_hot_encoder.get_feature_names_out())

    encoded_values_test = pd.DataFrame(one_hot_encoder.transform(X_test[categorical_columns]),index=X_test.index,columns=one_hot_encoder.get_feature_names_out())
    print(encoded_values_test)
    X_train.drop(categorical_columns, axis=1, inplace=True)
    X_train = pd.concat([X_train, encoded_values_train], axis=1)

    X_test.drop(categorical_columns, axis=1, inplace=True)
    X_test = pd.concat([X_test, encoded_values_test], axis=1)
    return X_train, X_test

The next feature engineering step we want to conduct in our pipeline is Label encoding. We are using LabelEncoding since our target Variable `RainTomorrow` is presented by text. Thus, LabelEncoding will transform the 'No' and 'Yes values to 0 and 1.

In [27]:
@step
def label_encoding(y_train:pd.Series,y_test:pd.Series) -> Tuple[Annotated[pd.Series,"y_train_encoded"],Annotated[pd.Series,"y_test_encoded"]]:
    """
    Applies label encoding to the target variable for both training and testing datasets.

    This function converts target labels into a numerical format using label encoding.
    The encoder is fitted on the training data to ensure consistency in encoding between
    the training and testing sets, preventing data leakage.
    """
    encoder = LabelEncoder()
    y_train = pd.Series(encoder.fit_transform(y_train))
    y_test = pd.Series(encoder.transform(y_test))
    return y_train, y_test

The last feature engineering step we are applying is the scaling of the values. Therefore, we are using a `MinMaxScaler` that scales the values between 1 and 0. The scaling method is applied again on both datasets, which are finally also returned as a DataFrame.

In [28]:
@step 
def scale_values(X_train:pd.DataFrame,X_test:pd.DataFrame) -> Tuple[Annotated[pd.DataFrame,"X_train_scaled"],Annotated[pd.DataFrame,"X_test_scaled"]]:
    """
    Scales numerical features to a range between 0 and 1 using MinMax scaling.

    This function applies MinMaxScaler to scale the feature values in both the training
    and testing datasets. The scaler is fitted only on the training data to prevent data leakage from the test set.
    """
    scaler = MinMaxScaler()
    X_train = pd.DataFrame(scaler.fit_transform(X_train),index=X_train.index, columns=X_train.columns)
    X_test = pd.DataFrame(scaler.transform(X_test),index=X_test.index, columns=X_test.columns)

    return X_train,X_test

## Modeling and Evaluation

Now that we have prepared all the necessary steps to create a clean and usable dataset, we can develop and evaluate our machine-learning model. Hence, two final steps we need to define are left over before we stack everything together to execute our pipeline.

To train our model, we define the `model_trainer` step. As input, it receives the `X_train` features and the labels given by `y_train`. Then, a `LogisticRegression` is created inside the step and fitted to the training data. Lastly, the step returns as an artifact the trained model.

In [29]:
@step
def model_trainer(X_train: pd.DataFrame, y_train: pd.Series)-> Tuple[Annotated[ClassifierMixin,"Model"],Annotated[float,"In_Sample_Accuracy"]]:
    """
    Trains a logistic regression model using the provided training data and computes the in-sample accuracy.
    """
    model = LogisticRegression()
    model.fit(X_train,y_train)
    in_sample_score = model.score(X_train,y_train)
    return model,in_sample_score

Finally, we need to evaluate the performance of our model. For this performance test, we are creating the last step for our pipeline. This step has three input arguments: The model that our `model_trainer` returns, the cleaned features of the test set, and the corresponding labels. Overall, the `evaluate_model` step should calculate the accuracy of our model on the test dataset and return the metric.

In [30]:
@step
def evaluate_model(model:ClassifierMixin,X_test:pd.DataFrame,y_test:pd.DataFrame) -> Annotated[float,"Accuracy"]:
    """
    Evaluates the accuracy of a trained model using the testing dataset.
    """
    score = model.score(X_test,y_test)
    return score

### Pipeline

Finally, we have defined all the steps we need to train our model. In ZenML, we can now stack these steps together in one function to define our pipeline. To achieve this, we can simply define a new function, such as `training_pipeline()`, with a decorator `@pipeline` above. Thus, ZenML knows that this function represents our pipeline. Inside this function, we need to call all of our predefined steps, which will receive the returns of the previous steps as their input. 
For example, our `loading_data()` step returns the dataset, which then will be the input to our next step, the `split_data` step.

In [37]:
@pipeline
def training_pipeline():
    """
    Executes a full training pipeline on weather data to predict rain tomorrow.
    """
    dataset = loading_data("Weather_Perth.csv")
    X_train,X_test,y_train,y_test = split_data(dataset,"RainTomorrow")
    X_train,X_test = impute_missing_values(X_train,X_test)
    X_train,X_test = encode_categorical_values(X_train,X_test)
    X_train,X_test = scale_values(X_train,X_test)
    y_train,y_test = label_encoding(y_train,y_test)
    model, in_sample_score = model_trainer(X_train,y_train)
    score = evaluate_model(model,X_test,y_test)

Let us execute our pipeline by calling our `training_pipeline()` function.

In [None]:
training_pipeline()

In the beginning, we explained that ZenML has the advantage of storing all the artifacts of our run. So, let us retrieve one of these artifacts, our test accuracy. To do so, we need to conduct the following steps:
- First, we need to import `from zenml.client` the Pyhton `Client`
- Second, we call the `get_artifact_version()` method on the Client object and store the responded artifact into a variable. As a parameter, we can pass the Artifact Name that we have defined in our step above. For example, to retrieve the test accuracy from the `evaluate_model()` function, we need to pass "Accuracy".
    - This method expects an artifact name, which is, in our case, the name that we have defined with the return type for our evaluation score. It is "Accuracy" from the `evaluate_model` step.
- Third, since we want to have the artifact of the last run, we need to call `Client().get_artifact_version()`, which expects as a parameter the artifact ID that can be received by passing the latest_version_id attribute from the artifact object.
- Finally, to receive the value, we call the `load()` function on the return value of step three.

In the beginning, we explained that ZenML has the advantage of storing all the artifacts of our run. So, let us retrieve one of these artifacts, our test accuracy. To do so, we need to conduct the following steps:
- First, we need to import `from zenml.client` the Pyhton `Client`
- Second, we call the `get_artifact_version()` method on the Client object and store the artifact into a variable.
    - ZenML stores all artifacts with a specific version ID; however, when we only pass the artifact name, we will receive the latest artifact as our response.
- Finally, to receive the value, we call the `load()` function on the return value of step three.

In [None]:
from zenml.client import Client
artifact = Client().get_artifact_version("Accuracy")
artifact.load()

Now, retrieve the latest artifact of the in-sample score and compare it to the out-of-sample score.

In [None]:
artifact = Client().get_artifact_version("In_Sample_Accuracy")
artifact.load()

### ZenML Dashboard

The last feature of ZenML we want to look at in this assignment is the Dashboard function. ZenML provides us, out of the box, a visualization of our pipelines and artifacts for every run we conducted. To use the dashboard, we can simply run `!zenml up` . Start the dashboard now and take a look at your pipeline. (Hint: you do not need to enter an email address if you do not want to receive news)

In [None]:
!zenml up