# Video Identification of Suspicious Behavior:  Train with Azure ML

This notebook will process your video data by:
* Utilize the data processed in the `Video Identification of Suspicious Behavior: Preparation`
* Load the training data
* Train the model against the training data using Azure ML to track run and manage model
* Generate predictions against the test data using this model
* Any suspicious activity in our videos?

The source data used in this notebook can be found at [EC Funded CAVIAR project/IST 2001 37540](http://homepages.inf.ed.ac.uk/rbf/CAVIAR/)

A related blog post is [here](https://databricks.com/blog/2018/09/13/identify-suspicious-behavior-in-video-with-databricks-runtime-for-machine-learning.html).

<img src="https://databricks.com/wp-content/uploads/2018/09/mnt_raela_video_splash.png" width=900/>

### Prerequisite
* Execute the `Video Identification of Suspicious Behavior: Preparation` to setup the images and feature datasets

### Cluster Configuration
* Suggested cluster configuration:
 * Databricks Runtime Version: `Databricks Runtime for ML` (e.g. 4.1 ML, 4.2 ML, etc.)
 * Driver: 64GB RAM Instance (e.g. `Azure: Standard_D16s_v3, AWS: r4.4xlarge`)
 * Workers: 2x 64GB RAM Instance (e.g. `Azure: Standard_D16s_v3, AWS: r4.4xlarge`)
 * Python: `Python 3`
 
### Need to install manually
To install, refer to **Upload a Python PyPI package or Python Egg** [Databricks](https://docs.databricks.com/user-guide/libraries.html#upload-a-python-pypi-package-or-python-egg) | [Azure Databricks](https://docs.azuredatabricks.net/user-guide/libraries.html#upload-a-python-pypi-package-or-python-egg)

* Python Libraries:
 * `opencv-python`: 3.4.2 
 
### Libraries Already Included in Databricks Runtime for ML
Because we're using *Databricks Runtime for ML*, you do **not** need to install the following libraires
* Python Libraries:
 * `h5py`: 2.7.1
 * `tensorflow`: 1.7.1
 * `keras`: 2.1.5 (Using TensorFlow backend)
 * *You can check by `import tensorflow as tf; print(tf.__version__)`*

* JARs:
 * `spark-deep-learning-1.0.0-spark2.3-s_2.11.jar`
 * `tensorframes-0.3.0-s_2.11.jar`
 * *You can check by reviewing cluster's Spark UI > Environment)*

In [2]:
import azureml.core
from azureml.core import Workspace
from azureml.core.run import Run
from azureml.core.experiment import Experiment

import numpy as np
import os
import shutil
#from haikunator import Haikunator

from pyspark.ml import PipelineModel
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import expr
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

# Please use the same as before - this is for experiment and resource names
MY_NAME = 'silinskiy'

In [3]:
config_name = "/mnt/azureml/" + MY_NAME + "_config.json"
print("Config file name: " + config_name)

# Contents to write - FILL IN
json_to_write = """
{
    "subscription_id": "f29420a1-6637-4252-bc28-67131742f1f7",
    "resource_group": "Axon-AI-Camp",
    "workspace_name": "mlsilinskiy"
}
"""
dbutils.fs.mkdirs("dbfs:/mnt/azureml")
dbutils.fs.put(config_name, json_to_write, overwrite=True)

In [4]:
ws = Workspace.from_config(path="/dbfs/" + config_name)

In [5]:
srcVideoPath = "/databricks-datasets/cctvVideos/train/"
srcTestVideoPath =  "/databricks-datasets/cctvVideos/test/"
targetImgPath = "/mnt/tardis6/videos/cctvFrames/train-" + MY_NAME + "/"
targetImgTestPath = "/mnt/tardis6/videos/cctvFrames/test-" + MY_NAME + "/"
imgFeaturesPath = "/mnt/tardis6/videos/cctv_features/train-" + MY_NAME + "/"
imgFeaturesTestPath = "/mnt/tardis6/videos/cctv_features/test-" + MY_NAME + "/"
labeledDataPath = "/databricks-datasets/cctvVideos/labels/"
srcVideoMP4Path = "/databricks-datasets/cctvVideos/mp4/train/"
srcTestVideoMP4Path = "/databricks-datasets/cctvVideos/mp4/test/"

# displayVid(): Shows video from mounted cloud storage
def displayVid(filepath):
  return displayHTML("""
  <video width="480" height="320" controls>
  <source src="/files/%s" type="video/mp4">
  </video>
  """ % filepath)

# displayDbfsVid(): Shows video from DBFS
def displayDbfsVid(filepath):
  return displayHTML("""
  <video width="480" height="320" controls>
  <source src="/dbfs/%s" type="video/mp4">
  </video>
  """ % filepath)

# displayImg(): Shows image from dbfs/cloud storage
def displayImg(filepath):
  dbutils.fs.cp(filepath, "FileStore/%s" % filepath)
  return displayHTML("""
  <img src="/files/%s">
  """ % filepath)

* Read the Parquet files previously generated containing the training dataset
* Read the hand labelled data

In [7]:
# Prefix to add prior to join
prefix = "dbfs:" + targetImgPath

# Read in hand-labeled data 
labels = spark.read.csv(labeledDataPath, header=True, inferSchema=True)
labels_df = labels.withColumn("filePath", expr("concat('" + prefix + "', ImageName)")).drop('ImageName')

# Read in features data (saved in Parquet format)
featureDF = spark.read.parquet(imgFeaturesPath)

# Create train-ing dataset by joining labels and features
train = featureDF.join(labels_df, featureDF.origin == labels_df.filePath).select("features", "label", featureDF.origin)

# Validate number of images used for training
train.count()

In [8]:
# Prefix to add prior to join
prefix = "dbfs:" + targetImgTestPath

# Read in hand-labeled data 
labels = spark.read.csv(labeledDataPath, header=True, inferSchema=True)
labels_df = labels.withColumn("filePath", expr("concat('" + prefix + "', ImageName)")).drop('ImageName')

# Read in features data (saved in Parquet format)
featureTestDF = spark.read.parquet(imgFeaturesTestPath)

# Create train-ing dataset by joining labels and features
test = featureTestDF.join(labels_df, featureTestDF.origin == labels_df.filePath).select("features", "label", featureTestDF.origin)

# Validate number of images used for training
test.count()

In [9]:
model_name = "suspicious_behavior.mml"
model_dbfs = os.path.join("/dbfs", model_name)
run_history_name = 'spark-ml-notebook'

# start a training run by defining an experiment
myexperiment = Experiment(ws, "test-for-camp-" + MY_NAME)
root_run = myexperiment.start_logging()

# Regularization Rates - 
regs = [0.0001, 0.001, 0.01, 0.1]

In [10]:
# try a bunch of regularization rate in a Logistic Regression model
for reg in regs:
  print("Regularization rate: {}".format(reg))
  # create a bunch of child runs
  with root_run.child_run("reg-" + str(reg)) as run:
    # Fit LogisticRegression Model
    lr = LogisticRegression(maxIter=20, regParam=reg, elasticNetParam=0.3, labelCol="label")
    lrModel = lr.fit(train)
    
    # Predict on test dataset extracted features
    pred = lrModel.transform(test)
    
    # Evaluate. note only 2 metrics are supported out of the box by Spark ML.
    bce = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
    au_roc = bce.setMetricName('areaUnderROC').evaluate(pred)
    au_prc = bce.setMetricName('areaUnderPR').evaluate(pred)
    
    print("Area under ROC: {}".format(au_roc))
    print("Area Under PR: {}".format(au_prc))
    
    # log reg, au_roc, au_prc and feature names in run history
    run.log("reg", reg)
    run.log("au_roc", au_roc)
    run.log("au_prc", au_prc)
    run.log_list("columns", train.columns)
    
    # save model locally
    lrModel.write().overwrite().save(model_name)
    
    # upload the serialized model into run history record
    mdl, ext = model_name.split(".")
    model_zip = mdl + ".zip"
    shutil.make_archive(mdl, 'zip', model_dbfs)
    run.upload_file("outputs/" + model_name, model_zip)
    
    # now delete the serialized model from local folder since it is already uploaded to run history 
    shutil.rmtree(model_dbfs)
    os.remove(model_zip)

# Declare run completed
root_run.complete()
root_run_id = root_run.id
print ("run id:", root_run.id)

In [11]:
metrics = root_run.get_metrics(recursive=True)
best_run_id = max(metrics, key = lambda k: metrics[k]['au_roc'])
print(best_run_id, metrics[best_run_id]['au_roc'], metrics[best_run_id]['reg'])

child_runs = {}

for r in root_run.get_children():
    child_runs[r.id] = r
best_run = child_runs[best_run_id]

In [12]:
# Download the model from the best run to a local folder
best_model_file_name = "best_model.zip"
best_run.download_file(name='outputs/' + model_name, output_file_path=best_model_file_name)

# Unzip model file
if os.path.isfile(model_dbfs) or os.path.isdir(model_dbfs):
    shutil.rmtree(model_dbfs)
shutil.unpack_archive(best_model_file_name, model_dbfs)

# Load model
#lrModel = PipelineModel.load(model_name)
lrModel = LogisticRegressionModel.read().load(model_name)

In [13]:
# Generate predictions on test data
result = lrModel.transform(featureTestDF)
# Create a local table from a dataframe
result.createOrReplaceTempView("result")

In [14]:
# Extract first and second elements of the StructType
firstelement=udf(lambda v:float(v[0]),FloatType())
secondelement=udf(lambda v:float(v[1]),FloatType())

# Second element is what we need for probability
predictions = result.withColumn("prob2", secondelement('probability'))
predictions.createOrReplaceTempView("predictions")

In [15]:
%sql
select origin, probability, prob2, prediction from predictions where prediction = 1  order by prob2 desc

origin,probability,prob2,prediction
dbfs:/mnt/tardis6/videos/cctvFrames/test-silinskiy/Browse1frame0028.jpg,"List(1, 2, List(), List(2.8642088700417144E-37, 1.0))",1.0,1.0
dbfs:/mnt/tardis6/videos/cctvFrames/test-silinskiy/Fight_OneManDownframe0024.jpg,"List(1, 2, List(), List(4.0414734478555E-9, 0.9999999959585266))",1.0,1.0
dbfs:/mnt/tardis6/videos/cctvFrames/test-silinskiy/Browse1frame0035.jpg,"List(1, 2, List(), List(4.138311390962073E-6, 0.999995861688609))",0.9999959,1.0
dbfs:/mnt/tardis6/videos/cctvFrames/test-silinskiy/Fight_OneManDownframe0019.jpg,"List(1, 2, List(), List(2.6386266324274145E-5, 0.9999736137336758))",0.9999736,1.0
dbfs:/mnt/tardis6/videos/cctvFrames/test-silinskiy/LeftBoxframe0027.jpg,"List(1, 2, List(), List(2.6863909282152595E-4, 0.9997313609071785))",0.99973136,1.0
dbfs:/mnt/tardis6/videos/cctvFrames/test-silinskiy/Rest_SlumpOnFloorframe0033.jpg,"List(1, 2, List(), List(0.0010216157067265824, 0.9989783842932735))",0.9989784,1.0
dbfs:/mnt/tardis6/videos/cctvFrames/test-silinskiy/Rest_SlumpOnFloorframe0035.jpg,"List(1, 2, List(), List(0.001361066830799293, 0.9986389331692008))",0.9986389,1.0
dbfs:/mnt/tardis6/videos/cctvFrames/test-silinskiy/Rest_SlumpOnFloorframe0027.jpg,"List(1, 2, List(), List(0.0017702536912118758, 0.9982297463087881))",0.99822974,1.0
dbfs:/mnt/tardis6/videos/cctvFrames/test-silinskiy/Rest_SlumpOnFloorframe0025.jpg,"List(1, 2, List(), List(0.00208178739733657, 0.9979182126026634))",0.9979182,1.0
dbfs:/mnt/tardis6/videos/cctvFrames/test-silinskiy/LeftBoxframe0033.jpg,"List(1, 2, List(), List(0.003795366577071483, 0.9962046334229285))",0.9962046,1.0


View the top three most suspicious images based on `prob2` column

In [17]:
displayImg(f"/mnt/tardis6/videos/cctvFrames/test-{MY_NAME}/Fight_OneManDownframe0024.jpg")

In [18]:
displayImg(f"/mnt/tardis6/videos/cctvFrames/test-{MY_NAME}/Fight_OneManDownframe0014.jpg")

In [19]:
displayImg(f"/mnt/tardis6/videos/cctvFrames/test-{MY_NAME}/Fight_OneManDownframe0017.jpg")

## View the Source Video
View the source video of the suspicious images

![](https://s3.us-east-2.amazonaws.com/databricks-dennylee/media/Fight_OneManDown.gif)