In [1]:
%cd ..

c:\Workspace\PracticeProjects\aml


# Train, Tune - pytorch

In [9]:
from azure.ai.ml import MLClient, command, Input
from azure.ai.ml.constants import AssetTypes, InputOutputModes
from azure.ai.ml.entities import AmlCompute, Data, Environment, Model
from azure.ai.ml.sweep import BanditPolicy, Uniform
from azure.identity import DefaultAzureCredential

from aml.settings import *

In [18]:
HYMENOPTERA_DATA_PATH = "data/hymenoptera"
HYMENOPTERA_DATA_NAME = "hymenoptera"
HYMENOPTERA_DATA_DESCRIPTION = "This dataset contains images of ants and bees intended for training a classification model. It consists of approximately 120 training images for each class (ants and bees) and 75 validation images for each class."

HYMENOPTERA_MODEL_NAME="hymenoptera_model"
HYMENOPTERA_MODEL_DESCRIPTION="Model created for hymenoptera data."

HYMENOPTERA_SWEEP_NAME="hymenoptera_sweep"
HYMENOPTERA_SWEER_DISPLAY_NAME="Hymenoptera Sweep"
HYMENOPTERA_SWEEP_DESCRIPTION="Sweep for training hymenoptera model."

CURATED_ENV_NAME = "AzureML-ACPT-pytorch-1.13-py38-cuda11.7-gpu@latest"

## Connect to Workspace

In [4]:
credential = DefaultAzureCredential()

# Get a handle to the workspace. You can find the info on the workspace tab on ml.azure.com
ml_client = MLClient(
    credential=credential,
    subscription_id=SUBSCRIPTION_ID,  # this will look like xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
    resource_group_name=RESOURCE_GROUP,
    workspace_name=WORKSPACE_NAME,
)

## Create Data asset

In [5]:
# Set the path, supported paths include:
# local: './<path>/<folder>' (this will be automatically uploaded to cloud storage)
# blob:  'wasbs://<container_name>@<account_name>.blob.core.windows.net/<path>/<folder>'
# ADLS gen2: 'abfss://<file_system>@<account_name>.dfs.core.windows.net/<path>/<folder>'
# Datastore: 'azureml://datastores/<data_store_name>/paths/<path>/<folder>'

# Define the Data asset object
my_data = Data(
    path=HYMENOPTERA_DATA_PATH,
    type=AssetTypes.URI_FOLDER,
    description=HYMENOPTERA_DATA_DESCRIPTION,
    name=HYMENOPTERA_DATA_NAME,
)

## create data asset if it doesn't already exist:
try:
    data_asset = ml_client.data.get(name=HYMENOPTERA_DATA_NAME, version=DATA_VERSION)
    print(
        f"Data asset already exists. Name: {my_data.name}, version: {my_data.version}"
    )
except:
    ml_client.data.create_or_update(my_data)
    data_asset = ml_client.data.get(name=HYMENOPTERA_DATA_NAME)
    print(f"Data asset created. Name: {my_data.name}, version: {my_data.version}")

Data asset already exists. Name: hymenoptera, version: None


## Create GPU cluster

In [6]:
create_or_update = False

if GPU_NAME in [com.name for com in ml_client.compute.list()]:
    print(
        f"You already have a cluster named {GPU_NAME}, we'll check whether its attributes match your specifications."
    )
    compute_target = ml_client.compute.get(GPU_NAME)

     # Check if the attributes of the existing compute match the specifications
    differences = []
    if compute_target.type != GPU_TYPE:
        differences.append("type")
    if compute_target.size.upper() != GPU_SIZE.upper():
        differences.append("size")
    if compute_target.min_instances != int(GPU_MIN_INSTANCES):
        differences.append("min_instances")
    if compute_target.max_instances != int(GPU_MAX_INSTANCES):
        differences.append("max_instances")
    if compute_target.idle_time_before_scale_down != float(GPU_IDLE_TIME):
        differences.append("idle_time_before_scale_down")
    if {"low_priority": "LowPriority", "dedicated": "Dedicated"}.get(compute_target.tier) != GPU_TIER:
        differences.append("tier")
        print({"low_priority": "LowPriority", "dedicated": "Dedicated"}.get(compute_target.tier), type({"low_priority": "LowPriority", "dedicated": "Dedicated"}.get(compute_target.tier)), GPU_TIER)

    # Print the differences, if any
    if differences:
        print(f"The following attributes of compute target are different from your specifications: {', '.join(differences)}")
        create_or_update = True
    else:
        print("All attributes of compute_target match the specifications.")
else:
    create_or_update = True

if create_or_update:
    user_input = input("-> Are you sure you want to create/update this Compute? [yes| no]: ")
    print(f"-> Are you sure you want to create/update this Compute? [yes| no]: {user_input.lower()}")
    
    if user_input.upper() == "YES":
        print("Creating/Updating compute target...")
        compute_target = AmlCompute(
            name=GPU_NAME,
            type=GPU_TYPE,
            size=GPU_SIZE,
            min_instances=GPU_MIN_INSTANCES,
            max_instances=GPU_MAX_INSTANCES,
            idle_time_before_scale_down=GPU_IDLE_TIME,
            tier=GPU_TIER,
        )
        compute_target = ml_client.begin_create_or_update(compute_target)
        print(f"AMLCompute with name {compute_target.name} is created/updated, the compute size is {compute_target.size}")
    else:
        print("No compute target created/updated.")

You already have a cluster named a100, we'll check whether its attributes match your specifications.
All attributes of compute_target match the specifications.


## Configure Job

In [7]:
# configure the command job
job = command(
    inputs=dict(
        # uri_file refers to a specific file as a data asset
        data=Input(
            path=data_asset.id,
            type=AssetTypes.URI_FOLDER,
            mode=InputOutputModes.RO_MOUNT
        ),
        num_epochs=30,
        learning_rate=0.001,
        momentum=0.9,
        output_dir="./outputs",
    ),
    code="./src/train-tune-pytorch",  # location of source code
    # The inputs/outputs are accessible in the command via the ${{ ... }} notation
    command="python pytorch_train.py --data ${{inputs.data}} --num_epochs ${{inputs.num_epochs}} --output_dir ${{inputs.output_dir}}",
    # This is the ready-made environment you are using
    environment=CURATED_ENV_NAME,
    # This is the compute you created earlier. You can alternatively remove this line to use serverless compute to run the job
    compute=GPU_NAME,
    # An experiment is a container for all the iterations one does on a certain project. All the jobs submitted under the same experiment name would be listed next to each other in Azure ML studio.
    experiment_name=EXPERIMENT_NAME,
    display_name=EXPERIMENT_DISPLAY_NAME,
)

## Submit Job

In [8]:
ml_client.create_or_update(job)

Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
[32mUploading train-tune-pytorch (0.0

Experiment,Name,Type,Status,Details Page
hymenoptera_classification,tidy_shampoo_bdmqwzsdl7,command,Starting,Link to Azure Machine Learning studio


## Tune model hyperparameters

Now that we've seen how to do a simple PyTorch training run using the SDK, let's see if we can further improve the accuracy of our model. We can optimize our model's hyperparameters using Azure Machine Learning's sweep capabilities.

You will replace some of the parameters passed to the training job with special inputs from the azure.ml.sweep package – that way, you are defining the parameter space in which to search.

Since the training script uses a learning rate schedule to decay the learning rate every several epochs, you can tune the initial learning rate and the momentum parameters.

In [19]:
# we will reuse the command_job created before. we call it as a function so that we can apply inputs
job_for_sweep = job(
    learning_rate=Uniform(min_value=0.0005, max_value=0.005),
    momentum=Uniform(min_value=0.9, max_value=0.99),
)

Then you configure sweep on the command job, with some sweep-specific parameters like the primary metric to watch and the sampling algorithm to use.

- You can use random sampling to try different configuration sets of hyperparameters to maximize the primary metric, the best validation accuracy (best_val_acc).
- You can specify the early termination policy to use to early terminate poorly performing runs. Here you use the BanditPolicy, which will terminate any run that doesn't fall within the slack factor of our primary evaluation metric. You will apply this policy every epoch (since we report our `best_val_acc` metric every epoch and `evaluation_interval`=1). Notice we will delay the first policy evaluation until after the first 10 epochs (`delay_evaluation`=10).

In [21]:
sweep_job = job_for_sweep.sweep(
    compute=GPU_NAME,
    sampling_algorithm="random",
    primary_metric="best_val_acc",
    goal="Maximize",
    max_total_trials=4,
    max_concurrent_trials=4,
    early_termination_policy=BanditPolicy(
        slack_factor=0.15, evaluation_interval=1, delay_evaluation=10
    ),
)

# Specify your experiment details
sweep_job.display_name = HYMENOPTERA_SWEER_DISPLAY_NAME
sweep_job.experiment_name = HYMENOPTERA_SWEEP_NAME
sweep_job.description = HYMENOPTERA_SWEEP_DESCRIPTION

Now you can submit this job as before. This will now run a sweep job that sweeps over our train job.

In [22]:
returned_sweep_job = ml_client.create_or_update(sweep_job)

# stream the output and wait until the job is finished
ml_client.jobs.stream(returned_sweep_job.name)

# refresh the latest status of the job after streaming
returned_sweep_job = ml_client.jobs.get(name=returned_sweep_job.name)

RunId: dreamy_wheel_c8dhz2t4p8
Web View: https://ml.azure.com/runs/dreamy_wheel_c8dhz2t4p8?wsid=/subscriptions/66ac1049-5712-45db-ac73-0472ab01abf8/resourcegroups/DI_INTERNS/workspaces/di-internal-projects

Streaming azureml-logs/hyperdrive.txt

[2024-04-26T08:55:22.131638][GENERATOR][INFO]Trying to sample '4' jobs from the hyperparameter space
[2024-04-26T08:55:22.6385544Z][SCHEDULER][INFO]Scheduling job, id='dreamy_wheel_c8dhz2t4p8_0' 
[2024-04-26T08:55:22.7600954Z][SCHEDULER][INFO]Scheduling job, id='dreamy_wheel_c8dhz2t4p8_1' 
[2024-04-26T08:55:22.8594118Z][SCHEDULER][INFO]Scheduling job, id='dreamy_wheel_c8dhz2t4p8_2' 
[2024-04-26T08:55:22.9763452Z][SCHEDULER][INFO]Scheduling job, id='dreamy_wheel_c8dhz2t4p8_3' 
[2024-04-26T08:55:22.941921][GENERATOR][INFO]Successfully sampled '4' jobs, they will soon be submitted to the execution target.
[2024-04-26T08:55:23.2851255Z][SCHEDULER][INFO]Successfully scheduled a job. Id='dreamy_wheel_c8dhz2t4p8_0' 
[2024-04-26T08:55:23.3682406Z][SCHE

You can monitor the job using the studio UI link presented when you run the job.

### Find the best model

**Once all the runs complete**, you can find the run that produced the model with the highest accuracy.

In [23]:
if returned_sweep_job.status == "Completed":

    # First let us get the run which gave us the best result
    best_run = returned_sweep_job.properties["best_child_run_id"]

    # lets get the model from this run
    model = Model(
        # the script stores the model as "outputs"
        path="azureml://jobs/{}/outputs/artifacts/paths/outputs/".format(best_run),
        name=HYMENOPTERA_MODEL_NAME,
        description=HYMENOPTERA_MODEL_DESCRIPTION,
        type="custom_model",
    )

else:
    print(
        "Sweep job status: {}. Please wait until it completes".format(
            returned_sweep_job.status
        )
    )