# SDK v1 code

In [None]:
import pandas as pd
import numpy as np
from azureml.core import Workspace, Experiment, Environment, ScriptRunConfig
from azureml.core.compute import AmlCompute,ComputeTarget, ComputeInstance
from azureml.exceptions import ComputeTargetException
from azureml.core.datastore import Datastore
from azureml.widgets import RunDetails
from azureml.core.environment import CondaDependencies

In [None]:
ws = Workspace.from_config(path='../../config/config.json')

## Environment

In [None]:
env=Environment.from_pip_requirements("conv_sum",  '../config/requirements.txt')
env.register(ws)

## Compute

In [None]:
from ComputeManagement import create_cluster, create_instance, delete_compute

In [None]:
cc=create_cluster(
    workspaceRef=ws,
    name="q34",
    vmSize="Standard_DS3_v2",
    minNodes=0,
    maxNodes=4,
    idleTime=180
)

## Training

In [None]:
# training_script_config = ScriptRunConfig(
#     source_directory = 'src',
#     script = 'training_script.py',
#     arguments=['--data',___],
#     environment = env,
#     compute_target = cc
# )
# experiment = Experiment(
#     workspace = ws,
#     name="maiden_experiment"
# )
# run = experiment.submit(config=training_script_config, tags=[])

# RunDetails(run).show()
# run.wait_for_completion(show_output=True)

In [None]:
import yfinance as yf

In [None]:
sensexTickerYFinance = ['HDFCLIFE.NS, NESTLEIND.NS, KOTAKBANK.NS, INDUSINDBK.NS, TATASTEEL.NS, ITC.NS, ONGC.NS, TITAN.NS, ULTRACEMCO.NS, BAJAJFINSV.NS, BAJFINANCE.NS, BRITANNIA.NS, BAJAJ-AUTO.NS, COALINDIA.NS, BHARTIARTL.NS, TATACONSUM.NS, LTI.NS, CIPLA.NS, MARUTI.NS, ICICIBANK.NS, APOLLOHOSP.NS, NTPC.NS, HEROMOTOCO.NS, HINDALCO.NS, WIPRO.NS, TCS.NS, ADANIENT.NS, MM.NS, TECHM.NS, RELIANCE.NS']
stock_data = yf.download(tickers=sensexTickerYFinance, start='2000-01-01', end='2022-12-31', interval='1mo')

In [None]:
stock_data.loc[:,'Adj Close']

 TODO
1. Read about SOTA for stock price prediction and what determines success of model trying to predict price
2. Choose stocks to monitor - Nifty 50
3. Build as below

Think of a common use-case where data would update regularly and model would shift
1. Stock price prediction
2. Automatic data retrieval using API to store into Azure storage
3. Automatic model training at intervals depending on error rate

Tie everything up in a RL portfolio optimization application

In [None]:
import yfinance as yf

In [None]:
tickerData= yf.download(tickers="RELIANCE.NS", start="2022-01-01", end="2023-01-10", period="1d")
tickerData['Date'] = [str(x)[:10] for x in tickerData.index]
tickerData['Ticker'] = "RELIANCE.NS"
tickerDataToPersist = list(tickerData.transpose().to_dict().values())

In [None]:
from src.TickerData import query, download

In [None]:
download(ticker="RELIANCE.NS", start="2022-12-01",end="2023-01-10", period="1d")

In [None]:
train_data.to_csv()

In [None]:
train_data = query(ticker="RELIANCE.NS", start="2022-12-01",end="2023-01-10")

In [None]:
train_data

In [None]:
import pandas as pd

In [None]:
import torch

In [None]:
ticker="RELIANCE.NS"

In [None]:
train_data = train_data[f"{ticker}_Close"]

In [None]:
import pandas as pd

In [None]:
pd.Series(train_data).to_csv('./data/ril.csv')

In [None]:
import numpy as np

In [None]:
def training_data(series, loookaheadSize=5):
    X,y = [],[]
    for i in np.arange(5,len(series)-1):
        X.append(series[i-loookaheadSize:i])
        y.append(series[i+1])
    X = np.array(X)
    y = np.array(y)
    X = X.reshape(len(series)-loookaheadSize-1,1,5)
    y=y.reshape(-1,1)

    train_dataset = torch.utils.data.TensorDataset(torch.from_numpy(X), torch.from_numpy(y))

    return train_dataset

In [None]:
tx=training_data(train_data)

In [None]:
next(iter(tx))

In [None]:
torch.save(tx,'txx.pt')

In [None]:
typ = torch.load('txx.pt')

In [None]:
next(iter(typ))

In [None]:
next(iter(tx))

In [None]:
import numpy as np

In [None]:
np.array([0.0026]).shape

In [None]:
from datetime import datetime
str(datetime.now().date())

# SDK v2 code

In [41]:
import json
from azure.ai.ml import MLClient, Input, Output, command
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml.entities import AmlCompute, Environment, Model, Data
from azure.ai.ml.constants import AssetTypes
import webbrowser

## Init

In [2]:
with open('../config/config.json','r') as f:
    configs=json.loads(f.read())

subscription_id, resource_group, workspace = configs['subscription_id'], configs['resource_group'], configs['workspace_name']

try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

ml_client = MLClient(
    credential, subscription_id, resource_group, workspace
)

## Compute creation

In [4]:
def create_train_cluster(
    TargetName = "cpu-cluster",
    computeSize="STANDARD_DS3_V2",
    minInstances=1,
    maxInstances=4,
    idleTime=180,
    ):

    try:
        compute = ml_client.compute.get(TargetName)
    except Exception:
        print("Creating a new cpu compute target...")
        # Let's create the Azure ML compute object with the intended parameters
        compute = AmlCompute(
            name=TargetName,
            type="amlcompute",
            size=computeSize,
            min_instances=minInstances,
            max_instances=maxInstances,
            idle_time_before_scale_down=idleTime,
        )
        # Now, we pass the object to MLClient's create_or_update method
        compute = ml_client.begin_create_or_update(compute)    
    return compute

In [14]:
compute = create_train_cluster()

## Environment creation

In [20]:
dependencies_dir = "../config"
os.makedirs(dependencies_dir, exist_ok=True)

In [21]:
%%writefile {dependencies_dir}/conda.yml
name: model-env
channels:
  - conda-forge
dependencies:
  - python=3.8
  - numpy=1.21.2
  - pip=21.2.4
  - scipy=1.7.1
  - pandas>=1.1,<1.2
  - torch==1.13.1
  - pip:
    - inference-schema[numpy-support]==1.3.0
    - xlrd==2.0.1
    - mlflow== 1.26.1
    - azureml-mlflow==1.42.0
    - yfinance==0.2.4
    - pymongo==4.3.3

Overwriting ../config/conda.yml


In [22]:
def register_env(
    dependencies_dir,
    envName = "stock-pred"
    ):

    env = Environment(
        name=envName,
        description="Custom environment for creating MLOps project for stock prediction",
        tags={"torch": "1.13.1"},
        conda_file=os.path.join(dependencies_dir, "conda.yml"),
        image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
        version="0.1.0",
    )
    env = ml_client.environments.create_or_update(env)

    print(
        f"Environment with name {env.name} is registered to workspace, the environment version is {env.version}"
    )
    return env

In [23]:
env = register_env(dependencies_dir=dependencies_dir)

Environment with name stock-pred is registered to workspace, the environment version is 0.1.0


# Upload data to Azure

In [43]:
my_path = '../data/ril.csv'

my_data = Data(
    path=my_path,
    type=AssetTypes.URI_FILE,
    description="ril_stock_data",
    name="ril",
    version='1'
)

ml_client.data.create_or_update(my_data)

Data({'skip_validation': False, 'mltable_schema_url': None, 'referenced_uris': None, 'type': 'uri_file', 'is_anonymous': False, 'auto_increment_version': False, 'name': 'ril', 'description': 'ril_stock_data', 'tags': {}, 'properties': {}, 'id': '/subscriptions/5d2e45e0-cd7b-4338-b279-455fa4a4c42d/resourceGroups/RG/providers/Microsoft.MachineLearningServices/workspaces/AzureMLWorkspace/data/ril/versions/1', 'Resource__source_path': None, 'base_path': '/Users/anupam/Documents/Codebase/MLOps_stock_prediction/notebooks', 'creation_context': <azure.ai.ml.entities._system_data.SystemData object at 0x10ce04b10>, 'serialize': <msrest.serialization.Serializer object at 0x10ce06410>, 'version': '1', 'latest_version': None, 'path': 'azureml://subscriptions/5d2e45e0-cd7b-4338-b279-455fa4a4c42d/resourcegroups/RG/workspaces/AzureMLWorkspace/datastores/workspaceblobstore/paths/LocalUpload/3e91b5da3fd7435efb8a68549a7c027a/ril.csv', 'datastore': None})

In [53]:
ril_data = ml_client.data.get(name='ril', version=1)

In [54]:
ril_data.path

'azureml://subscriptions/5d2e45e0-cd7b-4338-b279-455fa4a4c42d/resourcegroups/RG/workspaces/AzureMLWorkspace/datastores/workspaceblobstore/paths/LocalUpload/3e91b5da3fd7435efb8a68549a7c027a/ril.csv'

## Training

In [30]:
%%writefile {src_dir}/train.py
import os
import argparse
import pandas as pd
import torch
from sklearn.preprocessing import  MinMaxScaler
from sklearn.model_selection import train_test_split
import logging
import mlflow
import pickle

def series_to_tensors(series, lookaheadSize=5):

    X,y = [],[]
    for i in np.arange(5,len(series)-1):
        X.append(series[i-lookaheadSize:i])
        y.append(series[i+1])
    X = np.array(X)
    y = np.array(y)
    X = X.reshape(len(series)-lookaheadSize-1,1,5)
    y=y.reshape(-1,1)

    dataset = torch.utils.data.TensorDataset(torch.from_numpy(X), torch.from_numpy(y))

    return dataset

def dataprep(args):

    stockData = pd.read_csv(args.data)
    stock_train_df, stock_test_df = train_test_split(stockData, test_size=args.test_train_ratio)

    scaler = MinMaxScaler.fit(stock_train_df)
    stock_train_df = scaler.transform(stock_train_df)
    stock_test_df = scaler.transform(stock_test_df)

    train_tensors = series_to_tensors(stock_train_df)
    test_tensors = series_to_tensors(stock_test_df)

    return scaler, train_tensors, test_tensors

class lstm_model(torch.nn.Module):

    def __init__(self):
        super(lstm_model, self).__init__()
        self.lstm1=torch.nn.LSTM(batch_first=True, input_size=5, hidden_size=1)
        self.out=torch.nn.Linear(1,1)

    def forward(self, x, hidden=None):
        x, hidden = self.lstm1(x)
        x = self.out(x)
        return x.flatten()

def train(trainset):

    seq_model = lstm_model()
    optim = torch.optim.Adam(lr = 0.0001, params=seq_model.parameters())

    epochs = 10

    for epoch in np.arange(epochs):

        Loss=0

        for data in trainset:

            feats, target = data
            optim.zero_grad()

            y_p = seq_model(feats.float())
            loss = torch.nn.functional.mse_loss(y_p.float(), target.float())

            loss.backward()
            optim.step()
            Loss += loss.item()

        print(f"Epoch: {epoch}, loss: {Loss}")
    return seq_model

def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--registered_model_name", type=str, help="model name")
    parser.add_argument("--data", type=str, help="Path to input data")
    parser.add_argument("--test_train_ratio", type=float, required=False, default=0.25)

    args = parser.parse_args()
    
    # Load Scaler object later and send it for scaling data

    scaler, trainset, _ = dataprep(args)

    trainedModel = train(trainset)

    pickle.dump(scaler, open('scaler.pkl','wb'))
    model_file = f"modelstock_pred_{str(datetime.now().date())}.pth"
    torch.save(trainedModel, path = model_file)

    # Registering the model to the workspace

    # job_name = "<JOB_NAME>"

    # run_model = Model(
    #     path=f"azureml://jobs/{job_name}/outputs/artifacts/paths/scaler.pkl",
    #     name="MinMaxScaler,
    #     description="Scaler object",
    #     type=AssetTypes.MLFLOW_MODEL,
    # )
    #     run_model = Model(
    #     path=f"azureml://jobs/{job_name}/outputs/artifacts/paths/{model_file}",
    #     name="run-model-example",
    #     description="Model created from run.",
    #     type=AssetTypes.MLFLOW_MODEL,
    # )

if __name__ == "__main__":
    main()

Overwriting ../src/train.py


In [65]:
registered_model_name = "stock_pred_v1"

job = command(   
    inputs={
        "data": Input(type=AssetTypes.URI_FILE, mode="ro_mount", path=ril_data.path),
        "test_train_ratio": 0.25,
        "registered_model_name":registered_model_name
        },
    code="../src/",  # location of source code
    command="python train.py --data ${{inputs.data}} --test_train_ratio ${{inputs.test_train_ratio}} --registered_model_name ${{inputs.registered_model_name}} --debug",
    environment=env,
    compute=compute.name,
    experiment_name="train_model_stock_price_prediction",
    display_name="stock_price_prediction",
)

In [66]:
ml_client.create_or_update(job)



In [None]:
webbrowser.open(job.studio_url)