## City crime data analysis and modeling

This notebook creates a template for city crime data analysis using Spark SQL. I used SF crime data as an example, which can be downloaded at https://data.sfgov.org/Public-Safety/Police-Department-Incident-Reports-Historical-2003/tmnf-yvry.. 

The running environment is databricks community edition, which is free to use.

**Note**: One can download the small data (one month e.g. 2018-10) for debug, then download the data from 2013 to 2018 for testing and analysising.   

For crime data for other cities, one can download on the corresponding gov.org/Public-Safety, and simply follow this notebook template. The data format is almost same.

In [3]:
from csv import reader
from pyspark.sql import Row 
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd
import numpy as np
import seaborn as sb
import matplotlib.pyplot as plt
#from ggplot import *
import warnings

import os
os.environ["PYSPARK_PYTHON"] = "python3"


In [4]:
# download the data
import urllib.request
# urllib.request.urlretrieve("https://data.sfgov.org/api/views/tmnf-yvry/rows.csv?accessType=DOWNLOAD", "/tmp/sf_03_18.csv")
# dbutils.fs.mv("file:/tmp/sf_03_18.csv", "dbfs:/data/sf_03_18.csv")
# urllib.request.urlretrieve("https://data.sfgov.org/api/views/wg3w-h783/rows.csv?accessType=DOWNLOAD", "/tmp/sf_18_pr.csv")
# dbutils.fs.mv("file:/tmp/sf_18_pr.csv", "dbfs:/data/sf_18_pr.csv")
display(dbutils.fs.ls("dbfs:/data/"))

path,name,size
dbfs:/data/sf_03_18.csv,sf_03_18.csv,550859692
dbfs:/data/sf_18_pr.csv,sf_18_pr.csv,120467260


In [5]:
prefix = "dbfs:/data/"
data_path1 = prefix+"sf_03_18.csv"
data_path2 = prefix+"sf_18_pr.csv"
print(data_path1)

# use this file name later

In [6]:
# read data from the data storage
# please upload your data into databricks community at first. 
crime_data_lines1 = sc.textFile(data_path1)
#prepare data 
df_crimes1 = crime_data_lines1.map(lambda line: [x.strip('"') for x in next(reader([line]))])
#get header
header1 = df_crimes1.first()
print(header1)

#remove the first line of data
crimes1 = df_crimes1.filter(lambda x: x != header)

crime_data_lines2 = sc.textFile(data_path2)
df_crimes2 = crime_data_lines2.map(lambda line:[x.strip('"') for x in next(reader([line]))])
header2 = df_crimes2.first()
print(header2)
crimes2 = df_crimes2.filter(lambda x:x != header)


#get the first line of data
#display(crimes.take(3))

#get the total number of data 
#crimes1.count()+crimes2.count()


In [7]:
from pyspark.sql.functions import col
from pyspark.sql.functions import hour, date_format, to_date
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("crime analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df_opt1 = spark.read.format("csv").option("header", "true").load(data_path1)
df_opt2 = spark.read.format("csv").option("header", "true").load(data_path2)
df_opt1 = df_opt1.withColumn('X', df_opt1["X"].cast('double'))\
                 .withColumn('Y', df_opt1["Y"].cast('double'))\
                 .withColumn("day", to_date(col("Date"),"MM/dd/yy"))\
                 .withColumn('hour', hour(df_opt1['Time']))
df_opt1 = df_opt1.drop('IncidntNum','Descript','Address','PdId','Time','Date') \
                 .withColumnRenamed('day','Date')

df_opt2 = df_opt2.drop('Report Datetime','Row ID','Incident ID','Incident Number','CAD Number','Report Type Code',\
                       'Report Type Description','Filed Online','Incident Code','Incident Subcategory','Incident Description',\
                       'Intersection','CNN','Analysis Neighborhood','Supervisor District')

df_opt2 = df_opt2.withColumn('X', df_opt2['Longitude'].cast('double')) \
                 .withColumn('Y', df_opt2['Latitude'].cast('double')) \
                 .withColumn("Date",to_date(col("Incident Date"), "yyyy/MM/dd")) \
                 .withColumn('hour', hour(df_opt2['Incident Time'])) \
                 .drop('Longitude','Latitude','Incident Date','Incident Time','Incident Datetime')

df_opt2 = df_opt2.withColumnRenamed('Incident Day of Week', 'DayOfWeek') \
                 .withColumnRenamed('Incident Year','year') \
                 .withColumnRenamed('Incident Category','Category') \
                 .withColumnRenamed('Police District','District')
display(df_opt1)
display(df_opt2)
df_opt1.createOrReplaceTempView("sf_crime1")
df_opt2.createOrReplaceTempView("sf_crime2")

year,DayOfWeek,Category,Resolution,District,point,SF Find Neighborhoods,Current Police Districts,Current Supervisor Districts,Analysis Neighborhoods,HSOC Zones as of 2018-06-05,OWED Public Spaces,Central Market/Tenderloin Boundary Polygon - Updated,Parks Alliance CPSI (27+TL sites),ESNCAG - Boundary File,"Areas of Vulnerability, 2016",X,Y,Date,hour
2019,Wednesday,Offences Against The Family And Children,Open or Active,Taraval,"(37.76256939715695, -122.49962745519909)",39.0,10.0,7.0,35.0,,,,,,1.0,-122.49962745519908,37.76256939715695,2019-05-01,1
2019,Saturday,Non-Criminal,Open or Active,Southern,"(37.7805353858225, -122.40816079455212)",32.0,1.0,10.0,34.0,1.0,,1.0,,,2.0,-122.40816079455212,37.7805353858225,2019-06-22,7
2019,Monday,Missing Person,Open or Active,Bayview,"(37.72159985216247, -122.39074534279013)",88.0,2.0,9.0,1.0,,,,,,2.0,-122.39074534279013,37.72159985216247,2019-06-03,16
2018,Friday,Offences Against The Family And Children,Cite or Arrest Adult,Central,"(37.794859532228344, -122.40487561154785)",104.0,6.0,3.0,6.0,,18.0,,,,2.0,-122.40487561154784,37.79485953222834,2018-11-16,16
2019,Monday,Assault,Open or Active,Northern,"(37.79771621229674, -122.43055896140594)",15.0,4.0,6.0,13.0,,,,,,1.0,-122.43055896140594,37.79771621229674,2019-05-27,2
2018,Wednesday,Burglary,Open or Active,Central,"(37.78582921318811, -122.40148983641853)",32.0,1.0,10.0,8.0,,,,,,1.0,-122.40148983641852,37.78582921318811,2018-11-07,3
2019,Thursday,Larceny Theft,Open or Active,Richmond,"(37.782488031626265, -122.4458205129012)",13.0,8.0,6.0,18.0,,,,,,1.0,-122.4458205129012,37.782488031626265,2019-08-15,12
2019,Wednesday,Other Miscellaneous,Open or Active,Tenderloin,"(37.780005867994824, -122.41193341005338)",32.0,5.0,10.0,34.0,1.0,,1.0,,,2.0,-122.41193341005338,37.78000586799482,2019-08-07,7
2018,Thursday,Non-Criminal,Cite or Arrest Adult,Northern,"(37.78080176962032, -122.43726005920409)",97.0,4.0,11.0,39.0,,,,,,2.0,-122.43726005920408,37.78080176962032,2018-11-15,15
2019,Saturday,Traffic Violation Arrest,Cite or Arrest Adult,Central,"(37.806780111468534, -122.4195772441978)",99.0,6.0,6.0,32.0,,,,,,1.0,-122.4195772441978,37.80678011146853,2019-06-29,16


#####counts the number of crimes for different category.

In [9]:
# use Spark dataframe
q1_result = df_opt1.groupBy('category').count().orderBy('count', ascending=False)
display(q1_result)

category,count
LARCENY/THEFT,480448
OTHER OFFENSES,309358
NON-CRIMINAL,238323
ASSAULT,194694
VEHICLE THEFT,126602
DRUG/NARCOTIC,119628
VANDALISM,116059
WARRANTS,101379
BURGLARY,91543
SUSPICIOUS OCC,80444


In [10]:
# use Spark SQL (alternatively)
pre_crimeCategory = spark.sql("SELECT sf_crime1.category FROM sf_crime1 UNION ALL SELECT sf_crime2.category FROM sf_crime2")
pre_crimeCategory.createOrReplaceTempView("PRE")
crimeCategory = spark.sql("SELECT category, COUNT(*) as Count FROM PRE GROUP BY 1 ORDER BY 2 DESC")
display(crimeCategory)

category,Count
LARCENY/THEFT,480448
OTHER OFFENSES,309358
NON-CRIMINAL,238323
ASSAULT,194694
VEHICLE THEFT,126602
DRUG/NARCOTIC,119628
VANDALISM,116059
Larceny Theft,106145
WARRANTS,101379
BURGLARY,91543


#####counts the number of crimes for different district

In [12]:
from pyspark.sql.functions import year
fig, axes = plt.subplots(nrows=2,ncols=2,figsize=(14,14))

pre_district = spark.sql("SELECT sf_crime1.PdDistrict as District, year(sf_crime1.Date) as Year FROM sf_crime1 UNION ALL SELECT sf_crime2.District as District, sf_crime2.Year FROM sf_crime2")
#display(pre_district)

pre_district.createOrReplaceTempView("district")
district = spark.sql("SELECT District, Year, COUNT(*) AS Count FROM district GROUP BY 1, 2 ORDER BY 3 DESC").na.drop()
district.createOrReplaceTempView("district_15_18")
#display(district)
q2 = []
years_plot = range(2015,2019)
for i in range(0, 4):
  q2.append(spark.sql("SELECT District, Count FROM district_15_18 where Year = {}".format(years_plot[i])).toPandas())
  sb.barplot(x= 'District', y = 'Count', data=q2[i], ax=axes[i//2][i%2])
  axes[i//2][i%2].set_xticklabels(q2[i]['District'], rotation=45, fontsize=7)
  axes[i//2][i%2].set_ylabel('Number of Crimes')
  axes[i//2][i%2].set_title('Number of Crimes in {0}'.format(years_plot[i]))
display()


#####counts the number of crimes each "Sunday" at "SF downtown".   
We define the "SF downtown" is defined as a circle area with the center of SF downtown and a radius

Note: I used a UDF function to filter data which are located inside the "SF downtown".

In [14]:
#Possible Solution to Q3

from pyspark.sql.functions import year
import matplotlib.dates as mdates
monthsFmt = mdates.DateFormatter('%m')
'''''
a =[-122.407847, 37.793069] # center of SFdowntown
def find_dis(x, y):
  return (float(x)-a[0])**2 + (float(y)-a[1])**2
spark.udf.register("find_dis", find_dis, FloatType())

crimSunDowntown = spark.sql("SELECT DayOfWeek, Date, COUNT(*) AS Cases FROM sf_crime WHERE DayOfWeek='Sunday' and find_dis(X,Y) <= 0.000380581 GROUP BY 1,2 ORDER BY 3 ")
display(crimSunDowntown)
'''''
a =[-122.407847, 37.793069] # center of SFdowntown
def find_dis(x, y):
  return (float(x)-a[0])**2 + (float(y)-a[1])**2
spark.udf.register("find_dis", find_dis, FloatType())

fig, axes = plt.subplots(nrows=2,ncols=2,figsize=(14,14))

q3 = []
years = range(2015,2019)

for i in range(4):
  crimeSunDT = spark.sql("""
  SELECT DayOfWeek, Date, Year, Count(*) AS Cases
  FROM
  (SELECT DayOfWeek, Date, year(Date) AS Year, X, Y
   FROM sf_crime1
   UNION ALL
   SELECT DayOfWeek, Date, Year, X, Y
   FROM sf_crime2
   )
  WHERE DayOfWeek = 'Sunday' and (X+122.407847)*(X+122.407847)+(Y-37.793069)*(Y-37.793069)<=0.00038058181 and Year = {0}
  GROUP BY 1,2,3  ORDER BY 2,4""".format(years[i]))
  q3.append(crimeSunDT.toPandas())
  #display(q3[i])
  axes[i//2][i%2].plot(q3[i]['Date'],q3[i]['Cases'],'--.')
  axes[i//2][i%2].xaxis.set_major_formatter(monthsFmt)
  axes[i//2][i%2].set_xlabel('Month')
  axes[i//2][i%2].set_ylabel('Number of Crimes on Sun DT')
  axes[i//2][i%2].set_title('{0}'.format(years[i]))
display()

#####Analysis the number of crime in each month of 2015, 2016, 2017, 2018.

In [16]:
from pyspark.sql.functions import year, month

df4_1 = spark.sql("""SELECT year(Date) AS Year, month(Date) AS Month
                   FROM sf_crime1
                   WHERE year(Date) >= 2015 and year(Date) <= 2017
                   """)
df4_2 = spark.sql("""SELECT year(Date) AS Year, month(Date) AS Month
                   FROM sf_crime2
                   WHERE year(Date) <= 2018
                   """)
#display(df4_2)
#df4 = pd.concat([df4_1, df4_2], ignore_index = True)
df4 = df4_1.union(df4_2)
df4 = df4.groupBy('Year', 'Month').count().orderBy('Year','Month').dropna().toPandas()
#display(df4)
df4 = df4.pivot(index = 'Month', columns = 'Year', values = 'count')
ax = df4.plot(kind = 'line')
ax.set_title('Number of Crime in Each Month from 2015-2018')
display()


#####Analysis the number of crime w.r.t the hour in certian day like 2015/12/15, 2016/12/15, 2017/12/15.

In [18]:
df5_1 = spark.sql("""SELECT Date, hour, COUNT(*) AS `2015`
                FROM sf_crime1
                WHERE Date = '2015-12-15'
                GROUP BY 1, 2 ORDER BY 2
                """).toPandas().set_index('hour')
df5_2 = spark.sql("""SELECT Date, hour, COUNT(*) AS `2016`
                FROM sf_crime1
                WHERE Date = '2016-12-15'
                GROUP BY 1, 2 ORDER BY 2
                """).toPandas().set_index('hour')
df5_3 = spark.sql("""SELECT Date, hour, COUNT(*) AS `2017`
                FROM sf_crime1
                WHERE Date ='2017-12-15'
                GROUP BY 1, 2 ORDER BY 2
                """).toPandas().set_index('hour')
df5_4 = spark.sql("""SELECT Date, hour, COUNT(*) AS `2018`
                FROM sf_crime2
                WHERE Date ='2018-10-15'
                GROUP BY 1, 2 ORDER BY 2
                """).toPandas().set_index('hour')

df5 = pd.concat([df5_1, df5_2, df5_3, df5_4], axis=1)
fg = df5.plot(kind='bar', figsize =(16,10))
fg.set_title('Number of Crime in day 12/15(2015-2017) and 10/15(2018)')
display(fg)

#####Find out the top-3 danger disrict  
#####Find out the crime event w.r.t category and time (hour) from the top-3 danger disrict.

In [20]:
df6_1 = spark.sql("""SELECT District, COUNT(*) AS Count
                    FROM(
                    SELECT PdDistrict as District, year(Date) as Year
                    FROM sf_crime1
                    WHERE year(Date)>=2015 and year(Date)<=2017
                    UNION ALL
                    SELECT District, year as Year
                    FROM sf_crime2
                    WHERE year = 2018)
                    GROUP BY 1 ORDER BY 2 DESC
                    """).toPandas().head(3)
fg_1 = df6_1.plot(kind='bar', figsize =(16,10), use_index = 'District')
fg_1.set_xticklabels(df6_1['District'], rotation=45, fontsize=7)
display(fg_1)

In [21]:
fig6_2,ax6_2 = plt.subplots(nrows = 18, ncols = 4, figsize=(60,60))
districts = ['Southern', 'Northern','Mission']

for i in range(3):
  df6_2 = spark.sql("""SELECT Category, hour, COUNT(*) AS Count
           FROM(
           SELECT PdDistrict as District, year(Date) as Year, Category, hour
           FROM sf_crime1
           WHERE year(Date)>=2015 and year(Date)<=2017
           UNION ALL
           SELECT District, year as Year,Category, hour
           FROM sf_crime2
           WHERE year = 2018)
           WHERE District = "{0}"
           GROUP BY 1,2 ORDER BY 2 DESC
           """.format(districts[i])).dropna().toPandas()
  df6_2 = df6_2.pivot(index = 'Category', columns = 'hour', values = 'Count')
  df6_2 = df6_2.div(df6_2.sum(axis = 0), axis=1)
  for k in df6_2.columns:
    labels = df6_2[i].sort_values(ascending = False)[:5].index.tolist()
    values = df6_2[i].sort_values(ascending = False)[:5].values
    labels.append('other')
    values = np.append(values, 1-values.sum())
    print(labels)
    print(values)
    ax6_2[(i*24+k)//4][(i*24+k)%4].pie(values, labels=labels, autopct='%1.1f%%', shadow = False, startangle = 90)
    ax6_2[(i*24+k)//4][(i*24+k)%4].axis('equal')
    ax6_2[(i*24+k)//4][(i*24+k)%4].set_title('Hour {0} at {1}'.format(k, districts[i]))
fig6_2.suptitle('Percentage of Category in Top3 Danger District')
display()

#####For different category of crime, find the percentage of resolution.

In [23]:
df7 = spark.sql("""SELECT Category, COUNT(*) AS Count,
                  CASE Resolution
                    WHEN 'NONE' THEN 'False'
                    ELSE 'True'
                  END AS isResolved
                  FROM sf_crime1
                  GROUP BY 1, 3 ORDER BY 1""")
df7.createOrReplaceTempView("df7_Table")

#display(df7)
df7_all = spark.sql("""SELECT Category, COUNT(*) AS Count
                  FROM sf_crime1
                  GROUP BY 1 ORDER BY 1""")
df7_all.createOrReplaceTempView("df7_all_Table")

# display(df7_all)
df7_join = spark.sql("""
  SELECT df7_Table.Category, df7_Table.isResolved, df7_Table.Count, df7_all_Table.Count as all
  FROM df7_Table 
  LEFT JOIN df7_all_Table ON df7_Table.Category = df7_all_Table.Category
""")
df7_join.createOrReplaceTempView("df7_join_Table")

df7_percent = spark.sql("""SELECT Category,
                  CASE isResolved
                    WHEN 'True' THEN Count/all*100
                  END AS Percentage
                  FROM df7_join_Table
                  ORDER BY 2 DESC""").dropna().toPandas().set_index('Category')

ax = df7_percent.plot(figsize=(12, 12))
labels = df7_percent.index.tolist()
values = df7_percent['Percentage'].values
ax.barh(np.arange(len(labels)),values)
ax.invert_yaxis()
ax.set_yticklabels(labels, fontsize=5)
ax.set_xticklabels(df7_percent['Percentage'], fontsize=6)
ax.set_yticks(np.arange(len(labels)))
display()

