In [7]:
#import relevant modules
import datetime
from collections import namedtuple
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

#Set up Spark Configuration ==> To be executed only once though, hence its in a commented block
'''conf = SparkConf().setAppName("EnglandAccidents")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)'''

#set up the function to clean up some dirty data

def cleanFields(fields):
    accFields = fields
    
    counter = 0
    for index, field in enumerate(accFields):  #Use enumerate function to create a key value pair for index and field 
		#Clean up occurences of 1st and Second
		if "1st" in field:					   
			oldstr = field
			newstr = oldstr.replace(oldstr[:3], "first")  #Replace '1st' with "First" within the substring
			accFields[index] = newstr  					  #Set the new values
		if "2nd" in field:
			oldstr = field
			newstr = oldstr.replace(oldstr[:3], "Second")
			accFields[index] = newstr
		#Clean up occurences of parenthesis
		if "(" in field:							#If there is an open bracket, then there must be a closing one
			#print (field)
			for indx, i in enumerate(field):		#for each letter in the selected field (field Or column, dont be confused, both are used interchangeably
				if i == "(":
					bracketIndexOpen = indx         #Get the index of the Open parenthesis
			oldstr = field
			#print(oldstr)
			newstrOpenBrac = oldstr.replace(oldstr[bracketIndexOpen], "")  #the replaced version of open parenthesis
			#print (newstrOpenBrac)	
			for idx , i in enumerate(newstrOpenBrac):						#Loop through the replaced version so that we can get the updated index for close parenthesis
				latestCloseindex = idx	
				newstrCloseBrac = newstrOpenBrac.replace(newstrOpenBrac[latestCloseindex], "")  #Now remove close parenthesis 
			#print(newstrCloseBrac)
			accFields[index] = newstrCloseBrac
			#Clean out occurences of "-". Same Logic above applies. I know that there is only 1 occurence per field, if there were more, the logic would be different
		if "-" in field:							
			#print (field)
			for indx, i in enumerate(field):		
				if i == "-":
					hyphenIndex = indx 
			oldstr = field
			newstr = oldstr.replace(oldstr[hyphenIndex], "_")
			accFields[index] = newstr

#Columns extracted from our source CSV data 
accFields = ['Accident_Index', 'Location_Easting_OSGR', 'Location_Northing_OSGR', 'Longitude', 'Latitude', 'Police_Force', 'Accident_Severity', 'Number_of_Vehicles',\
'Number_of_Casualties', 'Date', 'Day_of_Week', 'Time', 'Local_Authority_(District)', 'Local_Authority_(Highway)', '1st_Road_Class', '1st_Road_Number', 'Road_Type', \
'Speed_limit', 'Junction_Detail', 'Junction_Control', '2nd_Road_Class', '2nd_Road_Number', 'Pedestrian_Crossing-Human_Control', 'Pedestrian_Crossing-Physical_Facilities',\
'Light_Conditions', 'Weather_Conditions', 'Road_Surface_Conditions', 'Special_Conditions_at_Site', 'Carriageway_Hazards', 'Urban_or_Rural_Area', \
'Did_Police_Officer_Attend_Scene_of_Accident', 'LSOA_of_Accident_Location']


#Clean up time
cleanFields(accFields)

In [8]:
#Load the raw data into an RDD, and separate each field based on ","
acc= sc.textFile('/Users/dare.olufunmilayo/Home_Dare\'sProjects/EnglandRoadAccidents/data/Accidents_2014.csv')
acc= acc.map(lambda x: x.split(','))

In [9]:
#Get a sense of what the columns look like. You can see we have some not so nice columns there. 
#For example... '2nd_Road_Class' should'nt start with a number
acc.take(1)

[[u'Accident_Index',
  u'Location_Easting_OSGR',
  u'Location_Northing_OSGR',
  u'Longitude',
  u'Latitude',
  u'Police_Force',
  u'Accident_Severity',
  u'Number_of_Vehicles',
  u'Number_of_Casualties',
  u'Date',
  u'Day_of_Week',
  u'Time',
  u'Local_Authority_(District)',
  u'Local_Authority_(Highway)',
  u'1st_Road_Class',
  u'1st_Road_Number',
  u'Road_Type',
  u'Speed_limit',
  u'Junction_Detail',
  u'Junction_Control',
  u'2nd_Road_Class',
  u'2nd_Road_Number',
  u'Pedestrian_Crossing-Human_Control',
  u'Pedestrian_Crossing-Physical_Facilities',
  u'Light_Conditions',
  u'Weather_Conditions',
  u'Road_Surface_Conditions',
  u'Special_Conditions_at_Site',
  u'Carriageway_Hazards',
  u'Urban_or_Rural_Area',
  u'Did_Police_Officer_Attend_Scene_of_Accident',
  u'LSOA_of_Accident_Location']]

In [10]:
#Map the fields with a Class on the fly using namedtuple
accNamedTuples = namedtuple('Accidents', accFields)

#DATE_FMT = "%d/%m/%Y"  
#TIME_FMT = "%H%M"

#Define a function that will help us to map each field to a column within the RDD (I still need to work out how to convert to time based types)
def parse(row):
	#row[9] = datetime.datetime.strptime(row[9], DATE_FMT).date()   #Date
	#row[11] = datetime.datetime.strptime(row[11], TIME_FMT).time() #Time
	return accNamedTuples(*row[:32])

#Map data with columns and filter out the header
parsedAccData= acc.map(parse)

#Optionally you may want to filter out the header
accData = parsedAccData.filter(lambda x: 'Date' not in x)

In [11]:
#Take a look at our new column structure
accData.take(1)

[Accidents(Accident_Index=u'201401BS70001', Location_Easting_OSGR=u'524600', Location_Northing_OSGR=u'179020', Longitude=u'-0.206443', Latitude=u'51.496345', Police_Force=u'1', Accident_Severity=u'3', Number_of_Vehicles=u'2', Number_of_Casualties=u'1', Date=u'09/01/2014', Day_of_Week=u'5', Time=u'13:21', Local_Authority_District=u'12', Local_Authority_Highway=u'E09000020', first_Road_Class=u'3', first_Road_Number=u'315', Road_Type=u'6', Speed_limit=u'30', Junction_Detail=u'0', Junction_Control=u'-1', Second_Road_Class=u'-1', Second_Road_Number=u'0', Pedestrian_Crossing_Human_Control=u'0', Pedestrian_Crossing_Physical_Facilities=u'0', Light_Conditions=u'1', Weather_Conditions=u'2', Road_Surface_Conditions=u'2', Special_Conditions_at_Site=u'0', Carriageway_Hazards=u'0', Urban_or_Rural_Area=u'1', Did_Police_Officer_Attend_Scene_of_Accident=u'2', LSOA_of_Accident_Location=u'E01002814')]

In [312]:
#YOu can pick out columns that you want from the data
accDow = parsedAccData.map(lambda x: (x.Date, x.Day_of_Week))
#Print out the new columns
accDow.take(5)

[(u'Date', u'Day_of_Week'),
 (u'09/01/2014', u'5'),
 (u'20/01/2014', u'2'),
 (u'21/01/2014', u'3'),
 (u'15/01/2014', u'4')]

In [None]:
# Going on from here are areas to explore  

#The day of the week with the highest rate of accidents

# Relationship between accidents and road conditions
# Show the trend for and


In [306]:
#We need another RDD to be able to lookup what the numbers representing DOW mean
ISO_dow =sc.textFile('/Users/dare.olufunmilayo/Home_Dare\'sProjects/EnglandRoadAccidents/data/ISO_dow.csv')
dow =sc.textFile('/Users/dare.olufunmilayo/Home_Dare\'sProjects/EnglandRoadAccidents/data/dow.csv')
lookupDow = dow.map(lambda x: x.split(',')).collectAsMap()
lookupDow['6']


u' Sunday'

In [310]:
#This Data is using the ISO Format
lookupISO_dow = ISO_dow.map(lambda x : x.split(',')).collectAsMap()
lookupISO_dow['7']


u' Sunday'

In [313]:
#Lets not forget what our Pair RDD look like
accDow.take(5)

[(u'Date', u'Day_of_Week'),
 (u'09/01/2014', u'5'),
 (u'20/01/2014', u'2'),
 (u'21/01/2014', u'3'),
 (u'15/01/2014', u'4')]

In [314]:
#We will now use our lookup RDD to get the days and assign number 1 to each
accDow = accDow.filter(lambda x : 'Date' not in x).map(lambda x: (lookupISO_dow[x[1]], 1 ))
accDow.take(4)


[(u' Friday', 1), (u' Tuesday', 1), (u' Wednesday', 1), (u' Thursday', 1)]

In [337]:
import seaborn as sns
import pandas as pd
import matplotlib
%matplotlib inline
#Lets sum up all the values, and sort in descending order to get the highest accident rate by Day of the Week
aggregate_data = accDow.reduceByKey(lambda x, y: x + y).sortBy(lambda x: -x[1]).persist()
aggregate_data.collect()

[(u' Saturday', 23960),
 (u' Wednesday', 22318),
 (u' Thursday', 22210),
 (u' Friday', 21780),
 (u' Tuesday', 21093),
 (u' Sunday', 19021),
 (u' Monday', 15940)]

In [121]:
sns.plt?
#dir(sns)

fig, axes = sns.plt.subplots(3,3)

axes[0, 0].set_title('Days_Of_Week_Accidents')

fig

#Taken from http://stackoverflow.com/questions/26171230/matplotlib-seaborn-barplot-strings-in-x-axis   
#axes.factorplot("age", "jobs", col="industry", row="city", data=df_city,
               #margin_titles=True, size=3, aspect=.8, palette=["darkred"])
    
axes[0,0].plot([1,4,5,4,5,6,4,6,], label = 'DOW Accidents')

#.axes[0,1].clear()
fig

In [None]:
#sns.factorplot("DOW", "# of Accidents", col="DOW", row="Accidents", data=aggAcc,
 #              margin_titles=True, size=3, aspect=.8, palette=["darkred"])
    

In [281]:
#Now that we have this information, we should automate the task to take input for different years and load the data into HIVE


In [395]:
#Next is to analyze the average accidents by speed limit
accSpeedLimit = parsedAccData.map(lambda x: (x.Speed_limit)).filter(lambda x: 'Speed_limit' not in x)

In [396]:
accSpeedLimit.take(5)

[u'30', u'30', u'30', u'30', u'30']

In [397]:
#The number of accidents in the year
accSpeedLimit.count()

146322

In [439]:
#Road accidents with the highest number based on Speed Limit
#Next is to analyze the average accidents by speed limit. 
#sumCountPerLimit = accSpeedLimit.aggregate
accSpeedLimit_SumCount = accSpeedLimit.map(lambda x: (x, 1)).combineByKey(lambda x:(x, 1),                 #Combiner function  
                                          (lambda acc, x: (acc[0] + x, acc[1] + 1)),                          #Merge Function 
                                          (lambda acc1, acc2 :(acc1[0] + acc2[0], acc1[1] + acc2[1])))           #MergeCombiners Function

In [469]:
#accSpeedLimit_SumCount = accSpeedLimit_SumCount.collect()
#accSpeedLimit_SumCount


accCountPerSpeedLimit = accSpeedLimit.map(lambda x: (x , 1)).reduceByKey(lambda x , y: x + y)

accCountPerSpeedLimit = accCountPerSpeedLimit.collect()
accCount = accSpeedLimit.count()



In [504]:
#Need some tidying up here, but the output gives us the average accidents happening per speed limit.
avgAccPerSpeedLimit = []
accCount = float(accCount)
for i in accCountPerSpeedLimit:
    avg = (i[1]/accCount) *100
    avgAccPerSpeedLimit.append(avg)
    #out = 'Average Accidents is { :.2f}'.format(avgAccPerSpeedLimit)
print(avgAccPerSpeedLimit)


[13.67941936277525, 2.3359440138871803, 8.212708956957943, 64.92325145911073, 3.9618102540971285, 6.886865953171773]


In [442]:
#Will use this Soon
accSpeedLimit_SumCount = accSpeedLimit.combineByKey(lambda x:(x, 1),                 #Combiner function  
                                          (lambda acc, x: (acc[0] + x, acc[1] + 1)),                          #Merge Function 
                                          (lambda acc1, acc2 :(acc1[0] + acc2[0], acc1[1] + acc2[1]))) 