## AWS S3 setup

In [2]:
ACCESS_KEY = ""
SECRET_KEY = "".replace("/", "%2F")
AWS_BUCKET_NAME = "nypd-motor-vehicle-collisions"
MOUNT_NAME = "nypd_shankar"

dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY, SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)

View the mount name with the %fs ls command

In [4]:
display(dbutils.fs.ls("/mnt/%s" % MOUNT_NAME))

## Exploring the NY Motor Vehicle Collisions public data with Apache Spark

View the data with the %fs ls command

In [7]:
%fs ls /mnt/nypd_shankar/

The entry point into all functionality in Spark is the new SparkSession class:

In [9]:
spark

Using the SparkSession, create a DataFrame from the CSV file by inferring the schema:

In [11]:
nyMotorCollisionsDF = spark.read.csv('/mnt/nypd_shankar/NYPD_Motor_Vehicle_Collisions.csv', header=True, inferSchema=True)

In [12]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

In [13]:
# Note that we are removing all space characters from the col names to prevent errors when writing to Parquet later

nypdSchema = StructType([StructField('DATE', StringType(), True),
                     StructField('TIME', StringType(), True),
                     StructField('BOROUGH', StringType(), True),
                     StructField('ZIPCODE', IntegerType(), True),                  
                     StructField('LATITUDE', DoubleType(), True),       
                     StructField('LONGITUDE', DoubleType(), True),
                     StructField('LOCATION', StringType(), True),
                     StructField('ON_STREET_NAME', StringType(), True),       
                     StructField('CROSS_STREET_NAME', StringType(), True),       
                     StructField('OFF_STREET_NAME', StringType(), True),       
                     StructField('NUMBER_OF_PERSONS_INJURED', IntegerType(), True),       
                     StructField('NUMBER_OF_PERSONS_KILLED', IntegerType(), True),                  
                     StructField('NUMBER_OF_PEDESTRIANS_INJURED', IntegerType(), True),       
                     StructField('NUMBER_OF_PEDESTRIANS_KILLED', IntegerType(), True),       
                     StructField('NUMBER_OF_CYCLIST_INJURED', IntegerType(), True),       
                     StructField('NUMBER_OF_CYCLIST_KILLED', IntegerType(), True),       
                     StructField('NUMBER_OF_MOTORIST_INJURED', IntegerType(), True),       
                     StructField('NUMBER_OF_MOTORIST_KILLED', IntegerType(), True),       
                     StructField('CONTRIBUTING_FACTOR_VEHICLE_1', StringType(), True),                 
                     StructField('CONTRIBUTING_FACTOR_VEHICLE_2', StringType(), True),       
                     StructField('CONTRIBUTING_FACTOR_VEHICLE_3', StringType(), True),       
                     StructField('CONTRIBUTING_FACTOR_VEHICLE_4', StringType(), True),       
                     StructField('CONTRIBUTING_FACTOR_VEHICLE_5', StringType(), True),       
                     StructField('UNIQUE_KEY', IntegerType(), True),       
                     StructField('VEHICLE_TYPE_CODE_1', StringType(), True),       
                     StructField('VEHICLE_TYPE_CODE_2', StringType(), True),
                     StructField('VEHICLE_TYPE_CODE_3', StringType(), True),
                     StructField('VEHICLE_TYPE_CODE_4', StringType(), True),
                     StructField('VEHICLE_TYPE_CODE_5', StringType(), True)])

In [14]:
#Notice that no job is run this time
nyMotorCollisionsDF = spark.read.csv('/mnt/nypd_shankar/NYPD_Motor_Vehicle_Collisions.csv', header=True, schema=nypdSchema)

In [15]:
display(nyMotorCollisionsDF.limit(10))

In [16]:
nyMotorCollisionsDF.columns

In [17]:
from pyspark.sql.functions import *

Let's use the unix_timestamp() function to convert the string into a timestamp:

In [19]:
from_pattern1 = 'MM/dd/yyyy'
to_pattern1 = 'yyyy-MM-dd'

nyMotorCollisionsTsDF = nyMotorCollisionsDF.withColumn('DATE_TS', unix_timestamp(nyMotorCollisionsDF['DATE'], from_pattern1).cast("timestamp")) \
                        .drop('DATE')

In [20]:
display(nyMotorCollisionsTsDF)

Finally calculate how many distinct years of data is in the CSV file:

In [22]:
nyMotorCollisionsTsDF.select(year('DATE_TS')).distinct().orderBy('year(DATE_TS)').show()

Steps for storing data frames in cache

In [24]:
nyMotorCollisionsTsDF.rdd.getNumPartitions()

In [25]:
nyMotorCollisionsTsDF.repartition(6).createOrReplaceTempView("nyMotorCollisionVIEW")

In [26]:
spark.catalog.cacheTable("nyMotorCollisionVIEW")

In [27]:
# Call .count() to materialize the cache
spark.table("nyMotorCollisionVIEW").count()

In [28]:
nyMotorCollisionDF = spark.table("nyMotorCollisionVIEW")

In [29]:
# Note that the full scan + count in memory takes < 1 second!
nyMotorCollisionDF.count()

In [30]:
# Check if view is cached
spark.catalog.isCached("nyMotorCollisionVIEW")

In [31]:
%fs ls /tmp/

In [32]:
# Saving in parquet file format for efficient usuage 
nyMotorCollisionDF.write.format('parquet').save('/tmp/nyMotorCollisionParqt/')

In [33]:
# Shows 6 gz files stored
%fs ls /tmp/nyMotorCollisionParqt/

In [34]:
# Took half second to read the file
tempDF = spark.read.parquet('/tmp/nyMotorCollisionParqt/')

In [35]:
display(tempDF.limit(2))

In [36]:
# %sql works only in databricks
%sql SELECT count(*) FROM nyMotorCollisionVIEW;

In [37]:
# Registers a DataFrame as a Temporary Table in the SQLContext. 
# To issue SQL queries via the sqlContext.sql( sqlQuery ) method.
tempDF.registerTempTable("tempDF");

Show dates where accidents happened:

In [39]:
data_frame = spark.sql("SELECT DATE_TS, COUNT(*) AS COUNT FROM tempDF GROUP BY DATE_TS ORDER BY COUNT DESC")
print_frame = data_frame.toPandas();

print print_frame

In [40]:
from collections import defaultdict
import json

dates_dict = defaultdict(list)
for x in range(0,24):
    data_frame = spark.sql("SELECT BOROUGH,COUNT(*) AS timeCount FROM tempDF WHERE TIME LIKE '" + str(x) + ":%' GROUP BY BOROUGH")
    pd_frame = data_frame.toPandas()
    for index,row in pd_frame.iterrows():
		dates_dict[row['BOROUGH']].append(row['timeCount'])

del dates_dict[None]
print(json.dumps(dates_dict, indent=4))

In [41]:
death_Dict = []
for x in range(0,24):
	data_frame = spark.sql("SELECT COUNT(*) AS NUMBER_OF_PERSONS_KILLED FROM tempDF WHERE TIME LIKE '" + str(x) + ":%' AND NUMBER_OF_PERSONS_KILLED > 0")
	pd_frame = data_frame.toPandas()
	for index,row in pd_frame.iterrows():
		death_Dict.append(row['NUMBER_OF_PERSONS_KILLED'])

print death_Dict

In [42]:
person_Inj_Dict = []
for x in range(0,24):
	data_frame = spark.sql("SELECT COUNT(*) AS NUMBER_OF_PERSONS_INJURED FROM tempDF WHERE TIME LIKE '" + str(x) + ":%' AND NUMBER_OF_PERSONS_INJURED > 0")
	pd_frame = data_frame.toPandas()
	for index,row in pd_frame.iterrows():
		person_Inj_Dict.append(row['NUMBER_OF_PERSONS_INJURED'])

print person_Inj_Dict

In [43]:
pedes_Inj_Dict = []
for x in range(0,24):
	data_frame = spark.sql("SELECT COUNT(*) AS NUMBER_OF_PEDESTRIANS_INJURED FROM tempDF WHERE TIME LIKE '" + str(x) + ":%' AND NUMBER_OF_PEDESTRIANS_INJURED > 0")
	pd_frame = data_frame.toPandas()
	for index,row in pd_frame.iterrows():
		pedes_Inj_Dict.append(row['NUMBER_OF_PEDESTRIANS_INJURED'])

print pedes_Inj_Dict

In [44]:
pedes_Killed_Dict = []
for x in range(0,24):
	data_frame = spark.sql("SELECT COUNT(*) AS NUMBER_OF_PEDESTRIANS_KILLED FROM tempDF WHERE TIME LIKE '" + str(x) + ":%' AND NUMBER_OF_PEDESTRIANS_KILLED > 0")
	pd_frame = data_frame.toPandas()
	for index,row in pd_frame.iterrows():
		pedes_Killed_Dict.append(row['NUMBER_OF_PEDESTRIANS_KILLED'])

print pedes_Killed_Dict

In [45]:
cyclist_killed_Dict = []
for x in range(0,24):
	data_frame = spark.sql("SELECT COUNT(*) AS NUMBER_OF_CYCLIST_KILLED FROM tempDF WHERE TIME LIKE '" + str(x) + ":%' AND NUMBER_OF_CYCLIST_KILLED > 0")
	pd_frame = data_frame.toPandas()
	for index,row in pd_frame.iterrows():
		cyclist_killed_Dict.append(row['NUMBER_OF_CYCLIST_KILLED'])

print cyclist_killed_Dict

In [46]:
cyclist_injured_Dict = []
for x in range(0,24):
	data_frame = spark.sql("SELECT COUNT(*) AS NUMBER_OF_CYCLIST_INJURED FROM tempDF WHERE TIME LIKE '" + str(x) + ":%' AND NUMBER_OF_CYCLIST_INJURED > 0")
	pd_frame = data_frame.toPandas()
	for index,row in pd_frame.iterrows():
		cyclist_injured_Dict.append(row['NUMBER_OF_CYCLIST_INJURED'])

print cyclist_injured_Dict

In [47]:
motorist_injured_Dict = []
for x in range(0,24):
	data_frame = spark.sql("SELECT COUNT(*) AS NUMBER_OF_MOTORIST_INJURED FROM tempDF WHERE TIME LIKE '" + str(x) + ":%' AND NUMBER_OF_MOTORIST_INJURED > 0")
	pd_frame = data_frame.toPandas()
	for index,row in pd_frame.iterrows():
		motorist_injured_Dict.append(row['NUMBER_OF_MOTORIST_INJURED'])

print motorist_injured_Dict

In [48]:
motorist_killed_Dict = []
for x in range(0,24):
	data_frame = spark.sql("SELECT COUNT(*) AS NUMBER_OF_MOTORIST_KILLED FROM tempDF WHERE TIME LIKE '" + str(x) + ":%' AND NUMBER_OF_MOTORIST_KILLED > 0")
	pd_frame = data_frame.toPandas()
	for index,row in pd_frame.iterrows():
		motorist_killed_Dict.append(row['NUMBER_OF_MOTORIST_KILLED'])

print motorist_killed_Dict

In [49]:
data_frame = spark.sql("SELECT CONTRIBUTING_FACTOR_VEHICLE_1,COUNT(*) as count from nyMotorCollisionVIEW group By CONTRIBUTING_FACTOR_VEHICLE_1 order by count desc")
df = data_frame.toPandas();
print df

In [50]:
data_frame = spark.sql("SELECT CONTRIBUTING_FACTOR_VEHICLE_2,COUNT(*) as count from nyMotorCollisionVIEW group By CONTRIBUTING_FACTOR_VEHICLE_2 order by count desc")
df = data_frame.toPandas();
print df

In [51]:
data_frame = spark.sql("SELECT CONTRIBUTING_FACTOR_VEHICLE_3,COUNT(*) as count from nyMotorCollisionVIEW group By CONTRIBUTING_FACTOR_VEHICLE_3 order by count desc")
df = data_frame.toPandas();
print df

In [52]:
data_frame = spark.sql("SELECT CONTRIBUTING_FACTOR_VEHICLE_4,COUNT(*) as count from nyMotorCollisionVIEW group By CONTRIBUTING_FACTOR_VEHICLE_4 order by count desc")
df = data_frame.toPandas();
print df

In [53]:
data_frame = spark.sql("SELECT VEHICLE_TYPE_CODE_1,COUNT(*) as count from nyMotorCollisionVIEW group By VEHICLE_TYPE_CODE_1 order by count desc")
df = data_frame.toPandas();
print df

In [54]:
data_frame = spark.sql("SELECT VEHICLE_TYPE_CODE_2,COUNT(*) as count from nyMotorCollisionVIEW group By VEHICLE_TYPE_CODE_2 order by count desc")
df = data_frame.toPandas();
print df

In [55]:
data_frame = spark.sql("SELECT VEHICLE_TYPE_CODE_3,COUNT(*) as count from nyMotorCollisionVIEW group By VEHICLE_TYPE_CODE_3 order by count desc")
df = data_frame.toPandas();
print df

In [56]:
data_frame = spark.sql("SELECT VEHICLE_TYPE_CODE_4,COUNT(*) as count from nyMotorCollisionVIEW group By VEHICLE_TYPE_CODE_4 order by count desc")
df = data_frame.toPandas();
print df

In [57]:
data_frame = spark.sql("SELECT VEHICLE_TYPE_CODE_5,COUNT(*) as count from nyMotorCollisionVIEW group By VEHICLE_TYPE_CODE_5 order by count desc")
df = data_frame.toPandas();
print df

In [58]:
data_frame = spark.sql("SELECT * from nyMotorCollisionVIEW")
df = data_frame.toPandas();

In [59]:
# Locations where accident occurred most:

location_Dict = {}

data_frame = spark.sql("select LATITUDE,LONGITUDE from nyMotorCollisionVIEW")
pd_frame = data_frame.toPandas()

for index, row in pd_frame.iterrows():
	a = str(row['LATITUDE']) + str(row['LONGITUDE'])
	if a in location_Dict:
		location_Dict[a] = location_Dict[a] + 1;
	else:
		location_Dict[a] = 1;

In [60]:
print(json.dumps(location_Dict, indent=4))