In [1]:
# import required libraries
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential

from azure.ai.ml import MLClient
from azure.ai.ml.dsl import pipeline
from azure.ai.ml import load_component

In [2]:
import os
## Either get environment variables, or a fallback name, which is the second parameter.
## Currently, fill in the fallback values. Later on, we will make sure to work with Environment values. So we're already preparing for it in here!
workspace_name = os.environ.get('WORKSPACE', 'mlops-nathan')
subscription_id = os.environ.get('SUBSCRIPTION_ID', '7c50f9c3-289b-4ae0-a075-08784b3b9042')
resource_group = os.environ.get('RESOURCE_GROUP', 'mlops')

In [3]:
# Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
credential = InteractiveBrowserCredential()

In [4]:
ml_client = MLClient(
    credential, subscription_id, resource_group, workspace_name
)

Class FeatureStoreOperations: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class FeatureSetOperations: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class FeatureStoreEntityOperations: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


## Prepare a Virtual PC if needed

In [5]:
from azure.ai.ml.entities import AmlCompute

cpu_compute_target = "cpu-nathan"

# let's see if the compute target already exists
cpu_machine = ml_client.compute.get(cpu_compute_target)
print(
    f"You already have a machine named {cpu_compute_target}, we'll reuse it as is."
)

You already have a machine named cpu-nathan, we'll reuse it as is.


In [6]:
from azure.ai.ml.entities import Environment
import os

custom_env_name = "aml-Pillow"

pipeline_job_env = Environment(
    name=custom_env_name,
    description="Custom environment for Image Processing (with Pillow)",
    tags={"Pillow": "0.0.1"},
    conda_file=os.path.join("components", "dataprep", "conda.yaml"),
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
    version="0.1.0",
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

Environment with name aml-Pillow is registered to workspace, the environment version is 0.1.0


In [7]:
from azure.ai.ml import Input

In [8]:
import os

data_prep_src_dir = "./components/dataprep"
os.makedirs(data_prep_src_dir, exist_ok=True)

## Data Prep

In [9]:
%%writefile {data_prep_src_dir}/dataprep.py
import os
import argparse
import logging
import mlflow
from glob import glob
from PIL import Image



def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--data", type=str, help="path to input data")
    parser.add_argument("--output_data", type=str, help="path to output data")
    args = parser.parse_args()

    # Start Logging
    mlflow.start_run()

    print(" ".join(f"{k}={v}" for k, v in vars(args).items()))

    print("input data:", args.data)
    print("output folder:", args.output_data)

    output_dir = args.output_data
    size = (64, 64) # Later we can also pass this as a property


    for file in glob(args.data + "/*.jpg"):
        img = Image.open(file)
        img_resized = img.resize(size)

        # Save the resized image to the output directory
        output_file = os.path.join(output_dir, os.path.basename(file))
        img_resized.save(output_file)


    # Stop Logging
    mlflow.end_run()


if __name__ == "__main__":
    main()

Overwriting ./components/dataprep/dataprep.py


In [10]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

data_prep_component = command(
    name="data_prep_image_resize",
    display_name="Data preparation, Image Resizing",
    description="Reads a data asset of images and preprocesses them by resizing them to 64 to 64.",
    inputs={
        "data": Input(type="uri_folder"),
    },
    outputs={
        "output_data": Output(type="uri_folder", mode="rw_mount")
    },
    # The source folder of the component
    code=data_prep_src_dir,
    command="""python dataprep.py \
            --data ${{inputs.data}} \
            --output_data ${{outputs.output_data}} \
            """,
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
)

In [11]:
# Now we register the component to the workspace
data_prep_component = ml_client.create_or_update(data_prep_component.component)

# Create (register) the component in your workspace
print(
    f"Component {data_prep_component.name} with Version {data_prep_component.version} is registered"
)

Component data_prep_image_resize with Version 2023-05-15-07-10-44-9498611 is registered


In [12]:
from typing import List

In [13]:
# the dsl decorator tells the sdk that we are defining an Azure ML pipeline
from azure.ai.ml import dsl, Input, Output

@dsl.pipeline(
    compute=cpu_compute_target,
    description="Custom data_prep pipeline",
)
def animal_images_preprocessing_pipeline(
    input_version: str, # Currently we don't use these version numbers, but we will use them later on.
    output_version: str,
):
    # using data_prep_function like a python call with its own inputs
    # These are the animals with the version name as a second item in the tuple
    animals = [
        ('pandas', "1"),
        ('cats', "1"),
        ('dogs', "1")
    ] # They are hardcoded in here, because we should give them from another component otherwise.
    
    jobs = {}
    for animal in animals:

        data_prep_job = data_prep_component(
            data=Input(
                type="uri_folder",
                path=f"azureml:{animal[0]}:{[animal[1]]}"
            ),
        )
        
        output_name = animal[0] + "_resized"
        output_path = "azureml://subscriptions/7c50f9c3-289b-4ae0-a075-08784b3b9042/resourcegroups/mlops/workspaces/mlops-nathan/datastores/workspaceblobstore/paths/processed_animals/" + animal[0]

        data_prep_job.outputs.output_data = Output(
            type="uri_folder",
            path=output_path,
            name=output_name,
            mode="rw_mount"
        )

        jobs[animal[0]] = data_prep_job

    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        k: v.outputs.output_data for k,v in jobs.items()
    }

In [14]:

# Let's instantiate the pipeline with the parameters of our choice
pipeline = animal_images_preprocessing_pipeline()

In [23]:
import webbrowser

# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    # Project's name
    experiment_name="image_preprocessing_pipeline",
)
# open the pipeline in web browser
webbrowser.open(pipeline_job.studio_url)

True

## Train test split

In [33]:
%%writefile {data_prep_src_dir}/traintestsplit.py
import os
import argparse
import logging
import mlflow
from glob import glob
import math
import random

def main():
    """Main function of the script."""

    SEED = 42

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--datasets", type=str, nargs="+", help="All the datasets to combine")
    parser.add_argument("--training_data_output", type=str, help="path to training output data")
    parser.add_argument("--testing_data_output", type=str, help="path to testing output data")
    parser.add_argument("--split_size", type=int, help="Percentage to use as Testing data")
    args = parser.parse_args()

    # Start Logging
    mlflow.start_run()

    print(" ".join(f"{k}={v}" for k, v in vars(args).items()))

    print("input data:", args.datasets)
    print("Training folder:", args.training_data_output)
    print("Testing folder:", args.testing_data_output)
    print("Split size:", args.split_size)

    train_test_split_factor = args.split_size / 100 # Alias
    datasets = args.datasets

    training_datapaths = []
    testing_datapaths = []


    for dataset in datasets:
        animal_images = glob(dataset + "/*.jpg")
        print(f"Found {len(animal_images)} images for {dataset}")

        ## Concatenate the names for the animal_name and the img_path. Don't put a / between, because the img_path already contains that
        ## animal_images = [(default_datastore, f'processed_animals/{animal_name}{img_path}') for img_path in animal_images] # Make sure the paths are actual DataPaths
    
        random.seed(SEED) # Use the same random seed as I use and defined in the earlier cells
        random.shuffle(animal_images) # Shuffle the data so it's randomized

        ## Testing images
        amount_of_test_images = math.ceil(len(animal_images) * train_test_split_factor) # Get a small percentage of testing images

        animal_test_images = animal_images[:amount_of_test_images]
        animal_training_images = animal_images[amount_of_test_images:]

        # Add them all to the other ones
        testing_datapaths.extend(animal_test_images)
        training_datapaths.extend(animal_training_images)

        print(testing_datapaths[:5])

        # Write the data to the output
        for img in animal_test_images:
            # Open the img, which is a string filepath, then save it to the args.testing_data_output directory
            with open(img, "rb") as f:
                with open(os.path.join(args.testing_data_output, os.path.basename(img)), "wb") as f2:
                    f2.write(f.read())

        for img in animal_training_images:
            # Open the img, which is a string filepath, then save it to the args.testing_data_output directory
            with open(img, "rb") as f:
                with open(os.path.join(args.training_data_output, os.path.basename(img)), "wb") as f2:
                    f2.write(f.read())
       
    # Stop Logging
    mlflow.end_run()


if __name__ == "__main__":
    main()

Overwriting ./components/dataprep/traintestsplit.py


In [34]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

data_split_component = command(
    name="data_split",
    display_name="Data Splitting to Train and Test",
    description="Reads a data asset of images and combines them into a training and testing dataset",
    inputs={
        "animal_1": Input(type="uri_folder"),
        "animal_2": Input(type="uri_folder"),
        "animal_3": Input(type="uri_folder"),
        "train_test_split_factor": Input(type="number") # The percentage of the data to use as testing data, always a positive value
    },
    outputs={
        "training_data": Output(type="uri_folder", mode="rw_mount"),
        "testing_data": Output(type="uri_folder", mode="rw_mount")
    },
    # The source folder of the component
    code=data_prep_src_dir,
    command="""python traintestsplit.py \
            --datasets ${{inputs.animal_1}} ${{inputs.animal_2}} ${{inputs.animal_3}} \
            --training_data ${{outputs.training_data}} \
            --testing_data ${{outputs.testing_data}} \
            --split_size ${{inputs.train_test_split_factor}}
            """,
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
)

In [35]:
# Now we register the component to the workspace
data_split_component = ml_client.create_or_update(data_split_component.component)

# Create (register) the component in your workspace
print(
    f"Component {data_split_component.name} with Version {data_split_component.version} is registered"
)

[32mUploading dataprep (0.0 MBs): 100%|██████████| 4374/4374 [00:00<00:00, 14333.70it/s]
[39m



Component data_split with Version 2023-05-15-07-33-41-2937143 is registered


In [44]:
# the dsl decorator tells the sdk that we are defining an Azure ML pipeline
from azure.ai.ml import dsl, Input, Output

@dsl.pipeline(
    compute=cpu_compute_target,
    description="Custom data_prep pipeline",
)
def animal_images_traintest_split_pipeline(
    train_test_split: int, # Currently we don't use these version numbers, but we will use them later on.
    animal_1: Input,
    animal_2: Input,
    animal_3: Input,
):
    # using data_prep_function like a python call with its own inputs
    # These are the animals with the version name as a second item in the tuple

    # This doesnt work yet, I tried to get it to work, but I haven't found out how it works.
    # Combining arguments starting with "animals_" into a dictionary
    animals_args = {k: v for k, v in locals().items() if k.startswith("animals_")}


    data_split_job = data_split_component(
            animal_1=animal_1,
            animal_2=animal_2,
            animal_3=animal_3,
            train_test_split_factor=train_test_split
        )
    
    # Let Azure decide a unique place everytime
    data_split_job.outputs.training_data = Output(
        type="uri_folder",
        name="training_data",
        mode="rw_mount"
    )
    data_split_job.outputs.testing_data = Output(
        type="uri_folder",
        name="testing_data",
        mode="rw_mount"
    )


    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        "training_data": data_split_job.outputs.training_data,
        "testing_data": data_split_job.outputs.testing_data
    }

In [45]:

# Let's instantiate the pipeline with the parameters of our choice
version = "2"
animals = ["pandas", "cats", "dogs"]

animals_datasets = {
    f"animal_{i+1}": Input(type="uri_folder", path=f"azureml:{animal}_resized:{version}")
    for i, animal in enumerate(animals)
}

print(animals_datasets)

train_test_pipeline = animal_images_traintest_split_pipeline(
    **animals_datasets,
    train_test_split=20
)

{'animal_1': {'type': 'uri_folder', 'path': 'azureml:pandas_resized:2'}, 'animal_2': {'type': 'uri_folder', 'path': 'azureml:cats_resized:2'}, 'animal_3': {'type': 'uri_folder', 'path': 'azureml:dogs_resized:2'}}


In [46]:
import webbrowser

In [47]:
# submit the pipeline job
train_test_pipeline_job = ml_client.jobs.create_or_update(
    train_test_pipeline,
    # Project's name
    experiment_name="image_preprocessing_pipeline",
)
# open the pipeline in web browser
webbrowser.open(train_test_pipeline_job.studio_url)

True

## Training

In [48]:
import os

training_src_dir = "./components/training"
os.makedirs(training_src_dir, exist_ok=True)

In [74]:
from azure.ai.ml.entities import Environment
import os

custom_env_name = "aml-Tensorflow-Pillow"

pipeline_job_env = Environment(
    name=custom_env_name,
    description="Custom environment for AI Training (with Pillow)",
    tags={"Pillow": "0.0.1", "Tensorflow": "2.4.1"},
    conda_file=os.path.join("components", "training", "conda.yaml"),
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
    version="0.1.1",
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

Environment with name aml-Tensorflow-Pillow is registered to workspace, the environment version is 0.1.1


In [87]:
%%writefile {training_src_dir}/train.py

import argparse
import os
from glob import glob
import random
import tensorflow as tf
import mlflow

# This time we will need our Tensorflow Keras libraries, as we will be working with the AI training now
from tensorflow import keras
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.metrics import classification_report, confusion_matrix

# This AzureML package will allow to log our metrics etc.
from azureml.core import Run

# Important to load in the utils as well!
from utils import *


### HARDCODED VARIABLES FOR NOW
### TODO for the students:
### Make sure to adapt the ArgumentParser on line 31 to include these parameters
### You can base your answer on the lines that are already there

SEED = 42
INITIAL_LEARNING_RATE = 0.01
BATCH_SIZE = 32
PATIENCE = 11
model_name = 'animal-cnn-test'

def main():

    parser = argparse.ArgumentParser()
    parser.add_argument('--training_folder', type=str, dest='training_folder', help='training folder mounting point')
    parser.add_argument('--testing_folder', type=str, dest='testing_folder', help='testing folder mounting point')
    parser.add_argument('--output_folder', type=str, dest='output_folder', help='Output folder')
    parser.add_argument('--epochs', type=int, dest='epochs', help='The amount of Epochs to train')
    args = parser.parse_args()

    # Start Logging
    mlflow.start_run()

    print(" ".join(f"{k}={v}" for k, v in vars(args).items()))

    training_folder = args.training_folder
    print('Training folder:', training_folder)

    testing_folder = args.testing_folder
    print('Testing folder:', testing_folder)

    output_folder = args.output_folder
    print('Testing folder:', output_folder)

    MAX_EPOCHS = args.epochs

    # As we're mounting the training_folder and testing_folder onto the `/mnt/data` directories, we can load in the images by using glob.
    training_paths = glob(training_folder + "/*.jpg", recursive=True)
    testing_paths = glob(testing_folder + "/*.jpg", recursive=True)

    print("Training samples:", len(training_paths))
    print("Testing samples:", len(testing_paths))

    # Make sure to shuffle in the same way as I'm doing everything
    random.seed(SEED)
    random.shuffle(training_paths)
    random.seed(SEED)
    random.shuffle(testing_paths)

    print(training_paths[:3]) # Examples
    print(testing_paths[:3]) # Examples

    # Parse to Features and Targets for both Training and Testing. Refer to the Utils package for more information
    X_train = getFeatures(training_paths)
    y_train = getTargets(training_paths)

    X_test = getFeatures(testing_paths)
    y_test = getTargets(testing_paths)

    print('Shapes:')
    print(X_train.shape)
    print(X_test.shape)
    print(len(y_train))
    print(len(y_test))

    # Make sure the data is one-hot-encoded
    LABELS, y_train, y_test = encodeLabels(y_train, y_test)
    print('One Hot Shapes:')

    print(y_train.shape)
    print(y_test.shape)

    # Create an output directory where our AI model will be saved to.
    # Everything inside the `outputs` directory will be logged and kept aside for later usage.
    model_path = os.path.join(output_folder, model_name)
    os.makedirs(model_path, exist_ok=True)


    # Save the best model, not the last
    cb_save_best_model = keras.callbacks.ModelCheckpoint(filepath=model_path,
                                                            monitor='val_loss', 
                                                            save_best_only=True, 
                                                            verbose=1)

    # Early stop when the val_los isn't improving for PATIENCE epochs
    cb_early_stop = keras.callbacks.EarlyStopping(monitor='val_loss', 
                                                patience= PATIENCE,
                                                verbose=1,
                                                restore_best_weights=True)

    # Reduce the Learning Rate when not learning more for 4 epochs.
    cb_reduce_lr_on_plateau = keras.callbacks.ReduceLROnPlateau(factor=.5, patience=4, verbose=1)

    opt = tf.keras.optimizers.legacy.SGD(lr=INITIAL_LEARNING_RATE, decay=INITIAL_LEARNING_RATE / MAX_EPOCHS) # Define the Optimizer

    model = buildModel((64, 64, 3), 3) # Create the AI model as defined in the utils script.

    model.compile(loss="categorical_crossentropy", optimizer=opt, metrics=["accuracy"])

    # Construct & initialize the image data generator for data augmentation
    # Image augmentation allows us to construct “additional” training data from our existing training data 
    # by randomly rotating, shifting, shearing, zooming, and flipping. This is to avoid overfitting.
    # It also allows us to fit AI models using a Generator, so we don't need to capture the whole dataset in memory at once.
    aug = ImageDataGenerator(rotation_range=30, width_shift_range=0.1,
                            height_shift_range=0.1, shear_range=0.2, zoom_range=0.2,
                            horizontal_flip=True, fill_mode="nearest")


    # train the network
    history = model.fit( aug.flow(X_train, y_train, batch_size=BATCH_SIZE),
                            validation_data=(X_test, y_test),
                            steps_per_epoch=len(X_train) // BATCH_SIZE,
                            epochs=MAX_EPOCHS,
                            callbacks=[cb_save_best_model, cb_early_stop, cb_reduce_lr_on_plateau] )

    print("[INFO] evaluating network...")
    predictions = model.predict(X_test, batch_size=32)
    print(classification_report(y_test.argmax(axis=1), predictions.argmax(axis=1), target_names=['cats', 'dogs', 'panda'])) # Give the target names to easier refer to them.
    # If you want, you can enter the target names as a parameter as well, in case you ever adapt your AI model to more animals.

    cf_matrix = confusion_matrix(y_test.argmax(axis=1), predictions.argmax(axis=1))
    print(cf_matrix)

    ### TODO for students
    ### Find a way to log more information to the Run context.

    # Save the confusion matrix to the outputs.
    np.save(os.path.join(output_folder, '/confusion_matrix.npy'), cf_matrix)

    print("DONE TRAINING")

    # Stop Logging
    mlflow.end_run()


if __name__ == "__main__":
    main()

Overwriting ./components/training/train.py


In [93]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

training_component = command(
    name="training",
    display_name="Training an AI model",
    description="Trains an AI model by inputting a lot of training and testing data.",
    inputs={
        "training_folder": Input(type="uri_folder"),
        "testing_folder": Input(type="uri_folder"),
        "epochs": Input(type="number") # The percentage of the data to use as testing data, always a positive value
    },
    outputs={
        "output_folder": Output(type="uri_folder", mode="rw_mount"),
    },
    # The source folder of the component
    code=training_src_dir,
    command="""python train.py \
            --training_folder ${{inputs.training_folder}} \
            --testing_folder ${{inputs.testing_folder}} \
            --output_folder ${{outputs.output_folder}} \
            --epochs ${{inputs.epochs}} \
            """,
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
)

In [94]:
# Now we register the component to the workspace
training_component = ml_client.create_or_update(training_component.component)

# Create (register) the component in your workspace
print(
    f"Component {training_component.name} with Version {training_component.version} is registered"
)

[32mUploading training (0.01 MBs): 100%|██████████| 10244/10244 [00:00<00:00, 158264.55it/s]
[39m



Component training with Version 2023-05-16-08-58-57-7385595 is registered


In [95]:
# the dsl decorator tells the sdk that we are defining an Azure ML pipeline
from azure.ai.ml import dsl, Input, Output

@dsl.pipeline(
    compute=cpu_compute_target,
    description="Custom Animals Training pipeline",
)
def animals_training_pipeline(
    training_folder: Input, # Currently we don't use these version numbers, but we will use them later on.
    testing_folder: Input,
    epochs: int,
):

    training_job = training_component(
        training_folder=training_folder,
        testing_folder=testing_folder,
        epochs=epochs
    )
    
    # Let Azure decide a unique place everytime
    training_job.outputs.output_folder = Output(
        type="uri_folder",
        name="output_data",
        mode="rw_mount"
    )


    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        "output_data": training_job.outputs.output_folder,
    }

In [96]:

# Let's instantiate the pipeline with the parameters of our choice

training_pipeline = animals_training_pipeline(
    training_folder=Input(type="uri_folder", path=f"azureml:training_data:2"),
    testing_folder=Input(type="uri_folder", path=f"azureml:testing_data:12"),
    epochs=5
)

In [97]:
import webbrowser
# submit the pipeline job
training_pipeline_job = ml_client.jobs.create_or_update(
    training_pipeline,
    # Project's name
    experiment_name="training_pipeline",
)
# open the pipeline in web browser
webbrowser.open(training_pipeline_job.studio_url)

True