### Setup

In [1]:
# Import required libraries
import os
import datetime
import json
import yaml
import azure.ai.ml
import pandas as pd
from time import sleep
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient, Input, Output
from azure.ai.ml import load_component
from azure.ai.ml.constants import AssetTypes, InputOutputModes
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import (
    BatchEndpoint,
    BatchDeployment,
    AmlCompute,
    PipelineComponentBatchDeployment,
    Environment,
)

print(f"SDK version: {azure.ai.ml.__version__}")

# Set your subscription, resource group and workspace name:
subscription_id = "25f559b2-3a60-46ab-bf85-6e8a8359d5e4"
resource_group = "Peak-rg-MIWG"
workspace = "forecasting_demo"

# connect to the AzureML workspace
ml_client = MLClient(
    DefaultAzureCredential(), subscription_id, resource_group, workspace
)

print(ml_client)

SDK version: 1.8.0
MLClient(credential=<azure.identity._credentials.default.DefaultAzureCredential object at 0x7fc2e6e56aa0>,
         subscription_id=25f559b2-3a60-46ab-bf85-6e8a8359d5e4,
         resource_group_name=Peak-rg-MIWG,
         workspace_name=forecasting_demo)


### Create Compute

In [2]:
# specify aml compute name.
cpu_compute_target = "forecasting-compute-cluster"

try:
    # let's see if the compute target already exists
    cpu_cluster = ml_client.compute.get(cpu_compute_target)
    print(
        f"You already have a cluster named {cpu_compute_target}, we'll reuse it as is."
    )

except Exception:
    print("Creating a new cpu compute target...")

    # Let's create the Azure ML compute object with the intended parameters
    cpu_cluster = AmlCompute(
        name=cpu_compute_target,
        # Azure ML Compute is the on-demand VM service
        type="amlcompute",
        # VM Family
        size="STANDARD_DS11_V2",
        # Minimum running nodes when there is no job running
        min_instances=0,
        # Nodes in cluster
        max_instances=1,
        # How many seconds will the node running after the job termination
        idle_time_before_scale_down=180,
        # Dedicated or LowPriority. The latter is cheaper but there is a chance of job termination
        tier="Dedicated",
    )

    # Now, we pass the object to MLClient's create_or_update method
    cpu_cluster = ml_client.compute.begin_create_or_update(cpu_cluster)

You already have a cluster named forecasting-compute-cluster, we'll reuse it as is.


### Getting the data

In [3]:
# ==============================================================
# Set the input and output URI paths for the data. Supported paths include:
# local: `./<path>
# Blob: wasbs://<container_name>@<account_name>.blob.core.windows.net/<path>
# ADLS: abfss://<file_system>@<account_name>.dfs.core.windows.net/<path>
# Datastore: azureml://datastores/<data_store_name>/paths/<path>
# Data Asset: azureml:<my_data>:<version>
# As an example, we set the input path to a file on a public blob container
# As an example, we set the output path to a folder in the default datastore
# ==============================================================
input_path = "azureml:MasterSourceData:1"
output_path = "azureml://datastores/forecastingdemosourcedatalake/paths/outputfiles/"

# ==============================================================
# What type of data are you pointing to?
# AssetTypes.URI_FILE (a specific file)
# AssetTypes.URI_FOLDER (a folder)
# AssetTypes.MLTABLE (a table)
# The path we set above is a specific file
# ==============================================================
data_type = AssetTypes.URI_FILE
output_type = AssetTypes.URI_FOLDER

# ==============================================================
# Set the input mode. The most commonly-used modes:
# InputOutputModes.RO_MOUNT
# InputOutputModes.DOWNLOAD
# Set the mode to Read Only (RO) to mount the data
# ==============================================================
input_mode = InputOutputModes.RO_MOUNT

# ==============================================================
# Set the output mode. The most commonly-used modes:
# InputOutputModes.RW_MOUNT
# InputOutputModes.UPLOAD
# Set the mode to Read Write (RW) to mount the data
# ==============================================================
output_mode = InputOutputModes.RW_MOUNT

### Create a Job Environment for Pipeline Steps

In [4]:
dependencies_dir = "./src/dependencies"
os.makedirs(dependencies_dir, exist_ok=True)

In [5]:
%%writefile {dependencies_dir}/conda.yaml
name: forecast
channels:
- conda-forge
- anaconda
dependencies:
- python=3.8
- pip=22.1.2
- numpy~=1.22.3
- pandas~=1.3.5
- scikit-learn=1.2.2
- py-xgboost=1.3.3
- holidays=0.10.3
- cudatoolkit=11.1
- pyopenssl=23.2.0
- psutil>=5.2.2,<6.0.0
- GitPython=3.1.32
- tqdm
- setuptools=65.5.1
- wheel=0.38.1
- openssl=1.1.1s
- gunicorn
- flask
- pip:
  - inference-schema
  - azure-ai-ml
  - fsspec
  - azureml-mlflow==1.53.0
  - azureml-defaults==1.53.0
  - azureml-telemetry==1.53.0
  - azureml-interpret==1.53.0
  - cryptography>=41.0.2
  - scipy==1.10.1
  - spacy==2.1.8
  - prophet==1.1.4
  - py-cpuinfo==5.0.0 

Overwriting ./src/dependencies/conda.yaml


In [6]:
custom_env_name = "forecastingDemo_pipline_env"

pipeline_job_env = Environment(
    name=custom_env_name,
    description="Custom environment for forecasting demo pipeline",
    tags={},
    conda_file=os.path.join(dependencies_dir, "conda.yaml"),
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
    version="1",
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

Environment with name forecastingDemo_pipline_env is registered to workspace, the environment version is 1


### Create Component 1: Training

In [7]:
train_src_dir = "./src/components/train"
os.makedirs(train_src_dir, exist_ok=True)

In [8]:
%%writefile {train_src_dir}/train.py

# imports
import pandas as pd
import numpy as np
import argparse
from prophet import Prophet
import pickle
import os

# Define the output directory
output_dir = './outputs'
os.makedirs("./outputs", exist_ok=True)

# input and output arguments
parser = argparse.ArgumentParser()
parser.add_argument("--sensor_data", type=str, help="path to sensor data")
parser.add_argument("--registered_model_name", type=str, help="model name")
parser.add_argument("--model", type=str, help="path to model file")
args = parser.parse_args()

sensor_data = pd.read_csv(args.sensor_data)

sensors = sensor_data["Sensor Name"].unique()

for sensor in sensors:
    try:

        '''
        clean dataset by replacing suspect readings with the mean average of the dataset

        '''
        # get the DataFrame for the current sensor
        df_sensor = sensor_data[sensor_data["Sensor Name"] == sensor]

        # calculate the mean of non-zero, non-negative, and non-empty values
        mean_value = df_sensor[df_sensor["Value"] > 0]["Value"].mean()

        # replace the values for rows that are zero, negative, or na
        df_sensor.loc[(df_sensor["Value"] <= 0) | (df_sensor["Value"].isna()), "Value"] = mean_value

        # replace the empty rows with the mean value
        df_sensor["Value"].fillna(mean_value, inplace=True)

        # update the original DataFrame with the modified values
        sensor_data[sensor_data["Sensor Name"] == sensor] = df_sensor


        '''
        fit data to model

        '''

        # get the DataFrame for the current sensor
        x = pd.to_datetime(df_sensor["Timestamp"])
        y = df_sensor.Value

        # Create a dictionary with the data for each column
        data = {'ds': x, 'y': y}

        # Create the DataFrame
        df = pd.DataFrame(data)

        # Create and fit the Prophet model with custom seasonality
        model = Prophet(
            daily_seasonality=True,
            interval_width=0.8,
            changepoint_prior_scale=0.15,
            seasonality_prior_scale=0.1,
            uncertainty_samples=1000
        )

        # modify seasonality and fourier_order (daily)
        model.add_seasonality(
            name='daily',
            period=1,
            fourier_order=50 
        )

        # modify seasonality and fourier_order (weekly)
        model.add_seasonality(
            name='weekly',
            period=7,
            fourier_order=1
        )

        # modify seasonality and fourier_order
        model.add_seasonality(
            name='fortnight',    period=15,
            fourier_order=10 
        )

        # Fit the model
        forecast_model = model.fit(df)

        '''
        save model as a pickle file

        '''
        # Specify the file path where you want to save the pickle file
        file_path = sensor + "_forecast_model.pkl"
        pickle_file_path = os.path.join(output_dir, file_path)

        # Save the model as a pickle file
        with open(pickle_file_path, 'wb') as file:
            pickle.dump(forecast_model, file)

        # Close the file
        file.close()

        # Specify the file path where you want to save the pickle file
        file_path = sensor + "_forecast_model.pkl"
        pickle_file_path = os.path.join(args.model, file_path)

        # Save the model as a pickle file
        with open(pickle_file_path, 'wb') as file:
            pickle.dump(forecast_model, file)

        # Close the file
        file.close()


    except Exception as e:
        print(sensor + " has error:", str(e))
        continue          

Overwriting ./src/components/train/train.py


In [9]:
%%writefile {train_src_dir}/train.yml
# <component>
name: forecastdemo_model
display_name: ForecastDemo Model
type: command
inputs:
  sensor_data: 
    type: uri_folder  
  registered_model_name:
    type: string
outputs:
  model:
    type: uri_folder
code: ./
environment: azureml:forecastingDemo_pipline_env:1
command: >-
  python train.py 
  --sensor_data ${{inputs.sensor_data}}  
  --registered_model_name ${{inputs.registered_model_name}} 
  --model ${{outputs.model}}

Overwriting ./src/components/train/train.yml


In [10]:
# Loading the component from the yml file
train_component = load_component(source=os.path.join(train_src_dir, "train.yml"))

# Now we register the component to the workspace
train_component = ml_client.create_or_update(train_component)

# Create (register) the component in your workspace
print(
    f"Component {train_component.name} with Version {train_component.version} is registered"
)

Component forecastdemo_model with Version 2023-11-27-13-56-47-8929439 is registered


### Create Component 2: Score

In [11]:
score_src_dir = "./src/components/score"
os.makedirs(score_src_dir, exist_ok=True)

In [12]:
%%writefile {score_src_dir}/score.py
import argparse
import pandas as pd
import numpy as np
from prophet import Prophet
import pickle
import matplotlib.pyplot as plt
import os
from pathlib import Path
import mlflow

# start logging
mlflow.start_run()

parser = argparse.ArgumentParser()
parser.add_argument("--model", type=str, help="Path to the input model directory")
parser.add_argument("--data", type=str, help="Path to the data to score")
parser.add_argument("--scoredata", type=str, help="Path to save the score data")

args = parser.parse_args()

lines = [
    f"model path: {args.model}",
    f"input data path: {args.data}",
    f"Output data path: {args.scoredata}",
]

for line in lines:
    print(f"\t{line}")

sensor_data = pd.read_csv(args.data)
sensors = sensor_data["Sensor Name"].unique()

# Define the output directory for the first directory
# output_dir = './outputs'
# output_dir = "abfss://forecastingdemooutputs@stdtlkforecastingdemo.dfs.core.windows.net/outputfiles"
# os.makedirs(output_dir, exist_ok=True)

sensor_locs = sensor_data[["Sensor Name", "Sensor Centroid Latitude", "Sensor Centroid Longitude"]]
sensor_locs = sensor_locs.drop_duplicates()
# Save the predictions to the specified path
sensor_locs_file_path = os.path.join(args.scoredata, 'sensor_locs.csv')
sensor_locs.to_csv(sensor_locs_file_path, index=False)

for sensor in sensors:
    try:
        # Construct the path to the pickle file for the current sensor
        model_file_name = f"{sensor}_forecast_model.pkl"
        model_file_path = os.path.join(args.model, model_file_name)

        # Load the trained model for the current sensor
        with open(model_file_path, 'rb') as model_file:
            model = pickle.load(model_file)

        # Generate predictions
        future = model.make_future_dataframe(periods=14, freq='D')
        forecast = model.predict(future)
        fortnight_forecast = forecast.tail(14)
        fortnight_forecast['sensor reading'] = fortnight_forecast['yhat']
        fortnight_forecast["Sensor Name"] = sensor
    

        # Save the predictions to the specified path
        output_file_path = os.path.join(args.scoredata, f'{sensor}_predictions.csv')
        fortnight_forecast.to_csv(output_file_path, index=False)


    except Exception as e:
        print(f"{sensor} has error:", str(e))
        pass

mlflow.end_run()

Overwriting ./src/components/score/score.py


In [13]:
%%writefile {score_src_dir}/score.yml
name: forecastdemo_model_score
display_name: ForecastDemo Model Scoring
type: command
inputs:
  model:
    type: uri_file
  data:
    type: uri_file
outputs:
  scoredata:
    type: uri_folder
code: ./
environment: azureml:forecastingDemo_pipline_env:1
command: >-
  python score.py
  --model ${{inputs.model}}
  --data ${{inputs.data}}
  --scoredata ${{outputs.scoredata}}

Overwriting ./src/components/score/score.yml


In [14]:
# importing the Component Package
from azure.ai.ml import load_component

# Loading the component from the yml file
score_component = load_component(source=os.path.join(score_src_dir, "score.yml"))

# Now we register the component to the workspace
score_component = ml_client.create_or_update(score_component)

# Create (register) the component in your workspace
print(
    f"Component {score_component.name} with Version {score_component.version} is registered"
)

Component forecastdemo_model_score with Version 2023-11-27-13-56-49-3496529 is registered


### Create Pipeline from Components

In [15]:
@pipeline
def forecastingdemo_pipeline(
    pipeline_job_data_input: Input,
    pipeline_job_registered_model_name: str,
):

    # using train_func like a python call with its own inputs
    train_job = train_component(
        sensor_data=pipeline_job_data_input,
        registered_model_name=pipeline_job_registered_model_name,
    )

    # Adding scoring component
    score_job = score_component(
        data=pipeline_job_data_input,
        model=train_job.outputs.model,
    )

    # A pipeline returns a dictionary of outputs
    return {
        "pipeline_job_train_data": train_job.outputs.model,
        "pipeline_job_score_predictions": score_job.outputs.scoredata,
    }


In [16]:
registered_model_name = "forecastingdemo_model"

# Let's instantiate the pipeline
pipeline_job = forecastingdemo_pipeline(
    pipeline_job_data_input=Input(type=data_type, path=input_path, mode=input_mode),
    pipeline_job_registered_model_name=registered_model_name,
)

pipeline_job.outputs.pipeline_job_score_predictions = Output(
        type=output_type, mode=output_mode, path=output_path
    )

# set pipeline level compute
pipeline_job.settings.default_compute = cpu_compute_target
pipeline_job.settings.force_rerun = 'true'
pipeline_job.settings.continue_on_step_failure = 'false'
pipeline_job.display_name = 'DemoForecasting_EndtoEnd'
pipeline_job.description = 'Forecasting Demo End to End'

In [17]:
print(pipeline_job)

Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


display_name: DemoForecasting_EndtoEnd
description: Forecasting Demo End to End
type: pipeline
inputs:
  pipeline_job_data_input:
    mode: ro_mount
    type: uri_file
    path: azureml:MasterSourceData:1
  pipeline_job_registered_model_name: forecastingdemo_model
outputs:
  pipeline_job_train_data:
    type: uri_folder
  pipeline_job_score_predictions:
    mode: rw_mount
    type: uri_folder
    path: azureml://datastores/forecastingdemosourcedatalake/paths/outputfiles/
jobs:
  train_job:
    type: command
    inputs:
      sensor_data:
        path: ${{parent.inputs.pipeline_job_data_input}}
      registered_model_name:
        path: ${{parent.inputs.pipeline_job_registered_model_name}}
    outputs:
      model: ${{parent.outputs.pipeline_job_train_data}}
    resources:
      instance_count: 1
    component:
      name: forecastdemo_model
      version: 2023-11-27-13-56-47-8929439
      display_name: ForecastDemo Model
      type: command
      inputs:
        sensor_data:
          

### Submit Job

In [18]:
# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job,
    # Project's name
    experiment_name="forecastdemo_experiment",
)
ml_client.jobs.stream(pipeline_job.name)

RunId: willing_yacht_2hc0wwd7dn
Web View: https://ml.azure.com/runs/willing_yacht_2hc0wwd7dn?wsid=/subscriptions/25f559b2-3a60-46ab-bf85-6e8a8359d5e4/resourcegroups/Peak-rg-MIWG/workspaces/forecasting_demo

Streaming logs/azureml/executionlogs.txt

[2023-11-27 13:56:55Z] Submitting 1 runs, first five are: 3c3de4db:6144b836-bcba-4ba7-aa50-6a0ab72e04b0
