# Used Car Prices CarGurus on Azure: Create Datastore & Train Multilayer Perceptron

## Create an Azure Files Datastore
Connect to the Azure Machine Learning workspace with `MLClient`.

In [None]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

# Authenticate
credential = DefaultAzureCredential()

# Get a handle to the workspace
ml_client = MLClient(
    credential=credential,
    subscription_id='a134465f-eb15-4297-b902-3c97d4c81838',
    resource_group_name='aschultzdata',
    workspace_name='ds-ml-env',
)

Create the datastore specifying the name of the datastore, the description, the account, the name of the container, the protocol and the account key

In [None]:
from azure.ai.ml.entities import AzureBlobDatastore
from azure.ai.ml.entities import AccountKeyConfiguration

store = AzureBlobDatastore(
    name='usedcars_datastore',
    description='Datastore for Used Cars',
    account_name='dsmlenv8898281366',
    container_name='usedcarscargurus',
    protocol='https',
    credentials=AccountKeyConfiguration(
        account_key='XXXxxxXXXxXXXXxxXXXXXxXXXXXxXxxXxXXXxXXXxXXxxxXXxxXXXxXxXXXxxXxxXXXXxxxxxXXxxxxxxXXXxXXX'
    ),
)

ml_client.create_or_update(store)

AzureBlobDatastore({'type': <DatastoreType.AZURE_BLOB: 'AzureBlob'>, 'name': 'usedcars_datastore', 'description': 'Datastore for Used Cars', 'tags': {}, 'properties': {}, 'print_as_yaml': True, 'id': '/subscriptions/a134465f-eb15-4297-b902-3c97d4c81838/resourceGroups/aschultzdata/providers/Microsoft.MachineLearningServices/workspaces/ds-ml-env/datastores/usedcars_datastore', 'Resource__source_path': None, 'base_path': '/mnt/batch/tasks/shared/LS_root/mounts/clusters/cpu-standardds12v2/code/Users/aschultz.data/UsedCarsCarGurus/Models/DL/MLP', 'creation_context': None, 'serialize': <msrest.serialization.Serializer object at 0x7f543aae2740>, 'credentials': {'type': 'account_key'}, 'container_name': 'usedcarscargurus', 'account_name': 'dsmlenv8898281366', 'endpoint': 'core.windows.net', 'protocol': 'https'})

Then upload the train/test sets to the `usedcarscargurus` container.

# Train a model
First, create a compute instance to run a notebook in Azure Machine Learning studio. Then connect to the workspace and create the compute resource (CPU or GPU), if it has not all ready been created.




## Create a compute cluster to run the job

In [None]:
from azure.ai.ml.entities import AmlCompute

# Define the name of the compute cluster
gpu_compute_target = 'gpu-cluster-NC4as-T4-v3'

try:
    # Examine if the cluster already exists
    gpu_cluster = ml_client.compute.get(gpu_compute_target)
    print(
        f"You already have a cluster named {gpu_compute_target}, we'll reuse it as is."
    )

except Exception:
    print('Creating a new gpu compute target...')

    # Create the Azure Machine Learning compute object with specified components
    gpu_cluster = AmlCompute(
        name=gpu_compute_target,
        # On-demand VM service
        type='amlcompute',
        # VM Family
        size='Standard_NC4as_T4_v3',
        # Minimum nodes in the cluster
        min_instances=0,
        # Nodes in the cluster
        max_instances=1,
        # Time (seconds) the node will run after the job finishes/terminates
        idle_time_before_scale_down=180,
        # Type of tier: Dedicated or LowPriority
        tier='Dedicated',
    )
    print(
        f"AMLCompute with name {gpu_cluster.name} will be created, with compute size {gpu_cluster.size}"
    )
    # Pass the object to MLClient's create_or_update method
    gpu_cluster = ml_client.compute.begin_create_or_update(gpu_cluster)

Creating a new gpu compute target...
AMLCompute with name gpu-cluster-NC4as-T4-v3 will be created, with compute size Standard_NC4as_T4_v3


## Create the job environment
The environment lists the components of the runtime and the libraries installed on the compute for training the model.

In [None]:
import os

dependencies_dir = './dependencies'
os.makedirs(dependencies_dir, exist_ok=True)

Now we can write the `conda` file into the `dependencies` directory.

In [None]:
%%writefile {dependencies_dir}/conda.yaml
name: model-env
channels:
  - conda-forge
dependencies:
  - python=3.8.5
  - pip=23.1.2
  - numpy=1.21.6
  - scikit-learn==1.1.2
  - scipy
  - pandas>=1.1,<1.2
  - pip:
    - inference-schema[numpy-support]==1.3.0
    - mlflow
    - azureml-mlflow==1.50.0
    - psutil==5.9.0
    - tqdm
    - ipykernel
    - matplotlib
    - tensorflow==2.12
    - keras-tuner==1.1.3

Writing ./dependencies/conda.yaml


The created `conda.yaml` file allows for the environment to be created and registered in the workspace.

In [None]:
from azure.ai.ml.entities import Environment

custom_env_name = 'aml-usedcars-gpu-mlp'

custom_job_env = Environment(
    name=custom_env_name,
    description='Custom environment for Used Cars MLP job',
    tags={'tensorflow': '2.12'},
    conda_file=os.path.join(dependencies_dir, 'conda.yaml'),
    image='mcr.microsoft.com/azureml/curated/tensorflow-2.12-cuda11:6',
)
custom_job_env = ml_client.environments.create_or_update(custom_job_env)

print(
    f'Environment with name {custom_job_env.name} is registered to workspace, the environment version is {custom_job_env.version}'
)

Environment with name aml-usedcars-gpu-mlp is registered to workspace, the environment version is 5


## Create training script
First, the source folder where the training script, `main.py`, will be stored needs to be created.

In [None]:
train_src_dir = './src'
os.makedirs(train_src_dir, exist_ok=True)

The training script includes preparing the environment, reading the data, data preparation, model training, saving the model and evaluating the model. This includes specifying the dependencies to import and utilize, setting the seed, defining the input/output arguments of `argparse`, reading the train/test sets, preprocessing the data for dummy variables and scaling using the `StandardScaler`. Then the number of samples and features are logged with `MLFlow`. It uses this to then train a `MLP` model using the best parameters from `keras-tuner` where the `Loss` and `Val_Loss` are logged with `MLFlow` with defined callbacks. The model is then saved and evaluated for the model loss, the metrics of the train/test sets and the predicted vs. actual minimum/average/maximum price, which are logged as `MLFlow` artifacts and metrics.



In [None]:
%%writefile {train_src_dir}/main.py
import os
import random
import numpy as np
import warnings
import argparse
import pandas as pd
from sklearn.preprocessing import StandardScaler
import mlflow
import tensorflow as tf
import datetime
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from keras.callbacks import Callback
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import matplotlib.pyplot as plt
warnings.filterwarnings('ignore')

# Set seed for reproducibility
def init_seeds(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)
    session_conf = tf.compat.v1.ConfigProto()
    session_conf = tf.compat.v1.ConfigProto(intra_op_parallelism_threads=1,
                                            inter_op_parallelism_threads=1)
    os.environ['TF_CUDNN_DETERMINISTIC'] = 'True'
    os.environ['TF_DETERMINISTIC_OPS'] = 'True'
    sess = tf.compat.v1.Session(graph=tf.compat.v1.get_default_graph(),
                                config=session_conf)
    tf.compat.v1.keras.backend.set_session(sess)

    return sess

init_seeds(seed=42)

def main():
    """Main function of the script."""

    # Input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument('--train_data', type=str, help='path to input train data')
    parser.add_argument('--test_data', type=str, help='path to input test data')
    parser.add_argument('--epochs', required=False, default=30, type=int)
    parser.add_argument('--batch_size', required=False, default=1, type=int)
    args = parser.parse_args()

    # Start Logging
    mlflow.start_run()

    ###################
    #<prepare the data>
    ###################
    print(' '.join(f'{k}={v}' for k, v in vars(args).items()))

    print('input train data:', args.train_data)
    print('input test data:', args.test_data)

    trainDF = pd.read_csv(args.train_data, low_memory=False)
    testDF = pd.read_csv(args.test_data, low_memory=False)

    train_label = trainDF[['price']]
    test_label = testDF[['price']]

    train_features = trainDF.drop(columns = ['price'])
    test_features = testDF.drop(columns = ['price'])

    train_features = pd.get_dummies(train_features, drop_first=True)
    test_features = pd.get_dummies(test_features, drop_first=True)

    sc = StandardScaler()
    train_features = pd.DataFrame(sc.fit_transform(train_features))
    test_features = pd.DataFrame(sc.transform(test_features))

    mlflow.log_metric('num_samples', train_features.shape[0])
    mlflow.log_metric('num_features', train_features.shape[1])

    print(f'Training with data of shape {train_features.shape}')

    ####################
    #</prepare the data>
    ####################

    ##################
    #<train the model>
    ##################
    model = Sequential()
    model.add(Dense(130, input_dim=53, kernel_initializer='normal',
                    activation='relu'))
    model.add(Dense(130, kernel_initializer='normal', activation='relu'))
    model.add(Dense(130, kernel_initializer='normal', activation='relu'))
    model.add(Dense(70, kernel_initializer='normal', activation='relu'))
    model.add(Dense(130, kernel_initializer='normal', activation='relu'))
    model.add(Dense(70, kernel_initializer='normal', activation='relu'))
    model.add(Dense(130, kernel_initializer='normal', activation='relu'))
    model.add(Dense(70, kernel_initializer='normal', activation='relu'))
    model.add(Dense(130, kernel_initializer='normal', activation='relu'))
    model.add(Dense(70, kernel_initializer='normal', activation='relu'))
    model.add(Dense(130, kernel_initializer='normal', activation='relu'))
    model.add(Dense(130, kernel_initializer='normal', activation='relu'))
    model.add(Dense(130, kernel_initializer='normal', activation='relu'))
    model.add(Dropout(0.3))
    model.add(Dense(1))

    opt = tf.keras.optimizers.Adam(learning_rate=0.001)
    model.compile(loss='mae', metrics=['mse'], optimizer=opt)
    model.summary()

    # Log metrics
    class LogRunMetrics(Callback):
        # Callback at the end of every epoch
        def on_epoch_end(self, epoch, log):
            # log a value repeated which creates a list
            mlflow.log_metric('Loss', log['loss'])
            mlflow.log_metric('Val_Loss', log['val_loss'])

    log_folder = 'logs/fit/' + datetime.datetime.now().strftime('%Y%m%d-%H%M%S')

    filepath = 'MLP_weights_only_HPO1_bestModel.tf'
    checkpoint_dir = os.path.dirname(filepath)

    callbacks_list = [EarlyStopping(monitor='val_loss', patience=5),
                      ModelCheckpoint(filepath, monitor='mse',
                                      save_best_only=True, mode='min'),
                      LogRunMetrics()]

    history = model.fit(train_features, train_label, epochs=args.epochs,
                        batch_size=args.batch_size, validation_split=0.2,
                        callbacks=callbacks_list)

    model.save('./MLP_HPO1_bestModel', save_format='tf')

    # Load model for more training or later use
    #filepath = 'MLP_weights_only_b4_HPO1_bestModel.h5'
    #model = tf.keras.models.load_model('./MLP_HPO1_bestModel_tf.h5')
    #model.load_weights(filepath)

    ##################
    #</train the model>
    ##################

    #####################
    #<evaluate the model>
    #####################
    plt.title('Model Error for Price')
    plt.plot(history.history['loss'], label='train')
    plt.plot(history.history['val_loss'], label='val_loss')
    plt.ylabel('Error [Price]')
    plt.xlabel('Epoch')
    plt.legend()
    plt.grid(True)
    plt.savefig('Loss vs. Price.png')
    mlflow.log_artifact('Loss vs. Price.png')
    plt.close()

    losses = pd.DataFrame(model.history.history)
    losses.plot()
    plt.title('Model Error for Price')
    plt.ylabel('Error [Price]')
    plt.xlabel('Epoch')
    plt.legend(loc='upper right')
    plt.grid(True)
    plt.savefig('Error vs. Price.png')
    mlflow.log_artifact('Error vs. Price.png')
    plt.close()

    pred_train = model.predict(train_features)

    # Metrics: Train set
    train_mae = mean_absolute_error(train_label[:], pred_train[:])
    train_mse = mean_squared_error(train_label[:], pred_train[:])
    train_rmse = np.sqrt(mean_squared_error(train_label[:], pred_train[:]))
    train_r2 = r2_score(train_label[:], pred_train[:])

    pred_test = model.predict(test_features)

    # Metrics: Test set
    test_mae = mean_absolute_error(test_label[:], pred_test[:])
    test_mse = mean_squared_error(test_label[:], pred_test[:])
    test_rmse = np.sqrt(mean_squared_error(test_label[:], pred_test[:]))
    test_r2 = r2_score(test_label[:], pred_test[:])

    mlflow.log_metric('train_mae', train_mae)
    mlflow.log_metric('train_mse', train_mse)
    mlflow.log_metric('train_rmse', train_rmse)
    mlflow.log_metric('train_r2', train_r2)
    mlflow.log_metric('test_mae', test_mae)
    mlflow.log_metric('test_mse', test_mse)
    mlflow.log_metric('test_rmse', test_rmse)
    mlflow.log_metric('test_r2', test_r2)

    MaximumPrice = np.amax(test_label)
    PredictedMaxPrice = np.amax(pred_test)
    AveragePrice = np.average(test_label)
    PredictedAveragePrice = np.average(pred_test)
    MinimumPrice = np.amin(test_label)
    PredictedMinimumPrice = np.amin(pred_test)

    mlflow.log_metric('Maximum Price', MaximumPrice)
    mlflow.log_metric('Predicted Maximum Price', PredictedMaxPrice)
    mlflow.log_metric('Average Price', AveragePrice)
    mlflow.log_metric('Predicted Average Price', PredictedAveragePrice)
    mlflow.log_metric('Minimum Price', MinimumPrice)
    mlflow.log_metric('Predicted Minimum Price', PredictedMinimumPrice)

    ###################
    #</evaluate the model>
    ###################

    # Stop Logging
    mlflow.end_run()

if __name__ == "__main__":
    main()

Writing ./src/main.py


 ## Train the model with specified components
To train the model, a `command job` configured with the input specifying the  input data, the number of epochs and the batch size, which then runs the `training script` using the specified compute resource, environment, and the parameters specified to be logged needs to be submitted as a job.

In [None]:
from azure.ai.ml import command
from azure.ai.ml import Input

registered_model_name = 'usedcars_mlp_model'

job = command(
    inputs=dict(
        train_data=Input(
            type='uri_file',
            path='azureml://datastores/usedcars_datastore/paths/usedCars_trainSet.csv',
        ),
        test_data=Input(
            type='uri_file',
            path = 'azureml://datastores/usedcars_datastore/paths/usedCars_testSet.csv',
        ),
        epochs=50,
        batch_size=4,
        registered_model_name=registered_model_name,
    ),

    code='./src/',
    command='python main.py --train_data ${{inputs.train_data}} --test_data ${{inputs.test_data}} --epochs ${{inputs.epochs}} --batch_size ${{inputs.batch_size}}',
    environment='aml-usedcars-gpu-mlp@latest',
    compute='gpu-cluster-NC4as-T4-v3',
    display_name='usedcars_mlp_prediction',
)

## Submit the job

Then this job can be submitted to run in `Azure Machine Learning Studio` using the `create_or_update` command with `ml_client`.

In [None]:
ml_client.create_or_update(job)

Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
[32mUploading src (0.01 MBs): 100%|██

Experiment,Name,Type,Status,Details Page
MLP,plucky_frame_0ncbbdml4f,command,Starting,Link to Azure Machine Learning studio


## View Job Output
The submitted job can then be viewed by selecting the link in the output of the previous cell. The logged information with `MLFlow` including the model metrics and saved graphs can then be viewed/downloaded when the job completes.