In [None]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential


credential = DefaultAzureCredential()

ml_client = MLClient(
    credential=credential,
    subscription_id='',
    resource_group_name='',
    workspace_name='',
)

In [None]:
os.makedirs('./dependencies', exist_ok=True)

In [None]:
%%writefile dependencies/conda.yaml
name: model-env
channels:
  - conda-forge
dependencies:
  - python=3.8
  - numpy=1.21.2
  - pip=21.2.4
  - scikit-learn=0.24.2
  - scipy=1.7.1
  - pandas>=1.1,<1.2
  - pip:
    - inference-schema[numpy-support]==1.3.0
    - xlrd==2.0.1
    - mlflow== 1.26.1
    - azureml-mlflow==1.42.0
    - statsmodels==0.12.0
    - patsy
    - azure-core
    - arm-mango
    - azureml-dataprep
    

In [None]:
from azure.ai.ml.entities import Environment


pipeline_job_env = Environment(
    name="env-hwes",
    description="Env for HWES",
    conda_file="./dependencies/conda.yaml",
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

In [None]:
os.makedirs('./components',exist_ok=True)

In [None]:
%%writefile components/data_gather.py
import time
import http.client, json
import pandas as pd
from datetime import datetime
from dateutil import parser
import dateutil.relativedelta
import numpy as np
import os
from pytz import timezone 
import argparse
import logging
import mlflow
from azureml.core import Workspace, Datastore, Dataset
from azureml.data.datapath import DataPath

def datetotimestamp(date):
    time_tuple = date.timetuple()
    timestamp = round(time.mktime(time_tuple))
    return timestamp

def timestamptodate(timestamp):
    return datetime.fromtimestamp(timestamp)

def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--output_data", type=str, help="path to output data data")
    
    args = parser.parse_args()
    print('output path......', args.output_data)
    ws = Workspace.get(name="",
               subscription_id='',
               resource_group='')

    datastore = Datastore.get(ws, 'workspaceblobstore')

    try:
        dataset = Dataset.Tabular.from_delimited_files(path=(datastore, 'dataset/mrinmoy/data.csv'))
        print("++++DATA IS ALREADY PRESENT ++++")
        pandas_df = dataset.to_pandas_dataframe() 
        result = pandas_df.sort_values(by="t")
        temp = result.iloc[-1].tolist()
        start = temp[1]
        end = datetime.today() + pd.Timedelta(hours=5.5)
        c = end - start
        minutes = c.total_seconds() // 60
        count_back = int(minutes)
        end_date = datetotimestamp(end)
        start_date = datetotimestamp(start)
        url1 = ''
        conn = http.client.HTTPSConnection("")
        payload = ""
        headers = {}
        conn.request("GET", url1, payload, headers)
        res = conn.getresponse()
        data = res.read()
        response = json.loads(data.decode("utf-8"))
        actual_df = pd.DataFrame(response)
        actual_df.drop(['s', 'o','h','l','v'], axis=1,inplace=True)
        actual_df["t"] = actual_df["t"].apply(timestamptodate)
        actual_df["t"] = actual_df["t"] + pd.Timedelta(hours=5.5)
        actual_df=actual_df.drop_duplicates('t',keep='first')
        result = result.drop(columns=['Column1'])
        df = pd.concat([result, actual_df], ignore_index=False)
        df.to_csv(os.path.join(args.output_data, 'data.csv'), index=False)
        print(df)

    except:
        print("****NO PREVIOUS DATA OF STOCK FOUND, COLLECTING DATA FOR ENTIRE YEAR****")
        date = datetime.today() + pd.Timedelta(hours=5.5)
        print(date)
        prev_yr_date = date + dateutil.relativedelta.relativedelta(months=-12)
        print(prev_yr_date)
        start = datetotimestamp(prev_yr_date)
        end = datetotimestamp(date)
        c = date - prev_yr_date
        minutes = c.total_seconds() // 60
        count_back = int(minutes)
        url1 = ''
        conn = http.client.HTTPSConnection("")
        payload = ""
        headers = {}
        conn.request("GET", url1, payload, headers)
        res = conn.getresponse()
        data = res.read()
        response = json.loads(data.decode("utf-8"))
        actual_df = pd.DataFrame(response)
        actual_df["t"] = actual_df["t"].apply(timestamptodate)
        actual_df["t"] = actual_df["t"] + pd.Timedelta(hours=5.5)
        actual_df.drop(['s', 'o','h','l','v'], axis=1,inplace=True)
        actual_df.to_csv(os.path.join(args.output_data,'data.csv'), index=False)
        print(actual_df)

        
    ds = Dataset.File.upload_directory(src_dir= args.output_data,
            target=DataPath(datastore,  'dataset/mrinmoy'),
            show_progress=True, overwrite=True)

if __name__ == "__main__":
    main()


In [None]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

data_gather_component = command(
    name="data_gather",
    display_name="data gather",
    description="fetch data",
    outputs=dict(
        output_data=Output(type="uri_folder", mode="rw_mount", path='./data/')
    ),
    # The source folder of the component
    code= './components/',
    command="""python data_gather.py \
            --output_data ${{outputs.output_data}}\
            """,
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
)

In [None]:
data_gather_component = ml_client.create_or_update(data_gather_component.component)

print(
    f"Component {data_gather_component.name} with Version {data_gather_component.version} is registered"
)

In [None]:
%%writefile components/data_prep.py

import numpy as np
import pandas as pd
import os
import argparse
import logging
import mlflow
from datetime import datetime, timedelta
from dateutil import parser
import dateutil.relativedelta
import pytz
from azureml.core import Workspace, Datastore, Dataset
from azureml.data.datapath import DataPath

def data_process(start, end, df):
    duplicate_df = pd.date_range(start,end, freq='T')
    df_temp = pd.DataFrame({ 't': duplicate_df, 'd': None }) 
    common = df_temp.merge(df, on=["t"])
    row_not_in_originaldf = df_temp[~df_temp.t.isin(common.t)]
    row_not_in_originaldf.reset_index(drop=True, inplace=True)
    row_not_in_originaldf['c'] = row_not_in_originaldf['d']
    new_row_not_in_originaldf=row_not_in_originaldf.drop('d',axis=1)
    final_df=pd.concat([df,new_row_not_in_originaldf],ignore_index=True)
    final_df = final_df.sort_values(by="t",ignore_index=True)
    final_df=final_df.drop_duplicates('t',keep='first')
    new_final_df=final_df.fillna(method='ffill')
    new_final_df.drop(new_final_df.index[-1], inplace=True)
    return new_final_df



def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--data", type=str, help="path to input data")
    parser.add_argument("--n_test_points", type=int, required=False, default=300)
    parser.add_argument("--train_data", type=str, help="path to train data")
    parser.add_argument("--test_data", type=str, help="path to test data")
    parser.add_argument("--refined_data", type=str, help="path to refined data")
    
    args = parser.parse_args()

    # Start Logging
    mlflow.start_run()

    df = pd.read_csv(os.path.join(args.data, 'data.csv'))
    df=df.sort_values(by='t')

    df['t'] = pd.to_datetime(df['t'])
    df['t'] = df['t'] + pd.Timedelta(hours=5.5)
    
    start_date=df.iloc[0].tolist()
    end_date=df.iloc[-1].tolist()
    new_final_df = data_process(start_date[0], end_date[0], df)
    print()
    print(new_final_df)
    print()
    train_df = new_final_df.iloc[:-args.n_test_points]
    test_df = new_final_df.iloc[-args.n_test_points:]

    mlflow.log_metric("num_train_samples", train_df.shape[0])
    mlflow.log_metric("num_test_samples", test_df.shape[0])

    print(train_df.shape)
    print(test_df.shape)

    train_path = os.path.join(args.train_data, 'train.csv')
    test_path = os.path.join(args.test_data, 'test.csv')
    
    print(train_path)
    print(test_path)

    train_df.to_csv(train_path, index=False)
    test_df.to_csv(test_path, index=False)

    now = datetime.today() + pd.Timedelta(hours = 5.5)
    end = pd.to_datetime(now.strftime("%Y-%m-%d") + " 09:15:00")
#     end = now
    print(end)
    new_df = data_process(start_date[0], end, df)
    refined_data_path = os.path.join(args.refined_data, 'refined_data.csv')
    print(new_df.tail())
    new_df.to_csv(refined_data_path, index=False)

    ws = Workspace.get(name="",
    subscription_id='',
    resource_group='')

    datastore = Datastore.get(ws, 'workspaceblobstore')

    ds = Dataset.File.upload_directory(src_dir=args.train_data,
            target=DataPath(datastore,  'dataset/mrinmoy/train'),
            show_progress=True, overwrite=True)
    
    ds = Dataset.File.upload_directory(src_dir=args.test_data,
            target=DataPath(datastore,  'dataset/mrinmoy/test'),
            show_progress=True, overwrite=True)

    ds = Dataset.File.upload_directory(src_dir=args.refined_data,
            target=DataPath(datastore,  'dataset/mrinmoy/refined_data'),
            show_progress=True, overwrite=True)

    # Stop Logging
    mlflow.end_run()


if __name__ == "__main__":
    main()

In [None]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

data_prep_component = command(
    name="data_prep_hwes",
    display_name="Data preparation for training",

    inputs={
        "data": Input(type='uri_folder', mode='rw_mount'),
        "n_test_points": Input(type="integer"),
    },

    outputs=dict(
        train_data=Output(type="uri_folder", mode="rw_mount", path='./data_hwes/'),
        test_data=Output(type="uri_folder", mode="rw_mount", path='./data_hwes/'),
        refined_data=Output(type="uri_folder", mode="rw_mount", path='./data_hwes/'),
    ),
    
    code= './components/',
    command="""python data_prep.py \
            --data ${{inputs.data}} --n_test_points ${{inputs.n_test_points}} \
            --train_data ${{outputs.train_data}} --test_data ${{outputs.test_data}} --refined_data ${{outputs.refined_data}}\
            """,
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
)

In [None]:
data_prep_component = ml_client.create_or_update(data_prep_component.component)

print(
    f"Component {data_prep_component.name} with Version {data_prep_component.version} is registered"
)

In [None]:
import os
os.makedirs("./components/train", exist_ok=True)

In [None]:
%%writefile components/train.py
import argparse
from sklearn.metrics import mean_squared_error as mse
from sklearn.model_selection import ParameterGrid
import os
from datetime import datetime, timezone
import pandas as pd
import numpy as np
import mlflow
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.holtwinters import SimpleExpSmoothing   
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from sklearn.model_selection import GridSearchCV
import mango
from mango import Tuner

# Start Logging


os.makedirs("./results", exist_ok=True)

mlflow.start_run()

def main():
    

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--train_data", type=str, help="path to train data")
    parser.add_argument("--test_data", type=str, help="path to test data")
    parser.add_argument("--registered_model_name", type=str, help="model name")
    parser.add_argument("--model", type=str, help="path to model")

    args = parser.parse_args()

    train_df = pd.read_csv(os.path.join(args.train_data, 'train.csv'))
    train_df['t'] = pd.to_datetime(train_df['t'])
    train_df['t'] = train_df['t'].dt.tz_localize(None)
    train_df.set_index("t", inplace=True)

    test_df = pd.read_csv(os.path.join(args.test_data, 'test.csv'))
    test_df['t'] = pd.to_datetime(test_df['t'])
    test_df['t'] = test_df['t'].dt.tz_localize(None)
    test_df.set_index("t", inplace=True)

    print("train info")
    print(train_df.info())

    print("test info")
    print(test_df.info())

    def objective_function(args_list):
        errors = []
        for params in args_list:
            try:
                model = ExponentialSmoothing(train_df['c'],trend='MUL',seasonal='MUL', freq='T', seasonal_periods=1440).fit(optimized=False, **params)
                forecast = model.forecast(len(test_df)).values
                error = mse(test_df['c'], forecast, squared=False) 
                errors.append(error)
            except:
                errors.append(1000.0)
        return errors

    param_grid = dict(
    smoothing_level = np.linspace(0.01, 0.9, 10),
    smoothing_trend =  np.linspace(0.01, 0.9, 10),
    smoothing_seasonal = np.linspace(0.01, 0.9, 10),
    )
    grid = ParameterGrid(param_grid)

    conf_Dict = dict()
    conf_Dict['initial_random'] = 100
    conf_Dict['num_iteration'] = 50

    tuner = Tuner(param_grid, objective_function, conf_Dict)
    results = tuner.minimize()
    print("HyperParameter Tuning completed!")


    print('smoothing_trend', results['best_params']['smoothing_trend'])
    print('smoothing_seasonal', results['best_params']['smoothing_seasonal'])
    print('smoothing_level', results['best_params']['smoothing_level'])

    mlflow.log_param('smoothing_trend', results['best_params']['smoothing_trend'])
    mlflow.log_param('smoothing_seasonal', results['best_params']['smoothing_seasonal'])
    mlflow.log_param('smoothing_level', results['best_params']['smoothing_level'])

    df_train_temp = train_df.copy()
    forecasts = np.array([])
    for i in range(len(test_df)):
        fitted_model = ExponentialSmoothing(df_train_temp['c'],trend='MUL',seasonal='MUL', freq='T',
                                            seasonal_periods=1440).fit(
                                            smoothing_trend = results['best_params']['smoothing_trend'], 
                                            smoothing_seasonal = results['best_params']['smoothing_seasonal'],
                                            smoothing_level =  results['best_params']['smoothing_level'],
                                            optimized=False)
        forecast = fitted_model.forecast(1).values[0]
        forecasts = np.append(forecasts, forecast)
        df_train_temp.reset_index(inplace=True)
        df_train_temp.loc[len(df_train_temp)] = test_df.index[i], forecast
        df_train_temp.set_index("t", inplace=True)
    df_predictions = df_train_temp.iloc[-len(test_df):]
    rmse = mse(test_df['c'], df_predictions, squared=False)

    mlflow.log_metric('RMSE', rmse)
    
    train = pd.concat([train_df, test_df], ignore_index=False)
    print(train.info())
    print()
    print(train)
    fitted_model = ExponentialSmoothing(train['c'],trend='MUL',seasonal='MUL', freq='T',
                                            seasonal_periods=1440).fit(
                                            smoothing_trend = results['best_params']['smoothing_trend'], 
                                            smoothing_seasonal = results['best_params']['smoothing_seasonal'],
                                            smoothing_level =  results['best_params']['smoothing_level'],
                                            optimized=False)

    print("saving model with mlflow")
    mlflow.statsmodels.save_model(
        statsmodels_model=fitted_model,
        path = args.model
    )

    mlflow.end_run()
   
if __name__ == "__main__":
    main()

In [None]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

train_component = command(
    name="train_hwes",
    display_name="Train HWES",
    description="Finds Hyper patameter for HWES",
    inputs={
        "train_data": Input(type="uri_folder"),
        "test_data": Input(type="uri_folder"),
        "registered_model_name": Input(type="string")
    },
    outputs=dict(
        model= Output(type='uri_folder', mode='rw_mount', path='./results/')
    ),
    code="./components/",
    command="""python train.py \
            --train_data ${{inputs.train_data}} --test_data ${{inputs.test_data}} --registered_model_name ${{inputs.registered_model_name}} \
            --model ${{outputs.model}} \
            """,
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
)

In [None]:
train_component = ml_client.create_or_update(train_component.component)

print(
    f"Component {train_component.name} with Version {train_component.version} is registered"
)

In [None]:
from azure.ai.ml import dsl, Input, Output


@dsl.pipeline(
    compute='',
    description="Training pipeline for HWES",
)
def pipeline_hwes(
    pipeline_job_n_test_points,
    pipeline_job_train_data,
    pipeline_job_test_data,
    pipeline_job_refined_data,
    pipeline_job_registered_model_name,


):

    data_gather_job = data_gather_component()
    
    data_prep_job = data_prep_component(
        data = data_gather_job.outputs.output_data,
        n_test_points=pipeline_job_n_test_points
    )

    train_job = train_component(
        train_data=data_prep_job.outputs.train_data,  
        test_data=data_prep_job.outputs.test_data,  
        registered_model_name = pipeline_job_registered_model_name,
        )

    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        "pipeline_job_train_data": data_prep_job.outputs.train_data,
        "pipeline_job_test_data": data_prep_job.outputs.test_data,
        "pipeline_job_refined_data": data_prep_job.outputs.refined_data,
        "pipeline_job_model":train_job.outputs.model
    }

In [None]:
pipeline = pipeline_hwes(
    pipeline_job_n_test_points=5,
    pipeline_job_registered_model_name='holts-winter-model',
)

In [None]:
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    experiment_name="pipeline-hwes",
)
ml_client.jobs.stream(pipeline_job.name)