#Importing data
We first need to import the data from the San Francisco Open Data website.

Dataset can be found at https://data.sfgov.org/Public-Safety/SFPD-Incidents-from-1-January-2003/tmnf-yvry#

In [2]:
import base64
import urllib
import json

url = "https://data.sfgov.org/resource/cuks-n6tp.json"

response = urllib.urlopen(url)
raw_data = json.loads(response.read().decode('utf-8'))

In [3]:
from pyspark.sql.types import *

raw_df = sqlContext.createDataFrame(raw_data)

dropped_df = (raw_df.drop(':@computed_region_bh8s_q3mv')
            .drop(':@computed_region_fyvs_ahh9')
            .drop(':@computed_region_p5aj_wyqh')
            .drop(':@computed_region_rxqg_mtj9')
            .drop(':@computed_region_yftq_j783'))

df = (dropped_df
      .withColumn('x', dropped_df['x'].cast(IntegerType()))
      .withColumn('y', dropped_df['y'].cast(IntegerType()))
      .withColumn('pdid', dropped_df['pdid'].cast(IntegerType()))
      .withColumn('date', dropped_df['date'].cast(DateType())))

In [4]:
display(df)

In [5]:
df.printSchema()

# Analysis
Now that we have imported the data into a Spark DataFrame, we can analyze it

In [7]:
df.select('category').groupby('category').count().orderBy("count", ascending=False).show()

In [8]:
df.select('date').groupby('date').count().orderBy("date").show()

In [9]:
df.repartition(4).createOrReplaceTempView("df_view")
spark.catalog.cacheTable("df_view")
spark.table("df_view").write.format('parquet').mode('overwrite').save('/tmp/crimeParquet')

In [10]:
%fs ls /tmp/crimeParquet/

In [11]:
tempDF = spark.read.parquet('/tmp/crimeParquet')
display(tempDF)

In [12]:
%sql 
SELECT DISTINCT descript, COUNT(*)
FROM df_view
GROUP BY descript
ORDER BY 2 DESC

In [13]:
%sql
SELECT PDDISTRICT, DESCRIPT, COUNT(*)
FROM df_view
GROUP BY PDDISTRICT, DESCRIPT
ORDER BY 3 DESC