## Exploring the City of Saint Louis public crime data with Apache Spark 2.3

The Saint Louis OpenData project contains hundreds of datasets for the city of Saint Louis. Open government data has the potential to increase the quality of life for residents, create more efficient government services, better public decisions, and even create new local businesses and services.

![Gateway Arch](https://images.unsplash.com/photo-1514893011-72dfa15bd29c?ixlib=rb-0.3.5&ixid=eyJhcHBfaWQiOjEyMDd9&s=8c2d25b2fcf02c87b1e9022f43affd05&auto=format&fit=crop&w=1491&q=80)

We start by mounting the Amazon S3 storage to the notebook.

In [5]:
ACCESS_KEY = "XXXXXXXXXXXXXXX"
SECRET_KEY = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")
AWS_BUCKET_NAME = "xxxxx-xxxx"
MOUNT_NAME = "input"

dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)
display(dbutils.fs.ls("/mnt/%s" % MOUNT_NAME))

List all mounted points

In [7]:
display(dbutils.fs.mounts())

The 2018 crime data of Saint Louis city is uploaded to S3 in csv format and pulled down to Databricks for analysis. You can list the file with the `%fs ls` command

In [9]:
%fs ls /mnt/input/input

Note, I combined 12 csv files into 1 file to get a full year of data.  I downloaded directly from this link: http://www.slmpd.org/Crimereports.shtml

I'm using "spark" as an entry point into all functionality in Spark 2.3.

In [12]:
spark

Using the SparkSession, create a DataFrame from the CSV file by inferring the schema.

In [14]:
crimeDataDF = spark.read.csv('/mnt/input/input/2018stlcrimedata.csv', header=True, inferSchema=True)

In [15]:
crimeDataDF.count()

Display the data using a display function by Databricks.

In [17]:
display(crimeDataDF)

Notice that the above cell takes ~2 seconds to run b/c it is inferring the schema by sampling the file and reading through it.

Inferring the schema works for ad-hoc analysis against smaller datasets. But when working on terabyte of data, it's better to provide an **explicit pre-defined schema manually**, so there's no inferring cost:

In [19]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DecimalType

Note that we are removing all space characters from the col names to prevent errors when writing to Parquet later

In [21]:
crimeSchema = StructType(
                     [
                     StructField('Complaint', StringType(), True),
                     StructField('CodedMonth', StringType(), True),
                     StructField('DateOccur', StringType(), True),
                     StructField('FlagCrime', StringType(), True),                  
                     StructField('FlagUnfounded', StringType(), True),       
                     StructField('FlagAdministrative', StringType(), True),       
                     StructField('Count', IntegerType(), True),       
                     StructField('FlagCleanup', StringType(), True),       
                     StructField('Crime', IntegerType(), True),       
                     StructField('District', IntegerType(), True),       
                     StructField('Description', StringType(), True),       
                     StructField('ILEADSAddress', IntegerType(), True),                  
                     StructField('ILEADSStreet', StringType(), True),       
                     StructField('Neighborhood', IntegerType(), True),       
                     StructField('LocationName', StringType(), True),       
                     StructField('LocationComment', StringType(), True),       
                     StructField('CADAddress', IntegerType(), True),       
                     StructField('CADStreet', StringType(), True),       
                     StructField('XCoord', StringType(), True),                 
                     StructField('YCoord', StringType(), True)
                     ]
                     )

In [22]:
crimeDataSDF = spark.read.csv('/mnt/input/input/2018stlcrimedata.csv', header=True, schema=crimeSchema)

In [23]:
display(crimeDataSDF)

The csv file contains null records so we drop all null records from the table.

In [25]:
crimeDataSDF.na.drop(subset="Complaint")

In [26]:
crimeDataSDF.count()

Look at the first 5 records in the DataFrame:

In [28]:
crimeDataSDF.show(5)

Print just the column names in the DataFrame:

In [30]:
crimeDataSDF.columns

Count how many rows total there are in DataFrame (and see how long it takes to do a full scan from remote disk/S3):

In [32]:
crimeDataSDF.count()

There are over ~46 thousand rows in the DataFrame and it takes ~2 seconds to do a full read of it.

###  **Analysis with PySpark DataFrames and Spark SQL API**

Create a temp view to use spark.sql

In [36]:
crimeDataSDF.createOrReplaceTempView("crimesql")

**Q-1) How many different types of calls were made to the Police Department?**

In [38]:
crimeDataSDF.select('Description').distinct().show(35, False)

In [39]:
display(sqlContext.sql("SELECT Description FROM crimesql GROUP BY description"), limit=35)

The queries above show the different type of calls to the police department.

**Q-2) How many incidents of each call type were there?**

In [42]:
display(crimeDataSDF.select('Description').groupBy('Description').count().orderBy('Count', ascending=False))

In [43]:
display(sqlContext.sql("SELECT Description, count(*) as count FROM crimesql GROUP BY description ORDER BY count desc"))

Seems like the Saint Louis City Police department is called for leaving crime scene far more than any other type. Note that the above command took about 3 seconds to execute. In an upcoming section, we'll cache the data into memory for up to 100x speed increases.

**Q-4) What is the most dangerous month in Saint Louis city?**

In [46]:
display(crimeDataSDF.select('CodedMonth').groupBy('CodedMonth').count().orderBy('Count',ascending=False))

In [47]:
display(sqlContext.sql("select CodedMonth, count(*) as count from crimesql group by CodedMonth ORDER BY count DESC"))

Seems like August 2018 is the most dangerous month in Saint Louis city and June comes in second.

### ![Spark Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_spark_tiny.png) ** Doing Date/Time Analysis**

**Q-4) How many service calls were logged on July 4th?**

Notice that the date or time columns (DateOccur) is currently being interpreted as strings, rather than date or time objects:

In [52]:
crimeDataSDF.printSchema()

Let's use the unix_timestamp() function to convert the string into a timestamp:

https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/api/python/pyspark.sql.html?highlight=spark#pyspark.sql.functions.from_unixtime

In [54]:
from pyspark.sql.functions import *

In [55]:
# Note that PySpark uses the Java Simple Date Format patterns

from_pattern1 = 'MM/dd/yyyy'
to_pattern1 = 'MM/dd/yyyy'

from_pattern2 = 'MM/dd/yyyy HH:mm'
to_pattern2 = 'MM/dd/yyyy HH:mm'


crimeDataSTsDF = crimeDataSDF \
  .withColumn('DateOccurTS', unix_timestamp(crimeDataSDF['DateOccur'], from_pattern2).cast("timestamp")) \
  .drop('DateOccur')

In [56]:
crimeDataSTsDF.printSchema()

In [57]:
crimeDataSTsDF.createOrReplaceTempView("crimesql_ts")

Notice that the formatting of the timestamps is now different:

Note that July 4th, is the 185th day of the year in 2018.

Filter the DF down to just 2018 and days of year equal 185:

In [60]:
crimeDataSTsDF.filter(year('DateOccurTs') == '2018').filter(dayofyear('DateOccurTs') == 185).groupBy(dayofyear('DateOccurTs')).count().orderBy(dayofyear('DateOccurTs')).show()

In [61]:
display(sqlContext.sql("select count(*) as count FROM crimesql_ts WHERE cast(DateOccurTs as date)='2018-07-04'"))

In [62]:
They were 133 calls made on July 4, 2018.

**Q-5) How many service calls were logged in the first week of January 2018?**

Note that we can narrow down to the 2018 year and look at the first 7 days.

In [65]:
crimeDataSTsDF.filter(year('DateOccurTs') == '2018').filter(dayofyear('DateOccurTs') >= 1).filter(dayofyear('DateOccurTs') <= 7).groupBy(dayofyear('DateOccurTs')).count().orderBy(dayofyear('DateOccurTs')).show()

In [66]:
display(sqlContext.sql("SELECT cast(DateOccurTs as date) as Date, count(*) as count FROM crimesql_ts Where cast(DateOccurTs as date) between '2018-01-01' and '2018-01-07' Group BY cast(DateOccurTs as date) ORDER BY Date "))

Visualize the results in a bar graph:

In [68]:
display(crimeDataSTsDF.filter(year('DateOccurTs') == '2018').filter(dayofyear('DateOccurTs') >= 1).filter(dayofyear('DateOccurTs') <= 7).groupBy(dayofyear('DateOccurTs')).count().orderBy(dayofyear('DateOccurTs')))

### ![Spark Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_spark_tiny.png) ** Memory, Caching and write to Parquet**

The DataFrame is currently comprised of 2 partitions:

In [71]:
crimeDataSTsDF.rdd.getNumPartitions()

the repartition to 3 so that the data is divided evenly among 3 slots on Databrick Community Edition.

In [73]:
crimeDataSTsDF.repartition(3).createOrReplaceTempView("crimeDataVIEW");

In [74]:
spark.catalog.cacheTable("crimeDataVIEW")

In [75]:
# Call .count() to materialize the cache
spark.table("crimeDataVIEW").count()

In [76]:
crimeDataDF = spark.table("crimeDataVIEW")

Once the data is cached, the full table scan from Amazon S3 took 1/10 of a second verus 2 seconds as before.

In [78]:
crimeDataDF.count()

In [79]:
spark.catalog.isCached("crimeDataVIEW")

The 3 partitions are now cached in memory to match the Databrick 3 slots.

![6 Partitions](http://curriculum-release.s3-website-us-west-2.amazonaws.com/sf_open_data_meetup/df_6_parts.png)

We can check the Spark UI to see the 3 partitions in memory:

Now that our data has the correct date/time types for each column and it is correctly partitioned, let's write it down as a parquet file for future loading:

In [84]:
%fs ls /mnt/input/input

In [85]:
crimeDataDF.write.format('parquet').save('dbfs:/mnt/input/input/data')

Now the directory should contain 3 .gz compressed Parquet files (one for each partition):

In [87]:
%fs ls dbfs:/mnt/input/input/data

Here's how you can easily read the parquet file from S3 in the future:

In [89]:
tempDF = spark.read.parquet('dbfs:/mnt/input/input/data/')

In [90]:
display(tempDF.limit(2))

The possibilities are endless with these datasets.

In [92]:
This notebook was inspired by Sameer Farooqui at Databricks.