In [1]:
import os
import atexit
import sys

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
import findspark
from sparkhpc import sparkjob

#Exit handler to clean up the Spark cluster if the script exits or crashes
def exitHandler(sj,sc):
    try:
        print('Trapped Exit cleaning up Spark Context')
        sc.stop()
    except:
        pass
    try:
        print('Trapped Exit cleaning up Spark Job')
        sj.stop()
    except:
        pass

findspark.init()

#Parameters for the Spark cluster
nodes=2
tasks_per_node=3 
memory_per_task=1024 #1 gig per process, adjust accordingly
# Please estimate walltime carefully to keep unused Spark clusters from sitting 
# idle so that others may use the resources.
walltime="1:00" #1 hour
os.environ['SBATCH_PARTITION']='single' #Set the appropriate ARC partition

sj = sparkjob.sparkjob(
     ncores=nodes*tasks_per_node,
     cores_per_executor=tasks_per_node,
     memory_per_core=memory_per_task,
     walltime=walltime
    )

sj.wait_to_start()
sc = sj.start_spark()

#Register the exit handler                                                                                                     
atexit.register(exitHandler,sj,sc)

#You need this line if you want to use SparkSQL
sqlCtx=SQLContext(sc)

INFO:sparkhpc.sparkjob:Submitted batch job 674643

INFO:sparkhpc.sparkjob:Submitted cluster 0


# Question 1

In [2]:
texts = sc.textFile('Dataset/romeo-and-juliet.txt')

In [3]:
import string
def stripPunct(s):
    """Remove punctuation from a given string."""
    return s.translate(str.maketrans('','', string.punctuation))    

In [4]:
def stringToBigrams(s):
    """Convert strings to list of words."""
    words = s.lower().split(' ')
    words = [word for word in words if len(word)>0]
    return (zip(words, words[1:]))

In [5]:
bigrams = texts.map(lambda s: stripPunct(s))\
        .flatMap(lambda s: stringToBigrams(s))

In [6]:
bigrams.take(5)

[('the', 'project'),
 ('project', 'gutenberg'),
 ('gutenberg', 'ebook'),
 ('ebook', 'of'),
 ('of', 'romeo')]

In [7]:
# map out the bigrams, reduce and add up count for each bigram
counts = bigrams.map(lambda b: (b,1)).reduceByKey(lambda i,j: i+j)

In [8]:
# result
counts.collect()

[(('nurse', 'theres'), 1),
 (('theres', 'no'), 2),
 (('no', 'faith'), 1),
 (('faith', 'no'), 1),
 (('no', 'honesty'), 1),
 (('honesty', 'in'), 1),
 (('men', 'all'), 1),
 (('all', 'naught'), 1),
 (('naught', 'all'), 1),
 (('all', 'dissemblers'), 1),
 (('ah', 'wheres'), 1),
 (('wheres', 'my'), 2),
 (('my', 'man'), 4),
 (('me', 'some'), 3),
 (('aqua', 'vitae'), 1),
 (('me', 'old'), 1),
 (('shame', 'come'), 1),
 (('come', 'to'), 11),
 (('to', 'romeo'), 9),
 (('blisterd', 'be'), 1),
 (('for', 'such'), 4),
 (('such', 'a'), 17),
 (('a', 'wish'), 1),
 (('he', 'was'), 6),
 (('not', 'born'), 1),
 (('born', 'to'), 2),
 (('to', 'shame'), 1),
 (('upon', 'his'), 2),
 (('his', 'brow'), 1),
 (('is', 'ashamd'), 1),
 (('to', 'sit'), 1),
 (('throne', 'where'), 1),
 (('where', 'honour'), 1),
 (('honour', 'may'), 1),
 (('be', 'crownd'), 1),
 (('sole', 'monarch'), 1),
 (('monarch', 'of'), 1),
 (('universal', 'earth'), 1),
 (('o', 'what'), 3),
 (('what', 'a'), 8),
 (('a', 'beast'), 2),
 (('was', 'i'), 2),
 (

# Question 2

In [2]:
# for using RDD transformations
data = sc.textFile('Dataset/NCDB_1999_to_2014.csv')

In [3]:
data.glom().collect()

[['C_YEAR,C_MNTH,C_WDAY,C_HOUR,C_SEV,C_VEHS,C_CONF,C_RCFG,C_WTHR,C_RSUR,C_RALN,C_TRAF,V_ID,V_TYPE,V_YEAR,P_ID,P_SEX,P_AGE,P_PSN,P_ISEV,P_SAFE,P_USER',
  '1999,01,1,20,2,02,34,UU,1,5,3,03,01,06,1990,01,M,41,11,1,UU,1',
  '1999,01,1,20,2,02,34,UU,1,5,3,03,02,01,1987,01,M,19,11,1,UU,1',
  '1999,01,1,20,2,02,34,UU,1,5,3,03,02,01,1987,02,F,20,13,2,02,2',
  '1999,01,1,08,2,01,01,UU,5,3,6,18,01,01,1986,01,M,46,11,1,UU,1',
  '1999,01,1,08,2,01,01,UU,5,3,6,18,99,NN,NNNN,01,M,05,99,2,UU,3',
  '1999,01,1,17,2,03,QQ,QQ,1,2,1,01,01,01,1984,01,M,28,11,1,UU,1',
  '1999,01,1,17,2,03,QQ,QQ,1,2,1,01,02,01,1991,01,M,21,11,1,UU,1',
  '1999,01,1,17,2,03,QQ,QQ,1,2,1,01,02,01,1991,02,F,UU,13,2,UU,2',
  '1999,01,1,17,2,03,QQ,QQ,1,2,1,01,03,01,1992,01,M,UU,11,2,UU,1',
  '1999,01,1,15,2,01,04,UU,1,5,U,UU,01,01,1997,01,M,61,11,1,UU,1',
  '1999,01,1,15,2,01,04,UU,1,5,U,UU,01,01,1997,02,F,56,13,2,02,2',
  '1999,01,1,14,2,02,31,UU,3,4,2,UU,01,01,1993,01,F,34,11,1,UU,1',
  '1999,01,1,14,2,02,31,UU,3,4,2,UU,02,01,199

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('DFaccidents').getOrCreate()

In [5]:
# Read from csv, create table
df = spark.read.csv('Dataset/NCDB_1999_to_2014.csv', inferSchema=True, header=True)
df.createOrReplaceTempView('accidents')

In [6]:
df.printSchema()

root
 |-- C_YEAR: integer (nullable = true)
 |-- C_MNTH: string (nullable = true)
 |-- C_WDAY: string (nullable = true)
 |-- C_HOUR: string (nullable = true)
 |-- C_SEV: integer (nullable = true)
 |-- C_VEHS: string (nullable = true)
 |-- C_CONF: string (nullable = true)
 |-- C_RCFG: string (nullable = true)
 |-- C_WTHR: string (nullable = true)
 |-- C_RSUR: string (nullable = true)
 |-- C_RALN: string (nullable = true)
 |-- C_TRAF: string (nullable = true)
 |-- V_ID: string (nullable = true)
 |-- V_TYPE: string (nullable = true)
 |-- V_YEAR: string (nullable = true)
 |-- P_ID: string (nullable = true)
 |-- P_SEX: string (nullable = true)
 |-- P_AGE: string (nullable = true)
 |-- P_PSN: string (nullable = true)
 |-- P_ISEV: string (nullable = true)
 |-- P_SAFE: string (nullable = true)
 |-- P_USER: string (nullable = true)



#### the year with highest number of entries

Using RDD Transformation

In [4]:
yearlyAccidents = data.map(lambda x: x.split(","))\
                        .map(lambda x: (x[0], 1))\
                        .reduceByKey(lambda x,y: x+y)\
                        .filter(lambda x: x[0] !='C_YEAR')\
                        .sortByKey()

In [5]:
yearlyAccidents.collect()

[('1999', 413509),
 ('2000', 422075),
 ('2001', 409389),
 ('2002', 420008),
 ('2003', 407036),
 ('2004', 389050),
 ('2005', 386470),
 ('2006', 378523),
 ('2007', 368507),
 ('2008', 338268),
 ('2009', 330771),
 ('2010', 334555),
 ('2011', 325153),
 ('2012', 322421),
 ('2013', 317058),
 ('2014', 297612)]

In [6]:
yearlyAccidents.max(lambda x: x[1])

('2000', 422075)

Using DataFrame API

In [5]:
df.groupBy('C_YEAR').agg({"C_YEAR":"count"})\
.withColumnRenamed("count(C_YEAR)","Count")\
.orderBy("Count", ascending=False).take(1)

[Row(C_YEAR=2000, Count=422075)]

Using SQL

In [6]:
sqlCtx.sql("select C_YEAR, count(*) from accidents group by C_YEAR \
            order by count(*) desc limit 1").show()

+------+--------+
|C_YEAR|count(1)|
+------+--------+
|  2000|  422075|
+------+--------+



#### the year with lowest number of fatalities

Using RDD Transformation

In [26]:
yearlyAccidents.reduce(lambda x,y: (x[0],x[1]) if (x[1]<y[1]) else (y[0], y[1])) 

('2014', 297612)

Using DataFrame API

In [7]:
df.where(df.C_SEV==1).groupBy('C_YEAR').agg({"C_SEV":"count"})\
.orderBy('count(C_SEV)').take(1)

[Row(C_YEAR=2014, count(C_SEV)=4502)]

Using SQL

In [6]:
sqlCtx.sql("select C_YEAR, count(*) from accidents \
            where C_SEV=1 group by C_YEAR \
            order by count(*) limit 1").show()

+------+--------+
|C_YEAR|count(1)|
+------+--------+
|  2014|    4502|
+------+--------+



#### Part of the day with highest number of fatal accidents (dawn, morning, afternoon, evening, or night)
- a. 4am<=Dawn<6am
- b. 6am<=Morning<12pm
- c. 12 pm <= Afternoon < 5 pm
- d. 5pm<=Evening<10pm
- e. 10pm<=Night<4am.

Using RDD Transformation

In [19]:
# split by comma, filter out 'UU' and 'C_HOUR' from C_HOUR, filter for 1 in C_SEV (fatality), map by C_HOUR with value 1, reduce
hourlyAccidents = data.map(lambda x: x.split(","))\
                        .filter(lambda x: (x[3]!='UU' and x[3]!='C_HOUR'))\
                        .filter(lambda x: x[4]=='1')\
                        .map(lambda x: (x[3], 1))\
                        .reduceByKey(lambda x,y: x+y)\
                        .sortByKey()

In [20]:
def hourToPart(hour):
    """Converts a given hour to part of day"""
    if int(hour) >=0 and int(hour) <4:
        return 'Night'
    elif int(hour) >=4 and int(hour) <6:
        return 'Dawn'
    elif int(hour) >=6 and int(hour) <12:
        return 'Morning'
    elif int(hour) >=12 and int(hour) <17:
        return 'Afternoon'
    elif int(hour) >=17 and int(hour)<22:
        return 'Evening'
    elif int(hour) >=22 and int(hour)<=23:
        return 'Night'
    else:
        return 'Unknown'

In [21]:
# Calls hourToPart() to convert hour to part of day, and then reduce by key and adds up accidents
dayPartAccidents = hourlyAccidents.map(lambda x: (hourToPart(x[0]),x[1])).reduceByKey(lambda x,y: x+y)

In [23]:
dayPartAccidents.collect()

[('Evening', 25644),
 ('Afternoon', 27728),
 ('Morning', 21251),
 ('Night', 19311),
 ('Dawn', 3620)]

In [11]:
# Part of the day with highest fatalities
dayPartAccidents.max(lambda x: x[1])

('Afternoon', 27728)

Using DataFrame API

In [25]:
max(df.where((df.C_HOUR>=4)&(df.C_HOUR<6)&(df.C_SEV==1)).agg({"C_HOUR":"count"}).withColumnRenamed('count(C_HOUR)','Dawn').take(1),
df.where((df.C_HOUR>=6)&(df.C_HOUR<12)&(df.C_SEV==1)).agg({"C_HOUR":"count"}).withColumnRenamed('count(C_HOUR)','Morning').take(1),
df.where((df.C_HOUR>=12)&(df.C_HOUR<17)&(df.C_SEV==1)).agg({"C_HOUR":"count"}).withColumnRenamed('count(C_HOUR)','Afternoon').take(1),
df.where((df.C_HOUR>=17)&(df.C_HOUR<22)&(df.C_SEV==1)).agg({"C_HOUR":"count"}).withColumnRenamed('count(C_HOUR)','Evening').take(1),
df.where((((df.C_HOUR>=22)&(df.C_HOUR<=23))|((df.C_HOUR>=0)&(df.C_HOUR<4)))&(df.C_SEV==1)).agg({"C_HOUR":"count"}).withColumnRenamed('count(C_HOUR)','Night').take(1))

[Row(Afternoon=27728)]

Using SQL

In [79]:
sqlCtx.sql("select Part, sum(numAcc) from\
               (select C_HOUR, count(*) as numAcc, \
                  case when C_HOUR>=4 and C_HOUR<6 then 'Dawn' \
                      when C_HOUR>=6 and C_HOUR<12 then 'Morning' \
                      when C_HOUR>=12 and C_HOUR<17 then 'Afternoon' \
                      when C_HOUR>=17 and C_HOUR<22 then 'Evening' \
                      when (C_HOUR>=22 and C_HOUR<=23) or (C_HOUR<4 and C_HOUR>=0) then 'Night' \
                      else 'NA' end as Part \
                      from accidents where C_SEV=1 group by C_HOUR)\
            where Part <> 'NA' group by Part \
            order by sum(numAcc) desc limit 1").show()

+---------+-----------+
|     Part|sum(numAcc)|
+---------+-----------+
|Afternoon|      27728|
+---------+-----------+



#### Part of the day with lowest number of fatal accidents

Using RDD Transformation

In [12]:
dayPartAccidents.min(lambda x: x[1])

('Dawn', 3620)

Using DataFrame API

In [42]:
min(df.where((df.C_HOUR>=4)&(df.C_HOUR<6)&(df.C_SEV==1)).agg({"C_HOUR":"count"}).withColumnRenamed('count(C_HOUR)','Dawn').take(1),
df.where((df.C_HOUR>=6)&(df.C_HOUR<12)&(df.C_SEV==1)).agg({"C_HOUR":"count"}).withColumnRenamed('count(C_HOUR)','Morning').take(1),
df.where((df.C_HOUR>=12)&(df.C_HOUR<17)&(df.C_SEV==1)).agg({"C_HOUR":"count"}).withColumnRenamed('count(C_HOUR)','Afternoon').take(1),
df.where((df.C_HOUR>=17)&(df.C_HOUR<22)&(df.C_SEV==1)).agg({"C_HOUR":"count"}).withColumnRenamed('count(C_HOUR)','Evening').take(1),
df.where((((df.C_HOUR>=22)&(df.C_HOUR<=23))|((df.C_HOUR>=0)&(df.C_HOUR<4)))&(df.C_SEV==1)).agg({"C_HOUR":"count"}).withColumnRenamed('count(C_HOUR)','Night').take(1))

[Row(Dawn=3620)]

Using SQL

In [5]:
sqlCtx.sql("select Part, sum(numAcc) from\
               (select C_HOUR, count(*) as numAcc, \
                  case when C_HOUR>=4 and C_HOUR<6 then 'Dawn' \
                      when C_HOUR>=6 and C_HOUR<12 then 'Morning' \
                      when C_HOUR>=12 and C_HOUR<17 then 'Afternoon' \
                      when C_HOUR>=17 and C_HOUR<22 then 'Evening' \
                      when (C_HOUR>=22 and C_HOUR<=23) or (C_HOUR<4 and C_HOUR>=0) then 'Night' \
                      else 'NA' end as Part \
                      from accidents where C_SEV=1 group by C_HOUR)\
            where Part <> 'NA' group by Part \
            order by sum(numAcc) asc limit 1").show()

+----+-----------+
|Part|sum(numAcc)|
+----+-----------+
|Dawn|       3620|
+----+-----------+



#### Accidents between years 2007 and 2012 (inclusive) grouped by gender and age class.
- a. Baby Boomers >50
- b. <=35 Generation X <=50
- c. 18 <= Millennials <= 34
- d. Teens< 18

Using RDD Transformation

In [7]:
def filterForYear(year,minYear,maxYear):
    try:
        if int(year) >= int(minYear) and int(year) <= int(maxYear):
            return True
        else:
            return False
    except:
        return False

In [8]:
def ageToGen(age):
    try:    
        if int(age)>50:
            return 'BabyBoomer'
        elif int(age)>=35 and int(age)<=50:
            return 'GenerationX'
        elif int(age)>=18 and int(age)<=34:
            return 'Millennial'
        elif int(age)<18:
            return 'Teen'
    except:
        return 'Unknown'

In [24]:
data.map(lambda x: x.split(","))\
    .filter(lambda x: filterForYear(x[0], 2007, 2012))\
    .map(lambda x: (ageToGen(x[17]) + '-' + x[16], 1))\
    .reduceByKey(lambda x,y: x+y)\
    .sortByKey().collect()

[('BabyBoomer-F', 200359),
 ('BabyBoomer-M', 256375),
 ('BabyBoomer-N', 2),
 ('BabyBoomer-U', 3764),
 ('GenerationX-F', 213836),
 ('GenerationX-M', 272265),
 ('GenerationX-N', 1),
 ('GenerationX-U', 3746),
 ('Millennial-F', 297505),
 ('Millennial-M', 382034),
 ('Millennial-N', 7),
 ('Millennial-U', 4453),
 ('Teen-F', 112182),
 ('Teen-M', 118515),
 ('Teen-N', 12),
 ('Teen-U', 1025),
 ('Unknown-F', 23448),
 ('Unknown-M', 42658),
 ('Unknown-N', 5495),
 ('Unknown-U', 81993)]

Using DataFrame API

In [28]:
df.where((df.P_AGE>50)&(df.C_YEAR>=2007)&(df.C_YEAR<=2012)).groupBy('P_SEX').agg({'C_YEAR':'count'}).withColumnRenamed('count(C_YEAR)','BabyBoomers').show()
df.where((df.P_AGE<=50)&(df.P_AGE>=35)&(df.C_YEAR>=2007)&(df.C_YEAR<=2012)).groupBy('P_SEX').agg({'C_YEAR':'count'}).withColumnRenamed('count(C_YEAR)','GenerationX').show()
df.where((df.P_AGE<=34)&(df.P_AGE>=18)&(df.C_YEAR>=2007)&(df.C_YEAR<=2012)).groupBy('P_SEX').agg({'C_YEAR':'count'}).withColumnRenamed('count(C_YEAR)','Millennials').show()
df.where((df.P_AGE<=18)&(df.C_YEAR>=2007)&(df.C_YEAR<=2012)).groupBy('P_SEX').agg({'C_YEAR':'count'}).withColumnRenamed('count(C_YEAR)','Millennials').show()

+-----+-----------+
|P_SEX|BabyBoomers|
+-----+-----------+
|    F|     200359|
|    M|     256375|
|    U|       3764|
|    N|          2|
+-----+-----------+

+-----+-----------+
|P_SEX|GenerationX|
+-----+-----------+
|    F|     213836|
|    M|     272265|
|    U|       3746|
|    N|          1|
+-----+-----------+

+-----+-----------+
|P_SEX|Millennials|
+-----+-----------+
|    F|     297505|
|    M|     382034|
|    U|       4453|
|    N|          7|
+-----+-----------+

+-----+-----------+
|P_SEX|Millennials|
+-----+-----------+
|    F|     137268|
|    M|     149735|
|    U|       1399|
|    N|         17|
+-----+-----------+



Using SQL

In [27]:
sqlCtx.sql("select Generation, P_SEX, count(*) as number from \
                (select P_SEX, P_AGE, \
                    case when P_AGE>50 then 'BabyBoomers' \
                        when P_AGE<=50 and P_AGE>=35 then 'GenerationX' \
                        when P_AGE<=34 and P_AGE>=18 then 'Millennials' \
                        when P_AGE<=18 then 'Teens' \
                        else 'Unknown' end as Generation \
                from accidents where C_YEAR>=2007 and C_YEAR<=2012) \
           group by Generation, P_SEX order by Generation, P_SEX").show()

+-----------+-----+------+
| Generation|P_SEX|number|
+-----------+-----+------+
|BabyBoomers|    F|200359|
|BabyBoomers|    M|256375|
|BabyBoomers|    N|     2|
|BabyBoomers|    U|  3764|
|GenerationX|    F|213836|
|GenerationX|    M|272265|
|GenerationX|    N|     1|
|GenerationX|    U|  3746|
|Millennials|    F|297505|
|Millennials|    M|382034|
|Millennials|    N|     7|
|Millennials|    U|  4453|
|      Teens|    F|112182|
|      Teens|    M|118515|
|      Teens|    N|    12|
|      Teens|    U|  1025|
|    Unknown|    F| 23448|
|    Unknown|    M| 42658|
|    Unknown|    N|  5495|
|    Unknown|    U| 81993|
+-----------+-----+------+



# Question 3

Remove all non-numerical columns

In [36]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('DFaccidents').getOrCreate()
# Read from csv, create table
df = spark.read.csv('Dataset/NCDB_1999_to_2014.csv', inferSchema=True, header=True)
df.createOrReplaceTempView('accidents')

In [37]:
df.printSchema()

root
 |-- C_YEAR: integer (nullable = true)
 |-- C_MNTH: string (nullable = true)
 |-- C_WDAY: string (nullable = true)
 |-- C_HOUR: string (nullable = true)
 |-- C_SEV: integer (nullable = true)
 |-- C_VEHS: string (nullable = true)
 |-- C_CONF: string (nullable = true)
 |-- C_RCFG: string (nullable = true)
 |-- C_WTHR: string (nullable = true)
 |-- C_RSUR: string (nullable = true)
 |-- C_RALN: string (nullable = true)
 |-- C_TRAF: string (nullable = true)
 |-- V_ID: string (nullable = true)
 |-- V_TYPE: string (nullable = true)
 |-- V_YEAR: string (nullable = true)
 |-- P_ID: string (nullable = true)
 |-- P_SEX: string (nullable = true)
 |-- P_AGE: string (nullable = true)
 |-- P_PSN: string (nullable = true)
 |-- P_ISEV: string (nullable = true)
 |-- P_SAFE: string (nullable = true)
 |-- P_USER: string (nullable = true)



Remove non-numerical columns

In [38]:
df = df.drop('P_SEX').drop('P_ID')

Filter for C_SEV column

In [39]:
df = df.where((df.C_SEV==2)|(df.C_SEV==1))

Filter for P_ISEV column

In [40]:
df = df.where((df.P_ISEV==1)|(df.P_ISEV==2)|(df.P_ISEV==3))

Filter out rows contains X, N, Q, U in any of the columns

In [41]:
for column in df.columns:
    df = df.where(~df[column].contains('X')).where(~df[column].contains('N'))\
            .where(~df[column].contains('Q')).where(~df[column].contains('U'))

In [42]:
df.printSchema()

root
 |-- C_YEAR: integer (nullable = true)
 |-- C_MNTH: string (nullable = true)
 |-- C_WDAY: string (nullable = true)
 |-- C_HOUR: string (nullable = true)
 |-- C_SEV: integer (nullable = true)
 |-- C_VEHS: string (nullable = true)
 |-- C_CONF: string (nullable = true)
 |-- C_RCFG: string (nullable = true)
 |-- C_WTHR: string (nullable = true)
 |-- C_RSUR: string (nullable = true)
 |-- C_RALN: string (nullable = true)
 |-- C_TRAF: string (nullable = true)
 |-- V_ID: string (nullable = true)
 |-- V_TYPE: string (nullable = true)
 |-- V_YEAR: string (nullable = true)
 |-- P_AGE: string (nullable = true)
 |-- P_PSN: string (nullable = true)
 |-- P_ISEV: string (nullable = true)
 |-- P_SAFE: string (nullable = true)
 |-- P_USER: string (nullable = true)



In [43]:
df = df.select("C_SEV","P_AGE","P_USER","V_TYPE","V_YEAR","C_MNTH","C_WDAY","C_HOUR","C_VEHS","C_WTHR")

Normalize some columns:

In [44]:
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector

# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])


# from pyspark.sql.types import FloatType
# from pyspark.sql.functions import udf

# for column in ['C_YEAR', 'C_MNTH', 'C_WDAY' , 'C_HOUR', 'C_VEHS', 'P_AGE', 'V_YEAR']:
#     maxVal = df.agg({column: "max"}).collect()[0][0]
#     minVal = df.agg({column: "min"}).collect()[0][0]
#     normalize = udf(lambda val: ((val - minVal)/(maxVal - minVal)), FloatType())
#     df = df.withColumn("norm_"+column, normalize(column))   # Normalize a column
#     df = df.drop(column)   # remove the original un-normalized column

In [45]:
# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)

# Inspect the result
scaled_df.take(2)

[Row(label=2, features=DenseVector([33.0, 1.0, 1.0, 1992.0, 1.0, 1.0, 9.0, 2.0, 1.0]), features_scaled=DenseVector([1.7787, 1.5222, 0.4135, 285.1251, 0.2924, 0.5159, 1.7521, 1.7851, 0.8919])),
 Row(label=2, features=DenseVector([70.0, 1.0, 1.0, 1992.0, 1.0, 1.0, 9.0, 2.0, 1.0]), features_scaled=DenseVector([3.773, 1.5222, 0.4135, 285.1251, 0.2924, 0.5159, 1.7521, 1.7851, 0.8919]))]

#### Prepare for machine learning

In [46]:
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1)

#### Linear Regression

In [47]:
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression
# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)
# Fit the data to the model
linearModel = lr.fit(train_data)

In [48]:
# Generate predictions
predicted = linearModel.transform(test_data)
# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

In [50]:
# Coefficients for the model
linearModel.coefficients

DenseVector([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])

In [51]:
# Intercept for the model
linearModel.intercept

1.9836861629579312

As shown the linear regression model does not work well here:
- Large amount of non-fatality data vs fatality, the sample bias caused the linear model to output non-fatality all the time
- Linear model's lack of ability to fit between 1 - 0, and not ideal for binary classification

#### Logistic Regression

In [53]:
from pyspark.ml.classification import LogisticRegression
logr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = logr.fit(train_data)

In [79]:
predicted = lrModel.transform(test_data)

In [80]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [81]:
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predicted))

Test Area Under ROC 1.0


Logistic regression is better suited for binary regression due to its S-shape curve and ability to fit between two values in close proximity.

#### Decision Tree

In [62]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train_data)

In [64]:
predicted = dtModel.transform(test_data)

In [78]:
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predicted))

Test Area Under ROC 1.0


Decision tree is also good for binary regression and can accomodate non-linear decision boundaries.

#### Random Forest

In [82]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train_data)

In [83]:
predicted = rfModel.transform(test_data)

In [84]:
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predicted))

Test Area Under ROC 1.0
