# DISTRIBUTED GPU TRAINING JOB FOR FOOD DATA CLASSIFICATION 

The purpose of this notebook is to provide a technical documentation for a greater accelerated training process. This is done through a methodological approach known as distributed GPU training process. This involves a multi-node multi-gpu pytorch job, wherin MLFlow was used to analyze the metrics.  

**Requirements & Dependencies:**
1. Provisioned AzureML workspace with Azure subscription
2. Appropriate permissions to provision minimal CPU and GPU cluster 
3. Azure ML Python SDK V2


## CONNECTION TO AZUREML CLIENT üîó

An instance of the MLClient was created to connect to AzureML service. The use of `DefaultAzureCredential` is used to access the workspace and resource wherin the code is located in. This service principle policy allows the user to authenticate to access the client in a secured manner. 

In [2]:
# Import the required libraries for this auth step
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential

# tru catch method to retrieve this form of connection with a token
try:
    credential = DefaultAzureCredential()
    credential.get_token("https://management.azure.com/.default")

# when this form of connection doesn't work, it will prompt a manual login
except Exception as error:
    credential = InteractiveBrowserCredential()





In [3]:
# Import the ml client library
from azure.ai.ml import MLClient

# Prepare the information needed to access it in the account
ml_client = MLClient(
    subscription_id="<SUBSCRIPTION_ID>",
    resource_group_name="resource1",
    workspace_name="workspace1",
    credential=credential,
)
cpu_cluster = None
gpu_cluster = None




## CREATION OF CLUSTERS üéØ
There are two types of clusters on Azure that are required for this project. This includes CPU and GPU cluster. 

1. **CPU**: Consists of VMs to handle computing tasks such as running applications, handling web applications, and performing data processing. Don't rely on parallel processing. 

2. **GPU**: Consists of VMs for parallel processing and for heavy computation work such as ML, scientific simulations, video rendering, etc. Azure uses the NVIDIA Tesla series as a VM to perform deep learning tasks. VMs are software-emulation of physical computers that run on their own OS(guest OS) and runs independently of other VMs in the same host machine. 

In [5]:
# Import the cpu library from Azure
from azure.ai.ml.entities import AmlCompute

cpu_compute_target = "project-cpu-cluster"

# Determine if the compute target already exists and return a message for it
try:
    cpu_cluster = ml_client.compute.get(cpu_compute_target)
    print(f"You already have a cluster of the same name which is {cpu_compute_target}")

# We're not catching an error, but its an exception  
except Exception:
    print("Creating a new CPU compute target...")

    # Create an Azure ML Compute Object 
    cpu_cluster = AmlCompute(
        # Name of cluster
        name = "{cpu_compute_target}",

        # Describe the VM service 
        type = "amlcompute",

        # VM Family
        size = "STANDARD_DS3_V2",

        # Min nodes 
        min_instances = 0,

        # Max nodes
        max_instances = 5,

        #Time for node to run after job has been terminated
        idle_time_before_scale_down = 200,

        # Define the cost tier - LowPriority or Dedicated. 
        tier = "Dedicated",
    )

    # Pass the object to the MLClient for creation and updation
    cpu_cluster_client = ml_client.begin_create_or_update(cpu_cluster)

# print statement in the end to show the success of creation 
print(f"The compute with the name is {cpu_cluster_client.name} is made and the size is {cpu_cluster_client.size}")

Creating a new CPU compute target...
The compute with the name is project-cpu-cluster is made and the size is STANDARD_DS3_V2


In [6]:
# Import the required libraries for AML compute
from azure.ai.ml.entities import AmlCompute

gpu_cluster_target = "project-gpu-cluster"

# check if the gpu cluster exists
try:
    gpu_cluster = ml_client.compute.get(gpu_cluster_target)
    print(f"Theres a gpu clusterwith a name {gpu_cluster_target} that already exists")


# compute using gpu compute cluster by making one
except Exception:
    print("Creating a new gpu compute target...")

    gpu_cluster = AmlCompute(
        # Name of cluster
        name = "project-gpu-cluster", 

        # Describe the VM service 
        type = "amlcompute",

        # VM Family
        size = "STANDARD_NC6s_v3",

        # Min number of nodes
        min_instances = 0,

        # Max number of nodes
        max_instances = 5,

        #Time for node to run after job has been terminated
        idle_time_before_scale_down = 200,

        # Define the cost tier
        tier = "Dedicated",

    )

    # pass the object to the ml client
    gpu_cluster_client = ml_client.begin_create_or_update(gpu_cluster)

print(f"AML Compute with the name {gpu_cluster_client.name} and the size of {gpu_cluster_client.size} ")


Creating a new gpu compute target...
AML Compute with the name project-gpu-cluster and the size of STANDARD_NC6s_v3


## UNZIPPING IMAGE ARCHIVES üñºÔ∏è
To train the machine learning classifier, it is crucial to take in the dataset from local and extract the zip archive before putting them in train and validation folder. 

```
tar xvfm ${{inputs.archive}} --no-same-owner -C ${{outputs.images}}
```

Parameters like the location of archive and output directory are injected into the command using the command. This is applied further in the code itself. 

In [7]:
# import the required libraries
from azure.ai.ml import command 
from azure.ai.ml import Input, Output
from azure.ai.ml.constants import AssetTypes 

# Command for unzipping the files in the directory
dataset_untar_command_jar = command(
    # Name for the UI (optional)
    display_name = "untarring_command",

    # apply the command
    command = "tar xvfm ${{inputs.archive}} --no-same-owner -C ${{outputs.images}}",

    # inputs
    inputs = {
        "archive": Input(
            type = AssetTypes.URI_FILE, 
            path = "https://drive.google.com/file/d/1BGnigrVXeQ-Oeh04wIyG0HcEH4_f_kgf/view?usp=sharing"
        )
    },

    # outputs 
    outputs = {
        "images": Output(
            type = AssetTypes.URI_FOLDER,
            mode = "upload",
            path="azureml://datastores/workspaceblobstore/paths/datasets",

        ),
    },

    # define the environment
    environment = "AzureML-sklearn-1.0-ubuntu20.04-py38-cpu:1",

    # define the compute (Lambda Expression in Python)
    compute = lambda client: "project-cpu-cluster" if client else None
    
)

Creating command job for unzipping files...
Command job created successfully.


In [10]:
# import the required libraries
import webbrowser

# submit the required command object to the ml client
job_object = ml_client.create_or_update(dataset_untar_command_jar)

# obtain the URL for job status to unzip the files
print(f"Here is the URL for the live job.... {job_object.studio_url}")

# Open the browser with this URL 
webbrowser.open(job_object.studio_url)

#print the pipeline
print(f"The pipeline details can be accessed through the job: {job_object.name}")


Here is the URL for the live job.... 
The pipeline details can be accessed through the job: food-classification


## DISTRIBUTED GPU TRAINING JOB ü§ñ
Distributed training can be completed in a bunch of different ways that include 

In [None]:
import os 
train_src_dir = "/Users/harinikarthik/Desktop/Waterloo/Leetcode/Smart-Fridge/SRC"
os.makedirs(train_src_dir, exist_ok = True)

In [None]:
%%writefile {train_src_dir}/main.py
import os
import argparse
import pandas as pd
import mlflow
import mlflow.pytorch
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, DistributedSampler
from torch.nn.parallel import DistributedDataParallel

class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.fc_layers = nn.Sequential(
            nn.Linear(32 * 128 * 128, 128),
            nn.ReLU(),
            nn.Linear(128, 16)  # Output size 16 for 16 classes
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)
        x = self.fc_layers(x)
        return x

def main():
    """Main function of the script."""
    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--data", type=str, help="path to input data")
    parser.add_argument("--batch_size", type=int, help="batch size")
    parser.add_argument("--num_workers", type=int, help="number of workers")
    parser.add_argument("--prefetch_factor", type=int, help="prefetch factor")
    parser.add_argument("--model_arch", type=str, help="model architecture")
    parser.add_argument("--model_arch_pretrained", type=bool, help="whether to use pretrained model architecture")
    parser.add_argument("--num_epochs", type=int, help="number of epochs")
    parser.add_argument("--learning_rate", type=float, help="learning rate")
    parser.add_argument("--momentum", type=float, help="momentum")
    parser.add_argument("--register_model_as", type=str, help="model registration name")
    parser.add_argument("--enable_profiling", type=bool, help="whether to enable profiling")
    args = parser.parse_args()
   
    # Initialize distributed backend
    torch.distributed.init_process_group(backend='nccl')

    # Start Logging
    mlflow.start_run()

    ###################
    #<prepare the data>
    ###################
    print(" ".join(f"{k}={v}" for k, v in vars(args).items()))

    print("input data:", args.data)
    
    # Assuming you have a train dataset in args.data directory
    train_dataset = torchvision.datasets.ImageFolder(
        root=args.data,
        transform=transforms.Compose([
                      transforms.ToTensor(),
                      transforms.Resize((512, 512))
        ])
    )

    # Use DistributedSampler for distributed training
    train_sampler = DistributedSampler(train_dataset)

    train_loader = DataLoader(train_dataset, batch_size=args.batch_size, sampler=train_sampler, 
                              num_workers=args.num_workers, prefetch_factor=args.prefetch_factor)
    ####################
    #</prepare the data>
    ####################

    ##################
    #<train the model>
    ##################
    model = CNNModel()
    
    # Wrap model with DistributedDataParallel
    model = DistributedDataParallel(model)

    # Loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=args.learning_rate, momentum=args.momentum)

    # Training loop
    loss = None
    for epoch in range(args.num_epochs):
        model.train()
        for images, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

    ###################
    #</train the model>
    ###################

    ##########################
    #<save and register model>
    ##########################
    # Save the trained model
    torch.save(model.state_dict(), "trained_model.pth")

    # Registering the model to the workspace
    print("Registering the model via MLFlow")
    mlflow.pytorch.log_model(model, args.register_model_as)

    # Saving the model to a file
    mlflow.pytorch.save_model(model, args.register_model_as)
    ###########################
    #</save and register model>
    ###########################
    
    # Log training loss
    mlflow.log_metric("training_loss", loss.item())
    
    # Stop Logging
    mlflow.end_run()

if __name__ == "__main__":
    main()


In [None]:
from azure.ai.ml import command
from azure.ai.ml import Input
from azure.ai.ml.entities import ResourceConfiguration

training_job = command(
    # local path where the code is stored
    code="./src/pytorch_dl_train/",
    # describe the command to run the python script, with all its parameters
    # use the syntax below to inject parameter values from code
    command="""python train.py \
        --train_images ${{inputs.train_images}} \
        --batch_size ${{inputs.batch_size}} \
        --num_workers ${{inputs.num_workers}} \
        --prefetch_factor ${{inputs.prefetch_factor}} \
        --model_arch ${{inputs.model_arch}} \
        --model_arch_pretrained ${{inputs.model_arch_pretrained}} \
        --num_epochs ${{inputs.num_epochs}} \
        --learning_rate ${{inputs.learning_rate}} \
        --momentum ${{inputs.momentum}} \
        --register_model_as ${{inputs.register_model_as}} \
        --enable_profiling ${{inputs.enable_profiling}}
    """,
    inputs={
        "train_images": Input(
            type="uri_folder",
            path="Users/guess_karthik/AzureML/src",
            # path="azureml://datastores/workspaceblobstore/paths/tutorial-datasets/places2/train/",
            mode="download",  # use download to make access faster, mount if dataset is larger than VM
        ),
        "batch_size": 64,
        "num_workers": 5,  # number of cpus for pre-fetching
        "prefetch_factor": 2,  # number of batches fetched in advance
        "model_arch": "resnet18",
        "model_arch_pretrained": True,
        "num_epochs": 7,
        "learning_rate": 0.01,
        "momentum": 0.01,
        "register_model_as": "dogs_dev",
        # "register_model_as": "places_dev",
        "enable_profiling": False,
    },
    environment="AzureML-pytorch-1.10-ubuntu18.04-py38-cuda11-gpu@latest",
    compute="gpu-cluster"
    if (gpu_cluster)
    else None,  # No compute needs to be passed to use serverless
    distribution={
        "type": "PyTorch",
        # set process count to the number of gpus on the node
        # NC6 has only 1
        "process_count_per_instance": 1,
    },
    # set instance count to the number of nodes you want to use
    instance_count=2,
    display_name="pytorch_training_sample",
    description="training a torchvision model",
)
if gpu_cluster == None:
    training_job.resources = ResourceConfiguration(
        instance_type="Standard_NC6s_v3", instance_count=2
    )  # resources for serverless job

In [33]:
ml_client.jobs.create_or_update(training_job)


Uploading src (0.01 MBs):   0%|          | 0/5613 [00:00<?, ?it/s]
Uploading src (0.01 MBs): 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 5613/5613 [00:00<00:00, 425972.14it/s]


Unnamed: 0,Experiment,Name,Type,Status,Details Page
0,Foods-Project,quirky_night_36q3t31071,command,Starting,Link to Azure Machine Learning studio
1,Foods-Project,quirky_night_36q3t31072,command,Starting,Link to Azure Machine Learning studio


## Creating an Online Endpoint for real-time inference

In [None]:
import uuid

# Creating a unique name for the endpoint
online_endpoint_name = "food-endpoint-" + str(uuid.uuid4())[:8]

In [34]:
# Expect the endpoint creation to take a few minutes
from azure.ai.ml.entities import (
    ManagedOnlineEndpoint,
    ManagedOnlineDeployment,
    Model,
    Environment,
)

# create an online endpoint
endpoint = ManagedOnlineEndpoint(
    name=online_endpoint_name,
    description="this is an online endpoint",
    auth_mode="key",
    tags={
        "training_dataset": "credit_defaults",
        "model_type": "torch.nn.Sequential",
    },
)

endpoint = ml_client.online_endpoints.begin_create_or_update(endpoint).result()

print(f"Endpoint {endpoint.name} provisioning state: {endpoint.provisioning_state}")

Endpoint food-endpoint-7880d4a9 provisioning state: Succeeded


In [35]:
endpoint = ml_client.online_endpoints.get(name=online_endpoint_name)

print(
    f'Endpoint "{endpoint.name}" with provisioning state "{endpoint.provisioning_state}" is retrieved'
)

Endpoint "food-endpoint-7880d4a9" with provisioning state "Succeeded" is retrieved


In [37]:
# List all models
all_models = ml_client.models.list()

# Iterate over the models and print their details
for model in all_models:
    print(f"Name: {model.name}, Version: {model.version}, Description: {model.description}")
    model.version = "2"
    print(f"Name: {model.name}, Version: {model.version}, Description: {model.description}")


Name: azureml_mango_shirt_vhw92tbjc8_output_mlflow_log_model_1974916105, Version: None, Description: None
Name: azureml_mango_shirt_vhw92tbjc8_output_mlflow_log_model_1974916105, Version: 2, Description: None
Name: azureml_mango_shirt_vhw92tbjc8_output_mlflow_log_model_1099368587, Version: None, Description: None
Name: azureml_mango_shirt_vhw92tbjc8_output_mlflow_log_model_1099368587, Version: 2, Description: None
Name: credit_defaults_model, Version: None, Description: None
Name: credit_defaults_model, Version: 2, Description: None
Name: azureml_wheat_garden_dmr7j4sjgl_output_mlflow_log_model_1982509246, Version: None, Description: None
Name: azureml_wheat_garden_dmr7j4sjgl_output_mlflow_log_model_1982509246, Version: 2, Description: None
Name: azureml_teal_market_mpkbjj7gmb_output_mlflow_log_model_1893551161, Version: None, Description: None
Name: azureml_teal_market_mpkbjj7gmb_output_mlflow_log_model_1893551161, Version: 2, Description: None
Name: azureml_elated_parsnip_vv3s61rxjb_o

In [38]:
# picking the model to deploy. Here we use the latest version of our registered model
model = ml_client.models.get(name="Food_Model", version = "1")

# Expect this deployment to take approximately 6 to 8 minutes.
# create an online deployment.
# if you run into an out of quota error, change the instance_type to a comparable VM that is available.
# Learn more on https://azure.microsoft.com/en-us/pricing/details/machine-learning/.
blue_deployment = ManagedOnlineDeployment(
    name="bluenew",
    endpoint_name=online_endpoint_name,
    model=model,
    instance_type="Standard_E2s_v3",
    instance_count=1,
)

blue_deployment = ml_client.begin_create_or_update(blue_deployment).result()

Check: endpoint food-endpoint-7880d4a9 exists
................................................................................................................................................................................................................................................
................................................................................................................................................................................................................................................
................................................................................................................................................................................................................................................
.......................................................................................................................................................................................................................................