In [None]:
%% pip show azure-ai-ml

In [None]:
# enter details of your AML workspace
subscription_id = "YOUR-SUBSCRIPTION-ID"
resource_group = "rg-churn-pred-proj"
workspace = "churn-pred-proj"

In [None]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

# get a handle to the workspace
ml_client = MLClient(
    DefaultAzureCredential(), subscription_id, resource_group, workspace)

# Creating a Data Asset
This is the data we have downloaded form hugging face website and will be using for model development.

In [None]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes
import time

#my_path = "./data/churn.csv"
web_path = "https://huggingface.co/datasets/scikit-learn/churn-prediction/blob/main/dataset.csv"
# set the version number of the data asset to the current UTC time
v1 = time.strftime("%Y.%m.%d.%H%M%S", time.gmtime())

churn_data = Data(
    name="telco-churn",
    version=v1,
    description="Churning customers of a telecommunication company",
    path=web_path,
    type=AssetTypes.URI_FILE,
    tags={"source_type": "web", "source": "Hugging Face"},
)

# create data asset
ml_client.data.create_or_update(churn_data)

print(f"Data asset created. Name: {churn_data.name}, version: {churn_data.version}")

Access Data

In [None]:
%pip install -U azureml-fsspec

In [None]:
import pandas as pd

# get a handle of the data asset and print the URI
data_asset = ml_client.data.get(name="telco-churn", version=v1)
print(f"Data asset URI: {data_asset.path}")

# read into pandas - note that you will see 2 headers in your data frame - that is ok, for now

df = pd.read_csv(data_asset.path)
df.head()

# Compute Cluster for Pipeline

In [None]:
from azure.ai.ml.entities import AmlCompute

cpu_compute_target = "cpu-cluster"

try:
    # let's see if the compute target already exists
    cpu_cluster = ml_client.compute.get(cpu_compute_target)
    print(
        f"You already have a cluster named {cpu_compute_target}, we'll reuse it as is."
    )

except Exception:
    print("Creating a new cpu compute target...")

    # Let's create the Azure ML compute object with the intended parameters
    cpu_cluster = AmlCompute(
        # Name assigned to the compute cluster
        name="cpu-cluster",
        # Azure ML Compute is the on-demand VM service
        type="amlcompute",
        # VM Family
        size="STANDARD_DS3_V2",
        # Minimum running nodes when there is no job running
        min_instances=0,
        # Nodes in cluster
        max_instances=4,
        # How many seconds will the node running after the job termination
        idle_time_before_scale_down=180,
        # Dedicated or LowPriority. The latter is cheaper but there is a chance of job termination
        tier="Dedicated",
    )

    # Now, we pass the object to MLClient's create_or_update method
    cpu_cluster = ml_client.begin_create_or_update(cpu_cluster)

print(
    f"AMLCompute with name {cpu_cluster.name} is created, the compute size is {cpu_cluster.size}"
)

# Pipeline job enviornment creation

In [None]:
import os

dependencies_dir = "./dependencies"
os.makedirs(dependencies_dir, exist_ok=True)

Creating conda yaml file for in the dependencies directory


In [None]:
%%writefile {dependencies_dir}/conda.yaml
name: churn-env
channels:
  - conda-forge
dependencies:
  - python=3.8
  - numpy=1.23.5
  - pip=22.3.1
  - scikit-learn=1.2.2
  - pandas=1.5.3
  - matplotlib=3.7.1
  - xgboost=1.7.5
  - pip:
    - imbalanced-learn=0.10.1
    - mlflow== 1.26.1
    - azureml-mlflow==1.42.0

Create Environment

In [None]:
from azure.ai.ml.entities import Environment

custom_env_name = "Chrun-Proj-scikit-learn"

pipeline_job_env = Environment(
    name=custom_env_name,
    description="Custom environment for Customer Churn pipeline",
    tags={"scikit-learn": "1.2.2"},
    conda_file=os.path.join(dependencies_dir, "conda.yaml"),
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
    version="0.1.0",
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

# Building Pipeline

## Component 1: Data Prep
This envolves missing vlaue recification, dropping some columns, encoding and scaling

In [None]:
import os

# create a folder for the script files
script_folder = 'src'
os.makedirs(script_folder, exist_ok=True)
print(script_folder, 'folder created')

prep-data.py script
This is the preparation component of the pipeline, some data cleaning, ecnoding and scaling

In [None]:
%%writefile $script_folder/prep-data.py

# import libraries
import argparse
import pandas as pd
from pathlib import Path
from sklearn.preprocessing import OrdinalEncoder
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import MinMaxScaler


def main(args):
    # read data
    print('Reading data ...')
    df = get_data(args.input_data)

    print('Cleaning data ...')
    cleaned_data = clean_data(df)

    print('Encoding data ...')
    encoded_data = encode_data(cleaned_data)

    print('Normalizing data ...')
    normalized_data = normalize_data(encoded_data)

    output_df = normalized_data.to_csv((Path(args.output_data) / "churn_prepped.csv"), index = False)


def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument("--input_data", dest='input_data',
                        type=str)
    parser.add_argument("--output_data", dest='output_data',
                        type=str)

    # parse args
    args = parser.parse_args()

    # return args
    return args

# function that reads the data
def get_data(path):
    df = pd.read_csv(path)

    # Count the rows and print the result
    row_count = (len(df))
    print('Preparing {} rows of data'.format(row_count))

    return df


# function that removes useless values and imputes missing ones
def clean_data(df):
    # Column TotalCharges is a string, have to convert to numeric
    df.TotalCharges = df.TotalCharges.apply([lambda x: float(x) if x!= ' ' else x]) # make float if value exists
    mean = pd.to_numeric(df.TotalCharges, errors='coerce').mean()
    df.TotalCharges = df.TotalCharges.apply([lambda x: mean if x == ' ' else x ]) # replace ' ' with mean of this column

    # Drop useless columns (high cardinality and low correlation clumns - based on the ANOVA test done in EDA)
    df.drop(['customerID', 'gender','PhoneService', 'MultipleLines',
            'InternetService','StreamingTV', 'StreamingMovies'], axis = 1, inplace=True)
    
    return df   

# Function that encodes the data
def encode_date(df):
    cat_cols = ['Partner','Dependents','OnlineSecurity','OnlineBackup',
    	        'DeviceProtection','TechSupport','PaperlessBilling',
                'StreamingTV','Contract', 'PaymentMethod'] #categorica columns

    # Encode categorical columns
    ord_enc = OrdinalEncoder()
    df[cat_cols] = ord_enc.fit_transform(df[cat_cols]).copy() 
    # Mapping the target (Churn column)
    lb = LabelEncoder()
    df['Churn'] = lb.fit_transform(df['Churn'])

    return df

# function that normalizes the data
def normalize_data(df):
    # Define Scaler
    mms = MinMaxScaler() # Normalisation using min max scaler
    df['tenure'] = mms.fit_transform(df[['tenure']])
    df['MonthlyCharges'] = mms.fit_transform(df[['MonthlyCharges']])
    df['TotalCharges'] = mms.fit_transform(df[['TotalCharges']])

    return df


# run script
if __name__ == "__main__":
    # add space in logs
    print("\n\n")
    print("*" * 60)

    # parse args
    args = parse_args()

    # run main function
    main(args)

    # add space in logs
    print("*" * 60)
    print("\n\n")

Preparing YAML file for prep-data

In [None]:

%%writefile prep-data.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: prep_data
display_name: Prepare training data
version: 1
type: command
inputs:
  input_data: 
    type: uri_file
outputs:
  output_data:
    type: uri_folder
code: ./src
environment: churn-env
# azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
command: >-
  python prep.py 
  --input_data ${{inputs.input_data}}
  --output_data ${{outputs.output_data}}

## Component 2: Train and Evaluate Model
We use mlflow to keep track of the training

train-model.py script

In [None]:
%%writefile $script_folder/train-model.py

# import libraries
import mlflow
import glob
import argparse

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from imblearn.over_sampling import SMOTE

from sklearn.model_selection import train_test_split
from sklearn.metrics import ConfusionMatrixDisplay
from sklearn.metrics import roc_auc_score
from sklearn.metrics import RocCurveDisplay
from sklearn.model_selection import cross_val_score
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score
from sklearn.model_selection import RepeatedStratifiedKFold

from xgboost import XGBClassifier

def main(args):
    # enable autologging
    mlflow.autolog()

    # read data
    df = get_data(args.training_data)

    # split data
    X_train, X_test, y_train, y_test = split_data(df)

    # train model
    model = train(args.reg_rate, X_train, X_test, y_train, y_test)

    # evaluate model
    evaluate(model, X_test, y_test)

# function that reads the data
def get_data(data_path):

    all_files = glob.glob(data_path + "/*.csv")
    df = pd.concat((pd.read_csv(f) for f in all_files), sort=False)
    
    return df

# function that splits the data - uses SMOTE as data unbalanced.
def split_data(df):
    print("Splitting data...")

    oversample = SMOTE(sampling_strategy=1) # same sample size
    f1 = df.iloc[:,:13].values
    t1 = df.iloc[:,13].values
    f1, t1 = oversample.fit_resample(f1, t1)

    X_train, X_test, y_train, y_test = train_test_split(f1, t1, test_size=0.20, random_state=0)

    return X_train, X_test, y_train, y_test


# Function that trains the model, learning rate and max depth are nput args
def train(model,learning_rate, max_depth X_train,y_train,X_test,y_test):
    mlflow.log_param("learning_rate", learning_rate)
    mlflow.log_param("max_depth", max_depth)

    model = XGBClassifier(learning_rate = learning_rate, max_depth = max_depth, n_estimators = 1000)
    model.fit(X_train,y_train)

    mlflow.xgboost.save_model(model, args.model_output)

    return model


# Function that evaluates the model
def evaluate(model,X_test,y_test):
    # calculate accuracy
    y_hat = model.predict(X_test)
    acc = np.average(y_hat == y_test)
    print('Accuracy:', acc)

    # calculate AUC
    y_scores = model.predict_proba(X_test)
    auc = roc_auc_score(y_test,y_scores[:,1])
    print('AUC: ' + str(auc))

    # Confusion Matrix
    ConfusionMatrixDisplay(y_test,model.predict(X_test), normalize = True, cmap = 'Blues')
    plt.savefig("Confusion-Matrix.png") 

    # plot ROC curve
    RocCurveDisplay.from_estimator(model, X_test,y_test)
    # Plot the diagonal 50% line
    plt.plot([0, 1], [0, 1], 'k--')
    # Plot the FPR and TPR achieved by our model
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.savefig("ROC-Curve.png") 
    
    # Classification Report
    print(classification_report(y_test,y_hat))


def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument("--training_data", dest='training_data',
                        type=str)
    parser.add_argument("--learning_rate", dest='learning_rate',
                        type=float, default=0.01)
    parser.add_argument("--max_depth", dest='max_depth',
                        type=float, default=3)
    parser.add_argument("--model_output", dest='model_output',
                        type=str)

    # parse args
    args = parser.parse_args()

    # return args
    return args

# run script
if __name__ == "__main__":
    # add space in logs
    print("\n\n")
    print("*" * 60)

    # parse args
    args = parse_args()

    # run main function
    main(args)

    # add space in logs
    print("*" * 60)
    print("\n\n")


train-model.YAML file 

In [None]:
%%writefile train-model.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: train_model
display_name: Train a decision tree classifier model
version: 1
type: command
inputs:
  training_data: 
    type: uri_folder
  learning_rate:
    type: number
    default: 0.01
  max_depth:
    type: number
    default: 3
outputs:
  model_output:
    type: mlflow_model
code: ./src
environment: churn-env
#azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
command: >-
  python prep.py 
  --input_data ${{inputs.input_data}}
  --output_data ${{outputs.output_data}}

## Load Components

In [None]:
from azure.ai.ml import load_component
parent_dir = ""

prep_data = load_component(source=parent_dir + "./prep-data.yml")
train_XGBoost = load_component(source=parent_dir + "./train-model.yml")

## Build Pipeline

In [None]:
from azure.ai.ml import Input
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.dsl import pipeline

@pipeline()
def customer_churn_classification(pipeline_job_input):
    clean_data = prep_data(input_data=pipeline_job_input)
    train_model = train_XGBoost(training_data=clean_data.outputs.output_data)

    return {
        "pipeline_job_transformed_data": clean_data.outputs.output_data,
        "pipeline_job_trained_model": train_model.outputs.model_output,
    }

pipeline_job = customer_churn_classification(Input(type=AssetTypes.URI_FILE, path= data_asset.path))

In [None]:

print(pipeline_job)

## Submit the pipeline

In [None]:

# submit job to workspace
pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name="pipeline_churn"
)
pipeline_job