In [None]:
# Installing Java and spark 
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install pyspark==2.4.7

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"


0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu bionic-security InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Waiting for headers] [Co                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Connecting to ppa.launch0% [1 InRelease gpgv 88.7 kB] [Connecting to archive.ubuntu.com (91.189.88.152)                                                                               Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 88.7 kB] [Connecting to archive.ubuntu.com (91.189.88.152)                                                                               Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Connecting to ppa.

In [None]:
# Mount the cloud folder for data file storage
from google.colab import drive
drive.mount('/content/gdrive')



Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
# path to csv file
csvFile_path = "/content/gdrive/My Drive/Colab Notebooks/shootings.csv"

In [None]:
from pyspark import SparkContext
sc = SparkContext("local", "First App")

In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [None]:
# Libraries
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, MinMaxScaler, VectorAssembler

from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [None]:
df = sqlContext.read.csv(csvFile_path , header = True)

In [None]:
df.printSchema()
df.show(5)

In [None]:
recsys_df = df.select(['manner_of_death', 'armed', 'age', 'gender', 'race', 'state', 'signs_of_mental_illness', 'threat_level', 'flee', 'body_camera', 'arms_category'])

In [None]:
recsys_df.show(5)

+----------------+----------+----+------+--------+-----+-----------------------+------------+-----------+-----------+--------------------+
| manner_of_death|     armed| age|gender|    race|state|signs_of_mental_illness|threat_level|       flee|body_camera|       arms_category|
+----------------+----------+----+------+--------+-----+-----------------------+------------+-----------+-----------+--------------------+
|            shot|       gun|53.0|     M|   Asian|   WA|                   True|      attack|Not fleeing|      False|                Guns|
|            shot|       gun|47.0|     M|   White|   OR|                  False|      attack|Not fleeing|      False|                Guns|
|shot and Tasered|   unarmed|23.0|     M|Hispanic|   KS|                  False|       other|Not fleeing|      False|             Unarmed|
|            shot|toy weapon|32.0|     M|   White|   CA|                   True|      attack|Not fleeing|      False|Other unusual obj...|
|            shot|  nail gu

# Task I.1: Exploratory data analysis 
 

•	telling its number of rows and columns, 

•	doing the data cleaning (missing values or duplicated records) if necessary 


In [None]:
# converting categorical data to integer

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(recsys_df.columns)-set(['age'])) ]
assembler = VectorAssembler(inputCols=['manner_of_death_index', 'armed_index', 'gender_index', 'state_index', 'signs_of_mental_illness_index', 'threat_level_index', 'flee_index', 'body_camera_index', 'arms_category_index'], outputCol='features')
scaler = MinMaxScaler(inputCol='features', outputCol='scaled')
data_pipeline = Pipeline().setStages(indexers +[assembler, scaler])
data = data_pipeline.fit(recsys_df).transform(recsys_df)

data.show(5)

+----------------+----------+----+------+--------+-----+-----------------------+------------+-----------+-----------+--------------------+-----------------+---------------------+-----------------------------+-------------------+----------+----------+------------+-----------+------------------+-----------+--------------------+--------------------+
| manner_of_death|     armed| age|gender|    race|state|signs_of_mental_illness|threat_level|       flee|body_camera|       arms_category|body_camera_index|manner_of_death_index|signs_of_mental_illness_index|arms_category_index|race_index|flee_index|gender_index|armed_index|threat_level_index|state_index|            features|              scaled|
+----------------+----------+----+------+--------+-----+-----------------------+------------+-----------+-----------+--------------------+-----------------+---------------------+-----------------------------+-------------------+----------+----------+------------+-----------+------------------+--------

##GroupB
###Recommendation engine
•	Model training and predictions 

•	Model evaluation using MSE

In [None]:
# train and test dataset split
train, test = data.randomSplit([0.7, 0.3])

In [None]:
# Recommender System

als_rec = ALS(userCol='race_index', itemCol='state_index', ratingCol='manner_of_death_index')
rec_model = als_rec.fit(train)

In [None]:
rec_predict = rec_model.transform(test)
# rec_predict.show(3)

In [None]:
# Recommender System Evaluation
evaluator = RegressionEvaluator(metricName='mse', labelCol='race_index', predictionCol='prediction')
evaluator.evaluate(rec_predict)

1.650200266889782

# section C
### Classification  

•	Logistic Regression model training 

•	Model evaluation 

In [None]:
# Logistic Regression
logreg_clf = LogisticRegression(featuresCol = 'features', labelCol='race_index')
model = logreg_clf.fit(train)

In [None]:
logreg_predict = model.transform(test)

In [None]:
# Logistic Regression Evaluation
evaluator = MulticlassClassificationEvaluator(labelCol='race_index', predictionCol="prediction")
evaluator.evaluate(logreg_predict)

0.40792297694056273