In [1]:
import datetime
import os

import constants

import pandas as pd 

from azure.ai.ml import command, Input, Output
from azure.ai.ml import MLClient
from azure.ai.ml.entities import Environment
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential

In [9]:
try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

# Get a handle to the workspace
ml_client = MLClient(
    credential=credential,
    subscription_id=constants.SUBSCRIPTION_ID,
    resource_group_name=constants.RESOURCE_GROUP_NAME,
    workspace_name=constants.WORKSPACE_NAME,
)

In [3]:
# all versions will have the date encoded in the version number
version_num = datetime.datetime.now().strftime("%Y%m%d")

In [4]:
%%writefile components/get_data.py

from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

import pandas as pd
import argparse

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--input_data", type=str, help="path or URL to input data")
    parser.add_argument("--train_data", type=str)
    args = parser.parse_args()

    df = pd.read_csv(args.input_data)
    df = df.drop("adjusted_close", axis=1)

    df = prep_data(df)
    df = df.fillna(method="ffill")

    path = os.path.join(args.train_data, "stock-data.csv")
    df.to_csv(path)

    stock_data = Data(
        name="stock-data",
        path=path,
        type=AssetTypes.URI_FILE,
        description="Dataset to train a model on the IBM stock data.",
        tags={"source_type": "web", "source": "AlphaVantage"},
    )

    credit_data = ml_client.data.create_or_update(stock_data)

def prep_data(dataframe):
    # get rolling mean an exponential moving average
    dataframe["rolling_3_mean"] = dataframe["close"].shift(1).rolling(3).mean()
    dataframe["rolling_7_mean"] = dataframe["close"].shift(1).rolling(3).mean()
    dataframe["ewma"] = dataframe["close"].shift(1).ewm(alpha=0.5).mean()

    # convert timestamp to unix timecode 
    dataframe['unix_timestamp'] = pd.to_datetime(dataframe['timestamp']).values.astype(int)/ 10**9

    # get day of week and month
    dataframe["timestamp"] = pd.to_datetime(dataframe["timestamp"])
    dataframe["weekday"] = dataframe['timestamp'].dt.dayofweek
    dataframe["month"] = dataframe['timestamp'].dt.month
    dataframe = dataframe.drop("timestamp", axis=1)

    # shift the target column 
    dataframe["close_shifted"] = dataframe["close"].shift(-1)
    return dataframe

if __name__ == "__main__": 
    main()

Overwriting components/get_data.py


In [5]:
%%writefile components/train_model.py
from azure.ai.ml.entities import Model
from azure.ai.ml.constants import AssetTypes

import lightgbm as lgbm 
import pandas as pd
import numpy as np
import argparse
import pickle
import datetime

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--train_data", type=str, help="path to input data")
    args = parser.parse_args()

    # Get a handle to the workspace
    ml_client = MLClient.from_config(credential=credential)

    # all versions will have the date encoded in the version number
    version_num = datetime.datetime.now().strftime("%Y%m%d")

    df = pd.read_csv(args.train_data)
    df.head()

    X = np.array(df.drop(["Unnamed: 0", "close", "close_shifted"], axis=1))
    y = np.array(df["close_shifted"])

    # Train model on whole dataset
    lgbm_model = lgbm.LGBMRegressor(max_depth=100, reg_alpha=0.05, reg_lambda=0.05).fit(X, y)

    with open('ibm_model.pkl', 'wb') as f:
        pickle.dump(lgbm_model, f)

    file_model = Model(
        path="ibm_model.pkl",
        type=AssetTypes.CUSTOM_MODEL,
        name="IBM-Model",
        description="Model created from local file.",
        version=version_num
    )
    ml_client.models.create_or_update(file_model)

if __name__ == "__main__":
    main()

Overwriting components/train_model.py


In [6]:
%%writefile dependencies/conda.yml
name: model-env
channels:
  - conda-forge
dependencies:
  - python=3.9
  - numpy
  - pip
  - lightgbm
  - pandas
  - pip:
    - inference-schema[numpy-support]
    - xlrd
    - mlflow
    - azureml-mlflow
    - azure-ai-ml
    - azureml-fsspec 
    - mltable

Overwriting dependencies/conda.yml


In [12]:
custom_env_name = "stock-training-env"

try:    
    pipeline_job_env = ml_client.environments.get(custom_env_name, version="1.5")

except:
    pipeline_job_env = Environment(
        name=custom_env_name,
        description="Custom environment for stock training pipeline",
        tags={"lightgbm": "3.3.3"},
        conda_file=os.path.join("dependencies", "conda.yml"),
        image="mcr.microsoft.com/azureml/curated/lightgbm-3.2-ubuntu18.04-py37-cpu:48",
        version="1.5",
    )
    pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

    print(
        f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
    )

In [None]:
data_prep_component = command(
    name="data_prep",
    display_name="Data preparation for training",
    description="Loads data via AlphaVantage API input, preps data and stores to as data asset",
    inputs={"input_data": Input(type="uri_file")},
    outputs={"train_data": Output(type="uri_folder")},
    code="./components/get_data.py",
    command="python get_data.py --input_data ${{inputs.input_data}} --train_data ${{outputs.train_data}}",
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
    compute="ava"
)

In [None]:
train_model_component = command(
    name="train_model",
    display_name="Model training with data from previous step",
    description="Trains a LightGBM model with preprocessed data",
    inputs={"train_data": Input(type="uri_folder")},
    code="./components/train_model.py",
    command="python train_model.py --train_data ${{inputs.train_data}}",
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
    compute="ava",
)

In [None]:
# Retrieve an already attached Azure Machine Learning Compute.
cluster_name = "ava"
print(ml_client.compute.get(cluster_name))

AmlCompute({'type': 'amlcompute', 'created_on': None, 'provisioning_state': 'Succeeded', 'provisioning_errors': None, 'name': 'ava', 'description': None, 'tags': {}, 'properties': {}, 'id': '/subscriptions/5a361d37-b562-4eee-981b-0936493063e9/resourceGroups/MlGroup/providers/Microsoft.MachineLearningServices/workspaces/mlworkspace/computes/ava', 'Resource__source_path': None, 'base_path': 'c:\\Users\\Leo\\OneDrive\\Programming\\Python\\azure\\stock-pipeline', 'creation_context': None, 'serialize': <msrest.serialization.Serializer object at 0x00000245A243F100>, 'resource_id': None, 'location': 'northeurope', 'size': 'STANDARD_A1_V2', 'min_instances': 0, 'max_instances': 1, 'idle_time_before_scale_down': 180.0, 'identity': None, 'ssh_public_access_enabled': True, 'ssh_settings': None, 'network_settings': <azure.ai.ml.entities._compute.compute.NetworkSettings object at 0x0000024582230910>, 'tier': 'low_priority', 'subnet': None})


In [None]:
from azure.ai.ml.dsl import pipeline
from components import get_data, train_model

In [None]:
@pipeline(compute="ava")
def pipeline_with_non_python_components(input_data):

    data_prep_job = data_prep_component(input_data=input_data)
    train_model_job = train_model_component(train_data=data_prep_job.outputs.train_data) # feed putput of previous step into the training job

    return {"out": data_prep_job.outputs.train_data}


pipeline_job = pipeline_with_non_python_components(
    input_data=Input(
        path="https://www.alphavantage.co/query?function=TIME_SERIES_DAILY_ADJUSTED&symbol=IBMdatatype=csv&outputsize=full&apikey=SGXL42YQBJ7R7WXL"
        ) # stock data via AlphaVantage
    )

# set pipeline level compute
pipeline_job.settings.default_compute = "ava"

In [None]:
# submit job to workspace
pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name="stock-training-pipeline"
)
pipeline_job

Experiment,Name,Type,Status,Details Page
stock-training-pipeline,patient_oxygen_wx0yf5prjr,pipeline,Preparing,Link to Azure Machine Learning studio


In [None]:
# Wait until the job completes
ml_client.jobs.stream(pipeline_job.name)

RunId: patient_oxygen_wx0yf5prjr
Web View: https://ml.azure.com/runs/patient_oxygen_wx0yf5prjr?wsid=/subscriptions/5a361d37-b562-4eee-981b-0936493063e9/resourcegroups/MlGroup/workspaces/mlworkspace

Streaming logs/azureml/executionlogs.txt

[2023-01-01 15:41:38Z] Submitting 1 runs, first five are: f597c5ec:b86666db-10bb-4ccf-b020-7a93ef9ae6da
[2023-01-01 15:44:51Z] Execution of experiment failed, update experiment status and cancel running nodes.

Execution Summary
RunId: patient_oxygen_wx0yf5prjr
Web View: https://ml.azure.com/runs/patient_oxygen_wx0yf5prjr?wsid=/subscriptions/5a361d37-b562-4eee-981b-0936493063e9/resourcegroups/MlGroup/workspaces/mlworkspace


JobException: Exception : 
 {
    "error": {
        "code": "UserError",
        "message": "Pipeline has some failed steps. See child run or execution logs for more details.",
        "message_format": "Pipeline has some failed steps. {0}",
        "message_parameters": {},
        "reference_code": "PipelineHasStepJobFailed",
        "details": []
    },
    "environment": "northeurope",
    "location": "northeurope",
    "time": "2023-01-01T15:44:51.433546Z",
    "component_name": ""
} 