# <span style="font-width:bold; font-size: 3rem; color:#2656a3;">**Data Engineering and Machine Learning Operations in Business** </span> <span style="font-width:bold; font-size: 3rem; color:#333;">- Part 03: Training Pipeline</span>

## 🗒️ This notebook is divided into the following sections:
1. Feature selection.
2. Feature transformations.
3. Training datasets creation.
4. Loading the training data.
5. Train the model.
6. Register model to Hopsworks model registry.

## <span style='color:#2656a3'> ⚙️ Import of libraries and packages

First, we'll install the Python packages required for this notebook. We'll use the --quiet command after specifying the names of the libraries to ensure a silent installation process. Then, we'll proceed to import all the necessary libraries.

In [1]:
# !pip install tensorflow --quiet

In [2]:
# Importing of the packages for the needed libraries for the Jupyter notebook

import inspect 
import datetime

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf

# Ignore warnings
import warnings
warnings.filterwarnings('ignore')

## <span style="color:#2656a3;"> 📡 Connecting to Hopsworks Feature Store

We now connect to Hopsworks Feature Store so we can retrieve the feature groups

In [3]:
import hopsworks

project = hopsworks.login()

fs = project.get_feature_store() 

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/554133
Connected. Call `.close()` to terminate connection gracefully.


In [4]:
# Retrieve feature groups
electricity_fg = fs.get_feature_group(
    name='electricity_prices',
    version=1,
)

weather_fg = fs.get_feature_group(
    name='weather_measurements',
    version=1,
)

danish_holidays_fg = fs.get_feature_group(
    name='danish_holidays',
    version=1,
)

forecast_renewable_energy_fg = fs.get_feature_group(
    name='forecast_renewable_energy',
    version=1,
)

## <span style="color:#2656a3;"> 🖍 Feature View Creation and Retrieving </span>

Let's start by selecting all the features you want to include for model training/inference.

In [5]:
# Select features for training data
selected_features = electricity_fg.select_all()\
    .join(weather_fg.select_except(["timestamp", "time"]))\
    .join(forecast_renewable_energy_fg.select_except(["timestamp", "time"]))\
    .join(danish_holidays_fg.select_all())

In [6]:
# Uncomment this if you would like to view your selected features
# selected_features.show(5)

### <span style="color:#2656a3;"> 🤖 Transformation Functions</span>

Hopsworks Feature Store provides functionality to attach transformation functions to feature views and comes with built-in transformation functions such as `min_max_scaler`, `standard_scaler`, `robust_scaler` and `label_encoder`.

You will preprocess your data using *min-max scaling* on numerical features and *label encoding* on categorical features. To do this you simply define a mapping between our features and transformation functions. This ensures that transformation functions such as *min-max scaling* are fitted only on the training data (and not the validation/test data), which ensures that there is no data leakage.

In [7]:
transformation_functions = {
        "dk1_spotpricedkk_kwh": fs.get_transformation_function(name="min_max_scaler"), 
        "dk1_offshore_wind_forecastintraday_kwh": fs.get_transformation_function(name="min_max_scaler"), 
        "dk1_onshore_wind_forecastintraday_kwh": fs.get_transformation_function(name="min_max_scaler"), 
        "dk1_solar_forecastintraday_kwh": fs.get_transformation_function(name="min_max_scaler"), 
        "temperature_2m": fs.get_transformation_function(name="min_max_scaler"), 
        "relative_humidity_2m": fs.get_transformation_function(name="min_max_scaler"), 
        "precipitation": fs.get_transformation_function(name="min_max_scaler"), 
        "rain": fs.get_transformation_function(name="min_max_scaler"), 
        "snowfall": fs.get_transformation_function(name="min_max_scaler"), 
        "weather_code": fs.get_transformation_function(name="min_max_scaler"), 
        "cloud_cover": fs.get_transformation_function(name="min_max_scaler"), 
        "wind_speed_10m": fs.get_transformation_function(name="min_max_scaler"),
        "wind_gusts_10m": fs.get_transformation_function(name="min_max_scaler"),
        "type": fs.get_transformation_function(name="label_encoder"),
    }

`Feature Views` stands between **Feature Groups** and **Training Dataset**. Сombining **Feature Groups** we can create **Feature Views** which store a metadata of our data. Having **Feature Views** we can create **Training Dataset**.

The Feature Views allows schema in form of a query with filters, define a model target feature/label and additional transformation functions.

In order to create Feature View we can use `FeatureStore.get_or_create_feature_view()` method.

We can specify next parameters:

- `name` - name of a feature group.

- `version` - version of a feature group.

- `labels`- our target variable.

- `transformation_functions` - functions to transform our features.

- `query` - query object with data.

In [8]:
#version = 1
#feature_view = fs.create_feature_view(
#    name='electricity_feature_view',
#    version=version,
#    labels=[], # will define our 'y' later manualy
#    transformation_functions=transformation_functions,
#    query=selected_features,
#)

In [9]:
version = 1
feature_view = fs.get_or_create_feature_view(
    name='electricity_feature_view',
    version=version,
    labels=[], # will define our 'y' later manualy
    transformation_functions=transformation_functions,
    query=selected_features,
)

## <span style="color:#2656a3;"> 🏋️ Training Dataset Creation</span>

### <span style="color:#2656a3;"> ⛳️ Dataset with train, test and validation splits</span>

In [10]:
# since you didn't specify 'labels' in feature view creation, it will return None for Y.
X_train, X_val, X_test, _, _, _ = feature_view.train_validation_test_split(
    train_start="2022-01-01",
    train_end="2023-06-30",
    validation_start="2023-07-01",
    validation_end="2023-09-30",
    test_start="2023-10-01",
    test_end="2023-12-31",
    description='Electricity price prediction dataset',
)

Finished: Reading data from Hopsworks, using ArrowFlight (335.40s) 




In [11]:
# Sorting the training, validation, and test datasets based on the 'time' column
X_train.sort_values(["timestamp"], inplace=True)
X_val.sort_values(["timestamp"], inplace=True)
X_test.sort_values(["timestamp"], inplace=True)

In [12]:
# Define 'y_train', 'y_val' and 'y_test'
y_train = X_train[["dk1_spotpricedkk_kwh"]]
y_val = X_val[["dk1_spotpricedkk_kwh"]]
y_test = X_test[["dk1_spotpricedkk_kwh"]]

In [13]:
# Dropping the 'dare', 'time' and 'timestamp' and dependent variable (y) columns from the training, validation, and test datasets
X_train.drop(["date", "time", "timestamp"], axis=1, inplace=True)
X_val.drop(["date", "time", "timestamp"], axis=1, inplace=True)
X_test.drop(["date", "time", "timestamp"], axis=1, inplace=True)

In [14]:
# Dropping the 'dare', 'time' and 'timestamp' and dependent variable (y) columns from the training, validation, and test datasets
X_train.drop(["dk1_spotpricedkk_kwh"], axis=1, inplace=True)
X_val.drop(["dk1_spotpricedkk_kwh"], axis=1, inplace=True)
X_test.drop(["dk1_spotpricedkk_kwh"], axis=1, inplace=True)

In [15]:
# Displaying the first 5 rows of the test dataset (X_train)
X_train.head()

Unnamed: 0,temperature_2m,relative_humidity_2m,precipitation,rain,snowfall,weather_code,cloud_cover,wind_speed_10m,wind_gusts_10m,dk1_offshore_wind_forecastintraday_kwh,dk1_onshore_wind_forecastintraday_kwh,dk1_solar_forecastintraday_kwh,type
2284741,0.426339,0.986667,0.0,0.0,0.0,0.04,1.0,0.313131,0.295525,0.27299,0.04676,0.010849,1
8103388,0.412946,0.973333,0.0,0.0,0.0,0.04,1.0,0.094949,0.078044,0.773045,0.264375,1.8e-05,1
8106168,0.426339,0.933333,0.0,0.0,0.0,0.04,1.0,0.19596,0.187305,0.949089,0.498277,0.0,1
6991514,0.408482,0.973333,0.0,0.0,0.0,0.04,1.0,0.133333,0.138398,0.663453,0.190924,0.000928,1
1108939,0.404018,0.986667,0.0,0.0,0.0,0.04,1.0,0.09697,0.115505,0.91042,0.45118,0.0,1


In [16]:
# Displaying the first 5 rows of the test dataset (y_train)
y_train.head()

Unnamed: 0,dk1_spotpricedkk_kwh
2284741,0.179988
8103388,0.179988
8106168,0.179988
6991514,0.179988
1108939,0.179988


## <span style="color:#2656a3;">🗃 Window timeseries dataset </span>

## <span style="color:#2656a3;">🧬 Modeling</span>

In [31]:
# PyTorch for deeplearning
import torch
from tqdm import tqdm_notebook    # Progress bar utility for notebooks

In [21]:
X_train = X_train.astype(float)
X_test = X_test.astype(float)
X_val = X_val.astype(float)

In [23]:
# Convert the pandas Series to PyTorch tensors

# Training sets
tensor_data_X_train = torch.tensor(X_train.values, dtype=torch.float32)  # Convert X_train to a tensor
tensor_data_y_train = torch.tensor(y_train.values, dtype=torch.float32)  # Convert y_train to a tensor

# Test sets
tensor_data_X_test = torch.tensor(X_test.values, dtype=torch.float32)    # Convert X_test to a tensor
tensor_data_y_test = torch.tensor(y_test.values, dtype=torch.float32)    # Convert y_test to a tensor

In [24]:
# Examining the size of the tensors
print('Training sets:')
print(tensor_data_X_train.size())
print(tensor_data_y_train.size())

print('')

print('Test sets:')
print(tensor_data_X_test.size())
print(tensor_data_y_test.size())

Training sets:
torch.Size([5012736, 13])
torch.Size([5012736, 1])

Test sets:
torch.Size([838656, 13])
torch.Size([838656, 1])


In [25]:
# Display the first data point in the training set tensor
tensor_data_X_train[0]

tensor([0.4263, 0.9867, 0.0000, 0.0000, 0.0000, 0.0400, 1.0000, 0.3131, 0.2955,
        0.2730, 0.0468, 0.0108, 1.0000])

In [26]:
# Reshape the first data point in the training set tensor to a 1D tensor
# In essence, a tensor is a multi-dimensional array that can hold data of varying types and sizes.
tensor_data_X_train[0].reshape(-1)

tensor([0.4263, 0.9867, 0.0000, 0.0000, 0.0000, 0.0400, 1.0000, 0.3131, 0.2955,
        0.2730, 0.0468, 0.0108, 1.0000])

In [27]:
# Display the shape of the training set tensor
tensor_data_X_train.shape

torch.Size([5012736, 13])

In [28]:
tensor_data_X_train.reshape(-1, 1)

tensor([[0.4263],
        [0.9867],
        [0.0000],
        ...,
        [0.0184],
        [0.1327],
        [0.0000]])

In [37]:
import jupyter
import ipywidgets
from tqdm import tqdm

In [38]:
# Initializing Hyperparameters
epochs = 10
learning_rate = 0.01

# Initializing Parameters
w = 50

loss_set = {}

# 1. Creating a FeedForwardNetwork
# 1.1 Structure (Architecture) of NN
model_ex5 = torch.nn.Sequential(torch.nn.Linear(13,25), # 13 input, 25 output
                                 torch.nn.ReLU(),
                                 torch.nn.Dropout(0.33),

                                 torch.nn.Linear(25,25), # 25 input, 25 output
                                 torch.nn.ReLU(),
                                 torch.nn.Dropout(0.33),

                                 torch.nn.Linear(25,1), # use the 25 as an input and map 1 output
                                 torch.nn.Identity(),
                                 );

# 1.2 Loss Function
loss_mse = torch.nn.MSELoss()

# Assuming batch_size is defined earlier in your code
batch_size = 13  # You can set your desired batch size

# 1.3 Optimization Approach
optimizer = torch.optim.SGD(model_ex5.parameters(), lr=learning_rate)

# Loop over the number of epochs
for epoch in tqdm(range(epochs), desc="Epochs"):
    epoch_loss = 0.0

    # Loop over batches of data
    for i in range(0, tensor_data_X_train.size(0), batch_size):
        # Select a mini-batch
        batch_X = tensor_data_X_train[i:i+batch_size]
        batch_Y = tensor_data_y_train[i:i+batch_size]

        # 2. Forward Pass
        output = model_ex5(batch_X)

        # 3. FeedForward Evaluation
        loss = loss_mse(output, batch_Y.reshape(-1))
        optimizer.zero_grad()

        # 4. Backward Pass / Gradient Calculation
        loss.backward()

        # Store the loss for each epoch
        epoch_loss += loss.item()

        # 5. Back Propagation / Update Weights
        optimizer.step()

    # Calculate and display average loss for the epoch
    epoch_loss /= (tensor_data_X_train.size(0) / batch_size)
    loss_set[epoch] = epoch_loss
    print(f"\nEpoch {epoch+1} Average Loss: {epoch_loss:.4f}\n{'-'*50}\n")




[A[A[A


[A[A[A


Epoch 1 Average Loss: nan
--------------------------------------------------






[A[A[A


Epoch 2 Average Loss: nan
--------------------------------------------------



Epochs:  20%|██        | 2/10 [03:42<14:50, 111.28s/it]
Exception ignored in: <function tqdm.__del__ at 0x316150a40>
Traceback (most recent call last):
  File "/Users/tobiasmjensen/anaconda3/envs/bds-mlops/lib/python3.11/site-packages/tqdm/std.py", line 1148, in __del__
    self.close()
  File "/Users/tobiasmjensen/anaconda3/envs/bds-mlops/lib/python3.11/site-packages/tqdm/notebook.py", line 279, in close
    self.disp(bar_style='danger', check_delay=False)
    ^^^^^^^^^
AttributeError: 'tqdm_notebook' object has no attribute 'disp'


KeyboardInterrupt: 

## <span style='color:#2656a3'>🗄 Model Registry</span>

In [None]:
# Exporting the trained model to a directory
model_dir = "electricity_price_model"
print('Exporting trained model to: {}'.format(model_dir))

# Saving the model using TensorFlow's saved_model.save function
tf.saved_model.save(model, model_dir)

In [None]:
# Retrieving the Model Registry
mr = project.get_model_registry()

# Extracting loss value from the training history
metrics = {'loss': history_dict['val_loss'][0]} 

# Creating a TensorFlow model in the Model Registry
tf_model = mr.tensorflow.create_model(
    name="electricity_price_prediction_model",
    metrics=metrics,
    description="Daily electricity price prediction model.",
    input_example=n_step_window.example[0].numpy(),
)

# Saving the model to the specified directory
tf_model.save(model_dir)

---

## <span style="color:#2656a3;">⏭️ **Next:** Part 04: Batch Inference </span>

In the next notebook you will use your registered model to predict batch data.