# Tutorial: Train a room occupancy prediction model with Azure Machine Learning and score with ADX

Open dataset from UCI Repository: __[Occupancy Detection](https://archive.ics.uci.edu/ml/datasets/Occupancy+Detection+)__

Experimental data used for binary classification (room occupancy) from Temperature,Humidity,Light and CO2.
Ground-truth occupancy was obtained from time stamped pictures that were taken every minute


## Prerequisite

* Enable Python plugin on your ADX cluster (see the Onboarding section of the __[python() plugin doc](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/pythonplugin?pivots=azuredataexplorer)__)
* Whitelist a blob container to be accessible by ADX Python sandbox (see the Appendix section of the doc)
* Create a Python environment (conda or virtual env) that reflects the Python sandbox image
* Install in that environment AML SDK
* Install in that environment Azure Blob Storage SDK (intall the older version v2.1 as the newer version is currently incompatible with azure-kusto-ingest package)

## Set up your AML environment

* Import Python packages
* Create (or connect to) an AML workspace
* Create (or connect to) a remote compute target to use for training
* Create an experiment to track all your runs

### Importing AML packages

In [None]:
import sys
import azureml.core
from azureml.core import Workspace
from azureml.core import Experiment
from azureml.core.compute import AmlCompute
from azureml.core.compute import ComputeTarget
from azureml.core.environment import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.train.sklearn import SKLearn

print(sys.version)
print("Azure ML SDK Version: ", azureml.core.VERSION)

### Create workspace

If the workspace already exists connect to it

In [None]:
ws = Workspace.create(
    name = "Your Workspace Name",
    subscription_id = "Your Subsription Id",
    resource_group = "Your Resource Group", 
    location = "Your location",  # e.g "westus"
    exist_ok = True,
    show_output = True)

ws.write_config()

In [None]:
# Just for testing: load workspace configuration from the config.json file in the current folder.
ws = Workspace.from_config()
print(ws.name, ws.location, ws.resource_group, sep='\t')

### Create experiment

Create an experiment to track the runs in your workspace

In [None]:
exp = Experiment(workspace=ws, name="Prediction-Occupancy")

### Create or attach existing compute resource
By using Azure Machine Learning Compute, a managed service, data scientists can train machine learning models on clusters of Azure virtual machines.Here you create Azure Machine Learning Compute for model training

**Creation of compute takes approximately 5 minutes.** If the AmlCompute with that name is already in your workspace the code will skip the creation process.

In [None]:
compute_name = "cpu-cluster"
vm_sku = "STANDARD_D2_V2"

if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print("found compute target: " + compute_name)
else:
    print("creating new compute target...")
    provisioning_config = AmlCompute.provisioning_configuration(vm_size=vm_sku, min_nodes=1,max_nodes=2)
    compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=10)

## Explore data

Before you train a model, you need to understand the data that you are using to train it. In this section you learn how to:

* Fetch the occupancy detection dataset from Kusto using __[KqlMagic](https://docs.microsoft.com/en-us/azure/data-explorer/kqlmagic)__

* Display some records

In [None]:
reload_ext Kqlmagic

In [None]:
%kql kusto://code;cluster='demo11.westus';database='ML'

In [None]:
%kql res << OccupancyDetection
df = res.to_dataframe() 
print(df.shape)
df[:4]

## Let's copy the data from ADX to blob container to access it from AML

Notes:
1. We copy the input data using KqlMagic to a blob container in the storage account that was allocated for the AML workspace
2. You can create the  blob container using __[Azure Storage Explorer](https://azure.microsoft.com/en-us/features/storage-explorer/)__, and extract its SAS token by right clicking it

In [None]:
aml_storage_account = "Your storage account" # you can use the storage account that was created automatically as part of the AML workspace
aml_container_name = "kusto"
aml_sas_token = "Your SAS Token for this container"

In [None]:
blob_container_uri = f"https://{aml_storage_account}.blob.core.windows.net/{aml_container_name}{aml_sas_token}"
copy_query = f".export to csv (h@'{blob_container_uri}') with(includeHeaders=all) <| OccupancyDetection"
print(copy_query)

%kql res << -query copy_query
data_blob_name = res.to_dataframe()["Path"][0].split('/')[-1]
print("\ndata blob name is: ", data_blob_name)

In [None]:
# Test downloading the blob

import pandas as pd
from azure.storage.blob import BlockBlobService  # v2.1
block_blob_service = BlockBlobService(account_name=aml_storage_account, sas_token=aml_sas_token)
block_blob_service.get_blob_to_path(aml_container_name, data_blob_name, 'data.csv')
df = pd.read_csv('data.csv')

print(df.shape)
df[-4:]

## Training on a remote cluster

Here we submit the job to run on the remote training cluster we set up earlier. To submit a job we:
* Create a directory for all files to be uploaded to the remote cluster
* Create a training script
* Create an estimator object
* Submit the job 

### Create a directory

Create a directory to upload all files to the remote cluster

In [None]:
import os
script_folder = os.path.join(os.getcwd(), "to-upload")
os.makedirs(script_folder, exist_ok=True)

### Create a training script

To submit the job to the cluster, we need to create a training script. Here we create `train.py` in the `to-upload` directory

In [None]:
%%writefile "$script_folder/train.py"

import pickle
import argparse
import pandas as pd
from azure.storage.blob import BlockBlobService  # v2.1

from sklearn import tree
from sklearn import neighbors
from sklearn import naive_bayes
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score

from azureml.core import Run

parser = argparse.ArgumentParser()
parser.add_argument('--account', type=str, dest='storage_account', help='storage account name')
parser.add_argument('--container', type=str, dest='container_name', help='blob container name')
parser.add_argument('--blob', type=str, dest='blob_name', help='blob name')
parser.add_argument('--sas', type=str, dest='sas_token', help='SAS token')
args = parser.parse_args()

storage_account = args.storage_account
container_name = args.container_name
blob_name = args.blob_name
sas_token = args.sas_token

# downloading the blob to a local file 'data.csv' and read into a dataframe

block_blob_service = BlockBlobService(account_name=storage_account, sas_token=sas_token)
block_blob_service.get_blob_to_path(container_name, blob_name, 'data.csv')
df = pd.read_csv('data.csv')

train_x = df[df['Test'] == False][['Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio']]
train_y = df[df['Test'] == False]['Occupancy']
test_x = df[df['Test'] == True][['Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio']]
test_y = df[df['Test'] == True]['Occupancy']

print(train_x.shape, train_y.shape, test_x.shape, test_y.shape)

run = Run.get_context()

#four classifier types
clf1 = tree.DecisionTreeClassifier()
clf2 = LogisticRegression(solver='liblinear')
clf3 = neighbors.KNeighborsClassifier()
clf4 = naive_bayes.GaussianNB()

clf1 = clf1.fit(train_x, train_y)
clf2 = clf2.fit(train_x, train_y)
clf3 = clf3.fit(train_x, train_y)
clf4 = clf4.fit(train_x, train_y)

os.makedirs('outputs', exist_ok=True) # note files saved in the outputs folder are automatically uploaded into experiment

# Accuracy on training set
for clf, label in zip([clf1, clf2, clf3, clf4], ['Decision Tree', 'Logistic Regression', 'K Nearest Neighbour', 'Naive Bayes']):
            scores = cross_val_score(clf, train_x, train_y, cv=5, scoring='accuracy')
            print("Accuracy: %0.4f (+/- %0.4f) [%s]" % (scores.mean(), scores.std(), label))
            run.log("training accuracy", scores.mean(), description='accuracy over the training set')
            
# Accuracy on testing set
for clf, label in zip([clf1, clf2, clf3, clf4], ['Decision Tree', 'Logistic Regression', 'K Nearest Neighbour', 'Naive Bayes']):
            scores = cross_val_score(clf, test_x, test_y, cv=5, scoring='accuracy')
            print("Accuracy: %0.4f (+/- %0.4f) [%s]" % (scores.mean(), scores.std(), label))
            run.log("model type", label)
            run.log("testing accuracy", scores.mean(), description='accuracy over the testing set')
            with open('outputs/' + label+'.pkl', 'wb') as handle:
                pickle.dump(clf, handle)

### Create an estimator

An estimator object is used to submit the run. Azure Machine Learning has pre-configured estimators for common machine learning frameworks, as well as generic Estimator. We create SKLearn estimator for scikit-learn model, by specifying

* The name of the estimator object, `est`
* The directory to uploaded into the cluster nodes for execution. 
* The compute target that we created
* The training script name `train.py`
* Parameters required from the training script

In [None]:
# Create Environment to install required packages

env = Environment('adx_sandbox_env')
# set scikit-learn==0.20.3 to match Kusto Python sandbox image (as of 4/2020)
cd = CondaDependencies.create(pip_packages=['azureml-sdk','scikit-learn==0.20.3','pandas==0.24.1','azure.storage.blob==2.1.0','azureml-dataprep[pandas,fuse]>=1.1.14'])
env.python.conda_dependencies = cd

script_params = {
    '--account': aml_storage_account,
    '--container': aml_container_name,
    '--blob': data_blob_name,
    '--sas': aml_sas_token
}

# Create the Estimator

est = SKLearn(source_directory=script_folder,
              script_params=script_params,
              compute_target=compute_target,
              environment_definition=env,
              entry_script='train.py')

### Submit the job to the cluster

We run the experiment by submitting the estimator object; we can navigate to Azure portal to monitor the run.

In [None]:
run = exp.submit(config=est)
run

Since the call is asynchronous, it returns a **Preparing** or **Running** state as soon as the job is started.

## Monitor a remote run

In total, the first run takes **approximately 10 minutes**. But for subsequent runs, as long as the dependencies (`conda_packages` parameter in the above estimator constructor) don't change, the same image is reused and hence the container start up time is much faster.

Here is what's happening:

- **Image creation**: A Docker image is created matching the Python environment specified by the estimator. The image is built and stored in the ACR (Azure Container Registry) associated with your workspace. Image creation and uploading takes **about 5 minutes**. 

  This stage happens once for each Python environment since the container is cached for subsequent runs.  During image creation, logs are streamed to the run history. We can monitor the image creation progress using these logs.

- **Scaling**: If the remote cluster requires more nodes to execute the run than currently available, additional nodes are added automatically. Scaling typically takes **about 5 minutes.**

- **Running**: In this stage, the necessary scripts and files are sent to the compute target, then data stores are mounted/copied (not relevant in this example as we read the data from blob), then the entry_script is run. While the job is running, stdout and the files in the ./logs directory are streamed to the run history. We can monitor the run's progress using these logs.

- **Post-Processing**: The ./outputs directory of the run is copied over to the run history in your workspace so we can access these results.


We can check the progress of a running job in multiple ways. This tutorial uses a Jupyter widget as well as a `wait_for_completion` method. 

### Jupyter widget

Watch the progress of the run with a Jupyter widget.  Like the run submission, the widget is asynchronous and provides live updates every 10-15 seconds until the job completes.

In [None]:
from azureml.widgets import RunDetails
RunDetails(run).show()

By the way, if you need to cancel a run, you can follow [these instructions](https://aka.ms/aml-docs-cancel-run).

### Get log results upon completion

Model training happens in the background. We can use `wait_for_completion` to block and wait until the model has completed training before running more code. 

In [None]:
# specify show_output to True for a verbose log
run.wait_for_completion(show_output=True) 

### Display run results

We now have a model trained on a remote cluster.  Retrieve all the metrics logged during the run, including the accuracy of the model:

In [None]:
print(run.get_metrics())

## Register model

The training script pickled the models to files and wrote them in a directory named `outputs` in the VM of the cluster where the job is executed. `outputs` is a special directory in that all content in this  directory is automatically uploaded to our workspace.  This content appears in the run record in the experiment under the workspace. Hence, the model file is now also available in the workspace.

We can see files associated with that run.

In [None]:
print(run.get_file_names())

Register the model in the workspace so that we can later query, examine, and deploy this model.

In [None]:
# register model 
model = run.register_model(model_name='LogisticRegression', model_path='outputs/Logistic Regression.pkl')
print(model.name, model.id, model.version, sep='\t')

## Scoring in ADX

2 options for retrieving the model for scoring:
- serialize the model to a string to be stored in a standard table in ADX
- copy the model to a blob container (that was previously whitelisted for access by ADX Python sandbox)

### Download the model to local file

In [None]:
model_path = model.download(exist_ok=True)
model_path

### Scoring from serialized model which is stored in ADX table

Serializing the model and store it in ADX models table using KqlMagic

In [None]:
import datetime

models_tbl = 'ML_Models'
model_name = 'AML-Occupancy'

with open(model_path, 'rb') as handle:
    buf = handle.read()

smodel = buf.hex()
now = datetime.datetime.now()
dfm = pd.DataFrame({'name':[model_name], 'timestamp':[now], 'model':[smodel]})
dfm

In [None]:
set_query = '''
.set-or-append {0} <|
let tbl = dfm;
tbl
'''.format(models_tbl)
print(set_query)

In [None]:
%kql -query set_query

Scoring from serialized model which is stored in ADX table

In [None]:
# NOTE: we run ADX scoring query here using KqlMagic by embedding the query from Kusto Explorer
# with r'''Kusto Explorer query''':

scoring_from_table_query = r'''
let classify_sf=(samples:(*), models_tbl:(name:string, timestamp:datetime, model:string), model_name:string, features_cols:dynamic, pred_col:string)
{
    let model_str = toscalar(models_tbl | where name == model_name | top 1 by timestamp desc | project model);
    let kwargs = pack('smodel', model_str, 'features_cols', features_cols, 'pred_col', pred_col);
    let code =
    'import pickle\n'
    'import binascii\n'
    '\n'
    'smodel = kargs["smodel"]\n'
    'features_cols = kargs["features_cols"]\n'
    'pred_col = kargs["pred_col"]\n'
    'bmodel = binascii.unhexlify(smodel)\n'
    'clf1 = pickle.loads(bmodel)\n'
    'df1 = df[features_cols]\n'
    'predictions = clf1.predict(df1)\n'
    '\n'
    'result = df\n'
    'result[pred_col] = pd.DataFrame(predictions, columns=[pred_col])'
    '\n'
    ;
    samples | evaluate python(typeof(*), code, kwargs)
};
OccupancyDetection 
| where Test == 1
| extend pred_Occupancy=bool(0)
| invoke classify_sf(ML_Models, 'AML-Occupancy', pack_array('Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio'), 'pred_Occupancy')
| summarize n=count() by Occupancy, pred_Occupancy      //  confusion matrix
'''

In [None]:
%kql res << -query scoring_from_table_query
df = res.to_dataframe()
print('Confusion Matrix')
df

### Scoring from model which is stored in blob storage

Copy the model to blob

Note again that the blob container should be whitelisted to be accessible by ADX Python sandbox (see the appendix section of the __[python() plugin doc](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/pythonplugin?pivots=azuredataexplorer)__)

In [None]:
adx_storage_account = "Your Storage Account"
adx_container_name = "Your container"
model_blob_name = model_name + '.pkl'
adx_sas_token = "Your SAS Token for this container"

In [None]:
from azure.storage.blob import BlockBlobService  # v2.1
block_blob_service = BlockBlobService(account_name=adx_storage_account, sas_token=adx_sas_token)
block_blob_service.create_blob_from_path(adx_container_name, model_blob_name, model_path)

In [None]:
model_uri = f'https://{adx_storage_account}.blob.core.windows.net/{adx_container_name}/{model_blob_name}{adx_sas_token}'
model_uri

In [None]:
scoring_from_blob_query = r'''
let classify_sf=(samples:(*), model_sas:string, features_cols:dynamic, pred_col:string)
{
    let kwargs = pack('model_sas', model_sas, 'features_cols', features_cols, 'pred_col', pred_col);
    let code =
    '\n'
    'import pickle\n'
    '\n'
    'model_sas = kargs["model_sas"]\n'
    'features_cols = kargs["features_cols"]\n'
    'pred_col = kargs["pred_col"]\n'
    'with open("/Temp/model.pkl", "rb") as f:\n'
    '   bmodel = f.read()\n'
    'clf1 = pickle.loads(bmodel)\n'
    'df1 = df[features_cols]\n'
    'predictions = clf1.predict(df1)\n'
    '\n'
    'result = df\n'
    'result[pred_col] = pd.DataFrame(predictions, columns=[pred_col])'
    '\n'
    ;
    samples | evaluate python(typeof(*), code, kwargs,
        external_artifacts=pack('model.pkl', model_sas))
};
OccupancyDetection 
| where Test == 1
| extend pred_Occupancy=bool(0)
| invoke classify_sf('$model_uri$',
                     pack_array('Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio'), 'pred_Occupancy')
| summarize n=count() by Occupancy, pred_Occupancy      //  confusion matrix
'''

In [None]:
scoring_from_blob_query = scoring_from_blob_query.replace('$model_uri$', model_uri)

In [None]:
%kql res << -query scoring_from_blob_query
df = res.to_dataframe()
print('Confusion Matrix')
df

### Summary

In this tutorial you learned how to train a model in AML and then use ADX for scoring. This is a win-win scenario as:
* AML has the infrastructue for batch training that can be done on scalable compute nodes of misc. SKUs
* AML enables ML Ops - full management of ML workflow (including the training data, ML model selection, hyper parameters tuning etc.)
* ADX scoring is done near the data, on the same ADX compute nodes, enabling near real time processing of big amounts of new data. There is no the need to export the data to external scoring service and import back the results. Consequently, scoring architecture is simpler and performance is much faster and scalable