# Training Deployment

## Getting the data and connecting to Azure

In [68]:
# Handle to the workspace
from azure.ai.ml import MLClient

# Authentication package
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential

try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

In [69]:
# Get a handle to the workspace
ml_client = MLClient(
    credential=credential,
    subscription_id="32787e59-b8b4-4923-89db-73cff7a82fbf",
    resource_group_name="capstone-project",
    workspace_name="capstone-project-workspace",
)

## Getting the data asset of the cleaned data for training

In [70]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

my_path = "azureml://subscriptions/32787e59-b8b4-4923-89db-73cff7a82fbf/resourcegroups/capstone-project/workspaces/capstone-project-workspace/datastores/workspaceblobstore/paths/UI/2024-03-05_102332_UTC/cleaned_data.csv"

my_data = Data(
    path=my_path,
    type=AssetTypes.URI_FILE,
    description="this is my first data asset",
    name="dataset1",
    version="1"
)

In [71]:
my_data = ml_client.data.create_or_update(my_data)
print(
    f"Dataset with name {my_data.name} was registered to workspace, the dataset version is {my_data.version}"
)

Dataset with name dataset1 was registered to workspace, the dataset version is 1


## Create a job environment for pipeline steps

In [72]:
import os

dependencies_dir = "./dependencies"
os.makedirs(dependencies_dir, exist_ok=True)

We added the nltk, gensim and imblearn libraries

In [73]:
%%writefile {dependencies_dir}/conda.yaml
name: model-env
channels:
  - conda-forge
dependencies:
  - python=3.8
  - numpy=1.21.2
  - pip=21.2.4
  - scikit-learn=0.24.2
  - scipy=1.7.1
  - pandas>=1.1,<1.2
  - nltk=3.8.1
  - pip:
    - inference-schema[numpy-support]==1.3.0
    - xlrd==2.0.1
    - azureml-mlflow==1.42.0
    - gensim==4.3.2
    - imblearn==0.12.0

Overwriting ./dependencies/conda.yaml


The specification contains some usual packages, that you'll use in your pipeline (numpy, pip).


Use the *yaml* file to create and register this custom environment in your workspace:

In [97]:
from azure.ai.ml.entities import Environment

custom_env_name = "aml-scikit-learn"

pipeline_job_env = Environment(
    name=custom_env_name,
    description="Custom environment for Capstone Project",
    tags={"scikit-learn": "0.24.2"},
    conda_file=os.path.join(dependencies_dir, "conda.yaml"),
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
    version="0.1.2",
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

Environment with name aml-scikit-learn is registered to workspace, the environment version is 0.1.2


## Build the training pipeline

### Create data prep component

In [98]:
import os

data_prep_src_dir = "./components/data_prep"
os.makedirs(data_prep_src_dir, exist_ok=True)

This script performs the simple task of splitting the data into train and test datasets. 

[MLFlow](https://mlflow.org/docs/latest/tracking.html) will be used to log the parameters and metrics during our pipeline run.

In [99]:
%%writefile {data_prep_src_dir}/data_prep.py
import os
import argparse
import pandas as pd
from sklearn.model_selection import train_test_split
import logging
import mlflow


def data_to_binary(cleaned_data):
    df_binary = pd.DataFrame(cleaned_data, columns=['Score', 'Text'])
    #create a binary dataset with only the positive or negative reviews

    # Create a function to map scores to 0 or 1 based on your conditions
    def label_score(score):
        if int(score) >= 4:
            return 1
        elif int(score) <= 2:
            return 0
        else:
            return None  # Ignore scores equal to 3

    # Apply the function to the 'Score' column and create a new column 'Label'
    df_binary['Label'] = df_binary['Score'].apply(label_score)

    # Drop rows with Label equal to None (scores equal to 3)
    df_binary = df_binary.dropna(subset=['Label'])

    # Optionally, you can reset the index if you want
    df_binary.reset_index(drop=True, inplace=True)
    return df_binary



import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
import string
from nltk.stem.snowball import SnowballStemmer
import re


nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')


def stem_text(tokens):
    stemmer = SnowballStemmer('english')
    try:
        tokens = [stemmer.stem(word) for word in tokens]
    except TypeError:
        print(tokens)
    return tokens

def lemmatize_text(tokens):
    lemmatizer = WordNetLemmatizer()
    tokens = [lemmatizer.lemmatize(word) for word in tokens]
    return tokens


def remove_punctuation(tokens):
    tokens = [token for token in tokens if token not in string.punctuation]
    return tokens


def remove_stopwords(tokens):
    default_stopwords = set(stopwords.words('english'))
    excluding = set(['against','not','don', "don't",'ain', 'aren', "aren't", 'couldn', "couldn't", 'didn', "didn't",
             'doesn', "doesn't", 'hadn', "hadn't", 'hasn', "hasn't", 'haven', "haven't", 'isn', "isn't", 
             'mightn', "mightn't", 'mustn', "mustn't", 'needn', "needn't",'shouldn', "shouldn't", 'wasn',
             "wasn't", 'weren', "weren't", 'won', "won't", 'wouldn', "wouldn't"])
 
    custom_stopwords = default_stopwords - excluding

    tokens = [token for token in tokens if token not in custom_stopwords]
    tokens = filter(None, tokens)
    return tokens


def correct_spelling_words_more_same_three_letters(tokens):
    tokens = [token for token in tokens if not any(char * 3 in token for char in string.ascii_lowercase)]
    return tokens


from abc import ABC


class SentenceDfCleaner(ABC):

    def __init__(self):
        self.pattern: str

    def clean(self, df):
        return df.str.replace(self.pattern, '', regex=True)


class RemoveNumbers(SentenceDfCleaner):

    def __init__(self):
        self.pattern = re.compile("\S*\d\S*")


class RemoveHtml(SentenceDfCleaner):

    def __init__(self):
        self.pattern = re.compile('<.*?>')


class RemoveUrl(SentenceDfCleaner):

    def __init__(self):
        self.pattern = re.compile('http\S+|www.\S+')


class RemovePunctuations(SentenceDfCleaner):

    def __init__(self):
        self.pattern = re.compile('[^\w\s]')


class RemovePatterns(SentenceDfCleaner):
    """
    https://stackoverflow.com/questions/37012948/regex-to-match-an-entire-word-that-contains-repeated-character
    Remove words like 'zzzzzzzzzzzzzzzzzzzzzzz', 'testtting', 'grrrrrrreeeettttt' etc. 
    Preserves words like 'looks', 'goods', 'soon' etc. We will remove all such words 
    which has three consecutive repeating characters.
    """
    def __init__(self):
        self.pattern = re.compile('\\s*\\b(?=\\w*(\\w)\\1{2,})\\w*\\b')


class RemoveAbbreviations(SentenceDfCleaner):

    def __init__(self):
        self.abbr_dict = {
            "what's":"what is",
            "what're":"what are",
            "who's":"who is",
            "who're":"who are",
            "where's":"where is",
            "where're":"where are",
            "when's":"when is",
            "when're":"when are",
            "how's":"how is",
            "how're":"how are",

            "i'm":"i am",
            "we're":"we are",
            "you're":"you are",
            "they're":"they are",
            "it's":"it is",
            "he's":"he is",
            "she's":"she is",
            "that's":"that is",
            "there's":"there is",
            "there're":"there are",

            "i've":"i have",
            "we've":"we have",
            "you've":"you have",
            "they've":"they have",
            "who've":"who have",
            "would've":"would have",
            "not've":"not have",

            "i'll":"i will",
            "we'll":"we will",
            "you'll":"you will",
            "he'll":"he will",
            "she'll":"she will",
            "it'll":"it will",
            "they'll":"they will",

            "isn't":"is not",
            "wasn't":"was not",
            "aren't":"are not",
            "weren't":"were not",
            "can't":"can not",
            "couldn't":"could not",
            "don't":"do not",
            "didn't":"did not",
            "shouldn't":"should not",
            "wouldn't":"would not",
            "doesn't":"does not",
            "haven't":"have not",
            "hasn't":"has not",
            "hadn't":"had not",
            "won't":"will not",
            '\s+':' '
        }
        self.pattern = re.compile("|".join(map(re.escape, self.abbr_dict.keys())))
    
    def clean(self, df):
        return df.str.replace(self.pattern, 
                              lambda match: self.abbr_dict[match.group(0)],
                                regex=True)


def ind_preprocess_text(text, processing_steps, tokenized=False):
    ''' Put everything in lowercase, remove punctuation and stopwords --> possibility to do stemming or lemmatizaion'''
    # Tokenize the text and convert to lowercase every word
    if not isinstance(text, list):
        tokens = word_tokenize(text)
    else:
        tokens = text
    
    for processing_step in processing_steps:
        tokens = processing_step(tokens)
    
    if tokenized:
        return tokens
    # Join tokens back into a single string
    return TreebankWordDetokenizer().detokenize(tokens)


def preprocess_text(text_df, processing_steps, tokenized):
    text_df = text_df.str.lower()

    for sent_step in processing_steps['sentence']:
        text_df = sent_step.clean(text_df)
    
    text_df = text_df.apply(ind_preprocess_text, 
                         processing_steps=processing_steps['tokens'], 
                         tokenized=tokenized)
    return text_df


def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--data", type=str, help="path to input data")
    parser.add_argument("--test_train_ratio", type=float, required=False, default=0.25)
    parser.add_argument("--train_data", type=str, help="path to train data")
    parser.add_argument("--train_labels", type=str, help="path to train labels")
    parser.add_argument("--test_data", type=str, help="path to test data")
    parser.add_argument("--test_labels", type=str, help="path to test labels")
    args = parser.parse_args()

    print(args.train_data, args.train_labels, args.test_data, args.test_labels)

    # Start Logging
    mlflow.start_run()

    print(" ".join(f"{k}={v}" for k, v in vars(args).items()))

    print("input data:", args.data)

    df = pd.read_csv(args.data).iloc[:100]
    print(df.head())
    print(df.columns)

    mlflow.log_metric("num_samples", df.shape[0])
    mlflow.log_metric("num_features", df.shape[1] - 1)

    df_binary = data_to_binary(df)
    mlflow.log_metric("binary_num_samples", df.shape[0])
    mlflow.log_metric("binary_num_features", df.shape[1] - 1)

    processing_steps = {'sentence': [RemoveNumbers(), RemoveHtml(), RemoveUrl(), RemovePunctuations(), 
                                    RemovePatterns(), RemoveAbbreviations()],
                        'tokens': [remove_stopwords, stem_text, lemmatize_text]}

    X_processed = preprocess_text(X, processing_steps, tokenized=False)
    
    X_train, X_test, y_train, y_test = train_test_split(X_processed, df_binary.Label, 
                                                        test_size=args.test_train_ratio,
                                                        random_state=99)

    # output paths are mounted as folder, therefore, we are adding a filename to the path
    X_train.to_csv(os.path.join(args.train_data, "data.csv"), index=False)
    y_train.to_csv(os.path.join(args.train_labels, "data.csv"), index=False)

    X_test.to_csv(os.path.join(args.test_data, "data.csv"), index=False)
    y_test.to_csv(os.path.join(args.test_labels, "data.csv"), index=False)

    # Stop Logging
    mlflow.end_run()


if __name__ == "__main__":
    main()

Overwriting ./components/data_prep/data_prep.py


In [100]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

data_prep_component = command(
    name="data_prep_credit_defaults",
    display_name="Data preparation for training",
    description="reads a .csv input, split the input to train and test",
    inputs={
        "data": Input(type="uri_folder"),
        "test_train_ratio": Input(type="number"),
    },
    outputs=dict(
        train_data=Output(type="uri_folder", mode="rw_mount"),
        test_data=Output(type="uri_folder", mode="rw_mount"),
        train_labels=Output(type="uri_folder", mode="rw_mount"),
        test_labels=Output(type="uri_folder", mode="rw_mount"),
    ),
    # The source folder of the component
    code=data_prep_src_dir,
    command="""python data_prep.py \
            --data ${{inputs.data}} --test_train_ratio ${{inputs.test_train_ratio}} \
            --train_data ${{outputs.train_data}} --test_data ${{outputs.test_data}} \
            --train_labels ${{outputs.train_labels}} --test_labels ${{outputs.test_labels}} \
            """,
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
)

In [101]:
# Now we register the component to the workspace
data_prep_component = ml_client.create_or_update(data_prep_component.component)

# Create (register) the component in your workspace
print(
    f"Component {data_prep_component.name} with Version {data_prep_component.version} is registered"
)

Component data_prep_credit_defaults with Version 2024-03-05-15-11-54-5196075 is registered


## Create training component

In [102]:
import os

train_src_dir = "./components/train"
os.makedirs(train_src_dir, exist_ok=True)

Create the training script in the directory:

In [103]:
%%writefile {train_src_dir}/train.py
import argparse
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
import os
import pandas as pd
import mlflow
from sklearn.feature_extraction.text import TfidfVectorizer


def text_to_vec_transformation(X_train, X_test):
    vect = TfidfVectorizer()
    X_train_dtm = vect.fit_transform(X_train) 

    X_test_dtm = vect.transform(X_test)
    return X_train_dtm, X_test_dtm


def select_first_file(path):
    """Selects first file in folder, use under assumption there is only one file in folder
    Args:
        path (str): path to directory or file to choose
    Returns:
        str: full path of selected file
    """
    files = os.listdir(path)
    return os.path.join(path, files[0])


# Start Logging
mlflow.start_run()

# enable autologging
mlflow.sklearn.autolog()

os.makedirs("./outputs", exist_ok=True)


def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--train_data", type=str, help="path to train data")
    parser.add_argument("--test_data", type=str, help="path to test data")
    parser.add_argument("--train_labels", type=str, help="path to train labels")
    parser.add_argument("--test_labels", type=str, help="path to test labels")
    parser.add_argument("--registered_model_name", type=str, help="model name")
    parser.add_argument("--model", type=str, help="path to model file")
    args = parser.parse_args()

    print(args.train_data, args.train_labels, args.test_data, args.test_labels)

    # paths are mounted as folder, therefore, we are selecting the file from folder
    X_train = pd.read_csv(select_first_file(args.train_data)).values[:, 0]

    # Extracting the label column
    y_train = pd.read_csv(select_first_file(args.train_labels))

    # paths are mounted as folder, therefore, we are selecting the file from folder
    X_test = pd.read_csv(select_first_file(args.test_data)).values[:, 0]

    # Extracting the label column
    y_test = pd.read_csv(select_first_file(args.test_labels))

    print(f"Training with data of shape {X_train.shape}")

    X_train, X_test = text_to_vec_transformation(X_train, X_test)

    model = LogisticRegression()
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)

    print(classification_report(y_test, y_pred))

    # Registering the model to the workspace
    print("Registering the model via MLFlow")
    mlflow.sklearn.log_model(
        sk_model=clf,
        registered_model_name=args.registered_model_name,
        artifact_path=args.registered_model_name,
    )

    # Saving the model to a file
    mlflow.sklearn.save_model(
        sk_model=clf,
        path=os.path.join(args.model, "trained_model"),
    )

    # Stop Logging
    mlflow.end_run()


if __name__ == "__main__":
    main()

Overwriting ./components/train/train.py


In [104]:
%%writefile {train_src_dir}/train.yml
# <component>
name: train_credit_defaults_model
display_name: Train Credit Defaults Model
# version: 1 # Not specifying a version will automatically update the version
type: command
inputs:
  train_data: 
    type: uri_folder
  test_data: 
    type: uri_folder
  train_labels: 
    type: uri_folder
  test_labels: 
    type: uri_folder  
  registered_model_name:
    type: string
outputs:
  model:
    type: uri_folder
code: .
environment:
  # for this step, we'll use an AzureML curate environment
  azureml:AzureML-sklearn-1.0-ubuntu20.04-py38-cpu:1
command: >-
  python train.py 
  --train_data ${{inputs.train_data}} 
  --test_data ${{inputs.test_data}} 
  --train_labels ${{inputs.train_labels}} 
  --test_labels ${{inputs.test_labels}} 
  --registered_model_name ${{inputs.registered_model_name}} 
  --model ${{outputs.model}}
# </component>


Overwriting ./components/train/train.yml


Once the `yaml` file and the script are ready, you can create your component using `load_component()`. 

In [105]:
# importing the Component Package
from azure.ai.ml import load_component

# Loading the component from the yml file
train_component = load_component(source=os.path.join(train_src_dir, "train.yml"))

Now create and register the component:

In [106]:
# Now we register the component to the workspace
train_component = ml_client.create_or_update(train_component)

# Create (register) the component in your workspace
print(
    f"Component {train_component.name} with Version {train_component.version} is registered"
)

[32mUploading train (0.0 MBs):   0%|          | 0/4055 [00:00<?, ?it/s][32mUploading train (0.0 MBs): 100%|██████████| 4055/4055 [00:00<00:00, 61199.76it/s]
[39m



Component train_credit_defaults_model with Version 2024-03-05-15-11-58-1092337 is registered


## Create the pipeline

In [107]:
# the dsl decorator tells the sdk that we are defining an Azure ML pipeline
from azure.ai.ml import dsl, Input, Output


@dsl.pipeline(
    compute="serverless",
    description="E2E data_perp-train pipeline",
)
def project_training_pipeline(
    pipeline_job_data_input,
    pipeline_job_test_train_ratio,
    pipeline_job_registered_model_name,
):
    # using data_prep_function like a python call with its own inputs
    data_prep_job = data_prep_component(
        data=pipeline_job_data_input,
        test_train_ratio=pipeline_job_test_train_ratio,
    )

    # using train_func like a python call with its own inputs
    _ = train_component(
        train_data=data_prep_job.outputs.train_data,  # note: using outputs from previous step
        test_data=data_prep_job.outputs.test_data,  # note: using outputs from previous step
        train_labels=data_prep_job.outputs.train_labels,  # note: using outputs from previous step
        test_labels=data_prep_job.outputs.test_labels,  # note: using outputs from previous step
        registered_model_name=pipeline_job_registered_model_name,
    )

    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        "pipeline_job_train_data": data_prep_job.outputs.train_data,
        "pipeline_job_test_data": data_prep_job.outputs.test_data,
        "pipeline_job_train_labels": data_prep_job.outputs.train_labels,
        "pipeline_job_test_labels": data_prep_job.outputs.test_labels,
    }

In [108]:
registered_model_name = "project_model"

# Let's instantiate the pipeline with the parameters of our choice
pipeline = project_training_pipeline(
    pipeline_job_data_input=Input(type="uri_file", path=my_data.path),
    pipeline_job_test_train_ratio=0.25,
    pipeline_job_registered_model_name=registered_model_name,
)

## Submit the job

In [109]:
# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    # Project's name
    experiment_name="e2e_registered_components",
)

In [110]:
ml_client.jobs.stream(pipeline_job.name)

RunId: salmon_raisin_qxwjv5pxw1
Web View: https://ml.azure.com/runs/salmon_raisin_qxwjv5pxw1?wsid=/subscriptions/32787e59-b8b4-4923-89db-73cff7a82fbf/resourcegroups/capstone-project/workspaces/capstone-project-workspace

Streaming logs/azureml/executionlogs.txt

[2024-03-05 15:12:04Z] Submitting 1 runs, first five are: ae93ae40:225630be-2cc6-4316-aef0-79bd6baee9bb
[2024-03-05 15:12:06Z] Completing processing run id 225630be-2cc6-4316-aef0-79bd6baee9bb.
[2024-03-05 15:12:07Z] Submitting 1 runs, first five are: d10343f3:2de057ef-bbea-4edd-98f8-f33f05506316
[2024-03-05 15:13:07Z] Completing processing run id 2de057ef-bbea-4edd-98f8-f33f05506316.

Execution Summary
RunId: salmon_raisin_qxwjv5pxw1
Web View: https://ml.azure.com/runs/salmon_raisin_qxwjv5pxw1?wsid=/subscriptions/32787e59-b8b4-4923-89db-73cff7a82fbf/resourcegroups/capstone-project/workspaces/capstone-project-workspace



You can track the progress of your pipeline, by using the link generated in the cell above or in this notebook using the following code:
```python
    ml_client.jobs.stream(pipeline_job.name)
```

When you select on each component, you'll see more information about the results of that component. 
There are two important parts to look for at this stage:
* `Outputs+logs` > `user_logs` > `std_log.txt`
This section shows the script run sdtout.

## Deploy the model as an online endpoint

### Create a new online endpoint

In [26]:
import uuid

# Creating a unique name for the endpoint
online_endpoint_name = "credit-endpoint-" + str(uuid.uuid4())[:8]

In [27]:
from azure.ai.ml.entities import (
    ManagedOnlineEndpoint,
    ManagedOnlineDeployment,
    Model,
    Environment,
)

# create an online endpoint
endpoint = ManagedOnlineEndpoint(
    name=online_endpoint_name,
    description="this is an online endpoint",
    auth_mode="key",
    tags={
        "training_dataset": "credit_defaults",
        "model_type": "sklearn.GradientBoostingClassifier",
    },
)

endpoint_result = ml_client.begin_create_or_update(endpoint).result()

print(
    f"Endpint {endpoint_result.name} provisioning state: {endpoint_result.provisioning_state}"
)

Endpint credit-endpoint-fb7ba6dd provisioning state: Succeeded


In [28]:
endpoint = ml_client.online_endpoints.get(name=online_endpoint_name)

print(
    f'Endpint "{endpoint.name}" with provisioning state "{endpoint.provisioning_state}" is retrieved'
)

Endpint "credit-endpoint-fb7ba6dd" with provisioning state "Succeeded" is retrieved


### Deploy the model to the endpoint

In [32]:
# Let's pick the latest version of the model
latest_model_version = max(
    [int(m.version) for m in ml_client.models.list(name=registered_model_name)]
)

In [35]:
# picking the model to deploy. Here we use the latest version of our registered model
model = ml_client.models.get(name=registered_model_name, version=latest_model_version)


# create an online deployment.
blue_deployment = ManagedOnlineDeployment(
    name="blue",
    endpoint_name=online_endpoint_name,
    model=model,
    instance_type="Standard_E2s_v3",
    instance_count=1,
)

blue_deployment_results = ml_client.online_deployments.begin_create_or_update(
    blue_deployment
).result()

print(
    f"Deployment {blue_deployment_results.name} provisioning state: {blue_deployment_results.provisioning_state}"
)

Check: endpoint credit-endpoint-fb7ba6dd exists


........................................................................Deployment blue provisioning state: Succeeded
