In [None]:
# Run the following commands to load the data
# wget https://police-incidents-dataset-ds.s3.amazonaws.com/Police_Department_Incident_Reports__2018_to_Present.csv

# Then put the data into the hadoop livy folder 
# hadoop fs -put Police_Department_Incident_Reports__2018_to_Present.csv /user/livy


In [None]:
spark


In [None]:
file1 = spark.read.csv("Police_Department_Incident_Reports__2018_to_Present.csv", header = True, inferSchema = True)

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType

In [None]:
fileSchema = StructType([StructField('Incident_DateTime', StringType(),True),
                        StructField('Incident_Date', StringType(),True),
                        StructField('Incident_Time', StringType(),True),
                        StructField('Incident_Year', IntegerType(),True),
                        StructField('Incident_DayOfWeek', StringType(),True),
                        StructField('Report_DateTime', StringType(),True),
                        StructField('Row_Id', LongType(),True),
                        StructField('Incident_Id', IntegerType(),True),
                        StructField('Incident_Number', IntegerType(),True),
                        StructField('Cad_Number', IntegerType(),True),
                        StructField('Report_Type_Code', StringType(),True),
                        StructField('Report_Type_Description', StringType(),True),
                        StructField('File_Online', BooleanType(),True),
                        StructField('Incident_Code', IntegerType(),True),
                        StructField('Incident_Category', StringType(),True),
                        StructField('Incident_Subcategory', StringType(),True),
                        StructField('Incident_Description', StringType(),True), 
                        StructField('Resolution', StringType(),True),
                        StructField('Intersection', StringType(),True),
                        StructField('CNN', DoubleType(),True),
                        StructField('Police_District', StringType(),True),
                        StructField('Analysis_Neighbourhood', StringType(),True),
                        StructField('Supervisor_District', IntegerType(),True),
                        StructField('Latitude', DoubleType(),True), 
                        StructField('Longitide', DoubleType(),True),
                        StructField('Point', StringType(),True), 
                        ])

In [None]:
file1 = spark.read.csv("Police_Department_Incident_Reports__2018_to_Present.csv", header = True, schema = fileSchema)

In [None]:
file1.printSchema()

In [None]:
file1.show(5)

In [None]:
file1.columns

In [None]:
file1.select('Incident_id', 'Incident_Category').show(5)

In [None]:
file1.select('Incident_Category').distinct().show(truncate = True)

In [None]:
file1.select('Incident_Category').groupBy('Incident_Category').count().orderBy("count", ascending = False).show(52, False)

In [None]:
from pyspark.sql.functions import *

In [None]:
# Analyzing datetime columns in the data set

In [None]:
file1.select("Incident_DateTime").show(5, False)

In [None]:
pattern1 = 'yyyy/MM/dd hh:mm:ss aa'
file2 = file1.withColumn('Incident_DateTime', unix_timestamp(file1['Incident_DateTime'], pattern1).cast('timestamp'))

In [None]:
file2.printSchema()

In [None]:
file2.select(year('Incident_DateTime')).distinct().show(truncate = False)

In [None]:
file1.select('Incident_DateTime', 'Incident_Date','Incident_Time', 'Incident_Year', 'Report_DateTime').show(5, False)

In [None]:
pattern1 = 'yyyy/MM/dd hh:mm:ss aa'
pattern2 = 'yyyy/MM/dd'
pattern3 = 'hh:mm'
pattern4 = 'yyyy'
file2 = file1.withColumn('Incident_DateTime', unix_timestamp(file1['Incident_DateTime'], pattern1).cast('timestamp'))\
.withColumn('Incident_Date', unix_timestamp(file1['Incident_Date'], pattern2).cast('timestamp'))\
.withColumn('Incident_Time', unix_timestamp(file1['Incident_Time'], pattern3).cast('timestamp'))\
.withColumn('Report_DateTime', unix_timestamp(file1['Report_DateTime'], pattern1).cast('timestamp'))

In [None]:
file2.printSchema()

In [None]:
####   Analysis 1 ##############
# Find the days of the week on which maximum incidents has happened


In [None]:

file2.select(dayofweek("Incident_DateTime")).show(7)

In [None]:
file2.select(date_format("Incident_DateTime",'E')).show(7)

In [None]:
# Adding a new column in our dataframe,which add the day of the week in each record
file3 =file2.withColumn('dayOfTheWeek' , date_format("Incident_DateTime",'E'))



In [None]:
# Aggregating based on the day of the week -- this will get us the day of the week, on which maximum incidents happened 
file3.groupBy('dayOfTheWeek').count().orderBy('count', ascending = False).show()


In [None]:
#################  Analysis 2    ####################

# What percent of the incidents has been recorded online



In [None]:
 file2.select("File_Online").show()


In [None]:
file3 = file2.withColumn("File_Online" ,when(col("File_Online") == True , True).otherwise(False) )
file3.select('File_Online').show()

In [None]:
file4 = file3.select("File_Online").groupBy('File_Online').count()

file4.show()

In [None]:
from pyspark.sql.window import Window

file4.withColumn( 'colnew' ,col('count') / sum('count').over(Window.partitionBy())).show()

In [None]:

############   Analysis 3 ###################

# Group by the numbers of incidents reported based on each Year

incidents_reporter_per_year = file2.select(year('Incident_DateTime')).groupBy('year(Incident_DateTime)').count()

incidents_reporter_per_year.show()

In [None]:
###### Running SQL queries in spark ################

In [None]:
### creating a temporary table ######
file2.registerTempTable("police_report_data")
                        

In [None]:
spark.sql("select * from police_report_data").show(3)

In [None]:
## Finding the number of incidents of for each incident_category
spark.sql("select Incident_Category , count(Incident_Category) from police_report_data group by  Incident_Category").show(52, False)


In [None]:
####   Analysis 2 ##############
# Find the days of the week on which maximum incidents has happened


In [None]:
file2.withColumn('dayOfTheWeek' , date_format("Incident_DateTime",'E')).registerTempTable("police_report_data_with_day")

In [None]:
spark.sql('select dayOfTheWeek from police_report_data_with_day ').show(12, False)

In [None]:
spark.sql('select dayOfTheWeek , count(dayOfTheWeek) from police_report_data_with_day group by dayOfTheWeek order by count(dayOfTheWeek) desc ').show()

In [None]:
#################  Analysis 3    ####################

# What percent of the incidents has been recorded online


In [None]:
# file2.select("File_Online").show()
spark.sql("select File_Online from police_report_data").show()

In [None]:
#file3 = file2.withColumn("File_Online" ,when(col("File_Online") == True , True).otherwise(False) )
#update users set name = '*' where name is null
spark.sql("select  ((count(*) - count(File_Online))/count(*))*100 as offline_percent  , \
(100 - ((count(*) - count(File_Online))/count(*))*100) as online_percent from police_report_data ").\
show()

In [None]:
############   Analysis 3 ###################

# Group by the numbers of incidents reported based on each Year

spark.sql("select  year(Incident_DateTime) as year ,  count(*) as no_incidents from police_report_data group by \
year(Incident_DateTime)").show()

########### How many cases of Assault happened on particular month say in Jan 2020  ############

    

In [None]:
spark.sql("select  year(Incident_DateTime) as year , month (Incident_DateTime) as month,  count(*) \
as no_incidents from police_report_data where year(Incident_DateTime)= 2020 and month (Incident_DateTime) = 3 group by year, month ").show()




Extras ------------------------------------------------>

Let us look at the queries used in this video:


Analysis 1:

Find the days of the week on which maximum incidents have happened.

 

We already have a column named 'day of week' in our dataset. But let us use Incident_DateTime column for our query.

 

file2.select(dayofweek("Incident_DateTime")).show(7)
Since the data type of Incident_DateTime column is timestamp, we can use the day of week() method on it. it will give us the day of the week as numbers for each incident.

 

But for our analysis, we need dayofweek as string. For this, we use the following code:

file2.select(date_format("Incident_DateTime",'E')).show(7)
 

'E' here is used to get the string format day of the week from the column "Incident_DateTime". To understand this date_format method, you can use the link in the additional readings.

 

So, we know how we can find dayofweek in string format from timestamp "Incident_DateTime". Let us add a new column named "dayoftheweek" in our dataset. Since dataframes are immutable, we add this column to our 'file2' dataframe and store the result in a new dataframe 'file3'.

 

# Adding a new column in our dataframe,which add the day of the week in each record
file3 =file2.withColumn('dayOfTheWeek' , date_format("Incident_DateTime",'E'))
 

To run the analysis, we use the following query,

# Aggregating based on the day of the week -- this will get us the day of the week, on which maximum incidents happened 
file3.groupBy('dayOfTheWeek').count().orderBy('count', ascending = False).show()
 

Code Description:

We groupBy on the dayOfTheWeek column and count the number of rows in each group. We, then, arrange the count column that is created by the query to show the day when most incidents happened.

 

Analysis 2:

What percent of the incidents have been recorded online?

We are using file2 that contains DateTime columns in timestamp data type.

Let us first analyze this column "File_Online" using the following code.

file2.select("File_Online").show()
 

As discussed in the video, this is a boolean data type column that contains 'true' for cases reported online and 'null' for cases reported offline. We can find the percent of incidents recorded online with this column as well. But let us see how we can update this column values.

file3 = file2.withColumn("File_Online" ,when(col("File_Online") == True , True).otherwise(False) )
file3.select('File_Online').show()
 

Code Description:

Using withColumn(), we have specified that if the value of "File_Online" is True, it should remain True, but in any other case, it should be 'False'. Also, as dataframes are immutable, we have stored the result in a new dataframe 'file3'.

 

Now, let us start the analysis of incidents reported online. we first create a new dataframe from 'file3' that contains the "File_Online" column along with the count of incidents recorded online as well as offline. This new dataframe is stored in 'file4' using the following code.

file4 = file3.select("File_Online").groupBy('File_Online').count()

file4.show()
 

Since we have to find the percent, we have to sum all the incidents reported. To do that, we use a function 'over(Window.partitionBy())'. 

from pyspark.sql.window import Window

file4.withColumn( 'colnew' ,col('count') / sum('count').over(Window.partitionBy())).show()

Analysis 3:

Find the number of incidents reported each year in our dataset.

# Group by the numbers of incidents reported based on each Year

incidents_reporter_per_year = file2.select(year('Incident_DateTime')).groupBy('year(Incident_DateTime)').count()

incidents_reporter_per_year.show()
 

 

Some of the other operations that are available on dataframes include:

Union of two Dataframes

unionDF = df1.unionAll(df2)
 

Filter based on the value

explodeDF.filter(explodeDF.firstName == "xiangrui").sort(explodeDF.lastName)



filterDF = explodeDF.filter((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))
 

Treatment of null rows 

#Drop records with null value

dropNullDF = explodeDF.na.drop()
 

Aggregate function

salarySumDF = explodeDF.agg({"salary" : "sum"})
 

This segment was filled with code demos that included various queries on a police dataset. In the upcoming segments, we will understand how we can run the same queries using Spark SQL.