####**Records of Denver, Colorado Police Pedestrian Stops and Vehicle Stops**

In [1]:
%fs ls /FileStore/tables/

UsageError: Line magic function `%fs` not found.


In [None]:
spark

In [None]:
DenverCrimeDataDF = spark.read.csv('/FileStore/tables/DenverCrimeData.csv', header=True, inferSchema=True)


In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

In [None]:
# Note that we are removing all space characters from the col names to prevent errors when writing to Parquet later

CrimeSchema = StructType([StructField('MASTER_INCIDENT_NUMBER', StringType(), True),
                     StructField('PRIORITY_DESCRIPTION', StringType(), True),
                     StructField('PROBLEM', StringType(), True),
                     StructField('ADDRESS', StringType(), True),   
                     StructField('CALL_CLASS', IntegerType(), True),
                     StructField('TIME_PHONEPICKUP', StringType(), True),
                     StructField('CALL_DISPOSITION', StringType(), True),
                     StructField('GEO_X', DoubleType(), True),
                     StructField('GEO_Y', DoubleType(), True),
                     StructField('GEO_LON', DoubleType(), True),
                     StructField('GEO_LAT', DoubleType(), True),
                     StructField('DISTRICT_ID', IntegerType(), True),
                     StructField('PRECINCT_ID', IntegerType(), True),
                     StructField('NEIGHBORHOOD_NAME', StringType(), True),
])

In [None]:
#Notice that no job is run this time
denverCrimeDataDF = spark.read.csv('/FileStore/tables/DenverCrimeData.csv', header=True, schema=CrimeSchema)

In [None]:
denverCrimeDataDF.limit(5).show()

In [None]:
denverCrimeDataDF.columns

In [None]:
denverCrimeDataDF.count()

** Summary statistics **

In [None]:
display(denverCrimeDataDF.describe())

** Create a CSV file with all the incidents in the most eventful neighborhood**

In [None]:
denverCrimeDataDF.select('NEIGHBORHOOD_NAME').groupBy('NEIGHBORHOOD_NAME').count().orderBy("count", ascending=False).limit(1).createOrReplaceTempView("sfpdDF")

In [None]:
#denverCrimeDataDF.filter('NEIGHBORHOOD_NAME' == 'Five Points)
                         
from pyspark.sql.functions import col
top10ResSQL = spark.sql("SELECT NEIGHBORHOOD_NAME from sfpdDF ")
TopNeighDataDF = denverCrimeDataDF.where(col("NEIGHBORHOOD_NAME").isin([(top10ResSQL.collect()[0][0])]))   
TopNeighDataDF.coalesce(1).write.format("csv").save('/FileStore/TopNeighDetails/TopNeighDataDetails.csv')

** Top 10 incident neighborhoods by count**

In [None]:
#Note that .count() is actually a transformation here
display(denverCrimeDataDF.select('NEIGHBORHOOD_NAME').groupBy('NEIGHBORHOOD_NAME').count().orderBy("count", ascending=False).limit(10))


** Trend over time by number of incidents**

In [None]:
denverCrimeDataDF.printSchema()

In [None]:
from pyspark.sql.functions import *

In [None]:
denverCrimeDataTsDF = denverCrimeDataDF.withColumn('TIME_PHONEPICKUP_TS', to_timestamp(denverCrimeDataDF['TIME_PHONEPICKUP'], 'dd-MM-yy').cast("timestamp")) #.drop('TIME_PHONEPICKUP')  
#denverCrimeDataTsDF.show()

display(denverCrimeDataTsDF.select('PROBLEM','TIME_PHONEPICKUP_TS').groupBy('PROBLEM',year('TIME_PHONEPICKUP_TS')).count().orderBy("count", ascending=False).limit(10))

#denverCrimeDataTsDF.select(year('TIME_PHONEPICKUP_TS')).distinct().orderBy('year(TIME_PHONEPICKUP_TS)').show()

In [None]:
denverCrimeDataTsDF.printSchema()

** Most common resolutions **

In [None]:

display(denverCrimeDataDF.select('CALL_DISPOSITION').groupBy('CALL_DISPOSITION').count().orderBy("count", ascending=False).limit(10))


##** Additional insights **

**The number of problems by category**

In [None]:

display(denverCrimeDataDF.select('PROBLEM').groupBy('PROBLEM').count().orderBy("count", ascending=False).limit(10))


**The number of problems by category and the respective resolutions**

In [None]:
display(denverCrimeDataDF.select('CALL_DISPOSITION','PROBLEM').groupBy('CALL_DISPOSITION','PROBLEM').count().orderBy("count", ascending=False).limit(10))


**Priority Description categorised by problem**

In [None]:

display(denverCrimeDataDF.select('PRIORITY_DESCRIPTION','PROBLEM').groupBy('PRIORITY_DESCRIPTION','PROBLEM').count().orderBy("count", ascending=False).limit(10))

**Number of incidents occured in every month till date**

In [None]:
display(denverCrimeDataTsDF.select('TIME_PHONEPICKUP_TS').groupBy(month('TIME_PHONEPICKUP_TS')).count().orderBy(month('TIME_PHONEPICKUP_TS')).limit(13))