In [1]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('1').getOrCreate()

In [2]:
# Importing data.
df = spark.read.csv('dataset/ri_statewide_2019_02_25.csv', header=True, inferSchema=True)

In [3]:
#Intial look at the data
df.show()

+--------------+----------+--------+----+------------+-----------+-------------+---------+-----------+---------------+--------------+--------+----------------+----------------+------------------+------------------+----------------+---------------+----------------+------------+-----------------+--------------------+------------+-------------+
+--------------+----------+--------+----+------------+-----------+-------------+---------+-----------+---------------+--------------+--------+----------------+----------------+------------------+------------------+----------------+---------------+----------------+------------+-----------------+--------------------+------------+-------------+
|             1|2005-11-22|11:15:00|  X3|       white|       male|          200|vehicular|      FALSE|           TRUE|         FALSE|citation|              NA|              NA|                NA|                NA|           false|          FALSE|           FALSE|          NA|               NA|            Speed

In [4]:
#Print the schema
df.printSchema()

root
 |-- raw_row_number: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- zone: string (nullable = true)
 |-- subject_race: string (nullable = true)
 |-- subject_sex: string (nullable = true)
 |-- department_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- arrest_made: string (nullable = true)
 |-- citation_issued: string (nullable = true)
 |-- outcome: string (nullable = true)
 |-- contraband_found: string (nullable = true)
 |-- contraband_drugs: string (nullable = true)
 |-- contraband_weapons: string (nullable = true)
 |-- contraband_alcohol: string (nullable = true)
 |-- contraband_other: boolean (nullable = true)
 |-- frisk_performed: string (nullable = true)
 |-- search_conducted: string (nullable = true)
 |-- search_basis: string (nullable = true)
 |-- reason_for_search: string (nullable = true)
 |-- reason_for_stop: string (nullable = true)
 |-- vehicle_make: string (nullable = true)
 |-- vehicle_model:

In [5]:
#Total number of values
df.count()

509681

In [6]:
df.groupBy("subject_sex").count().show()

+-----------+------+
|subject_sex| count|
+-----------+------+
|         NA| 29097|
|     female|131138|
|       male|349446|
+-----------+------+



In [7]:
df.groupBy(["zone", "subject_sex"]).count().show()

+----+-----------+-----+
|zone|subject_sex|count|
+----+-----------+-----+
|  X3|     female|26653|
|  X3|         NA| 4627|
|  X3|       male|62778|
|  X4|         NA| 9679|
|  X4|       male|94953|
|  K3|     female|29097|
|  X1|         NA| 3491|
|  X1|     female| 2702|
|  K3|       male|79771|
|  X1|       male|10522|
|  K1|         NA| 2252|
|  K2|     female|28114|
|  K1|       male|32255|
|  NA|         NA|   10|
|  K3|         NA| 4916|
|  K1|     female|13855|
|  K2|         NA| 4122|
|  X4|     female|30717|
|  K2|       male|69167|
+----+-----------+-----+



In [8]:
df.groupBy(["subject_sex", "reason_for_stop"]).count().show()

+-----------+--------------------+------+
|subject_sex|     reason_for_stop| count|
+-----------+--------------------+------+
|     female|Other Traffic Vio...| 17911|
|       male|Special Detail/Di...| 12977|
|     female|Equipment/Inspect...| 14039|
|     female|                 APB|   109|
|     female|Violation of City...|   216|
|         NA|    Call for Service|     4|
|         NA|Equipment/Inspect...|     2|
|       male|    Call for Service|  5237|
|       male|Registration Viol...| 14181|
|       male|            Speeding|182538|
|         NA|            Speeding|     8|
|       male|Motorist Assist/C...|   657|
|       male|   Suspicious Person|   268|
|     female|  Seatbelt Violation|  3550|
|     female|Registration Viol...|  5649|
|       male|Other Traffic Vio...| 72317|
|         NA|                  NA| 29073|
|     female|   Suspicious Person|    74|
|         NA|  Seatbelt Violation|     3|
|       male|                 APB|   376|
+-----------+--------------------+

In [9]:
#Drop the rows from which subject_sex is missing
#Regular dropna wasn't working cuz data is a String "NA"
df = df.filter(df.subject_sex.endswith('ale'))

In [10]:
df.groupBy("subject_sex").count().show()

+-----------+------+
|subject_sex| count|
+-----------+------+
|     female|131138|
|       male|349446|
+-----------+------+



In [11]:
df.groupBy("contraband_found").count().show()

+----------------+------+
|contraband_found| count|
+----------------+------+
|           FALSE| 11183|
|              NA|462822|
|            TRUE|  6579|
+----------------+------+



In [12]:
#total number of values in data BEFORE dropping duplicates
df.count()

480584

In [13]:
#dropping duplicates from data
df = df.dropDuplicates()

In [14]:
#total number of values in data AFTER dropping duplicates
df.count()

480584

In [15]:
#For checking what to drop
df.printSchema()

root
 |-- raw_row_number: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- zone: string (nullable = true)
 |-- subject_race: string (nullable = true)
 |-- subject_sex: string (nullable = true)
 |-- department_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- arrest_made: string (nullable = true)
 |-- citation_issued: string (nullable = true)
 |-- outcome: string (nullable = true)
 |-- contraband_found: string (nullable = true)
 |-- contraband_drugs: string (nullable = true)
 |-- contraband_weapons: string (nullable = true)
 |-- contraband_alcohol: string (nullable = true)
 |-- contraband_other: boolean (nullable = true)
 |-- frisk_performed: string (nullable = true)
 |-- search_conducted: string (nullable = true)
 |-- search_basis: string (nullable = true)
 |-- reason_for_search: string (nullable = true)
 |-- reason_for_stop: string (nullable = true)
 |-- vehicle_make: string (nullable = true)
 |-- vehicle_model:

In [16]:
#Drop Prim keys from data
df = df.drop("raw_row_number", "department_id")

In [17]:
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- zone: string (nullable = true)
 |-- subject_race: string (nullable = true)
 |-- subject_sex: string (nullable = true)
 |-- type: string (nullable = true)
 |-- arrest_made: string (nullable = true)
 |-- citation_issued: string (nullable = true)
 |-- outcome: string (nullable = true)
 |-- contraband_found: string (nullable = true)
 |-- contraband_drugs: string (nullable = true)
 |-- contraband_weapons: string (nullable = true)
 |-- contraband_alcohol: string (nullable = true)
 |-- contraband_other: boolean (nullable = true)
 |-- frisk_performed: string (nullable = true)
 |-- search_conducted: string (nullable = true)
 |-- search_basis: string (nullable = true)
 |-- reason_for_search: string (nullable = true)
 |-- reason_for_stop: string (nullable = true)
 |-- vehicle_make: string (nullable = true)
 |-- vehicle_model: string (nullable = true)



In [18]:
#Check feature type
df.groupBy("type").count().show()

+---------+------+
|     type| count|
+---------+------+
|vehicular|480584|
+---------+------+



In [19]:
#Drop type feature from data, since it has only one value
df = df.drop('type')

In [20]:
#Checking if type was dropped
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- zone: string (nullable = true)
 |-- subject_race: string (nullable = true)
 |-- subject_sex: string (nullable = true)
 |-- arrest_made: string (nullable = true)
 |-- citation_issued: string (nullable = true)
 |-- outcome: string (nullable = true)
 |-- contraband_found: string (nullable = true)
 |-- contraband_drugs: string (nullable = true)
 |-- contraband_weapons: string (nullable = true)
 |-- contraband_alcohol: string (nullable = true)
 |-- contraband_other: boolean (nullable = true)
 |-- frisk_performed: string (nullable = true)
 |-- search_conducted: string (nullable = true)
 |-- search_basis: string (nullable = true)
 |-- reason_for_search: string (nullable = true)
 |-- reason_for_stop: string (nullable = true)
 |-- vehicle_make: string (nullable = true)
 |-- vehicle_model: string (nullable = true)



In [21]:
df.groupBy("subject_race").count().show()

+--------------------+------+
|        subject_race| count|
+--------------------+------+
|               white|344716|
|               black| 68577|
|            hispanic| 53123|
|       other/unknown|  1344|
|asian/pacific isl...| 12824|
+--------------------+------+



In [22]:
#NA Values in some fields
df.groupBy("search_conducted").count().show()

+----------------+------+
|search_conducted| count|
+----------------+------+
|           FALSE|462822|
|            TRUE| 17762|
+----------------+------+



In [23]:
#NA Values in some fields
df.groupBy("contraband_found").count().show()

+----------------+------+
|contraband_found| count|
+----------------+------+
|           FALSE| 11183|
|              NA|462822|
|            TRUE|  6579|
+----------------+------+



In [24]:
from pyspark.sql.functions import when
#Create a new column for contraband_found 
df = df.withColumn("contraband_found_resolved", when(df.contraband_found == "NA", 0).otherwise(1))

In [25]:
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- zone: string (nullable = true)
 |-- subject_race: string (nullable = true)
 |-- subject_sex: string (nullable = true)
 |-- arrest_made: string (nullable = true)
 |-- citation_issued: string (nullable = true)
 |-- outcome: string (nullable = true)
 |-- contraband_found: string (nullable = true)
 |-- contraband_drugs: string (nullable = true)
 |-- contraband_weapons: string (nullable = true)
 |-- contraband_alcohol: string (nullable = true)
 |-- contraband_other: boolean (nullable = true)
 |-- frisk_performed: string (nullable = true)
 |-- search_conducted: string (nullable = true)
 |-- search_basis: string (nullable = true)
 |-- reason_for_search: string (nullable = true)
 |-- reason_for_stop: string (nullable = true)
 |-- vehicle_make: string (nullable = true)
 |-- vehicle_model: string (nullable = true)
 |-- contraband_found_resolved: integer (nullable = false)



In [26]:
#Check newly created column
df.groupBy("contraband_found_resolved").count().show()

+-------------------------+------+
|contraband_found_resolved| count|
+-------------------------+------+
|                        1| 17762|
|                        0|462822|
+-------------------------+------+



In [27]:
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- zone: string (nullable = true)
 |-- subject_race: string (nullable = true)
 |-- subject_sex: string (nullable = true)
 |-- arrest_made: string (nullable = true)
 |-- citation_issued: string (nullable = true)
 |-- outcome: string (nullable = true)
 |-- contraband_found: string (nullable = true)
 |-- contraband_drugs: string (nullable = true)
 |-- contraband_weapons: string (nullable = true)
 |-- contraband_alcohol: string (nullable = true)
 |-- contraband_other: boolean (nullable = true)
 |-- frisk_performed: string (nullable = true)
 |-- search_conducted: string (nullable = true)
 |-- search_basis: string (nullable = true)
 |-- reason_for_search: string (nullable = true)
 |-- reason_for_stop: string (nullable = true)
 |-- vehicle_make: string (nullable = true)
 |-- vehicle_model: string (nullable = true)
 |-- contraband_found_resolved: integer (nullable = false)



In [28]:
#import split function
from pyspark.sql.functions import split



In [29]:
#splitting date into different columns for LR
from pyspark.sql.types import IntegerType
split_date = split(df['Date'], '-')     
df = df.withColumn('Year', split_date.getItem(0).cast(IntegerType()))
df= df.withColumn('Month', split_date.getItem(1).cast(IntegerType()))
df= df.withColumn('Day', split_date.getItem(2).cast(IntegerType()))

In [30]:
#splitting time into different columns for modeling

split_time = split(df['time'], '-')     
df = df.withColumn('Hour', split_date.getItem(0).cast(IntegerType()))
df= df.withColumn('Minute', split_date.getItem(1).cast(IntegerType()))
df= df.withColumn('Second', split_date.getItem(2).cast(IntegerType()))

In [31]:
#Check seconds column
df.groupBy("Second").count().show()

+------+-----+
|Second|count|
+------+-----+
|    31| 9308|
|    28|15712|
|    26|15435|
|    27|15113|
|    12|15510|
|    22|15733|
|     1|15291|
|    13|15791|
|    16|15617|
|     6|16072|
|     3|15698|
|    20|15869|
|     5|16197|
|    19|15579|
|    15|15634|
|    17|15228|
|     9|16208|
|     4|16195|
|     8|16034|
|    23|16010|
+------+-----+
only showing top 20 rows



In [32]:
#from pyspark.sql.functions import to_date, to_utc_timestamp


#Creating a date time 
#df = df.select(to_date(df.date).alias('date'), 'zone', 'subject_race', 'subject_sex', 'arrest_made', 'citation_issued'
#              , 'warning_issued', 'outcome', 'contraband_found', 'contraband_drugs', 'contraband_weapons', 'contraband_alcohol'
#               , 'contraband_other', 'frisk_performed', 'search_conducted', 'search_basis', 'reason_for_search'
#               , 'reason_for_stop', 'vehicle_make', 'vehicle_model', 'contraband_found_resolved')



#New Discovery, dates can't be used in modeling in Spark

In [33]:
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- zone: string (nullable = true)
 |-- subject_race: string (nullable = true)
 |-- subject_sex: string (nullable = true)
 |-- arrest_made: string (nullable = true)
 |-- citation_issued: string (nullable = true)
 |-- outcome: string (nullable = true)
 |-- contraband_found: string (nullable = true)
 |-- contraband_drugs: string (nullable = true)
 |-- contraband_weapons: string (nullable = true)
 |-- contraband_alcohol: string (nullable = true)
 |-- contraband_other: boolean (nullable = true)
 |-- frisk_performed: string (nullable = true)
 |-- search_conducted: string (nullable = true)
 |-- search_basis: string (nullable = true)
 |-- reason_for_search: string (nullable = true)
 |-- reason_for_stop: string (nullable = true)
 |-- vehicle_make: string (nullable = true)
 |-- vehicle_model: string (nullable = true)
 |-- contraband_found_resolved: integer (nullable = false)
 |-- Year: integer (nullable = true)
 |-- M

In [34]:
#Check year column
df.groupBy("Year").count().show()

+----+-----+
|Year|count|
+----+-----+
|2007|50139|
|2015|45199|
|2006|55552|
|2013|41922|
|2014|48878|
|2012|57625|
|2009|39820|
|2005|13809|
|2010|39440|
|2011|42630|
|2008|45570|
+----+-----+



In [35]:
#Check month column
df.groupBy("Month").count().show()

+-----+-----+
|Month|count|
+-----+-----+
|   12|37648|
|    1|43810|
|    6|39448|
|    3|41310|
|    5|41375|
|    9|39228|
|    4|39535|
|    8|39654|
|    7|39896|
|   10|40947|
|   11|41588|
|    2|36145|
+-----+-----+



In [36]:
#Check day column
df.groupBy("Day").count().show()

+---+-----+
|Day|count|
+---+-----+
| 31| 9308|
| 28|15712|
| 26|15435|
| 27|15113|
| 12|15510|
| 22|15733|
|  1|15291|
| 13|15791|
| 16|15617|
|  6|16072|
|  3|15698|
| 20|15869|
|  5|16197|
| 19|15579|
| 15|15634|
| 17|15228|
|  9|16208|
|  4|16195|
|  8|16034|
| 23|16010|
+---+-----+
only showing top 20 rows



In [37]:
#Check outcome column
df.groupBy("outcome").count().show()

+--------+------+
| outcome| count|
+--------+------+
|      NA|  6763|
|citation|428378|
|  arrest| 16603|
+--------+------+



In [38]:
df = df.drop('outcome')

In [39]:
#Drop features linearly dependent
df = df.drop('contraband_drugs', 'contraband_weapons', 'contraband_alcohol', 'contraband_other')

In [40]:
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- zone: string (nullable = true)
 |-- subject_race: string (nullable = true)
 |-- subject_sex: string (nullable = true)
 |-- arrest_made: string (nullable = true)
 |-- citation_issued: string (nullable = true)
 |-- contraband_found: string (nullable = true)
 |-- frisk_performed: string (nullable = true)
 |-- search_conducted: string (nullable = true)
 |-- search_basis: string (nullable = true)
 |-- reason_for_search: string (nullable = true)
 |-- reason_for_stop: string (nullable = true)
 |-- vehicle_make: string (nullable = true)
 |-- vehicle_model: string (nullable = true)
 |-- contraband_found_resolved: integer (nullable = false)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Minute: integer (nullable = true)
 |-- Second: integer (nullable = true)



In [41]:
#Drop features not known prior to making the stop
df = df.drop('arrest_made', 'frisk_performed', 'search_basis', 'reason_for_search', 'reason_for_stop')

In [42]:
#Drop features not known prior to making the stop
df = df.drop('warning_issued', 'citation_issued')

In [43]:
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- zone: string (nullable = true)
 |-- subject_race: string (nullable = true)
 |-- subject_sex: string (nullable = true)
 |-- contraband_found: string (nullable = true)
 |-- search_conducted: string (nullable = true)
 |-- vehicle_make: string (nullable = true)
 |-- vehicle_model: string (nullable = true)
 |-- contraband_found_resolved: integer (nullable = false)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Minute: integer (nullable = true)
 |-- Second: integer (nullable = true)



In [44]:
#Check vehicle make column
df.groupBy("vehicle_make").count().show()

+------------+------+
|vehicle_make| count|
+------------+------+
|        MERK|     1|
|          PC|     1|
|        DODG| 14170|
|        MASE|    30|
|        PEUG|     1|
|        DATS|     7|
|        EAGL|     1|
|        STLG|    93|
|        FRUE|     9|
|        LINC|  2659|
|        INTE|     1|
|        AMER|    15|
|        MERZ|  3159|
|        TRIM|     1|
|        AMGN|    71|
|        FIAT|    44|
|        ISUZ|     1|
|          NA|162525|
|        CHRY|  7550|
|        TRIU|    18|
+------------+------+
only showing top 20 rows



In [45]:
from pyspark.sql.functions import countDistinct, avg, stddev
df.select(countDistinct("vehicle_make").alias("Distinct vehicle makes")).show()

+----------------------+
|Distinct vehicle makes|
+----------------------+
|                    97|
+----------------------+



In [46]:
#Check vehicle model column
df.groupBy("vehicle_model").count().show()

+-------------+-----+
|vehicle_model|count|
+-------------+-----+
|        ASTRO|   92|
|          MDX|  590|
|       CHR300|  123|
|          BOX|   43|
|           LT|    7|
|         545I|   30|
|      MAZDA3I|    4|
|     6 TOURIN|    2|
|     LE SABLE|    6|
|    GEO PRIZM|    5|
|          E.S|    1|
|    TL S-TYPE|    1|
|          ETK|    1|
|     TEMPO GL|    4|
|         PROS|    5|
|      LLUMINA|    3|
|    ENTOURAGE|   14|
| LEGACY WAGON|    2|
|     6 SERIES|    4|
|          ...|   15|
+-------------+-----+
only showing top 20 rows



In [47]:
from pyspark.sql.functions import countDistinct, avg, stddev
df.select(countDistinct("vehicle_model").alias("Distinct vehicle models")).show()

+-----------------------+
|Distinct vehicle models|
+-----------------------+
|                   9184|
+-----------------------+



In [48]:
#Drop vehicle model
df = df.drop('vehicle_model')

In [49]:
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- zone: string (nullable = true)
 |-- subject_race: string (nullable = true)
 |-- subject_sex: string (nullable = true)
 |-- contraband_found: string (nullable = true)
 |-- search_conducted: string (nullable = true)
 |-- vehicle_make: string (nullable = true)
 |-- contraband_found_resolved: integer (nullable = false)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Minute: integer (nullable = true)
 |-- Second: integer (nullable = true)



In [50]:
#NA Values in some fields
df.groupBy("search_conducted").count().show()

+----------------+------+
|search_conducted| count|
+----------------+------+
|           FALSE|462822|
|            TRUE| 17762|
+----------------+------+



In [51]:
from pyspark.sql.functions import when
#Create a new column for contraband_found 
df = df.withColumn("search_conducted", when(df.search_conducted == "FALSE", 0).otherwise(1))

In [52]:
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- zone: string (nullable = true)
 |-- subject_race: string (nullable = true)
 |-- subject_sex: string (nullable = true)
 |-- contraband_found: string (nullable = true)
 |-- search_conducted: integer (nullable = false)
 |-- vehicle_make: string (nullable = true)
 |-- contraband_found_resolved: integer (nullable = false)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Minute: integer (nullable = true)
 |-- Second: integer (nullable = true)



In [53]:
from pyspark.sql.functions import when
#Create a new column for contraband_found 
df = df.withColumn("contraband_found", when(df.contraband_found == "FALSE", 0).otherwise(1))

In [54]:
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- zone: string (nullable = true)
 |-- subject_race: string (nullable = true)
 |-- subject_sex: string (nullable = true)
 |-- contraband_found: integer (nullable = false)
 |-- search_conducted: integer (nullable = false)
 |-- vehicle_make: string (nullable = true)
 |-- contraband_found_resolved: integer (nullable = false)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Minute: integer (nullable = true)
 |-- Second: integer (nullable = true)



In [55]:
#Imports for assembler, encoder, indexer
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
                                OneHotEncoder,StringIndexer)

In [56]:
#Working with categorical data

# Create a string indexer (convert every string into a number, such as male = 0 and female = 1).
# A number will be assigned to every category in the column.
gender_indexer = StringIndexer(inputCol='subject_sex',outputCol='SexIndex')

# Now we can one hot encode these numbers. This converts the various outputs into a single vector.
gender_encoder = OneHotEncoder(inputCol='SexIndex',outputCol='SexVec')

#Similar to the above.
race_indexer = StringIndexer(inputCol='subject_race',outputCol='raceIndex')
race_encoder = OneHotEncoder(inputCol='raceIndex',outputCol='raceVec')

#Similar to the above.
zone_indexer = StringIndexer(inputCol='zone',outputCol='zoneIndex')
zone_encoder = OneHotEncoder(inputCol='zoneIndex',outputCol='zoneVec')

#Similar to the above.
vehicle_indexer = StringIndexer(inputCol='vehicle_make',outputCol='vehicleIndex')
vehicle_encoder = OneHotEncoder(inputCol='vehicleIndex',outputCol='vehicleVec')

In [57]:
# Now we can assemble all of this as one vector in the features column. 
assembler = VectorAssembler(inputCols=['Year', 'Month', 'Day', 'Hour', 'Minute', 'Second', 
 'SexVec',
 'zoneVec',
 'search_conducted',
 'vehicleVec',                                      
 'raceVec'],outputCol='features')

In [58]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [59]:
log_reg = LogisticRegression(featuresCol='features',labelCol='contraband_found')

In [60]:
# Pipeline for complex workflow
pipeline = Pipeline(stages=[gender_indexer,race_indexer, zone_indexer, vehicle_indexer, 
                           gender_encoder,race_encoder, zone_encoder, vehicle_encoder,
                           assembler,log_reg])

In [61]:
# Train/test split. 
train_data, test_data = df.randomSplit([0.7,.3])

In [62]:
#modeling
fit_model = pipeline.fit(train_data)

In [63]:
# Transform test data. 
results = fit_model.transform(test_data)

In [64]:
# Evaluate the model using the binary classifer.
from pyspark.ml.evaluation import BinaryClassificationEvaluator

my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='contraband_found')

# If we select the actual and predicted results, we can see that some predictions were correct while others were wrong.
results.select('contraband_found','prediction').show()



+----------------+----------+
|contraband_found|prediction|
+----------------+----------+
|               1|       1.0|
|               1|       1.0|
|               1|       1.0|
|               1|       1.0|
|               1|       1.0|
|               1|       1.0|
|               1|       1.0|
|               1|       1.0|
|               1|       1.0|
|               1|       1.0|
|               1|       1.0|
|               1|       1.0|
|               1|       1.0|
|               1|       1.0|
|               1|       1.0|
|               0|       1.0|
|               1|       1.0|
|               1|       1.0|
|               1|       1.0|
|               1|       1.0|
+----------------+----------+
only showing top 20 rows



In [65]:
# We can then evaluate using AUC (area under the curve). AUC is linked to ROC.
AUC = my_eval.evaluate(results)

AUC

Py4JJavaError: An error occurred while calling o438.evaluate.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4884.0 failed 1 times, most recent failure: Lost task 4.0 in stage 4884.0 (TID 336829, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$4: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Unseen label: TOYO.
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:170)
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:166)
	... 15 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:266)
	at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:128)
	at org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:62)
	at org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:61)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.OrderedRDDFunctions.sortByKey(OrderedRDDFunctions.scala:61)
	at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$4$lzycompute(BinaryClassificationMetrics.scala:155)
	at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$4(BinaryClassificationMetrics.scala:146)
	at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions$lzycompute(BinaryClassificationMetrics.scala:148)
	at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions(BinaryClassificationMetrics.scala:148)
	at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.createCurve(BinaryClassificationMetrics.scala:223)
	at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.roc(BinaryClassificationMetrics.scala:86)
	at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.areaUnderROC(BinaryClassificationMetrics.scala:97)
	at org.apache.spark.ml.evaluation.BinaryClassificationEvaluator.evaluate(BinaryClassificationEvaluator.scala:87)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$4: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.SparkException: Unseen label: TOYO.
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:170)
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:166)
	... 15 more
