### First cells

In [1]:
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient

In [2]:
import os
## Either get environment variables, or a fallback name, which is the second parameter.
## Currently, fill in the fallback values. Later on, we will make sure to work with Environment values. So we're already preparing for it in here!
workspace_name = os.environ.get('WORKSPACE', 'pollet-daymon-ml')
subscription_id = os.environ.get('SUBSCRIPTION_ID', '6a36bb7a-aee5-4e15-a3c7-2e362d2c2387')
resource_group = os.environ.get('RESOURCE_GROUP', 'mlops-demo')

In [3]:
# The credential "DefaultAzureCredential" will use the same name as your logged in user.
credential = DefaultAzureCredential()

In [4]:
ml_client = MLClient(
    credential, subscription_id, resource_group, workspace_name
)

### Create a Compute Machine from the SDK

In [5]:
# Compute Instances need to have a unique name across the region.
from azure.ai.ml.entities import ComputeInstance, AmlCompute

ci_basic_name = "cpu-daymon-auto" # I add the suffix Auto, because we are automatically creating this instance.
idle_shutdown_minutes = 30

ci_basic = ComputeInstance(
    name=ci_basic_name,
    size="STANDARD_DS3_v2",
    idle_time_before_shutdown_minutes=idle_shutdown_minutes # by adding this line we can configure a shutdown if idle
)

print(f"Creating or updating compute instance '{ci_basic_name}' with size '{ci_basic.size}' and idle shutdown after {idle_shutdown_minutes} minutes of inactivity...")

ml_client.begin_create_or_update(ci_basic).result()

print(f"Compute instance '{ci_basic_name}' created/updated successfully!")

Creating or updating compute instance 'cpu-daymon-auto' with size 'STANDARD_DS3_v2' and idle shutdown after 30 minutes of inactivity...
Compute instance 'cpu-daymon-auto' created/updated successfully!


pasting contents of the yaml file to the right directory using code.

In [6]:
import os

directory_path = "components/dataprep"

file_path = os.path.join(directory_path, "conda.yaml")

conda_yaml_content = """
name: aml-Pillow
channels:
  - conda-forge
dependencies:
  - python=3.8
  - numpy=1.21.2
  - pip=21.2.4
  - scikit-learn=0.24.2
  - scipy=1.7.1
  - pandas>=1.1,<1.2
  - pip:
    - inference-schema[numpy-support]==1.3.0
    - xlrd==2.0.1
    - mlflow==1.26.1
    - azureml-mlflow==1.42.0
    - Pillow==10.0.1
"""

os.makedirs(directory_path, exist_ok=True)

with open(file_path, "w") as f:
    f.write(conda_yaml_content.strip()) 

print(f"Successfully created '{file_path}' with the specified content.")

# verify reading back contents
with open(file_path, "r") as f:
    print("\n--- Content of conda.yaml ---")
    print(f.read())
    print("-----------------------------")

Successfully created 'components/dataprep/conda.yaml' with the specified content.

--- Content of conda.yaml ---
name: aml-Pillow
channels:
  - conda-forge
dependencies:
  - python=3.8
  - numpy=1.21.2
  - pip=21.2.4
  - scikit-learn=0.24.2
  - scipy=1.7.1
  - pandas>=1.1,<1.2
  - pip:
    - inference-schema[numpy-support]==1.3.0
    - xlrd==2.0.1
    - mlflow==1.26.1
    - azureml-mlflow==1.42.0
    - Pillow==10.0.1
-----------------------------


Using magic

In [7]:
!mkdir -p components/dataprep # -p is the same as 'create if not exists' 

# Use %%writefile to write the content directly to the file
# this command must be the first line in the cell and there can't be any comments or it won't work
# After executing this cell, the file 'components/dataprep/conda.yaml' will be created/overwritten.

In [8]:
%%writefile components/dataprep/conda.yaml 
name: aml-Pillow
channels:
  - conda-forge
dependencies:
  - python=3.8
  - numpy=1.21.2
  - pip=21.2.4
  - scikit-learn=0.24.2
  - scipy=1.7.1
  - pandas>=1.1,<1.2
  - pip:
    - inference-schema[numpy-support]==1.3.0
    - xlrd==2.0.1
    - mlflow==1.26.1
    - azureml-mlflow==1.42.0
    - Pillow==10.0.1

Overwriting components/dataprep/conda.yaml


In [9]:
# Create the environment “aml-Pillow”. aml stands for “Azure Machine Learning” and Pillow  is a well known Image Processing library.
from azure.ai.ml.entities import Environment
import os

custom_env_name = "aml-Pillow"

pipeline_job_env = Environment(
    name=custom_env_name,
    description="Custom environment for Image Processing (with Pillow)",
    tags={"Pillow": "10.0.1"},
    conda_file=os.path.join("components", "dataprep", "conda.yaml"),
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
    )
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

Environment with name aml-Pillow is registered to workspace, the environment version is 1


## creating data prep component

In [10]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

# This registers a component with the name "data_prep_image_resize"
# Which can then be used in the Pipeline editor of the Azure Portal
data_prep_component = command(
    name="data_prep_image_resize",
    display_name="Data preparation, Image Resizing",
    description="Reads a data asset of images and preprocesses them by resizing them to 64 to 64.",
    # Which input data will we receive? We will be splitting each batch of images individually for each animal type.
    inputs={
        "data": Input(type="uri_folder"),
    },
    # We need the "rw_mount" (Read/Write mount) so that our code can also write to the output folder and save the images
    outputs={
        "output_data": Output(type="uri_folder", mode="rw_mount"),
    },
    # The source folder of the components code. It will upload all the files in that directory
    code=os.path.join("components", "dataprep"),
    command="""python dataprep.py \
            --data ${{inputs.data}} \
            --output_data ${{outputs.output_data}} \
            """,
    environment=f"aml-Pillow@latest",
)

# Now we register the component to the workspace
data_prep_component = ml_client.create_or_update(data_prep_component.component)

# Create (register) the component in your workspace
print(
    f"Component {data_prep_component.name} with Version {data_prep_component.version} is registered"
)

Component data_prep_image_resize with Version 2025-05-30-17-29-52-6002454 is registered


### first pipeline component

In [11]:
# the dsl decorator tells the sdk that we are defining an Azure ML pipeline
from azure.ai.ml import dsl, Input, Output

@dsl.pipeline(
    compute="cpu-daymon-auto",
    description="Custom data_prep pipeline",
)
def animal_images_preprocessing_pipeline(
    input_version: str, # Currently we don't use these version numbers, but we will use them later on.
    output_version: str,
):
    # using data_prep_function like a python call with its own inputs
    # These are the animals with the version name as a second item in the tuple
    animals = [
        ('pandas', "1"),
        ('cats', "1"),
        ('dogs', "1")
    ] # They are hardcoded in here, because we should give them from another component otherwise.
    
    jobs = {}
    for animal in animals:

        data_prep_job = data_prep_component(
            data=Input(
                type="uri_folder",
                path=f"azureml:{animal[0]}:{animal[1]}" 
            ),
        )
        
        output_name = animal[0] + "_resized"
        # Update the subscriptionID, resourcegroup and workspace name here as well
        workspace_name = os.environ.get('WORKSPACE', 'pollet-daymon-ml')
        subscription_id = os.environ.get('SUBSCRIPTION_ID', '6a36bb7a-aee5-4e15-a3c7-2e362d2c2387')
        resource_group = os.environ.get('RESOURCE_GROUP', 'mlops-demo')
        output_path = f"azureml://subscriptions/{subscription_id}/resourcegroups/{resource_group}/workspaces/{workspace_name}/datastores/workspaceblobstore/paths/processed_animals/" + animal[0]

        data_prep_job.outputs.output_data = Output(
            type="uri_folder",
            path=output_path,
            name=output_name,
            mode="rw_mount"
        )

        jobs[animal[0]] = data_prep_job

    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        k: v.outputs.output_data for k,v in jobs.items()
    }

In [12]:
# Let's instantiate the pipeline with the parameters of our choice
pipeline = animal_images_preprocessing_pipeline()

In [13]:
import webbrowser

# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    # Project's name
    experiment_name="image_preprocessing_pipeline",
)
# open the pipeline in web browser
webbrowser.open(pipeline_job.studio_url)

Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
pathOnCompute is not a known attribute

False

In [14]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output
import os

# Define the data split component
data_split_component = command(
    name="data_split",
    display_name="Dataset Train/Test Split",
    description="Splits input image datasets into training and testing sets based on a specified ratio.",
    # Define the inputs for the component
    inputs={
        "animal_1": Input(type="uri_folder", description="Path to the first animal image dataset."),
        "animal_2": Input(type="uri_folder", description="Path to the second animal image dataset."),
        "animal_3": Input(type="uri_folder", description="Path to the third animal image dataset."),
        "train_test_split_factor": Input(type="number", description="Percentage of data to use for testing (e.g., 0.2 for 20%)."),
    },
    # Define the outputs for the component
    outputs={
        "training_data": Output(type="uri_folder", mode="rw_mount", description="Output path for the training dataset."),
        "testing_data": Output(type="uri_folder", mode="rw_mount", description="Output path for the testing dataset."),
    },
    # specify source code dir relative to where we are right now in this notebook
    code=os.path.join("components", "dataprep", "code"),
    # mapping input and output to script arguments
    command="""python traintestsplit.py \
            --datasets ${{inputs.animal_1}} ${{inputs.animal_2}} ${{inputs.animal_3}} \
            --split_size ${{inputs.train_test_split_factor}} \
            --training_data_output ${{outputs.training_data}} \
            --testing_data_output ${{outputs.testing_data}} \
            """,
    # command for running this 
    environment=f"aml-Pillow@latest",
)

# this is how to register a component to a workspace
data_split_component = ml_client.create_or_update(data_split_component.component)

# check by printing for comformation
print(
    f"Component {data_split_component.name} with Version {data_split_component.version} is registered."
)


Component data_split with Version 2025-05-30-17-29-58-7844951 is registered.


using the above component in a pipeline:

In [15]:
from azure.ai.ml import dsl, Input, Output

@dsl.pipeline(
    compute="cpu-daymon-auto",
    description="Custom data_split pipeline",
)
def animal_images_traintest_split_pipeline(
    train_test_split: int, # Currently we don't use these version numbers, but we will use them later on.
    animal_1: Input,
    animal_2: Input,
    animal_3: Input,
):
    # using data_prep_function like a python call with its own inputs
    # These are the animals with the version name as a second item in the tuple

    # Combining arguments starting with "animals_" into a dictionary
    animals_args = {k: v for k, v in locals().items() if k.startswith("animals_")}

    # Create a component instance by calling the component factory
    data_split_job = data_split_component(
            animal_1=animal_1,
            animal_2=animal_2,
            animal_3=animal_3,
            train_test_split_factor=train_test_split
        )
    
    # Override the training data output and testing data output to a file named "trainingdata" and "testingdata
    data_split_job.outputs.training_data = Output(
        type="uri_folder",
        name="training_data",
        mode="rw_mount"
    )
    data_split_job.outputs.testing_data = Output(
        type="uri_folder",
        name="testing_data",
        mode="rw_mount"
    )

    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        "training_data": data_split_job.outputs.training_data,
        "testing_data": data_split_job.outputs.testing_data
    }

In [16]:
import os
from azure.ai.ml import Input 

version = "1" 
animals = ["pandas", "cats", "dogs"]

animals_datasets = {
    f"animal_{i+1}": Input(type="uri_folder", path=f"azureml:{animal}_resized:{version}")
    for i, animal in enumerate(animals)
}

split_percentage_for_pipeline = 20


print("Creating pipeline job instance...")
pipeline_job = animal_images_traintest_split_pipeline(
    train_test_split=split_percentage_for_pipeline,
    **animals_datasets # **animal_dataset_inputs unpacks the dictionary into keyword arguments (we finally get to use it again)
)

pipeline_job.experiment_name = "data_preparation_and_split_pipeline_run"
pipeline_job.display_name = "Animal Image Train-Test Split Pipeline"

returned_pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job,
    wait=True 
)

print(f"\nPipeline Job submitted: {returned_pipeline_job.name}")
print(f"Pipeline Job Status: {returned_pipeline_job.status}")
print(f"Pipeline Job URL: {returned_pipeline_job.studio_url}")



pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored



Pipeline Job submitted: gifted_fennel_zpshq4757h
Pipeline Job Status: NotStarted
Pipeline Job URL: https://ml.azure.com/runs/gifted_fennel_zpshq4757h?wsid=/subscriptions/6a36bb7a-aee5-4e15-a3c7-2e362d2c2387/resourcegroups/mlops-demo/workspaces/pollet-daymon-ml&tid=4f3f75e5-d447-48c8-9483-c82b6c655896


## Training pipeline

In [17]:
from azure.ai.ml.entities import Environment
import os

custom_env_name = "aml-Tensorflow-Pillow"

pipeline_job_env = Environment(
    name=custom_env_name,
    description="Custom environment for AI Training (with Pillow)",
    tags={"Pillow": "10.0.1", "Tensorflow": "2.4.1"},
    conda_file=os.path.join("components", "training", "conda.yaml"),
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

Environment with name aml-Tensorflow-Pillow is registered to workspace, the environment version is 2


define the training component

In [18]:
import os
from azure.ai.ml import command
from azure.ai.ml import Input, Output

training_component = command(
    name="training",
    display_name="Training an AI model",
    description="Trains an AI model by inputting a lot of training and testing data with configurable hyperparameters.",
    inputs={
        "training_folder": Input(type="uri_folder"),
        "testing_folder": Input(type="uri_folder"),
        "epochs": Input(type="integer"), # Changed to integer as epochs are typically integers
        # --- ADDED NEW DYNAMIC INPUTS HERE ---
        "seed": Input(type="integer", default=42, description="Random seed for reproducibility."),
        "initial_learning_rate": Input(type="number", default=0.001, description="Initial learning rate for optimizer."),
        "batch_size": Input(type="integer", default=32, description="Batch size for training."),
        "patience": Input(type="integer", default=5, description="Patience for early stopping."),
        "model_name": Input(type="string", default="animal-cnn", description="Name for the saved AI model."),
    },
    outputs={
        "output_folder": Output(type="uri_folder", mode="rw_mount"),
    },
    code=os.path.join("components", "training", "code"),
    command="""python train.py \
            --training_folder ${{inputs.training_folder}} \
            --testing_folder ${{inputs.testing_folder}} \
            --output_folder ${{outputs.output_folder}} \
            --epochs ${{inputs.epochs}} \
            --seed ${{inputs.seed}} \
            --initial_learning_rate ${{inputs.initial_learning_rate}} \
            --batch_size ${{inputs.batch_size}} \
            --patience ${{inputs.patience}} \
            --model_name ${{inputs.model_name}} \
            """,
    environment=f"aml-Tensorflow-Pillow@latest",
)

register the training component

In [19]:
training_component = ml_client.create_or_update(training_component.component)

print(
    f"Component {training_component.name} with Version {training_component.version} is registered."
)

Component training with Version 2025-05-30-17-30-10-0222014 is registered.


create the pipeline for this component

In [20]:
# the dsl decorator tells the sdk that we are defining an Azure ML pipeline
from azure.ai.ml import dsl, Input, Output

@dsl.pipeline(
    compute="cpu-daymon-auto",
    description="Custom Animals Training pipeline",
)
def animals_training_pipeline(
    training_folder: Input, # Currently we don't use these version numbers, but we will use them later on.
    testing_folder: Input,
    epochs: int,
):

    training_job = training_component(
        training_folder=training_folder,
        testing_folder=testing_folder,
        epochs=epochs
    )
    
    # Let Azure decide a unique place everytime
    training_job.outputs.output_folder = Output(
        type="uri_folder",
        name="output_data",
        mode="rw_mount"
    )


    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        "output_data": training_job.outputs.output_folder,
    }

instantiate the pipeline

In [21]:
# Let's instantiate the pipeline with the parameters of our choice

# Make sure to use the correct version number here!
training_pipeline = animals_training_pipeline(
    # Change these versions if you want to override the choices
    training_folder=Input(type="uri_folder", path=f"azureml:training_data:1"),
    testing_folder=Input(type="uri_folder", path=f"azureml:testing_data:1"),
    epochs=5 # This isn't super important now, the quality of the AI model isn't the most important, so training it longer will just waste resources anyways ...
)

create the pipeline

In [22]:
import webbrowser
# submit the pipeline job
training_pipeline_job = ml_client.jobs.create_or_update(
    training_pipeline,
    # Project's name
    experiment_name="training_pipeline",
)
# open the pipeline in web browser
webbrowser.open(training_pipeline_job.studio_url)

pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored


False

## Wrapping everything into one pipeline

In [23]:
import os
from azure.ai.ml import dsl, Input, Output


# --- Define the Master End-to-End Pipeline ---

@dsl.pipeline(
    compute="cpu-daymon-auto", 
    description="Master End-to-End Animal Image Classification Pipeline - Chaining Components Directly",
    default_compute="cpu-daymon-auto", 
    display_name="Master Animal ML Pipeline (Components Chained)"
)
def master_animal_ml_pipeline_from_components(
    pandas_raw_data_input: Input,
    cats_raw_data_input: Input,
    dogs_raw_data_input: Input,
    
    train_test_split_percentage: int = 20, 
    
    epochs: int = 50, 
    seed: int = 42,
    initial_learning_rate: float = 0.001,
    batch_size: int = 32,
    patience: int = 5,
    final_model_name: str = "master_animal_classifier_direct_chain", 
):
    # --- Step 1: Image Preprocessing (Resizing) using data_prep_component ---
    pandas_resized_job = data_prep_component(data=pandas_raw_data_input)
    cats_resized_job = data_prep_component(data=cats_raw_data_input)
    dogs_resized_job = data_prep_component(data=dogs_raw_data_input)

    # --- Step 2: Train/Test Split using data_split_component ---
    split_job = data_split_component(
        animal_1=pandas_resized_job.outputs.output_data, 
        animal_2=cats_resized_job.outputs.output_data,   
        animal_3=dogs_resized_job.outputs.output_data,   
        train_test_split_factor=train_test_split_percentage 
    )
    split_job.outputs.training_data = Output(type="uri_folder", name="combined_training_data", mode="rw_mount")
    split_job.outputs.testing_data = Output(type="uri_folder", name="combined_testing_data", mode="rw_mount")


    # --- Step 3: Model Training using training_component ---

    training_job = training_component(
        training_folder=split_job.outputs.training_data, 
        testing_folder=split_job.outputs.testing_data,  
        epochs=epochs,
        seed=seed,
        initial_learning_rate=initial_learning_rate,
        batch_size=batch_size,
        patience=patience,
        model_name=final_model_name, 
    )

    # to be used by the 'train.py' script for model registration.
    training_job.outputs.output_folder = Output(
        type="uri_folder",
        name="trained_model_artifacts", # Use a static, descriptive name for the output folder
        mode="rw_mount"
    )

    return {
        "final_model_output_asset": training_job.outputs.output_folder, 
        "final_training_data_asset": split_job.outputs.training_data,
        "final_testing_data_asset": split_job.outputs.testing_data,
    }

In [24]:
raw_animal_inputs_for_master_pipeline = {
    "pandas_raw_data_input": Input(type="uri_folder", path="azureml:pandas:1"),
    "cats_raw_data_input": Input(type="uri_folder", path="azureml:cats:1"),
    "dogs_raw_data_input": Input(type="uri_folder", path="azureml:dogs:1")
}

print("Creating master end-to-end pipeline job instance (chaining components directly)...")
master_components_pipeline_job_instance = master_animal_ml_pipeline_from_components(
    **raw_animal_inputs_for_master_pipeline, 
    train_test_split_percentage=25, 
    epochs=75, 
    seed=12345, 
    initial_learning_rate=0.0001, 
    batch_size=128, 
    patience=15, 
    final_model_name="my_e2e_animal_classifier_direct_chain_v1"
)

master_components_pipeline_job_instance.experiment_name = "master_e2e_component_chaining"
master_components_pipeline_job_instance.display_name = "Master Animal ML Pipeline (Direct Component Chain)"

print(f"\nSubmitting the master pipeline job '{master_components_pipeline_job_instance.display_name}'...")
returned_master_components_pipeline_job = ml_client.jobs.create_or_update(
    master_components_pipeline_job_instance,
    wait=True 
)

print(f"\nMaster Chained Components Pipeline Job submitted: {returned_master_components_pipeline_job.name}")
print(f"Master Chained Components Pipeline Job Status: {returned_master_components_pipeline_job.status}")
print(f"Master Chained Components Pipeline Job URL: {returned_master_components_pipeline_job.studio_url}")

Creating master end-to-end pipeline job instance (chaining components directly)...

Submitting the master pipeline job 'Master Animal ML Pipeline (Direct Component Chain)'...

Master Chained Components Pipeline Job submitted: boring_hamster_bw56rs21r6
Master Chained Components Pipeline Job Status: NotStarted
Master Chained Components Pipeline Job URL: https://ml.azure.com/runs/boring_hamster_bw56rs21r6?wsid=/subscriptions/6a36bb7a-aee5-4e15-a3c7-2e362d2c2387/resourcegroups/mlops-demo/workspaces/pollet-daymon-ml&tid=4f3f75e5-d447-48c8-9483-c82b6c655896


pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
