# Spark Hands On Lab - Python

#### Let's first down load the data that we're going to analyze from GitHub. For purposes of this lab, you'll just land the data on temporary storage. In practice, the data would reside on object storage or in a database.

#### The file contains vehicle accident data and has a size of 6.5 MB.

In [1]:
!wget "https://raw.githubusercontent.com/IBMDataScience/WOW2016/master/data/ACCIDENT2007-FullDataSet.csv"

--2016-11-22 14:19:37--  https://raw.githubusercontent.com/IBMDataScience/WOW2016/master/data/ACCIDENT2007-FullDataSet.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.48.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.48.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 6501490 (6.2M) [text/plain]
Saving to: 'ACCIDENT2007-FullDataSet.csv.1'


2016-11-22 14:19:38 (43.4 MB/s) - 'ACCIDENT2007-FullDataSet.csv.1' saved [6501490/6501490]



#### Next we'll read the file into a SparkData frame. We specify that the file is in csv format and that the first row of the file contains column names. We are also asking Spark to infer the schema and assign data types. We'll then print out the first two rows.

#### Here's how you do it.

    df_data_1 = spark.read.format('csv').options(header='true', inferschema='true').load("ACCIDENT2007-FullDataSet.csv")
    df_data_1.take(2)    


In [8]:
df_data_1 = spark.read.format('csv').options(header='true', inferschema='true').load("ACCIDENT2007-FullDataSet.csv")
df_data_1.take(2)

[Row(STATE=1, COUNTY=73, MONTH=1, DAY=2, HOUR=23, MINUTE=15, VE_TOTAL=1, PERSONS=1, PEDS=0, NHS=0, ROAD_FNC=14, ROUTE=4, SP_JUR=0, HARM_EV=42, MAN_COLL=0, REL_JUNC=1, REL_ROAD=4, TRAF_FLO=1, NO_LANES=2, SP_LIMIT=40, ALIGNMNT=2, PROFILE=1, PAVE_TYP=2, SUR_COND=1, TRA_CONT=0, T_CONT_F=0, HIT_RUN=0, LGT_COND=2, WEATHER1=1, WEATHER2=0, C_M_ZONE=0, NOT_HOUR=23, NOT_MIN=16, ARR_HOUR=23, ARR_MIN=20, HOSP_HR=0, HOSP_MN=0, SCH_BUS=0, CF1=0, CF2=0, CF3=0, FATALS=1, DAY_WEEK=3, DRUNK_DR=0, ST_CASE=10001, CITY=0, MILEPT=0.0, YEAR=2007, TWAY_ID=u'1493', TWAY_ID2=u'00000000', RAIL=u'0000000', LATITUDE=33272102, LONGITUD=87010454, VE_FORMS=1, WEATHER=1),
 Row(STATE=1, COUNTY=19, MONTH=1, DAY=30, HOUR=13, MINUTE=5, VE_TOTAL=3, PERSONS=4, PEDS=0, NHS=0, ROAD_FNC=3, ROUTE=3, SP_JUR=0, HARM_EV=12, MAN_COLL=5, REL_JUNC=2, REL_ROAD=1, TRAF_FLO=1, NO_LANES=2, SP_LIMIT=40, ALIGNMNT=1, PROFILE=1, PAVE_TYP=2, SUR_COND=1, TRA_CONT=20, T_CONT_F=3, HIT_RUN=0, LGT_COND=1, WEATHER1=1, WEATHER2=0, C_M_ZONE=0, NOT_HO

#### Look at the infered schema with

    df_data_1.printSchema()

In [14]:
df_data_1.printSchema()

root
 |-- STATE: integer (nullable = true)
 |-- COUNTY: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- HOUR: integer (nullable = true)
 |-- MINUTE: integer (nullable = true)
 |-- VE_TOTAL: integer (nullable = true)
 |-- PERSONS: integer (nullable = true)
 |-- PEDS: integer (nullable = true)
 |-- NHS: integer (nullable = true)
 |-- ROAD_FNC: integer (nullable = true)
 |-- ROUTE: integer (nullable = true)
 |-- SP_JUR: integer (nullable = true)
 |-- HARM_EV: integer (nullable = true)
 |-- MAN_COLL: integer (nullable = true)
 |-- REL_JUNC: integer (nullable = true)
 |-- REL_ROAD: integer (nullable = true)
 |-- TRAF_FLO: integer (nullable = true)
 |-- NO_LANES: integer (nullable = true)
 |-- SP_LIMIT: integer (nullable = true)
 |-- ALIGNMNT: integer (nullable = true)
 |-- PROFILE: integer (nullable = true)
 |-- PAVE_TYP: integer (nullable = true)
 |-- SUR_COND: integer (nullable = true)
 |-- TRA_CONT: integer (nullable = true)
 |-- T

#### Now that you have a DataFrame object, you can do analysis on it such as calculating the correlation between various variables.

#### You can see the correlation between whether the individual was drunk and if the accident resulted in fatalities by performing a simple Pearson correlation on two variables.  The notebooks provide code assistance by using the tab after a dot notation to see the various functions available.


#### The code should be

    df_data_1.corr('DRUNK_DR','FATALS')

In [11]:
df_data_1.corr('DRUNK_DR','FATALS')

0.07637497535956148

#### You can also mport your favorite libraries to use within your notebook, for example, the popular Pandas Python library. 

#### Import the pandas library and use it to convert the Spark DataFrame to a Pandas DataFrame and use the head function to show the top 5 rows values.

    import pandas as pd
    pd_fars = df_data_1.toPandas()
    pd_fars.head()

In [12]:
import pandas as pd
pd_fars = df_data_1.toPandas()
pd_fars.head()

Unnamed: 0,STATE,COUNTY,MONTH,DAY,HOUR,MINUTE,VE_TOTAL,PERSONS,PEDS,NHS,...,CITY,MILEPT,YEAR,TWAY_ID,TWAY_ID2,RAIL,LATITUDE,LONGITUD,VE_FORMS,WEATHER
0,1,73,1,2,23,15,1,1,0,0,...,0,0,2007,1493,00000000,0,33272102,87010454,1,1
1,1,19,1,30,13,5,3,4,0,0,...,630,0,2007,SR-68,270,0,34093815,85404107,3,1
2,1,9,1,14,12,13,2,5,0,0,...,2330,0,2007,US-SR53,00000000,0,33583258,86294685,2,1
3,1,15,1,1,15,0,2,2,0,1,...,0,2468,2007,US-SR1,7260,0,33511173,85545624,2,1
4,1,51,1,6,19,35,2,5,0,0,...,2063,0,2007,SR-14,I-65,0,32291980,86243217,2,1


#### You can also get summary statistics using the describe function.  This can help you determine missing values and the distribution of your attributes.    

    pd_fars.describe()

In [15]:
pd_fars.describe()

Unnamed: 0,STATE,COUNTY,MONTH,DAY,HOUR,MINUTE,VE_TOTAL,PERSONS,PEDS,NHS,...,DAY_WEEK,DRUNK_DR,ST_CASE,CITY,MILEPT,YEAR,LATITUDE,LONGITUD,VE_FORMS,WEATHER
count,37248.0,37248.0,37248.0,37248.0,37248.0,37248.0,37248.0,37248.0,37248.0,37248.0,...,37248.0,37248.0,37248.0,37248.0,37248.0,37248,37248.0,37248.0,37248.0,37248.0
mean,27.340824,89.824071,6.613483,15.617644,13.255369,29.048432,1.536619,2.516162,0.157673,0.442977,...,4.119496,0.396182,274152.643551,1078.447353,15859.809609,2007,45552885.613322,221311200.0,1.50145,1.24208
std,16.235443,93.412301,3.349695,8.828305,10.705684,18.820477,1.00224,2.146113,0.409271,1.184362,...,2.125074,0.603205,162221.655608,1845.912977,36262.937756,0,22836901.140083,318461900.0,0.967869,0.948115
min,1.0,0.0,1.0,1.0,0.0,0.0,1.0,1.0,0.0,0.0,...,1.0,0.0,10001.0,0.0,0.0,2007,5324764.0,67165910.0,1.0,1.0
25%,12.0,31.0,4.0,8.0,6.0,14.0,1.0,1.0,0.0,0.0,...,2.0,0.0,122279.75,0.0,0.0,2007,33340799.75,82202030.0,1.0,1.0
50%,27.0,71.0,7.0,16.0,14.0,30.0,1.0,2.0,0.0,0.0,...,4.0,0.0,270253.5,0.0,59.0,2007,38120909.5,90105390.0,1.0,1.0
75%,42.0,113.0,9.0,23.0,19.0,45.0,2.0,3.0,0.0,1.0,...,6.0,1.0,420353.25,1690.0,471.0,2007,42264758.25,112052800.0,2.0,1.0
max,56.0,840.0,12.0,31.0,99.0,99.0,92.0,158.0,11.0,9.0,...,7.0,9.0,560137.0,9999.0,99999.0,2007,99999999.0,1000000000.0,92.0,9.0


#### Now we want to look at an individual states worth of data.  The Spark DataFrame object supports a filter option to allow you to filter the data based on a column of interest and the resulting value.  You can also select a subset of columns (not illustrated in this example).

#### To see a list of the states, the Fatality Analysis Reporting System (FARS) data dictionairy provides a reference to the state codes here - https://crashstats.nhtsa.dot.gov/Api/Public/ViewPublication/812092 .  Choose your state of interest to filter the list down to your particular state.  Convert the results to a new Panda dataframe and view the first 5 rows data. 

#### Here's a list of some of the state values you can use.

        12 - Florida      11 - DC          13 - Georgia         48 - Texas    39 - Ohio     36 - New York        34 - New Jersey
        53 - Washingon    51 - Virginia    37 - North Carolina  40 - Oklahoma 32 - Neveda   45 - South Carolina  04 - Arizona

#### The code should look something like the following with your desired columns selected.

    df_cal=df_data_1.filter(df_data_1['STATE']==6)
    pd_cal=df_cal.toPandas()
    pd_cal.head(5)


In [43]:
df_cal=df_data_1.filter(df_data_1['STATE']==6)
pd_cal=df_cal.toPandas()
pd_cal.head(5)

Unnamed: 0,STATE,COUNTY,MONTH,DAY,HOUR,MINUTE,VE_TOTAL,PERSONS,PEDS,NHS,...,CITY,MILEPT,YEAR,TWAY_ID,TWAY_ID2,RAIL,LATITUDE,LONGITUD,VE_FORMS,WEATHER
0,6,85,1,1,0,45,2,8,0,0,...,2000,75,2007,SR-17,99999999999999999999,0,37135766,121581847,2,1
1,6,37,1,1,2,8,1,2,1,0,...,1840,0,2007,AVENUE J,99999999999999999999,0,34411949,118220551,1,1
2,6,73,1,1,3,50,1,2,1,1,...,3260,215,2007,I-805,99999999999999999999,0,32472992,117085848,1,1
3,6,85,1,1,4,15,1,1,0,1,...,3340,35,2007,I-605 RAMP,99999999999999999999,0,37225578,121513692,1,1
4,6,65,1,1,8,55,2,4,0,1,...,750,167,2007,SR-86,AIRPORT BLVD,0,33383148,116105227,2,1


#### Spark SQL enables applications to run SQL queries programmatically and return the result as a DataFrame. Let's perform the same query we did above, but this time using SQL.

#### First you need to create a Temporary Table

    df_data_1_tempTable = df_data_1.registerTempTable("tempTable")

In [44]:
df_data_1_tempTable = df_data_1.createOrReplaceTempView("tempTable")

#### Now run a similar query as above using SQL and only selected a subset of the columns. Note that the query results are the same, but with the SQL result only returning the subset of columns that were selected.

In [46]:
sql = "select STATE, COUNTY, MONTH, DAY, HOUR, MINUTE, VE_TOTAL, PERSONS, PEDS, NHS from tempTable where STATE = 6"
df_cal2 = spark.sql(sql)
pd_cal2=df_cal2.toPandas()
pd_cal2.head(5)

Unnamed: 0,STATE,COUNTY,MONTH,DAY,HOUR,MINUTE,VE_TOTAL,PERSONS,PEDS,NHS
0,6,85,1,1,0,45,2,8,0,0
1,6,37,1,1,2,8,1,2,1,0
2,6,73,1,1,3,50,1,2,1,1
3,6,85,1,1,4,15,1,1,0,1
4,6,65,1,1,8,55,2,4,0,1


#### Let's now map out the occurrences for the state of interest. Note that the LATITUDE and LONGITUD values were inferred as integers. These need to be converted to float. We can create new columns for a modified version of these fields so that we can map the individual occurances on a map.

#### Use lamba (anonymous) functions to create a lon and lat column that represents the expected values.

    pd_cal['lat'] = pd_cal['LATITUDE'].map(lambda x: (x * 1.0) / 1000000)
    pd_cal['lon'] = pd_cal['LONGITUD'].map(lambda x: (x * -1.0) / 1000000)
    pd_cal.head(5)

In [48]:
pd_cal['lat'] = pd_cal['LATITUDE'].map(lambda x: (x * 1.0) / 1000000)
pd_cal['lon'] = pd_cal['LONGITUD'].map(lambda x: (x * -1.0) / 1000000)
pd_cal.head(5)

Unnamed: 0,STATE,COUNTY,MONTH,DAY,HOUR,MINUTE,VE_TOTAL,PERSONS,PEDS,NHS,...,YEAR,TWAY_ID,TWAY_ID2,RAIL,LATITUDE,LONGITUD,VE_FORMS,WEATHER,lat,lon
0,6,85,1,1,0,45,2,8,0,0,...,2007,SR-17,99999999999999999999,0,37135766,121581847,2,1,37.135766,-121.581847
1,6,37,1,1,2,8,1,2,1,0,...,2007,AVENUE J,99999999999999999999,0,34411949,118220551,1,1,34.411949,-118.220551
2,6,73,1,1,3,50,1,2,1,1,...,2007,I-805,99999999999999999999,0,32472992,117085848,1,1,32.472992,-117.085848
3,6,85,1,1,4,15,1,1,0,1,...,2007,I-605 RAMP,99999999999999999999,0,37225578,121513692,1,1,37.225578,-121.513692
4,6,65,1,1,8,55,2,4,0,1,...,2007,SR-86,AIRPORT BLVD,0,33383148,116105227,2,1,33.383148,-116.105227


#### Now that we have the data we need, we can import the Brunel package and use it to show a graphical map of the occurances. The Brunel Visualization Language is a high-level language developed by IBM and open-sourced.. Brunel describes visualizations in terms of composable actions, and drives a visualization engine (D3) that performs the actual rendering and interactivity. 

#### Use the following code to display the map for your state using the lon and lat values and use PERSONS to display a color scale based on the number of individuals involved in the accident.

    import brunel
    %brunel map ('CA') + data('pd_cal') x(lon) y(lat) color(PERSONS) tooltip(VE_TOTAL)

In [55]:
import brunel
%brunel map ('CA') + data('pd_cal') x(lon) y(lat) color(PERSONS) tooltip(VE_TOTAL)

<IPython.core.display.Javascript object>

#### PixieDust is an open source Python helper library developed by IBM that works as an add-on to Jupyter notebooks to improve the user experience of working with data. Pixie dust provides an easy way to visualize the data using various table, charts, and maps.  

To import the PixieDust package, you simply need to use and import statement
    
    from pixiedust.display import *

Once imported, you bring up the interactive display area by using the display function on your dataset. For example

    display(df_data_1).

The initial display is a table view of the dataframe.  

Switch views to the pie chart by selecting the middle charting drop down menu at the top left of the display area. This will display a pie chart of the count of accidents by state along with a percentage.  You can view and modify the options used for the display by selecting the paint brush icon at the bottom left of the display area (note this may be invisible until you hover near the area with your mouse).  If you change the value to city instead of state, you will see a busier graph.

You can switch to a different dataframe at anytime by changing the value in the display parameter and rerunning the cell.  

Change the display to use the california data display(df_cal).

select the histogram chart from the drop down and then modify the values to contain city.  

In [51]:
from pixiedust.display import *
display(df_data_1)

#### At this point we are ready to start our first pass at building a predictive model.  In this example we will use the statsmodel.formula.api package.  

#### Use the following code to build a basic linear regression.

    import statsmodels.formula.api as sm

    result = sm.ols(formula="FATALS ~ VE_TOTAL + PERSONS + WEATHER + VE_FORMS", data=pd_cal).fit()
    print result.params

In [52]:
import statsmodels.formula.api as sm

result = sm.ols(formula="FATALS ~ VE_TOTAL + PERSONS + WEATHER + VE_FORMS", data=pd_cal).fit()
print result.params

Intercept    1.045752
VE_TOTAL     0.007279
PERSONS      0.076950
WEATHER      0.003083
VE_FORMS    -0.105668
dtype: float64


To see a summary of all the results, use

    print result.summary()

As you can see from the results, this is not a good model at all.  Select a different set of attributes to see if you can improve the R-squared value.

In [53]:
print result.summary()

                            OLS Regression Results                            
Dep. Variable:                 FATALS   R-squared:                       0.094
Model:                            OLS   Adj. R-squared:                  0.093
Method:                 Least Squares   F-statistic:                     92.02
Date:                Tue, 22 Nov 2016   Prob (F-statistic):           1.44e-74
Time:                        15:58:51   Log-Likelihood:                -1807.5
No. Observations:                3572   AIC:                             3625.
Df Residuals:                    3567   BIC:                             3656.
Df Model:                           4                                         
Covariance Type:            nonrobust                                         
                 coef    std err          t      P>|t|      [95.0% Conf. Int.]
------------------------------------------------------------------------------
Intercept      1.0458      0.014     75.105      0.0