PCRLv2 source code: https://github.com/seanreed1111/PCRLv2

In [2]:
from omegaconf import OmegaConf
import logging, argparse, os, pathlib
from azure.ai.ml import command, Input, Output
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.entities import AmlCompute
import json, time
import webbrowser
import datetime
from azure.ai.ml import MLClient
from dotenv import load_dotenv
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from pathlib import Path
from src.loggers import create_python_logger
(Path.cwd() / 'logs').mkdir(exist_ok=True)
logger = create_python_logger(__name__)
logger.debug("logging started")

2023-01-09 23:04:57,486 : DEBUG : __main__ : logging started


In [3]:
try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()


load_dotenv("~/.env")

# get a handle to the workspace
ml_client = MLClient(
    subscription_id=os.getenv("SUBSCRIPTION_ID"),
    resource_group_name=os.getenv("RESOURCE_NAME"),
    workspace_name=os.getenv("WORKSPACE_NAME"),
    credential=credential,
)
### setup
compute = {}
compute["sean-cpu-cluster-2"] = {"instance_type":"STANDARD_DS12_V2", "gpu":False, "machine_name":"STANDARD_DS12_V2", "process_count_per_instance":1}
compute["gpu-cluster-2-V100s"] = {"instance_type":"STANDARD_NC6s_v3", "gpu":True, "machine_name":"Telsa V100", "process_count_per_instance":1}
compute["gpu-cluster-2-V100s-LP"] = {"instance_type":"STANDARD_NC6s_v3", "gpu":True, "machine_name":"Telsa V100", "process_count_per_instance":1}
compute["gpu-cluster-4-V100s"] = {"instance_type":"STANDARD_NC6s_v3", "gpu":True, "machine_name":"Telsa V100", "process_count_per_instance":1}
compute["new-gpu-cluster-4-V100s-LP"] = {"instance_type":"STANDARD_NC6s_v3", "gpu":True, "machine_name":"Telsa V100", "process_count_per_instance":1}
compute["gpu-cluster-1-4xV100s-LP"] = {"instance_type":"Standard_NC24s_v3", "gpu":True, "machine_name":"Telsa 4xV100", "process_count_per_instance":4}
compute["gpu-cluster-1-4xV100s"] = {"instance_type":"Standard_NC24s_v3", "gpu":True, "machine_name":"Telsa 4xV100", "process_count_per_instance":4}

In [None]:
#################### INPUT ####################
compute_target = "gpu-cluster-2-V100s-LP"
##############################################################

environment = "env-PCRLv2"
device = 'gpu' if compute[compute_target]['gpu'] else 'cpu'
tier = "LowPriority" if ("LP" in compute_target) else "Dedicated"
instance_type = compute[compute_target]["instance_type"]
process_count_per_instance = compute[compute_target]["process_count_per_instance"]
### Metric Display Variables
machine_name = compute[compute_target]["machine_name"]
display_name = f"{compute_target}"

try:
    # let's see if the compute target already exists
    gpu_cluster = ml_client.compute.get(compute_target)
    print(
        f"You already have a cluster named {compute_target}, we'll reuse it as is."
    )

except Exception:
    print(f"Creating a new compute target...")

    cluster = AmlCompute(
        name=compute_target,
        type="amlcompute",
        size= instance_type,
        min_instances=0,
        max_instances=2,
        idle_time_before_scale_down=200,
        tier=tier, #Dedicated, LowPriority
    )

    cluster = ml_client.begin_create_or_update(cluster)

print(
    f"AMLCompute with name {compute_target} is available"
)

In [None]:
#################### INPUT  ####################
device_count = 1
precision = 32
batch_size = 1


##############################################################
## Sanity checks
precision = 32 if not compute[compute_target]['gpu'] else precision #cpu compute must be 32


# if cache_dataset:
#     p0 = "cache_dataset"
# elif persistent_dataset:
#     p0 = "persistent_dataset"
# else:
#     p0 = ""
# p1 = "fastdevrun" if fast_dev_run else ""
# p2 = "debug_loader" if debug_get_loader else ""
# p3 ="accumulate_grad_batches" if accumulate_grad_batches else ""
# p4 = f"overfit{overfit_batches}-batches" if overfit_batches else ""
# p5 = f"batch-size{batch_size}-sw_batch_size{sw_batch_size}"

experiment_name = f"{Path.cwd().stem}"

run_config = {
    'device': device,
    "precision": precision,
    "device_count": device_count,
    "compute_target": compute_target,
    "compute_target_args": compute[compute_target],
    "experiment_name": experiment_name,
    "environment": environment,
    "batch_size": batch_size,
    "tier": tier
}
p = Path("src/cfg/run_config.yaml")
p.parent.mkdir(parents=True, exist_ok=True)
with open(p, "w") as f:
    OmegaConf.save(OmegaConf.create(run_config), f=f.name)

training_job = command(
    # local path where the code is stored
    code="./src",
    # describe the command to run the python script, with all its parameters
    # use the syntax below to inject parameter values from code
    command="""python test.py \
        base_data_dir=${{inputs.base_data_dir}}, \
        base_weight_path=${{inputs.base_weight_path}}
    """,
    inputs={
        "base_data_dir": Input(
            type="uri_folder",
            path="azureml://datastores/workspaceworkingdirectory/paths/Users/sean.reed/data/base-data-dir-v1",
            mode="ro_mount",  # use mode="download" to make access faster, "ro_mount" if dataset is larger than VM
        ),
        "base_weight_path": Input(
            type="uri_folder",
            path="azureml://datastores/workspaceworkingdirectory/paths/Users/sean.reed/PCRLv2/pretrained_weights",
            mode="ro_mount",  # use mode="download" to make access faster, "ro_mount" if dataset is larger than VM
        )
    },
    environment=environment,
    compute=compute_target,
    distribution={
        "type": "PyTorch",
        # set process count to the number of gpus on the node
        "process_count_per_instance": process_count_per_instance,
    },
    # set instance count to the number of nodes you want to use
    instance_count=device_count,
    display_name=display_name,
    description=f"This job is using {device_count} x  {process_count_per_instance} x{machine_name}s in environment {environment} on the {tier} tier",
)

import webbrowser

# submit the job
returned_job = ml_client.jobs.create_or_update(
    training_job,
    # Project's name
    experiment_name=experiment_name,
)

# get a URL for the status of the job
print("The url to see your live job running is returned by the sdk:")
print(returned_job.studio_url)
# open the browser with this url
webbrowser.open(returned_job.studio_url)

# print the pipeline run id
print(
    f"The pipeline details can be access programmatically using identifier: {returned_job.name}"
)
# saving it for later in this notebook
small_scale_run_id = returned_job.name