# Titanic Challenge as Pipeline Job

## Connect to Workspace

In [1]:
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient

try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

In [2]:
# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)

Found the config file in: /config.json


## Create Script Folder

In [3]:
import os

# create a folder for the script files
script_folder = 'src'
os.makedirs(script_folder, exist_ok=True)
print(script_folder, 'folder created')

src folder created


## Create Data Preprocessing Script

In [4]:
%%writefile $script_folder/prep-data.py
# import libraries
import argparse
import pandas as pd
from pathlib import Path
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import SimpleImputer, IterativeImputer

def main(args):
    # read data
    df_raw = get_data(args.input_data)

    # prep data
    df_prep = prep_data(df_raw)

    print("Saving preprocessed data...")
    output_df = df_prep.to_csv((Path(args.output_data) / "prepped_data.csv"), index = False)

# funtion reads in data
def get_data(path):
    print('Reading raw data...')
    df = pd.read_csv(path)
    
    return df

# funtion preprocesses data
def prep_data(df):
    print('Preprocessing raw data...')
    # lowercase column names
    df.columns = [str_colName.lower() for str_colName in df.columns]
    
    # drop unwanted features
    dropFeats = ['name']
    for col in dropFeats:
        if col in df.columns:
            df = df.drop(col, axis=1)

    # numerical and categorical features
    numFeats = ['age', 'sibsp', 'parch', 'fare']
    catFeats = ['pclass', 'sex', 'ticket', 'cabin', 'embarked']
    nonImpFeats = [feat for feat in df.columns if feat not in numFeats+catFeats]
    
    # impute numerical data
    ii = IterativeImputer(random_state=2)
    ii_data = ii.fit_transform(df[numFeats])
    dfNum = pd.DataFrame(ii_data, columns=numFeats)

    # impute categorical data
    si = SimpleImputer(strategy='most_frequent')
    si_data = si.fit_transform(df[catFeats])
    dfCat = pd.DataFrame(si_data, columns=catFeats)
    
    # concatenate df
    df = pd.concat([df[nonImpFeats], dfNum, dfCat], axis=1)
    
    # scale numerical features
    scaler = MinMaxScaler()
    df[numFeats] = scaler.fit_transform(df[numFeats])
    
    # encode categorical features
    encoder = LabelEncoder()
    for col in catFeats:
        df[col] = encoder.fit_transform(df[col])
    
    return df

def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument("--input_data", dest='input_data',
                        type=str)
    parser.add_argument("--output_data", dest='output_data',
                        type=str)
    # parse args
    args = parser.parse_args()

    # return args
    return args

# run script
if __name__ == "__main__":
    # add space in logs
    print("\n\n")
    print("*" * 60)

    # parse args
    args = parse_args()

    # run main function
    main(args)

    # add space in logs
    print("*" * 60)
    print("\n\n")
    

Overwriting src/prep-data.py


## Create Model training Script

In [5]:
%%writefile $script_folder/train-model.py
# import libraries
import mlflow
import glob
import argparse
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt

def main(args):
    # enable autologging
    mlflow.autolog()

    # read data
    df = get_data(args.training_data)

    # split data
    X_train, X_test, y_train, y_test = split_data(df)

    # train model
    model = train_model(X_train, y_train)

    eval_model(model, X_test, y_test)

# function reads in data
def get_data(path):
    print("Reading training data...")
    #df = pd.read_csv(path)
    all_files = glob.glob(path + "/*.csv")
    df = pd.concat((pd.read_csv(f) for f in all_files), sort=False)
    df.dropna(inplace=True)
    return df

# function that splits data
def split_data(df):
    print("Splitting data...")
    X, y = df[['pclass', 'sex', 'age', 'sibsp', 'parch', 'ticket', 'fare','cabin', 'embarked']].values, df['survived'].values

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=2)

    return X_train, X_test, y_train, y_test

# function that trains the model
def train_model(X_train, y_train):
    print("Training model...")
    model = RandomForestClassifier(max_depth=7, random_state=2)
    model.fit(X_train, y_train)
    mlflow.sklearn.save_model(model, args.model_output)

    return model

# function that evaluates the model
def eval_model(model, X_test, y_test):
    # calculate accuracy
    y_pred = model.predict(X_test)
    acc = np.average(y_pred == y_test)
    print('Accuracy:', acc)

    # calculate AUC
    y_scores = model.predict_proba(X_test)
    auc = roc_auc_score(y_test,y_scores[:,1])
    print('AUC: ' + str(auc))

    # plot ROC curve
    fpr, tpr, thresholds = roc_curve(y_test, y_scores[:,1])
    fig = plt.figure(figsize=(6, 4))
    # Plot the diagonal 50% line
    plt.plot([0, 1], [0, 1], 'k--')
    # Plot the FPR and TPR achieved by our model
    plt.plot(fpr, tpr)
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.savefig("ROC-Curve.png") 

def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument("--training_data", dest='training_data',
                        type=str)
    parser.add_argument("--model_output", dest='model_output',
                        type=str)

    # parse args
    args = parser.parse_args()

    # return args
    return args

# run script
if __name__ == "__main__":
    # add space in logs
    print("\n\n")
    print("*" * 60)

    # parse args
    args = parse_args()

    # run main function
    main(args)

    # add space in logs
    print("*" * 60)
    print("\n\n")


Overwriting src/train-model.py


## Create Make Predictions Script

In [6]:
%%writefile $script_folder/make-predictions.py
# import libraries
import mlflow
import glob
import argparse
import pandas as pd
from pathlib import Path

def main(args):
    df = get_data(args.submission_data)
    preds = make_predictions(df, args.model_input)
    df_preds = prepare_predictions(df, preds)

    print("Saving predictions data...")
    output_df = df_preds.to_csv((Path(args.predictions_data) / "predictions_data.csv"), index = False)

# function prepares predictionsprint
def prepare_predictions(df, preds_arr):
    print("Preparing predictions...")
    print(df)
    preds = pd.DataFrame(preds_arr)
    df_preds = pd.concat([df[['passengerid']], preds], axis=1)
    df_preds.columns = ['PassengerId', 'Survived']
    
    return df_preds

# function reads in data
def get_data(path):
    print("Reading submission data...")
    #df = pd.read_csv(path)
    all_files = glob.glob(path + "/*.csv")
    df = pd.concat((pd.read_csv(f) for f in all_files), sort=False)
    #df.dropna(inplace=True)
    return df

# function that makes predictions
def make_predictions(df, model_path):
    print("Making predictions...")
    df1 = df.drop(['passengerid'], axis=1)
    # get model
    model = mlflow.pyfunc.load_model(model_path)
    preds = model.predict(df1[['pclass', 'sex', 'age', 'sibsp', 'parch', 'ticket', 'fare','cabin', 'embarked']])

    return preds

def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument("--submission_data", dest='submission_data',
                        type=str)
    parser.add_argument("--predictions_data", dest='predictions_data',
                        type=str)
    parser.add_argument("--model_input", dest='model_input',
                        type=str)
    
    # parse args
    args = parser.parse_args()

    # return args
    return args

# run script
if __name__ == "__main__":
    # add space in logs
    print("\n\n")
    print("*" * 60)

    # parse args
    args = parse_args()

    # run main function
    main(args)

    # add space in logs
    print("*" * 60)
    print("\n\n")


Overwriting src/make-predictions.py


## Define Components

In [7]:
%%writefile prep-data.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: prep_data
display_name: Prepare data for training/predictions
version: 1
type: command
inputs:
  input_data: 
    type: uri_file
outputs:
  output_data:
    type: uri_folder
code: ./src
environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
command: >-
  python prep-data.py 
  --input_data ${{inputs.input_data}}
  --output_data ${{outputs.output_data}}

Overwriting prep-data.yml


In [8]:
%%writefile train-model.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: train_model
display_name: Train a random forest model
version: 1
type: command
inputs:
  training_data: 
    type: uri_folder
outputs:
  model_output:
    type: mlflow_model
code: ./src
environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
command: >-
  python train-model.py 
  --training_data ${{inputs.training_data}} 
  --model_output ${{outputs.model_output}} 

Overwriting train-model.yml


In [9]:
%%writefile make-predictions.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: make_predictions
display_name: Makes predictions using a trained random forest model
version: 1
type: command
inputs:
  submission_data: 
    type: uri_folder
  model_input:
    type: mlflow_model
outputs:
  predictions_data:
    type: uri_folder
code: ./src
environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
command: >-
  python make-predictions.py 
  --submission_data ${{inputs.submission_data}} 
  --model_input ${{inputs.model_input}} 
  --predictions_data ${{outputs.predictions_data}} 

Overwriting make-predictions.yml


## Load Components

In [10]:
from azure.ai.ml import load_component
parent_dir = ""

prep_data = load_component(source=parent_dir + "./prep-data.yml")
train_random_forest = load_component(source=parent_dir + "./train-model.yml")
make_predictions = load_component(source=parent_dir + "./make-predictions.yml")

## Build Pipeline

In [11]:
from azure.ai.ml import Input
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.dsl import pipeline

@pipeline()
def titanic_classification(pipeline_job_input1, pipeline_job_input2):
    # 1 clean training data
    clean_data = prep_data(input_data=pipeline_job_input1)
    # 2 train model
    train_model = train_random_forest(training_data=clean_data.outputs.output_data)
    # 3 clean submission data
    clean_sub_data = prep_data(input_data=pipeline_job_input2)
    # 4 make predictions with clean sub data
    get_predictions = make_predictions(submission_data=clean_sub_data.outputs.output_data,
                                   model_input=train_model.outputs.model_output)

    return {
        "pipeline_job_transformed_data": clean_data.outputs.output_data,
        "pipeline_job_trained_model": train_model.outputs.model_output,
        "pipeline_job_prediction_data": get_predictions.outputs.predictions_data,
    }

pipeline_job = titanic_classification(
    Input(type=AssetTypes.URI_FILE, path="azureml:titanic-local:1"),
    Input(type=AssetTypes.URI_FILE, path="azureml:titanic-sub-local:1")
)

## Change Pipeline Parameters

In [12]:
# set pipeline level compute
pipeline_job.settings.default_compute = "aml-cluster"
# set pipeline level datastore
pipeline_job.settings.default_datastore = "workspaceblobstore"

# print the pipeline job again to review the changes
print(pipeline_job)

Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


display_name: titanic_classification
type: pipeline
inputs:
  pipeline_job_input1:
    type: uri_file
    path: azureml:titanic-local:1
  pipeline_job_input2:
    type: uri_file
    path: azureml:titanic-sub-local:1
outputs:
  pipeline_job_transformed_data:
    type: uri_folder
  pipeline_job_trained_model:
    type: mlflow_model
  pipeline_job_prediction_data:
    type: uri_folder
jobs:
  clean_data:
    type: command
    inputs:
      input_data:
        path: ${{parent.inputs.pipeline_job_input1}}
    outputs:
      output_data: ${{parent.outputs.pipeline_job_transformed_data}}
    component:
      $schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
      name: prep_data
      version: '1'
      display_name: Prepare data for training/predictions
      type: command
      inputs:
        input_data:
          type: uri_file
      outputs:
        output_data:
          type: uri_folder
      command: python prep-data.py  --input_data ${{inputs.input_dat

## Submit Pipeline Job

In [13]:
# submit job to workspace
pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name="pipeline_titanic"
)
pipeline_job

[32mUploading src (0.01 MBs): 100%|██████████| 7507/7507 [00:00<00:00, 145404.60it/s]
[39m

pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.MLFlowModelJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored


Experiment,Name,Type,Status,Details Page
pipeline_titanic,zen_orange_zjsg3d1xjy,pipeline,NotStarted,Link to Azure Machine Learning studio


---
# End of Notebook