# 3.- Azure ML Resources

In [29]:
import yaml
import os
from tqdm import tqdm

from azure.identity import DefaultAzureCredential

from azure.mgmt.resource import ResourceManagementClient

from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError, HttpResponseError

from azure.ai.ml import MLClient
from azure.ai.ml.entities import Workspace
from azure.mgmt.storage import StorageManagementClient
from azure.storage.blob import BlobServiceClient

from azure.ai.ml.entities import AzureBlobDatastore

from azure.ai.ml.entities import AmlCompute

## Define Variables

In [30]:
# Load configuration from the YAML file
with open("../config.yaml", "r") as file:
    config = yaml.safe_load(file)

In [31]:
subscription_id = config["azure"]["subscription_id"]
resource_group_name = config["azure"]["resource_group_name"]
workspace_name = config["azure"]["workspace_name"]
container_name = config["azure"]["container_name"]
location = config["azure"]["location"]
datastore_name = config["azure"]["datastore_name"]
training_gpu_cluster = config["azure"]["training_gpu_cluster"]

## Azure Authentication

In [4]:
# Initialize DefaultAzureCredential
credential = DefaultAzureCredential()

## Resource Group

In [5]:
# Initialize the Resource Management client
resource_client = ResourceManagementClient(credential, subscription_id)

In [6]:
def create_resource_group(resource_client, resource_group_name, location):
    try:
        # Intenta obtener el grupo de recursos
        resource_group = resource_client.resource_groups.get(resource_group_name)
        print(f"Resource Group '{resource_group_name}' already exists in '{resource_group.location}'.")
    except ResourceNotFoundError:
        # Si el grupo de recursos no existe, créalo
        resource_group_params = {"location": location}
        resource_group = resource_client.resource_groups.create_or_update(
            resource_group_name,
            resource_group_params
        )
        print(f"Resource Group '{resource_group_name}' created in '{resource_group.location}'.")
    except Exception as e:
        # Maneja otras excepciones
        print(f"An error occurred: {e}")
        return None
    return resource_group

In [7]:
# Call the function to create the Resource Group
resource_group = create_resource_group(resource_client, resource_group_name, location)

Resource Group 'test_group' already exists in 'eastus'.


## Workspace

In [8]:
ml_client = MLClient(credential, subscription_id, resource_group_name)

In [9]:
def create_workspace(ml_client, workspace_name, location):
    try:
        # Try to get the existing Workspace
        workspace = ml_client.workspaces.get(workspace_name)
        print(f"Workspace '{workspace_name}' already exists in '{workspace.location}'.")
        return workspace
    except ResourceNotFoundError:
        # If the Workspace does not exist, create it asynchronously
        workspace_poller = ml_client.workspaces.begin_create(
            Workspace(
                name=workspace_name,
                location=location  # Use the 'location' variable
            )
        )
        workspace = workspace_poller.result()  # Wait for the operation to complete
        print(f"Workspace '{workspace_name}' created in '{workspace.location}'.")
        return workspace
    except Exception as e:
        # Handle other exceptions
        print(f"An error occurred: {e}")
        return None


In [10]:
workspace = create_workspace(ml_client, workspace_name, location)

Workspace 'machine_que_tal' already exists in 'eastus'.


## Get Woskspace Storage Account Name

In [11]:
storage_account_name = workspace.storage_account.split('/')[-1]

## Get Storage Account Keys

In [19]:
storage_client = StorageManagementClient(credential, subscription_id)

In [20]:
def get_storage_account_keys(storage_client, resource_group_name, account_name):
    keys_response = storage_client.storage_accounts.list_keys(resource_group_name, account_name)
    keys = {key.key_name: key.value for key in keys_response.keys}
    return keys

In [21]:
if storage_account_name:
    storage_keys = get_storage_account_keys(storage_client, resource_group_name, storage_account_name)
    print("Successfully retrieved the storage account keys.")
else:
    print("Failed to create or retrieve the storage account.")

Successfully retrieved the storage account keys.


## Upload Data to Azure Blob Storage

In [32]:
def upload_files_to_blob(account_name, account_key, container_name, source_folder):
    account_url = f"https://{account_name}.blob.core.windows.net"
    blob_service_client = BlobServiceClient(account_url=account_url, credential=account_key)
    container_client = blob_service_client.get_container_client(container_name)
    
    try:
        container_client.create_container()
        print(f"Container '{container_name}' created.")
    except Exception as e:
        if "ContainerAlreadyExists" in str(e):
            print(f"Container '{container_name}' already exists.")
        else:
            print(f"Error creating container: {e}")

    files_to_upload = [os.path.join(root, file) for root, dirs, files in os.walk(source_folder) for file in files]
    progress_bar = tqdm(files_to_upload)

    for file_path in progress_bar:
        blob_path = os.path.relpath(file_path, start=source_folder)
        blob_client = container_client.get_blob_client(blob_path)

        try:
            with open(file_path, "rb") as data:
                blob_client.upload_blob(data, overwrite=True)
        except Exception as e:
            progress_bar.set_description(f"Failed {os.path.basename(file_path)}")


In [33]:
account_key = storage_keys['key1']
source_folder = '../data/'

upload_files_to_blob(storage_account_name, account_key, container_name, source_folder)

Container 'containerdatatrain' created.


100%|██████████| 87/87 [00:08<00:00, 10.39it/s]


## Azure Blob Datastore

In [34]:
ml_client = MLClient(credential, subscription_id, resource_group_name, workspace_name)

Overriding of current TracerProvider is not allowed
Overriding of current LoggerProvider is not allowed
Overriding of current MeterProvider is not allowed
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented


In [35]:
def create_datastore(datastore_name, account_name, container_name):
    """
    Ensure a datastore exists in the Azure ML workspace. If it does not exist, create it.

    Parameters:
    - datastore_name: Name of the datastore to check or create.
    - account_name: Azure storage account name associated with the datastore.
    - container_name: Azure storage container name associated with the datastore.

    Returns:
    None
    """

    # Check if the datastore already exists
    try:
        existing_datastore = ml_client.datastores.get(datastore_name)
        print(f"Datastore '{datastore_name}' already exists.")
    except Exception as e:
        print(f"Datastore '{datastore_name}' not found. Creating new datastore.")
        # Create a new datastore if it does not exist
        blob_datastore = AzureBlobDatastore(
            name=datastore_name,
            description="Datastore for storing training data and other blobs",
            account_name=account_name,
            container_name=container_name,
        )

        # Register the datastore in the workspace
        ml_client.datastores.create_or_update(blob_datastore)
        print(f"Datastore '{datastore_name}' has been created and registered.")

In [None]:
create_datastore(datastore_name, storage_account_name, container_name)

## Create a Compute Resource

In [40]:
try:
    # let's see if the compute target already exists
    gpu_cluster = ml_client.compute.get(training_gpu_cluster)
    print(
        f"You already have a cluster named {training_gpu_cluster}, we'll reuse it as is."
    )

except Exception:
    print("Creating a new gpu compute target...")

    # Let's create the Azure ML compute object with the intended parameters
    gpu_cluster = AmlCompute(
        # Name assigned to the compute cluster
        name=training_gpu_cluster,
        # Azure ML Compute is the on-demand VM service
        type="amlcompute",
        # VM Family
        size="STANDARD_D2_V3",
        # Minimum running nodes when there is no job running
        min_instances=0,
        # Nodes in cluster
        max_instances=4,
        # How many seconds will the node running after the job termination
        idle_time_before_scale_down=180,
        # Dedicated or LowPriority. The latter is cheaper but there is a chance of job termination
        tier="Dedicated",
    )

    # Now, we pass the object to MLClient's create_or_update method
    gpu_cluster = ml_client.begin_create_or_update(gpu_cluster).result()

print(
    f"AMLCompute with name {gpu_cluster.name} is created, the compute size is {gpu_cluster.size}"
)

Creating a new gpu compute target...
AMLCompute with name training-gpu-cluster is created, the compute size is STANDARD_D2_V3
