# Create Handler

In [1]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

# authenticate
credential = DefaultAzureCredential()
SUBSCRIPTION=<subscription id>
RESOURCE_GROUP=<name of resource group>
WS_NAME=<Name of Workspace>

# Get a handle to the workspace
ml_client = MLClient(
    credential=credential,
    subscription_id=SUBSCRIPTION,
    resource_group_name=RESOURCE_GROUP,
    workspace_name=WS_NAME,
)

In [2]:
# Verify that the handle works correctly.
ws = ml_client.workspaces.get(WS_NAME)
print(ws.location, ":", ws.resource_group)

northeurope : from_model_to_production


# Create component 1: data prep

In [3]:
import os

data_prep_src_dir = "./components_pred/data_prep"
os.makedirs(data_prep_src_dir, exist_ok=True)

In [4]:
%%writefile {data_prep_src_dir}/data_prep.py
import os
import argparse
import pandas as pd
from sklearn.model_selection import train_test_split
import mlflow
from PIL import Image
import cv2
import numpy as np

def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--data", type=str, help="path to input data")
    parser.add_argument("--train_data", type=str, help="path to train data")
    parser.add_argument("--test_data", type=str, help="path to test data")
    args = parser.parse_args()

    # Start Logging
    mlflow.start_run()

    data_dir = args.data
    folder_paths = [os.path.join(data_dir, d) for d in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, d))]

    images = []
    file_names = []  # List to store file names

    for folder in folder_paths:
        image_files = [os.path.join(folder, f) for f in os.listdir(folder) if f.endswith('.jpg') or f.endswith('.png')]

        for img_file in image_files:
            img = Image.open(img_file)  # Open image using PIL
            img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)  # Convert to BGR format
            img = cv2.resize(img, (64, 64))  # Resize image to 64x64 pixels
            img = img.flatten()  # Flatten the image array into a single row of pixel data

            images.append(img)
            file_names.append(os.path.basename(img_file))  # Save the image file name

    # Convert lists to numpy arrays
    images = np.array(images)
    file_names = np.array(file_names)  # File names as an array

    # Split the data into training and testing datasets (without labels)
    X_train, X_test, file_names_train, file_names_test = train_test_split(
        images, file_names, test_size=0.2, train_size=0.8, random_state=42
    )

    # Create DataFrames for training and testing data
    train_df = pd.DataFrame({
        "images": list(X_train),
        "file_name": file_names_train  # Add the file names
    })

    test_df = pd.DataFrame({
        "images": list(X_test),
        "file_name": file_names_test  # Add the file names
    })

    # Save the train and test datasets as pickle files
    train_df.to_pickle(os.path.join(args.train_data, "train_data.pkl"))
    test_df.to_pickle(os.path.join(args.test_data, "test_data.pkl"))

    # Stop Logging
    mlflow.end_run()


if __name__ == "__main__":
    main()

Overwriting ./components_pred/data_prep/data_prep.py


In [5]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

data_prep_component = command(
    name="data_proc_img_pred",
    display_name="This pipeline tries to register the csv as data sset",
    description="reads a URI_FOLDER input, split the input to train and test",
    inputs={
        "data": Input(type="uri_folder"),
    },
    outputs=dict(
        train_data=Output(type="uri_folder", mode="rw_mount"),
        test_data=Output(type="uri_folder", mode="rw_mount"),
    ),
    # The source folder of the component
    code=data_prep_src_dir,
    command="""python data_prep.py \
            --data ${{inputs.data}} \
            --train_data ${{outputs.train_data}} --test_data ${{outputs.test_data}} \
            """,
    environment="aml-scikit-learn@latest",
)

In [6]:
# Now we register the component to the workspace
data_prep_component = ml_client.create_or_update(data_prep_component.component)

# Create (register) the component in your workspace
print(
    f"Component {data_prep_component.name} with Version {data_prep_component.version} is registered"
)

Component data_proc_img_pred with Version 2024-09-29-17-12-11-2339491 is registered


# Create component 2: Prediction

In [7]:
import os

predict_src_dir = "./components_pred/predict"
os.makedirs(predict_src_dir, exist_ok=True)

In [8]:
%%writefile {predict_src_dir}/predict.py
import os
import argparse
import pandas as pd
import mlflow
from mlflow.tracking import MlflowClient
import numpy as np


from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

# authenticate
credential = DefaultAzureCredential()
SUBSCRIPTION=<subscription id>
RESOURCE_GROUP=<name of resource group>
WS_NAME=<Name of Workspace>

# Get a handle to the workspace
ml_client = MLClient(
    credential=credential,
    subscription_id=SUBSCRIPTION,
    resource_group_name=RESOURCE_GROUP,
    workspace_name=WS_NAME,
)



def main():
    """Main function of the prediction script."""

    # Parse input arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--model_name", type=str, help="Registered model name")
    parser.add_argument("--model_version", type=str, help="Model version")
    parser.add_argument("--test_data", type=str, help="Path to the test data (without labels)")
    parser.add_argument("--predictions_output", type=str, help="Path to save the predictions CSV file")
    args = parser.parse_args()

    # Start MLFlow run
    mlflow.start_run()

    # Load the registered model
    model_uri = f"models:/{args.model_name}/{args.model_version}" if args.model_version else f"models:/{args.model_name}/latest"
    print(f"Loading model from {model_uri}")
    model = mlflow.sklearn.load_model(model_uri)

    # Load the test data (preprocessed images)
    test_data = pd.read_pickle(os.path.join(args.test_data, "test_data.pkl"))
    X_test = np.stack(test_data["images"].values)  # Convert image data to numpy array
    file_names = test_data["file_name"].values  # Get corresponding file names

    # Make predictions
    print(f"Making predictions on test data of shape {X_test.shape}")
    y_pred_proba = model.predict_proba(X_test)  # Get prediction probabilities

    # Prepare output DataFrame (file name and predicted probabilities)
    predictions_df = pd.DataFrame(data=y_pred_proba, columns=[f"label_{i}_prob" for i in range(y_pred_proba.shape[1])])
    predictions_df.insert(0, "file_name", file_names)  # Insert file name column as the first column

    
    # Save predictions to CSV
    output_csv_path = os.path.join(args.predictions_output, "predictions.csv")
    predictions_df.to_csv(output_csv_path, index=False)
    
    
    
    # Path to the data we just downloaded
    data_path = output_csv_path

    # Version for dataset we are creating
    data_version = "1"

    offline_data_asset = Data(
        name="animal_predictions",
        version=data_version,
        description="This the result of the prediction pipeline",
        path=data_path,
        type=AssetTypes.URI_FILE,
    )

    # Create data asset on Azure ML
    online_data_asset = ml_client.data.create_or_update(offline_data_asset)
    
    
    
    print(f"Predictions saved to {output_csv_path}")

    # End MLFlow run
    mlflow.end_run()


if __name__ == "__main__":
    main()


Overwriting ./components_pred/predict/predict.py


In [9]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

predict_component = command(
    name="predict_animal_img",
    display_name="Predictions for Animal Image Classification",
    description="This component creates a CSV that contains predictions for different images",
    # Inputs: test data and model name, and optionally model version
    inputs={
        "test_data": Input(type="uri_folder"),
        "model_name": Input(type="string"),
    },
    # Output: path to save the predictions
    outputs=dict(
        predictions_output=Output(type="uri_folder", mode="rw_mount"),
    ),
    # The source folder containing the prediction script
    code=predict_src_dir,  # Replace with your actual path if necessary
    command="""python predict.py \
            --test_data ${{inputs.test_data}} \
            --model_name ${{inputs.model_name}} \
            --predictions_output ${{outputs.predictions_output}}""",
    environment="aml-scikit-learn@latest",
)


In [10]:
# Now we register the component to the workspace
predict_component = ml_client.create_or_update(predict_component.component)

# Create (register) the component in your workspace
print(
    f"Component {predict_component.name} with Version {predict_component.version} is registered"
)

[32mUploading predict (0.0 MBs): 100%|██████████| 3158/3158 [00:00<00:00, 642554.19it/s]
[39m



Component predict_animal_img with Version 2024-09-29-17-12-13-7599307 is registered


# Create the pipeline from components

In [11]:
from azure.ai.ml import dsl, Input, Output

@dsl.pipeline(
    compute="MLprodComputeCheap",
    description="Prediction Pipeline on animal images. this verion creates a datasset with the CSV",
)
def animal_img_prediction_pipeline(
    pipeline_job_data_input,  # The input data (images)
    pipeline_job_registered_model_name,  # The registered model name
):
    # using data_prep_function like a python call with its own inputs
    data_prep_job = data_prep_component(
        data=pipeline_job_data_input,
    )

    # using predict_component to predict using the pre-trained model
    predict_job = predict_component(
        test_data=data_prep_job.outputs.test_data,  # note: using test_data from data_prep_job
        model_name=pipeline_job_registered_model_name,
    )

    # Returning predictions output from the predict_job
    return {
        "predictions_output": predict_job.outputs.predictions_output,
    }


In [12]:
# Define the registered model name
registered_model_name = "image_classification_model"

# Define the image data input (your URI path)
img_data = <Azure Path to Blob storage to read data from>

# Let's instantiate the prediction pipeline with the parameters of our choice
pipeline = animal_img_prediction_pipeline(
    pipeline_job_data_input=Input(type="uri_folder", path=img_data),  # Data input
    pipeline_job_registered_model_name=registered_model_name,  # Model name
    # Optionally, you can pass model version if needed (defaults to 'latest')
    # pipeline_job_model_version="1",  # Uncomment this if you want a specific version
)

In [13]:
# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    # Project's name
    experiment_name="animal_img_prediction",
)
ml_client.jobs.stream(pipeline_job.name)

Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
pathOnCompute is not a known attribute

RunId: nice_music_9mb8zjzb21
Web View: https://ml.azure.com/runs/nice_music_9mb8zjzb21?wsid=/subscriptions/28cf8bdb-6861-4764-a3b5-46cfa8cf8d81/resourcegroups/from_model_to_production/workspaces/from_model_to_production

Streaming logs/azureml/executionlogs.txt

[2024-09-29 17:12:19Z] Submitting 1 runs, first five are: 960caeba:7922e8c9-a15e-416d-9135-a8bfbb9bdcc7
[2024-09-29 17:12:21Z] Completing processing run id 7922e8c9-a15e-416d-9135-a8bfbb9bdcc7.
[2024-09-29 17:12:22Z] Submitting 1 runs, first five are: d59c7d0a:385c455b-ad91-4a55-9cfc-404c47e2ae72
[2024-09-29 17:12:50Z] Completing processing run id 385c455b-ad91-4a55-9cfc-404c47e2ae72.

Execution Summary
RunId: nice_music_9mb8zjzb21
Web View: https://ml.azure.com/runs/nice_music_9mb8zjzb21?wsid=/subscriptions/28cf8bdb-6861-4764-a3b5-46cfa8cf8d81/resourcegroups/from_model_to_production/workspaces/from_model_to_production

