# Azure pipelines in action

This Jupyter Notebook contains pipeline for the ensemble model. The data for the pipeline is synthetic.

Pipelines is difficult to master at once but following instruction from this notebooks with the deployment of the ensemble model pipeline you will be able to deploy one with any level of difficulty. 

The pipeline itself contains of components. Components are pieces of code, meaning the a pipeline is a sequence of pieces of code. 

In [4]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential
from azureml.core import Workspace

In [5]:
subscription_id = "<SUBSCRIPTION_ID>"
resource_group = "<RESOURCE_GROUP>"
workspace_name = "<WORKSPACE_NAME>"

workspace = Workspace(subscription_id, resource_group, workspace_name)

In [6]:
# authenticate
credential = DefaultAzureCredential()
# Get a handle to the workspace
ml_client = MLClient(
    credential=credential,
    subscription_id=subscription_id,
    resource_group_name=resource_group,
    workspace_name=workspace_name,
)

As it mentioned in the documentation `MLClient` is "lazy" meaning it should be woken up in order to start registering anything. Therefore we're loading dataset to activate the client. This  happens because the client creation of the client itself doesn't assign it to the Notebook.

In [None]:
credit_data = ml_client.data.get(name="test-data", version="1")
print(f"Data asset URI: {credit_data.path}")

In [8]:
import os

dependencies_dir = "./dependencies"
os.makedirs(dependencies_dir, exist_ok=True)

## Setting up environment

The environment should not only contain necessary packages for working with data and models that we are planing to use. It should also have packages for the access to the secrets so we should be able to use those without direct exposing in the notebook or in the `.py` files.

In [9]:
%%writefile {dependencies_dir}/conda.yaml
name: conda_env
channels:
  - defaults
dependencies:
  - python=3.8
  - pip=20.0
  - pip:
    - azureml-mlflow==1.50.0
    - azure-ai-ml == 1.11.1
    - tensorflow==2.7.0
    - numpy==1.21.4
    - scikit-learn==1.0.1
    - pandas==1.5.3
    - matplotlib==3.2.2
    - protobuf==3.20.0
    - mldesigner==0.1.0b12
    - prophet==1.1.4
    - seaborn==0.12.2
    - sklearn-pandas==1.7.0
    - statsmodels
    - openpyxl==3.1.2
    - xlsxwriter==3.1.5 
    - azure-keyvault

Overwriting ./dependencies/conda.yaml


## Setting Up Compute

The next crucial step is to configure your compute environment. The choice of compute resources should align with the complexity and requirements of your task. In this example, we are using the default compute with a low-priority tier.

**Note:** If you want your compute to access secrets from `Key Vault`, you need to add a Managed Identity to it and assign it as a System Managed Identity. This can be done from within the Compute tab. It's an essential step if you want your compute cluster to have access to SAS tokens. This becomes particularly important when dealing with private containers, and you wish to avoid exposing tokens within your notebook.

An alternative approach is to set up an entity with System Managed Identity and assign all computes to it. Both solutions have their pros and cons, and the final decision depends on the architecture and security restrictions. We will discuss these considerations further in this notebook.



In [10]:
from azure.ai.ml.entities import AmlCompute

# Name assigned to the compute cluster
cpu_compute_target = "cluster-demo"

try:
    # let's see if the compute target already exists
    cpu_cluster = ml_client.compute.get(cpu_compute_target)
    print(
        f"You already have a cluster named {cpu_compute_target}, we'll reuse it as is."
    )

except Exception:
    print("Creating a new cpu compute target...")

    # Let's create the Azure Machine Learning compute object with the intended parameters
    cpu_cluster = AmlCompute(
        name=cpu_compute_target,
        # Azure Machine Learning Compute is the on-demand VM service
        type="amlcompute",
        # VM Family
        size="STANDARD_DS3_V2",
        # Minimum running nodes when there is no job running
        min_instances=0,
        # Nodes in cluster
        max_instances=2,
        # How many seconds will the node running after the job termination
        idle_time_before_scale_down=180,
        # Dedicated or LowPriority. The latter is cheaper but there is a chance of job termination
        tier="LowPriority",
    )
    print(
        f"AMLCompute with name {cpu_cluster.name} will be created, with compute size {cpu_cluster.size}"
    )
    # Now, we pass the object to MLClient's create_or_update method
    cpu_cluster = ml_client.compute.begin_create_or_update(cpu_cluster)

You already have a cluster named cluster-demo, we'll reuse it as is.


In [11]:
from azure.ai.ml.entities import Environment

custom_env_name = "aml-dss-test"

pipeline_job_env = Environment(
    name=custom_env_name,
    description="Custom environment for Customer Complients Forecasting pipeline",
    tags={"purpose": "demo"},
    conda_file=os.path.join(dependencies_dir, "conda.yaml"),
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
    version="0.0.2",
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

Environment with name aml-dss-test is registered to workspace, the environment version is 0.0.2


# Sales Forecasting

In this section components which are used to forecast sales are presented. This part of the pipeline consists of model training and forecasting which comes separately.

## Sales Forecasting with SARIMA Model

This component predicts future sales using a Seasonal Autoregressive Integrated Moving Average (SARIMA) model. It leverages historical sales data to make accurate forecasts.

### Key Functions

1. **Data Retrieval**: Accesses sales data stored in Azure Blob Storage using a secure SAS token.

2. **Data Splitting**: Divides the sales data into training, testing, and validation sets for model training and evaluation.

3. **Model Training**: Utilizes a SARIMA model to learn patterns from historical sales data, helping to make future predictions.

4. **Testing and Validation**: Assesses the model's accuracy by testing it on a subset of data and validating it on another. This ensures the model's robustness.

5. **Logging Metrics**: Records essential metrics such as Mean Absolute Error (MAE), Mean Squared Error (MSE), Root Mean Squared Error (RMSE), Mean Absolute Percentage Error (MAPE), and more.

6. **Logging Plots**: Captures visual representations of predicted sales against actual sales, aiding in understanding model performance.

7. **Saving Model**: Persists the trained SARIMA model for future use.

### Usage

1. **Configuration**: Set parameters like SAS token and model orders for optimal results.

2. **Data Access**: Fetches historical sales data securely from Azure Blob Storage.

3. **Model Training**: Learns patterns from historical data to make accurate future sales predictions.

4. **Evaluation**: Assesses model accuracy through testing and validation, providing insights into its performance.

5. **Metric Logging**: Records key metrics to measure the effectiveness of the model.

6. **Plot Logging**: Visualizes predictions and actual sales through plots for a better understanding.

7. **Model Persistence**: Saves the trained SARIMA model for later use in forecasting.

Make sure to have necessary libraries and dependencies installed before running the component.



**Note:** You may observe that we are utilizing `secret_client` to access the SAS token. This is achieved by assigning a `System Managed Identity` to the compute cluster. After completing this stage, it is crucial to grant your cluster access to the `Secrets`. To do this, navigate to the `Key Vault` dedicated to your subscription in the Azure Portal. In the `Access Policies` section, create access to the Secrets. When granting access to the secrets, locate your compute in the search bar of the subsequent window by typing `<YOUR_AZUREML_RESOURCE_NAME>/computes/<NAME_OF_THE_VIRTUAL_MACHINE_OR_CLUSTER>`. Once this is done, for the `client_id`, you need to find your compute in the `Active Directory` and copy the `Application ID` of your compute.


In [12]:
import os

sarima_train_src_dir = "./components/demo_sales_model_training"
os.makedirs(sarima_train_src_dir, exist_ok=True)

In [13]:
%%writefile {sarima_train_src_dir}/demo_model_training_sales.py

import os
from pathlib import Path

from mldesigner import command_component, Input, Output
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error
# from statsmodels.tsa.statespace.sarimax import SARIMAX
import matplotlib.pyplot as plt
import seaborn as sns
import mlflow
import datetime
import pickle
import joblib
import argparse
from sklearn.metrics import r2_score
from statsmodels.tools.eval_measures import rmse
from azure.keyvault.secrets import SecretClient
from azure.identity import ManagedIdentityCredential
import statsmodels.api as sm
import json

def select_first_file(path):
    """Selects first file in folder, use under assumption there is only one file in folder
    Args:
        path (str): path to directory or file to choose
    Returns:
        str: full path of selected file
    """
    files = os.listdir(path)
    return os.path.join(path, files[0])

def read_data(sas_token, secret_client):

    data_container_link = "<CONTAINER_URL>"
    filename = '/sales_complaints_synthetic.csv'
    sas_token = secret_client.get_secret(name=sas_token).value

    data_asset_url_with_sas = f"{data_container_link}{filename}?{sas_token}" 
    df = pd.read_csv(data_asset_url_with_sas)
    df['Calendar day'] = pd.to_datetime(df['Calendar day'], format='%Y-%m-%d', errors='coerce')

    return df

def model_train_sales(X_train, order: tuple, seasonal_order: tuple):

    model = sm.tsa.SARIMAX(X_train['sales'], order=order, seasonal_order=seasonal_order)
    results = model.fit()

    mlflow.statsmodels.log_model(results, artifact_path="sarima_model")

    return model, results

def data_split(df):

    # Splitting data. This can be wrapped into function  
    df_70 = round(len(df) * 0.7)
    remaining_rows = len(df) - df_70

    if remaining_rows % 2 != 0:
        df_15 = (remaining_rows // 2) + 1
    else:
        df_15 = remaining_rows // 2

    mlflow.log_metric("70% of the original dataframe", df_70)
    mlflow.log_metric("15% of the original dataframe", df_15)

    X_train = df.iloc[:df_70, :]  
    X_test = df.iloc[df_70:df_70 + df_15, :]  
    X_val = df.iloc[df_70 + df_15:, :]

    return X_train, X_test, X_val

def model_test(y_test, results):

    test  = results.get_prediction(start=y_test.index[0], end=y_test.index[-1])
    y_pred = test.predicted_mean
    test_confidence_intervals = test.conf_int(alpha=0.2)

    return y_pred, test_confidence_intervals

def model_val(y_val, results):
    forecast = results.get_forecast(steps=len(y_val))
    y_val_pred = forecast.predicted_mean
    forecast_confidence_intervals = forecast.conf_int(alpha=0.2)

    return y_val_pred, forecast_confidence_intervals

def log_metrics(y_test, y_pred, test: bool):
       
    def calculate_rmse(predictions, targets):
        return np.sqrt(((predictions - targets) ** 2).mean())
    
    
    mse = mean_squared_error(y_true=y_test,
                                y_pred=y_pred)
    rmse = np.sqrt(mean_squared_error(y_true=y_test,
                                        y_pred=y_pred))
    mae = mean_absolute_error(y_true=y_test,
                                y_pred=y_pred)

    def mean_absolute_percentage_error(y_true, y_pred):


        y_true = np.array(y_true)
        y_pred = np.array(y_pred)
    
        # Check for input shape compatibility
        if y_true.shape != y_pred.shape:
            raise ValueError(f"Input shapes do not match ({y_true.shape}), ({y_pred.shape})")
    
        # Calculate absolute percentage error
        absolute_error = np.abs((y_true - y_pred) / y_true)
    
        # Replace infinite values with zero and ignore NaN values
        absolute_error = np.nan_to_num(absolute_error, nan=0, posinf=0, neginf=0)
    
        # Calculate the mean of absolute percentage error
        mape = np.mean(absolute_error) * 100
    
        return mape
    
    mape = mean_absolute_percentage_error(y_true=y_test, 
                                            y_pred=y_pred)
    smape = np.mean(
        2 * np.abs(y_test.values - y_pred) / (np.abs(y_test.values) + np.abs(y_pred))
        ) * 100

    rmse_seasonal = calculate_rmse(y_pred, y_test.values)

    nrmse_seasonal = rmse_seasonal / (np.max(y_test.values) - np.min(y_test.values))

    if test:

        # Logging all metrics
        mlflow.log_metric('Sales MAE', mae)
        mlflow.log_metric('Sales MSE', mse)
        mlflow.log_metric('Sales RMSE', rmse)
        mlflow.log_metric('Sales MAPE', mape)
        mlflow.log_metric('Sales NRMSE', nrmse_seasonal)
        
    else:
        # Logging all metrics
        mlflow.log_metric('Sales Validation MAE', mae)
        mlflow.log_metric('Sales Validation MSE', mse)
        mlflow.log_metric('Sales Validation RMSE', rmse)
        mlflow.log_metric('SalesValidation MAPE', mape)
        mlflow.log_metric('Sales Validation NRMSE', nrmse_seasonal)

def log_plots(y_test, forecasted_values, confidence_intervals, axis, name):

    # Plot the actual and predicted values
    fig, ax = plt.subplots(figsize=(10, 5))

    # Plot the data
    ax.plot(axis['Calendar day'], forecasted_values, label='prediction')
    ax.plot(axis['Calendar day'], y_test, label='Actual')
    ax.set_title(str(name) + " plot for complaints")
    ax.set_xlabel('Date')
    ax.set_ylabel('Complaints')
    ax.legend()

    # Fill the confidence intervals
    ax.fill_between(axis['Calendar day'], confidence_intervals['lower sales'], confidence_intervals['upper sales'],
                    color='gray', alpha=0.3, label='Confidence Intervals')
    
    mlflow.log_figure(fig,str(name) + 'plot.png')

def main():

    """Main function of the script"""
    
    parser = argparse.ArgumentParser()

    # Sas token
    parser.add_argument("--sas_token", type=str)

    # Features arguments
    parser.add_argument("--p_input", type=int)
    parser.add_argument("--d_input", type=int)
    parser.add_argument("--q_input", type=int)
    parser.add_argument("--p_seasonal_input", type=int)
    parser.add_argument("--d_seasonal_input", type=int)
    parser.add_argument("--q_seasonal_input", type=int)
    parser.add_argument("--s_input", type=int)

    # Output dataframe
    parser.add_argument("--df_original", type=str, help="path to the file df_original.csv")
    parser.add_argument("--training_data_sales", type=str, help="path to the file X_train_original.csv")
    parser.add_argument("--testing_data_sales",type=str, help="path to the file X_test_original.csv")
    parser.add_argument("--validation_data_sales", type=str, help="path to the file X_val_original.csv")
    parser.add_argument("--model",type=str, help="path to the model file")

    args = parser.parse_args()

    credential = ManagedIdentityCredential(client_id = open('client_id.txt').readline())  # Client id is an Application ID from Active Directory (Entra ID). It is hidden in .txt for security purposes.
    secret_client = SecretClient(vault_url='<KEY_VAULT_URL>', credential=credential)  # Secret Client initialization with th access to the desired Key Valut

    # Reading data
    df = read_data(args.sas_token, secret_client=secret_client)

    # Splitting data
    X_train, X_test, X_val = data_split(df)
    print(len(X_train),len(X_test),len(X_val))

    print(X_train.head())

    # Saving data in order not to access the blob several time and just pass it to the other component
    df.to_csv(Path(args.df_original) /  "df_original.csv")
    X_train.to_csv(Path(args.training_data_sales) /  "X_train_original.csv")
    X_test.to_csv(Path(args.testing_data_sales) /  "X_test_original.csv")
    X_val.to_csv(Path(args.validation_data_sales) /  "X_val_original.csv")


    # Specify the SARIMA orders: (p, d, q) for non-seasonal, (P, D, Q, S) for seasonal
    order = (args.p_input, args.d_input, args.q_input)
    seasonal_order = (args.p_seasonal_input, args.d_seasonal_input, args.q_seasonal_input, args.s_input)

    # Model fitting
    model, results = model_train_sales(X_train[['Calendar day', 'sales']], order=order, seasonal_order=seasonal_order)

    results.summary()

    # Test the model
    y_pred, test_confidence_intervals = model_test(X_test[['Calendar day', 'sales']], results)

    # Model validation
    y_val_pred, val_confidence_intervals = model_val(X_val[['Calendar day', 'sales']], results)

    # Logging metrics for testing and then for validation
    log_metrics(X_test['sales'], y_pred, test=True)
    log_metrics(X_val['sales'], y_val_pred, test=False)

    # Logging plots
    log_plots(y_test=X_test['sales'], 
              forecasted_values=y_pred, 
              confidence_intervals=test_confidence_intervals, 
              axis=X_test, 
              name="Test")
    log_plots(y_test=X_val['sales'], 
              forevasted_values=y_val_pred, 
              confidence_intervals=val_confidence_intervals, 
              axis=X_val, 
              name="Validation")

    # Saving the model
    model_filename =  os.path.join(args.model,f'sarima_model_sales.pkl')
    pickle.dump(results, open(model_filename, 'wb'))

if __name__ == "__main__":
    main()

Overwriting ./components/demo_sales_model_training/demo_model_training_sales.py


In [14]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

demo_sales_model_training_component = command(
    name="demo_sales_model_training",
    display_name="Model training for sales forecasting",
    description="Reads data, plit it into train, test and validation. Then model is created. The model is tested on test and validation datasets",
    inputs={
        "sas_token": Input(type="string"),
        "p_input": Input(type="integer", default=1),
        "d_input": Input(type="integer", default=0),
        "q_input": Input(type="integer", default=1),
        "p_seasonal_input": Input(type="integer", default=3),
        "d_seasonal_input": Input(type="integer", default=0),
        "q_seasonal_input": Input(type="integer", default=3),
        "s_input": Input(type="integer", default=12),
    },
    outputs=dict(
        model=Output(type="uri_folder", mode="rw_mount"),
        df_original=Output(type="uri_folder", mode="rw_mount"),
        training_data_sales=Output(type="uri_folder", mode="rw_mount"),
        testing_data_sales=Output(type="uri_folder", mode="rw_mount"),
        validation_data_sales=Output(type="uri_folder", mode="rw_mount"),
    ),
    code=sarima_train_src_dir,
    command="""python demo_model_training_sales.py \
            --sas_token ${{inputs.sas_token}} --p_input ${{inputs.p_input}} \
            --d_input ${{inputs.d_input}} --q_input ${{inputs.q_input}} \
            --p_seasonal_input ${{inputs.p_seasonal_input}} --d_seasonal_input ${{inputs.d_seasonal_input}} \
            --q_seasonal_input ${{inputs.q_seasonal_input}} --s_input ${{inputs.s_input}} \
            --model ${{outputs.model}} --df_original ${{outputs.df_original}} \
            --training_data_sales  ${{outputs.training_data_sales}} --testing_data_sales ${{outputs.testing_data_sales}} \
            --validation_data_sales ${{outputs.validation_data_sales}} \
            """,
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
)

In [15]:
# Now we register the component to the workspace
demo_sales_model_training_component = ml_client.create_or_update(
    demo_sales_model_training_component.component
)

# Create (register) the component in your workspace
print(
    f"Component {demo_sales_model_training_component.name} with Version {demo_sales_model_training_component.version} is registered"
)

Component demo_sales_model_training with Version 2023-11-14-13-37-50-4370446 is registered


### Sales forecasting component

In [16]:
import os

sarima_forecast_src_dir = "./components/demo_sarima_forecast"
os.makedirs(sarima_forecast_src_dir, exist_ok=True)

In [17]:
%%writefile {sarima_forecast_src_dir}/demo_sarima_forecast_sales.py

import os
import sys
from pathlib import Path
from mldesigner import command_component, Input, Output
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error
from statsmodels.tsa.statespace.sarimax import SARIMAX
import matplotlib.pyplot as plt
import seaborn as sns
import mlflow
import datetime
import pickle
import joblib
import argparse
from sklearn.metrics import r2_score
from statsmodels.tools.eval_measures import rmse

def select_first_file(path):
    """Selects first file in folder, use under assumption there is only one file in folder
    Args:
        path (str): path to directory or file to choose
    Returns:
        str: full path of selected file
    """
    files = os.listdir(path)
    return os.path.join(path, files[0])

def sarima_model_forecast(X_val, results, out_of_sample_forecast_days=60):

    X_val['Calendar day'] = pd.to_datetime(X_val['Calendar day'])
    
    
    forecast_date_rng = pd.date_range(start=X_val['Calendar day'].iloc[-1] + pd.Timedelta(days=1), 
                                        periods=out_of_sample_forecast_days,
                                        freq="D"
                                        )

    forecast = results.get_forecast(steps=out_of_sample_forecast_days)

    # Extract the forecasted values and confidence intervals
    forecast_values = forecast.predicted_mean

    forecast_confidence_intervals = forecast.conf_int(alpha=0.2)

    forecast_df = pd.DataFrame({'Calendar day': forecast_date_rng, 'sales': forecast_values})

    forecast_df.set_index('Calendar day', inplace=True, drop=False)
    forecast_df.rename_axis(None, inplace=True)

    return forecast_values, forecast_confidence_intervals, forecast_df, forecast_date_rng

def log_plots(X_val, forecast_date_rng, forecast_values, forecast_confidence_intervals, name):

    # Plot the actual and predicted values
    fig, ax = plt.subplots(figsize=(10, 5))

    sns.set_style('whitegrid')

    plt.plot(X_val['Calendar day'], X_val['sales'], label='Observed')
    plt.plot(forecast_date_rng, forecast_values, label='Forecast', color='red')
    plt.fill_between(
        forecast_date_rng,
        forecast_confidence_intervals['lower sales'],
        forecast_confidence_intervals['upper sales'],
        color='pink',
        alpha=0.3,
        )
    plt.xticks(rotation=60)
    plt.grid(visible=True)
    plt.legend()

    mlflow.log_figure(fig,str(name) + 'plot.png')

def main():
    "Main function of the script"

    parser = argparse.ArgumentParser()
    parser.add_argument("--model_forecast", type=str, help='path to the file')
    parser.add_argument("--df", type=str, help='path to the file')
    parser.add_argument("--X_val", type=str, help='path ot the file')
    parser.add_argument("--df_extended", type=str, help="path to the file")
    args = parser.parse_args()

    # Load model
    files = [f for f in os.listdir(args.model_forecast) if f.endswith(".pkl")]
    model = joblib.load(args.model_forecast + "/" + files[0])
    print(model.summary())

    # Reading data from previous step
    df = pd.read_csv(select_first_file(args.df))
    X_val = pd.read_csv(select_first_file(args.X_val))

    # Making forecast
    forecast_values, forecast_confidence_intervals, forecast_df, forecast_date_rng = sarima_model_forecast(X_val, model)

    df_extended = pd.concat([df, forecast_df], ignore_index=True)
    df_extended.complaints.fillna(0.0, inplace=True)
    df_extended.to_csv((Path(args.df_extended) / 'df_extended.csv'))

    log_plots(X_val, forecast_date_rng, forecast_values, forecast_confidence_intervals, 'Sales Forecast ')


if __name__ == "__main__":
    main()

Overwriting ./components/demo_sarima_forecast/demo_sarima_forecast_sales.py


In [18]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

demo_sales_forecast_component = command(
    name="demo_sales_forecast",
    display_name="Demo Sales Forecast",
    description="This component utilizes SARIMA model to make out of sample forecast. The foracst get concatinated with the original DataFrame",
    inputs={
        "model_forecast": Input(type="uri_folder"),
        "df": Input(type="uri_folder"),
        "X_val": Input(type="uri_folder"),
    },
    outputs=dict(df_extended=Output(type="uri_folder", mode="rw_mount")),
    code=sarima_forecast_src_dir,
    command="""python demo_sarima_forecast_sales.py \
              --df ${{inputs.df}} --model_forecast ${{inputs.model_forecast}} \
              --X_val ${{inputs.X_val}} --df_extended ${{outputs.df_extended}} \
              """,
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
)

In [19]:
# Now we register the component to the workspace
demo_sales_forecast_component = ml_client.create_or_update(
    demo_sales_forecast_component.component
)

# Create (register) the component in your workspace
print(
    f"Component {demo_sales_forecast_component.name} with Version {demo_sales_forecast_component.version} is registered"
)

Component demo_sales_forecast with Version 2023-11-14-13-37-52-6875169 is registered


# Complaints forecasting

This section is dedicated to the complaints forecasting components.

## Feature Creation

This component enhances sales data by creating additional features for better predictive modeling. It introduces lagged values and a moving average to capture temporal patterns in the data.

### Key Functions

1. **Data Loading**: Retrieves sales data from the previous step for feature creation.

2. **Lagged Values**: Creates lagged versions of a specified column to incorporate historical values as features. The number of lagged values is determined by the `num_lags` parameter.

3. **Moving Average**: Generates a moving average of sales data to smooth out short-term fluctuations and highlight long-term trends. The `min_periods` parameter controls the minimum number of periods required to compute the average.

4. **Output**: Saves the enhanced features, including lagged values and the moving average, to a new CSV file (`df_features.csv`).

### Usage

1. **Input Data**: Provide the path to the sales data (`--df`) from the previous step.

2. **Feature Configuration**: Adjust parameters such as the number of lagged values (`--num_lags`), the target column (`--column`), and the minimum periods for the moving average (`--min_periods`).

3. **Feature Creation**: Generates lagged values and a moving average based on the specified parameters.

4. **Output**: Saves the newly created features to a CSV file (`df_features.csv`) for further use in the pipeline.

Ensure necessary libraries and dependencies are installed before running the component.



In [20]:
import os

feature_creation_src_dir = "./components/demo_feature_creation"
os.makedirs(feature_creation_src_dir, exist_ok=True)

In [21]:
%%writefile {feature_creation_src_dir}/demo_feature_creation.py

#required libraries
import os
import sys
import pandas as pd
from mldesigner import command_component, Input, Output
from pathlib import Path
import mlflow
import argparse
from pathlib import Path
from azure.storage.blob import BlobServiceClient

def select_first_file(path):
    """Selects first file in folder, use under assumption there is only one file in folder
    Args:
        path (str): path to directory or file to choose
    Returns:
        str: full path of selected file
    """

    files = os.listdir(path)
    return os.path.join(path, files[0])

def create_lags(df, column_name, num_lags=1):

    for i in range(1, num_lags + 1):
        df[f'{column_name}_lag_{i}'] = df[column_name].shift(i).fillna(0.0)

    return df

def create_moving_avg(df, min_periods):

    df['sales_ma'] = df["sales"].rolling(window=7, min_periods=min_periods, closed='left').mean().fillna(0.0)

    return df


def main():
    """
    Main function of the script.
    """

    # Input and output parameteres

    parser = argparse.ArgumentParser()

    # Data argument
    parser.add_argument("--df", type=str)

    # Features arguments
    parser.add_argument("--num_lags", type=int)
    parser.add_argument("--column", type=str)
    parser.add_argument("--min_periods", type=int)

    # Output dataframe
    parser.add_argument("--df_features",type=str)

    args = parser.parse_args()

    # Calling function to read data from previous step
    df = pd.read_csv(select_first_file(args.df))

    df = create_lags(df, args.column, args.num_lags)

    df_features = create_moving_avg(df, args.min_periods)

    df_features.to_csv((Path(args.df_features) / 'df_features.csv'))

if __name__ == "__main__":
    main()

Overwriting ./components/demo_feature_creation/demo_feature_creation.py


In [22]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

demo_feature_creation_component = command(
    name="demo_feature_creation",
    display_name="Demo feature creation for sales",
    description="This component creates lagsand moving average for the DataFrame for DSS conference.",
    inputs={
        "df": Input(type="uri_folder"),
        "num_lags": Input(type="integer"),
        "column": Input(type="string"),
        "min_periods": Input(type="integer"),
    },
    outputs=dict(df_features=Output(type="uri_folder", mode="rw_mount")),
    code=feature_creation_src_dir,
    command="""python demo_feature_creation.py \
              --df ${{inputs.df}} --num_lags ${{inputs.num_lags}} \
              --column ${{inputs.column}} --min_periods ${{inputs.min_periods}} \
              --df_features ${{outputs.df_features}}
              """,
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
)

In [23]:
# Now we register the component to the workspace
demo_feature_creation_component = ml_client.create_or_update(
    demo_feature_creation_component.component
)

# Create (register) the component in your workspace
print(
    f"Component {demo_feature_creation_component.name} with Version {demo_feature_creation_component.version} is registered"
)

Component demo_feature_creation with Version 2023-11-14-13-37-54-1886876 is registered


## Complaints model component

This component utilizes the Prophet model to forecast complaints based on historical data. It includes data loading, model training, testing, and validation, along with logging metrics and plots.

### Key Functions

1. **Data Loading**: Retrieves feature-enhanced data from the previous step for model training.

2. **Model Training**: Trains a Prophet model with specified hyperparameters such as yearly seasonality, seasonality mode, changepoint scales, and more.

3. **Model Forecasting**: Uses the trained model to forecast complaints for the testing data.

4. **Metric Logging**: Records key metrics like Mean Absolute Error (MAE), Mean Squared Error (MSE), Root Mean Squared Error (RMSE), and others for model evaluation.

5. **Plot Logging**: Captures visual representations of actual vs. predicted complaints, as well as trends and components identified by the Prophet model.

6. **Output**: Saves the training, testing, and validation data for future analysis.

### Usage

1. **Input Data**: Provide the path to the feature-enhanced data (`--df_features`) from the previous step.

2. **Model Configuration**: Adjust hyperparameters such as yearly seasonality, seasonality mode, changepoint scales, and more through command-line arguments.

3. **Model Training and Forecasting**: Trains the Prophet model and uses it to forecast complaints for the testing data.

4. **Metric and Plot Logging**: Captures and logs key metrics and visualizations for model evaluation.

5. **Output Data**: Saves the training, testing, and validation data for future analysis.

Ensure necessary libraries and dependencies are installed before running the component.

In [24]:
import os

prophet_src_dir = "./components/demo_prophet_train"
os.makedirs(prophet_src_dir, exist_ok=True)

In [25]:
%%writefile {prophet_src_dir}/demo_prophet_model_complaints.py

import os
import sys
from pathlib import Path
from mldesigner import command_component, Input, Output
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt
import seaborn as sns
import mlflow
from mlflow.models.signature import infer_signature
import datetime
import pickle
import joblib
import argparse
from sklearn.metrics import r2_score
from statsmodels.tools.eval_measures import rmse
from prophet import Prophet

def select_first_file(path):
    """Selects first file in folder, use under assumption there is only one file in folder
    Args:
        path (str): path to directory or file to choose
    Returns:
        str: full path of selected file
    """
    files = os.listdir(path)
    return os.path.join(path, files[0])

def data_split(df):

    # Splitting data. This can be wrapped into function  
    df_70 = round(len(df) * 0.7)
    remaining_rows = len(df) - df_70

    if remaining_rows % 2 != 0:
        df_15 = (remaining_rows // 2) + 1
    else:
        df_15 = remaining_rows // 2

    mlflow.log_metric("70% of the original dataframe", df_70)
    mlflow.log_metric("15% of the original dataframe", df_15)

    X_train = df.iloc[:df_70, :]  
    X_test = df.iloc[df_70:df_70 + df_15, :]  
    X_val = df.iloc[df_70 + df_15:, :]

    return X_train, X_test, X_val

def model_train(X_train, changepoint_prior_scale, changepoint_range, seasonality_prior_scale, holidays_prior_scale, weekly_seasonality, yearly_seasonality):

    model = Prophet(changepoint_prior_scale=changepoint_prior_scale, 
                    changepoint_range=changepoint_range, 
                    seasonality_prior_scale=seasonality_prior_scale, 
                    holidays_prior_scale=holidays_prior_scale, 
                    weekly_seasonality=weekly_seasonality, 
                    yearly_seasonality=yearly_seasonality)
    
    model.add_seasonality(period=30.4, name='monthly', fourier_order=2)

    results = model.fit(X_train.rename(columns={'complaints': 'y', 'Calendar day': 'ds'}))
    
    global train
    train = results.history

    return model, results

def model_forecast(X_test, model):

    future = X_test.rename(columns={'complaints': 'y', 'Calendar day': 'ds'})

    forecast = model.predict(future)

    y_pred = forecast.yhat

    return forecast, y_pred

def log_metrics(y_test, y_pred):

    def calculate_rmse(predictions, targets):
        return np.sqrt(((predictions - targets) ** 2).mean())
        
    mse = mean_squared_error(y_true=y_test,
                                y_pred=y_pred)
    rmse = np.sqrt(mean_squared_error(y_true=y_test,
                                        y_pred=y_pred))
    mae = mean_absolute_error(y_true=y_test,
                                y_pred=y_pred)

    def mean_absolute_percentage_error(y_true, y_pred):

        y_true = np.array(y_true)
        y_pred = np.array(y_pred)

        # Check for input shape compatibility
        if y_true.shape != y_pred.shape:
            raise ValueError(f"Input shapes do not match ({y_true.shape}), ({y_pred.shape})")
    
        # Calculate absolute percentage error
        absolute_error = np.abs((y_true - y_pred) / y_true)
    
        # Replace infinite values with zero and ignore NaN values
        absolute_error = np.nan_to_num(absolute_error, nan=0, posinf=0, neginf=0)
    
        # Calculate the mean of absolute percentage error
        mape = np.mean(absolute_error) * 100
    
        return mape

    mape = mean_absolute_percentage_error(y_true=y_test, 
                                            y_pred=y_pred)
    smape = np.mean(
        2 * np.abs(y_test.values - y_pred) / (np.abs(y_test.values) + np.abs(y_pred))
        ) * 100

    rmse_seasonal = calculate_rmse(y_pred, y_test.values)

    nrmse_seasonal = rmse_seasonal / (np.max(y_test.values) - np.min(y_test.values))
    
    # Logging all metrics
    mlflow.log_metric('MAE', mae)
    mlflow.log_metric('MSE', mse)
    mlflow.log_metric('RMSE', rmse)
    mlflow.log_metric('MAPE', mape)
    mlflow.log_metric('NRMSE', nrmse_seasonal)

def log_plots(y_test, y_pred, model, forecast):

    fig, ax = plt.subplots(figsize=(10, 5))
    sns.set_style('whitegrid')
    ax.plot(y_test['Calendar day'], y_test['complaints'].squeeze(), label='Actual')
    ax.fill_between(y_test['Calendar day'], forecast['yhat_lower'], forecast['yhat_upper'], alpha=0.2)
    plt.plot(y_test['Calendar day'], forecast['yhat'], label='Predicted')
    plt.xticks(rotation=60)
    plt.grid(visible=True)
    plt.legend()
    mlflow.log_figure(fig, 'forecasting_result.png')

    fig_m = model.plot_components(forecast)
    plt.xticks(rotation=60)
    plt.legend()
    mlflow.log_figure(fig_m, 'plot_forecasted_trends_and_components.png')

  
def main():
    """Main function of the script"""
    
    # Input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--df_features", type=str)
    
    # Prophet hyperparameters. Only when default value for the variable is set int the command in the component it works, if it set here this won't be accepted as the default value.
    parser.add_argument("--yearly_seasonality_input", type=int, help="(Integer) Repeating patterns over year time.", default=19)
    parser.add_argument("--seasonality_mode_input", type=str, help="(String) Mode of the seasonality components. Options: 'additive', 'multiplicative'", default='multiplicative')
    parser.add_argument("--changepoint_prior_scale_input", type=float, help="(Float) Determines the scale of the change at thetime series trend change point.", default=2.92877901455971)
    parser.add_argument("--seasonality_prior_scale_input", type=float, help="(Float) Controls themagnitude of the seasonality fluctuation.", default=6.010013757753535)
    parser.add_argument("--holidays_prior_scale_input", type=float, help='(Float) Determines the scale of holiday effects, and is very similar to the seasonality_prior_scale.', default=4.983602164233429)
    parser.add_argument("--changepoint_range_inupt", type=float, help="(Float) Available between 0 and 1, indicating the percentage of historical data that allow a trend change.", default=0.809764143539701)
    parser.add_argument("--weekly_seasonality_input", type=int, help="(Integer) Repeating patterns over week time.", default=14)

    # Output dataframes
    parser.add_argument("--training_data", type=str, default="./", help="output path for training_data")
    parser.add_argument("--testing_data", type=str, default="./", help="output path for training_data")
    parser.add_argument("--validation_data", type=str, help="output path for training_data")
    
    # Model
    parser.add_argument("--model_prophet", type=str, help="Path to the model file.")
    args = parser.parse_args()
    
    # Reading data from previous step
    df = pd.read_csv(select_first_file(args.df_features))

    #  Splitting data into 70%  training, 15% testing and validation
    X_train, X_test, X_val = data_split(df)
    print(X_train.head())
    print(X_test.head())
    print(X_train.head())

    
    # Identifying parameter variables from the parser
    yearly_seasonality = args.yearly_seasonality_input
    seasonality_mode = args.seasonality_mode_input
    changepoint_prior_scale = args.changepoint_prior_scale_input
    seasonality_prior_scale = args.seasonality_prior_scale_input
    holidays_prior_scale = args.holidays_prior_scale_input
    changepoint_range = args.changepoint_range_inupt
    weekly_seasonality = args.weekly_seasonality_input

    model, results = model_train(X_train, changepoint_prior_scale=changepoint_prior_scale, changepoint_range=changepoint_range,
                                seasonality_prior_scale=seasonality_prior_scale, holidays_prior_scale=holidays_prior_scale,
                                weekly_seasonality=weekly_seasonality, yearly_seasonality=yearly_seasonality)
    
    # Registering model. This can be donein the model training function. Variable 'train' is declared as a global one.
    signature =  infer_signature(train, results.predict(X_test.rename(columns={'complaints': 'y', 'Calendar day': 'ds'})))
    mlflow.prophet.log_model(results, artifact_path="prophet_model_complaints", signature=signature)

    model_filename =  os.path.join(args.model_prophet,f'prophet_model.pkl')
    pickle.dump(results, open(model_filename, 'wb'))
    
    # Testing out model on the X_test
    forecast, y_pred = model_forecast(X_test, model)

    # Logging metrics
    log_metrics(X_test['complaints'].squeeze(), y_pred)

    # Logging plots
    log_plots(X_test, y_pred, model, forecast)

    X_train.to_csv((Path(args.training_data) / "train_data.csv"))
    X_test.to_csv((Path(args.testing_data) / "test_data.csv"))
    X_val.to_csv((Path(args.validation_data) / "validation_data.csv"))

if __name__ == "__main__":
    main()

Overwriting ./components/demo_prophet_train/demo_prophet_model_complaints.py


In [26]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

demo_prophet_model_complaints_component = command(
    name="demo_prophet_model_complaints",
    display_name="Demo Prophet model for complaints",
    description="Component gets data with features from the feature creation component, trains and log model, metrics and plots. Splits data into train, test and validation",
    inputs={
        "df_features": Input(type="uri_folder"),
        "yearly_seasonality_input": Input(type="integer", default=19),
        "seasonality_mode_input": Input(type="string", default="multiplicative"),
        "changepoint_prior_scale_input": Input(type="number", default=2.92877901455971),
        "seasonality_prior_scale_input": Input(
            type="number", default=6.010013757753535
        ),
        "holidays_prior_scale_input": Input(type="number", default=4.983602164233429),
        "changepoint_range_inupt": Input(type="number", default=0.809764143539701),
        "weekly_seasonality_input": Input(type="integer", default=14),
    },
    outputs=dict(
        training_data=Output(type="uri_folder", mode="rw_mount"),
        testing_data=Output(type="uri_folder", mode="rw_mount"),
        validation_data=Output(type="uri_folder", mode="rw_mount"),
        model_prophet=Output(type="uri_folder", mode="rw_mount"),
    ),
    code=prophet_src_dir,
    command="""python demo_prophet_model_complaints.py \
            --df_features ${{inputs.df_features}} --yearly_seasonality_input ${{inputs.yearly_seasonality_input}} \
            --seasonality_mode_input ${{inputs.seasonality_mode_input}} --changepoint_prior_scale_input ${{inputs.changepoint_prior_scale_input}} \
            --seasonality_prior_scale_input ${{inputs.seasonality_prior_scale_input}} --holidays_prior_scale_input ${{inputs.holidays_prior_scale_input}} \
            --changepoint_range_inupt ${{inputs.changepoint_range_inupt}} --weekly_seasonality_input ${{inputs.weekly_seasonality_input}} \
            --training_data ${{outputs.training_data}} --testing_data ${{outputs.testing_data}} --validation_data ${{outputs.validation_data}} \
            --model_prophet ${{outputs.model_prophet}} \
            """,
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
)

In [27]:
# Now we register the component to the workspace
demo_prophet_model_complaints_component = ml_client.create_or_update(
    demo_prophet_model_complaints_component.component
)

# Create (register) the component in your workspace
print(
    f"Component {demo_prophet_model_complaints_component.name} with Version {demo_prophet_model_complaints_component.version} is registered"
)

Component demo_prophet_model_complaints with Version 2023-11-14-13-37-56-0108665 is registered


## Complaints forecasting component

This component performs validation on the complaints forecasting model using historical data. It includes loading the training, testing, and validation datasets, making future predictions, logging metrics, and recording results to Azure Blob Storage.

### Key Functions

1. **Data Loading**: Retrieves training, testing, and validation datasets from the previous steps for model validation.

2. **Model Validation**: Uses the trained forecasting model to make predictions for the validation dataset and assesses its performance.

3. **Metric Logging**: Records key metrics such as Mean Absolute Error (MAE), Mean Squared Error (MSE), Root Mean Squared Error (RMSE), etc., for model evaluation.

4. **Plot Logging**: Captures visualizations of actual vs. predicted complaints and trends/components identified by the forecasting model.

5. **Results Recording**: Stores the results, including future predictions, in Azure Blob Storage.

### Usage

1. **Input Data**: Provide the paths to the training, testing, and validation datasets (`--training_data`, `--testing_data`, `--validation_data`) from the previous steps.

2. **Azure Blob Storage Configuration**: Set the Azure Blob Storage connection string (`--conn_str`) and container name (`--container_name`) to record results.

3. **Trained Model Input**: Provide the path to the directory containing the trained forecasting model (`--model`).

4. **Forecasting Parameters**: Specify the forecasting parameters such as the number of periods into the future (`--periods_horizon`) and the frequency of observations (`--freq`).

5. **Model Validation and Results Recording**: Validates the forecasting model on the validation dataset, logs metrics and plots, and records results to Azure Blob Storage.

Ensure necessary libraries and dependencies are installed before running the component.

In [28]:
import os

forecast_src_dir = "./components/demo_prophet_forecast"
os.makedirs(forecast_src_dir, exist_ok=True)

In [29]:
%%writefile {forecast_src_dir}/demo_complaints_forecast.py
import os
from pathlib import Path
from mldesigner import command_component, Input, Output
import pandas as pd
import numpy as np
from prophet import Prophet, serialize
from prophet.diagnostics import cross_validation, performance_metrics
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt
import seaborn as sns
import mlflow
import datetime
import pickle
import joblib
import argparse
from io import BytesIO
from azure.storage.blob import BlobServiceClient
import io
from azure.keyvault.secrets import SecretClient
from azure.identity import ManagedIdentityCredential
import json

def select_first_file(path):
    """Selects first file in folder, use under assumption there is only one file in folder
    Args:
        path (str): path to directory or file to choose
    Returns:
        str: full path of selected file
    """
    files = os.listdir(path)
    return os.path.join(path, files[0])

def model_validate(X_train, X_test, X_val, model, periods, freq):

    future_val = X_val.rename(columns={'Calendar day': 'ds'})

    forecast_val = model.predict(future_val)

    forecast_val = forecast_val[-len(X_val):]

    y_pred_val = forecast_val['yhat']

    # Future predictions
    last_date = forecast_val.iloc[-1]['ds']
    last_date = last_date + datetime.timedelta(days=7)

    future_dates = pd.date_range(start=last_date, periods=periods, freq=freq)
    future_df = pd.DataFrame({'ds': future_dates})

    for col in X_val.columns:
        if col != 'ds':
            future_df[col] = 0
    
    # Drop the first row    
    future_predictions = model.predict(future_df)

    temp_df = future_predictions[['ds', 'yhat']]
    temp_df.rename(columns={'ds': 'Calendar day', 'yhat': 'complaints'}, inplace=True)
    
    reg_data_future = pd.concat([X_train, X_test, X_val, temp_df], axis=0)

    return forecast_val, y_pred_val, future_predictions, temp_df

def log_metrics(y_test, y_pred):

    def calculate_rmse(predictions, targets):
        return np.sqrt(((predictions - targets) ** 2).mean())
    
    
    mse = mean_squared_error(y_true=y_test,
                                y_pred=y_pred)
    rmse = np.sqrt(mean_squared_error(y_true=y_test,
                                        y_pred=y_pred))
    mae = mean_absolute_error(y_true=y_test,
                                y_pred=y_pred)

    def mean_absolute_percentage_error(y_true, y_pred):


        y_true = np.array(y_true)
        y_pred = np.array(y_pred)
    
        # Check for input shape compatibility
        if y_true.shape != y_pred.shape:
            raise ValueError(f"Input shapes do not match ({y_true.shape}), ({y_pred.shape})")
    
        # Calculate absolute percentage error
        absolute_error = np.abs((y_true - y_pred) / y_true)
    
        # Replace infinite values with zero and ignore NaN values
        absolute_error = np.nan_to_num(absolute_error, nan=0, posinf=0, neginf=0)
    
        # Calculate the mean of absolute percentage error
        mape = np.mean(absolute_error) * 100
    
        return mape
    
    mape = mean_absolute_percentage_error(y_true=y_test, 
                                            y_pred=y_pred)
    smape = np.mean(
        2 * np.abs(y_test.values - y_pred) / (np.abs(y_test.values) + np.abs(y_pred))
        ) * 100

    rmse_seasonal = calculate_rmse(y_pred, y_test.values)

    nrmse_seasonal = rmse_seasonal / (np.max(y_test.values) - np.min(y_test.values))
    
    # # Logging all metrics
    mlflow.log_metric('Validation MAE', mae)
    mlflow.log_metric('Validation MSE', mse)
    mlflow.log_metric('Validation RMSE', rmse)
    mlflow.log_metric('Validation MAPE', mape)
    mlflow.log_metric('Validation NRMSE', nrmse_seasonal)

def log_plots(y_test, y_pred, model, forecast):

    fig, ax = plt.subplots(figsize=(10, 5))
    sns.set_style('whitegrid')
    ax.plot(y_test['Calendar day'], y_test['complaints'].squeeze(), label='Actual')
    ax.fill_between(y_test['Calendar day'], forecast['yhat_lower'], forecast['yhat_upper'], alpha=0.2)
    plt.plot(y_test['Calendar day'], forecast['yhat'], label='Predicted')
    plt.xticks(rotation=60)
    plt.grid(visible=True)
    plt.legend()
    mlflow.log_figure(fig, 'val_forecasting_result.png')

    fig_m = model.plot_components(forecast)
    plt.xticks(rotation=60)
    plt.legend()
    mlflow.log_figure(fig_m, 'val_plot_forecasted_trends_and_components.png')

def record_to_blob(df, conn_str: str, container_name: str, secret_client):

    conString = secret_client.get_secret(name=conn_str).value
    blob_service_client = BlobServiceClient.from_connection_string(conString)
    blob_client = blob_service_client.get_blob_client(container=container_name, blob='result.csv')
    writer = io.BytesIO()
    df.to_csv(writer)
    blob_client.upload_blob(writer.getvalue(), overwrite = True)

def main():
    """Main function of the script"""

    # Input arguments
    parser = argparse.ArgumentParser()

    # Data input
    parser.add_argument("--training_data", type=str)
    parser.add_argument("--testing_data", type=str)
    parser.add_argument("--validation_data", type=str)

    # Connection input
    parser.add_argument("--conn_str", type=str)
    parser.add_argument("--container_name", type=str)

    # Model input
    parser.add_argument("--model_prophet", type=str)

    # Forecasting variables
    parser.add_argument("--periods_horizon", type=int)
    parser.add_argument("--freq", type=str)
    args = parser.parse_args()

    # Reading data from previous component
    X_train = pd.read_csv(select_first_file(args.training_data))
    X_test = pd.read_csv(select_first_file(args.testing_data))
    X_val = pd.read_csv(select_first_file(args.validation_data))

    periods = args.periods_horizon
    freq = args.freq

    # Loading model
    files = [f for f in os.listdir(args.model_prophet) if f.endswith(".pkl")]
    model = joblib.load(args.model_prophet + "/" + files[0])
    print(model)

    # Making future predictions
    forecast_val, y_pred_val, future_predictions, temp_df = model_validate(X_train, X_test, X_val, model, periods=periods, freq=freq)

    # Logging metrics
    log_metrics(X_val['complaints'].squeeze(), y_pred_val)

    # Logging plots
    log_plots(X_val, y_pred_val, model, forecast_val)

    credential = ManagedIdentityCredential(client_id = open('client_id.txt').readline())  # Client id is an Application ID from Active Directory

    secret_client = SecretClient(vault_url='<KEY_VAULT_URL>', credential=credential)  # Secret Client initialization with th access to the desired Key Valut
    
    record_to_blob(temp_df, args.conn_str, args.container_name, secret_client)


if __name__ == "__main__":
    main()

Overwriting ./components/demo_prophet_forecast/demo_complaints_forecast.py


In [30]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

demo_prophet_forecast_complaints_component = command(
    name="demo_prophet_forecast_complaints",
    display_name="Demo Prophet forecast for complaints",
    description="Component gets the train, test, validation ",
    inputs={
        "training_data": Input(type="uri_folder"),
        "testing_data": Input(type="uri_folder"),
        "validation_data": Input(type="uri_folder"),
        "conn_str": Input(type="string"),
        "container_name": Input(type="string"),
        "model_prophet": Input(type="uri_folder"),
        "periods_horizon": Input(type="integer"),
        "freq": Input(type="string"),
    },
    code=forecast_src_dir,
    command="""python demo_complaints_forecast.py \
            --training_data ${{inputs.training_data}} --testing_data ${{inputs.testing_data}} \
            --validation_data ${{inputs.validation_data}} --model_prophet ${{inputs.model_prophet}} \
            --conn_str ${{inputs.conn_str}} --container_name ${{inputs.container_name}} \
            --periods_horizon ${{inputs.periods_horizon}} --freq ${{inputs.freq}} \
            """,
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
)

In [31]:
# Now we register the component to the workspace
demo_prophet_forecast_complaints_component = ml_client.create_or_update(
    demo_prophet_forecast_complaints_component.component
)

# Create (register) the component in your workspace
print(
    f"Component {demo_prophet_forecast_complaints_component.name} with Version {demo_prophet_forecast_complaints_component.version} is registered"
)

[32mUploading demo_prophet_forecast (0.01 MBs):   0%|          | 0/7105 [00:00<?, ?it/s][32mUploading demo_prophet_forecast (0.01 MBs): 100%|██████████| 7105/7105 [00:00<00:00, 77321.43it/s]
[39m



Component demo_prophet_forecast_complaints with Version 2023-11-14-13-37-58-3703848 is registered


# Creating pipeline from components

In [32]:
# the dsl decorator tells the sdk that we are defining an Azure ML pipeline
from azure.ai.ml import dsl, Input, Output


@dsl.pipeline(
    compute=cpu_compute_target,
    description="Complaints forecasting demo",
)
def demo_complaints_forecasting(
    pipeline_job_sas_token,
    pipeline_job_yearly_seasonality,
    pipeline_job_seasonality_mode,
    pipeline_job_changepoint_prior_scale,
    pipeline_job_min_periods,
    pipeline_job_freq,
    pipeline_job_num_lags,
    pipeline_job_column,
    pipeline_job_seasonality_prior_scale,
    pipeline_job_holidays_prior_scale,
    pipeline_job_changepoint_range,
    pipeline_job_weekly_seasonality,
    pipeline_job_conn_str,
    pipeline_job_container_name,
    pipeline_job_periods_horizon,
    pipeline_job_p,
    pipeline_job_d,
    pipeline_job_q,
    pipeline_job_p_seasonal,
    pipeline_job_d_seasonal,
    pipeline_job_q_seasonal,
    pipeline_job_s,
):
    # Using model_training_billing_component like a python call with its own input
    demo_model_training_billing_job = demo_sales_model_training_component(
        sas_token=pipeline_job_sas_token,
        p_input=pipeline_job_p,
        d_input=pipeline_job_d,
        q_input=pipeline_job_q,
        p_seasonal_input=pipeline_job_p_seasonal,
        d_seasonal_input=pipeline_job_d_seasonal,
        q_seasonal_input=pipeline_job_q_seasonal,
        s_input=pipeline_job_s,
    )

    demo_sales_forecast_job = demo_sales_forecast_component(
        model_forecast=demo_model_training_billing_job.outputs.model,
        df=demo_model_training_billing_job.outputs.df_original,
        X_val=demo_model_training_billing_job.outputs.validation_data_sales,
    )

    demo_feature_creation_job = demo_feature_creation_component(
        df=demo_sales_forecast_job.outputs.df_extended,
        num_lags=pipeline_job_num_lags,
        column=pipeline_job_column,
        min_periods=pipeline_job_min_periods,
    )

    demo_prophet_model_complaints_job = demo_prophet_model_complaints_component(
        df_features=demo_feature_creation_job.outputs.df_features,
        yearly_seasonality_input=pipeline_job_yearly_seasonality,
        seasonality_mode_input=pipeline_job_seasonality_mode,
        changepoint_prior_scale_input=pipeline_job_changepoint_prior_scale,
        seasonality_prior_scale_input=pipeline_job_seasonality_prior_scale,
        holidays_prior_scale_input=pipeline_job_holidays_prior_scale,
        changepoint_range_inupt=pipeline_job_changepoint_range,
        weekly_seasonality_input=pipeline_job_weekly_seasonality,
    )
    demo_prophet_forecast_complaints_job = demo_prophet_forecast_complaints_component(
        training_data=demo_prophet_model_complaints_job.outputs.training_data,
        testing_data=demo_prophet_model_complaints_job.outputs.testing_data,
        validation_data=demo_prophet_model_complaints_job.outputs.validation_data,
        conn_str=pipeline_job_conn_str,
        container_name=pipeline_job_container_name,
        model_prophet=demo_prophet_model_complaints_job.outputs.model_prophet,
        periods_horizon=pipeline_job_periods_horizon,
        freq=pipeline_job_freq,
    )

    return {}

In [33]:
registered_model_name = "demo_complaints_forecasting"

# Let's instantiate the pipeline with the parameters of our choice
pipeline = demo_complaints_forecasting(
    pipeline_job_sas_token="sas-demo",
    pipeline_job_p=1,
    pipeline_job_d=0,
    pipeline_job_q=1,
    pipeline_job_p_seasonal=3,
    pipeline_job_d_seasonal=0,
    pipeline_job_q_seasonal=3,
    pipeline_job_s=12,
    pipeline_job_yearly_seasonality=19,
    pipeline_job_seasonality_mode="multiplicative",
    pipeline_job_changepoint_prior_scale=2.92877901455971,
    pipeline_job_seasonality_prior_scale=6.010013757753535,
    pipeline_job_holidays_prior_scale=4.983602164233429,
    pipeline_job_changepoint_range=0.809764143539701,
    pipeline_job_weekly_seasonality=14,
    pipeline_job_min_periods=1,
    pipeline_job_freq="D",
    pipeline_job_num_lags=3,
    pipeline_job_periods_horizon=60,
    pipeline_job_column="sales",
    pipeline_job_conn_str="connection-string-demo",
    pipeline_job_container_name="dss-demo",
)

# Submit the job

In [None]:
import webbrowser

# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    # Project's name
    experiment_name="demo_complaints_forecasting_pipeline",
)
# open the pipeline in web browser
webbrowser.open(pipeline_job.studio_url)