## SF crime data analysis and modeling

In [2]:
from csv import reader
from pyspark.sql import Row 
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd
import numpy as np
import seaborn as sb
import matplotlib.pyplot as plt
from ggplot import *
import warnings
from pyspark.sql.functions import col

import os
os.environ["PYSPARK_PYTHON"] = "python2"


In [3]:
# read data from the data storage
# please upload your data into databricks community at first. 
crime_data_lines = sc.textFile('/FileStore/tables/sf_data.csv')
#prepare data 
df_crimes = crime_data_lines.map(lambda line: [x.strip('"') for x in next(reader([line]))])
#get header
header = df_crimes.first()
header

#remove the first line of data
crimes = df_crimes.filter(lambda x: x != header)

#get the first line of data
display(crimes.take(5))

#get the total number of data 
crimes.count()

_1,_2,_3,_4,_5,_6,_7,_8,_9,_10,_11,_12,_13
150060275,NON-CRIMINAL,LOST PROPERTY,Monday,01/19/2015,14:00,MISSION,NONE,18TH ST / VALENCIA ST,-122.42158168137,37.7617007179518,"(37.7617007179518, -122.42158168137)",15006027571000
150098210,ROBBERY,"ROBBERY, BODILY FORCE",Sunday,02/01/2015,15:45,TENDERLOIN,NONE,300 Block of LEAVENWORTH ST,-122.414406029855,37.7841907151119,"(37.7841907151119, -122.414406029855)",15009821003074
150098210,ASSAULT,AGGRAVATED ASSAULT WITH BODILY FORCE,Sunday,02/01/2015,15:45,TENDERLOIN,NONE,300 Block of LEAVENWORTH ST,-122.414406029855,37.7841907151119,"(37.7841907151119, -122.414406029855)",15009821004014
150098210,SECONDARY CODES,DOMESTIC VIOLENCE,Sunday,02/01/2015,15:45,TENDERLOIN,NONE,300 Block of LEAVENWORTH ST,-122.414406029855,37.7841907151119,"(37.7841907151119, -122.414406029855)",15009821015200
150098226,VANDALISM,"MALICIOUS MISCHIEF, VANDALISM OF VEHICLES",Tuesday,01/27/2015,19:00,NORTHERN,NONE,LOMBARD ST / LAGUNA ST,-122.431118543788,37.8004687042875,"(37.8004687042875, -122.431118543788)",15009822628160


### Solove  big data issues via Spark
approach 1: use RDD (not recommend)  
approach 2: use Dataframe, register the RDD to a dataframe (recommend for DE)  
approach 3: use SQL (recomend for data analysis or DS)

In [5]:

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("crime analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df_opt1 = spark.read.format("csv").option("header", "true").load("/FileStore/tables/sf_data.csv")
display(df_opt1)
df_opt1.createOrReplaceTempView("sf_crime")

IncidntNum,Category,Descript,DayOfWeek,Date,Time,PdDistrict,Resolution,Address,X,Y,Location,PdId
150060275,NON-CRIMINAL,LOST PROPERTY,Monday,01/19/2015,14:00,MISSION,NONE,18TH ST / VALENCIA ST,-122.42158168137,37.7617007179518,"(37.7617007179518, -122.42158168137)",15006027571000
150098210,ROBBERY,"ROBBERY, BODILY FORCE",Sunday,02/01/2015,15:45,TENDERLOIN,NONE,300 Block of LEAVENWORTH ST,-122.414406029855,37.7841907151119,"(37.7841907151119, -122.414406029855)",15009821003074
150098210,ASSAULT,AGGRAVATED ASSAULT WITH BODILY FORCE,Sunday,02/01/2015,15:45,TENDERLOIN,NONE,300 Block of LEAVENWORTH ST,-122.414406029855,37.7841907151119,"(37.7841907151119, -122.414406029855)",15009821004014
150098210,SECONDARY CODES,DOMESTIC VIOLENCE,Sunday,02/01/2015,15:45,TENDERLOIN,NONE,300 Block of LEAVENWORTH ST,-122.414406029855,37.7841907151119,"(37.7841907151119, -122.414406029855)",15009821015200
150098226,VANDALISM,"MALICIOUS MISCHIEF, VANDALISM OF VEHICLES",Tuesday,01/27/2015,19:00,NORTHERN,NONE,LOMBARD ST / LAGUNA ST,-122.431118543788,37.8004687042875,"(37.8004687042875, -122.431118543788)",15009822628160
150098232,NON-CRIMINAL,AIDED CASE -PROPERTY FOR DESTRUCTION,Sunday,02/01/2015,16:21,RICHMOND,NONE,400 Block of LOCUST ST,-122.451781767894,37.7870853907529,"(37.7870853907529, -122.451781767894)",15009823251041
150098248,SECONDARY CODES,DOMESTIC VIOLENCE,Saturday,01/31/2015,21:00,BAYVIEW,NONE,700 Block of KIRKWOOD AV,-122.374019331833,37.729203356539,"(37.729203356539, -122.374019331833)",15009824815200
150098248,VANDALISM,"MALICIOUS MISCHIEF, VANDALISM",Saturday,01/31/2015,21:00,BAYVIEW,NONE,700 Block of KIRKWOOD AV,-122.374019331833,37.729203356539,"(37.729203356539, -122.374019331833)",15009824828150
150098254,BURGLARY,"BURGLARY OF STORE, UNLAWFUL ENTRY",Saturday,01/31/2015,16:09,CENTRAL,NONE,200 Block of STOCKTON ST,-122.40656817787,37.7878092959561,"(37.7878092959561, -122.40656817787)",15009825405053
150098260,LARCENY/THEFT,PETTY THEFT SHOPLIFTING,Saturday,01/31/2015,17:00,CENTRAL,NONE,800 Block of GEARY ST,-122.417295322526,37.7862578545865,"(37.7862578545865, -122.417295322526)",15009826006362


In [6]:

from pyspark.sql import Row

def createRow(keys, values):
  assert len(keys) == len(values)
  mapped = dict(zip(keys, values))
  return Row(**mapped)

rdd_rows = crimes.map(lambda x: createRow(header, x))

df_opt2 = spark.createDataFrame(rdd_rows)
df_opt2.createOrReplaceTempView("sf_crime")
display(df_opt2)

Address,Category,Date,DayOfWeek,Descript,IncidntNum,Location,PdDistrict,PdId,Resolution,Time,X,Y
18TH ST / VALENCIA ST,NON-CRIMINAL,01/19/2015,Monday,LOST PROPERTY,150060275,"(37.7617007179518, -122.42158168137)",MISSION,15006027571000,NONE,14:00,-122.42158168137,37.7617007179518
300 Block of LEAVENWORTH ST,ROBBERY,02/01/2015,Sunday,"ROBBERY, BODILY FORCE",150098210,"(37.7841907151119, -122.414406029855)",TENDERLOIN,15009821003074,NONE,15:45,-122.414406029855,37.7841907151119
300 Block of LEAVENWORTH ST,ASSAULT,02/01/2015,Sunday,AGGRAVATED ASSAULT WITH BODILY FORCE,150098210,"(37.7841907151119, -122.414406029855)",TENDERLOIN,15009821004014,NONE,15:45,-122.414406029855,37.7841907151119
300 Block of LEAVENWORTH ST,SECONDARY CODES,02/01/2015,Sunday,DOMESTIC VIOLENCE,150098210,"(37.7841907151119, -122.414406029855)",TENDERLOIN,15009821015200,NONE,15:45,-122.414406029855,37.7841907151119
LOMBARD ST / LAGUNA ST,VANDALISM,01/27/2015,Tuesday,"MALICIOUS MISCHIEF, VANDALISM OF VEHICLES",150098226,"(37.8004687042875, -122.431118543788)",NORTHERN,15009822628160,NONE,19:00,-122.431118543788,37.8004687042875
400 Block of LOCUST ST,NON-CRIMINAL,02/01/2015,Sunday,AIDED CASE -PROPERTY FOR DESTRUCTION,150098232,"(37.7870853907529, -122.451781767894)",RICHMOND,15009823251041,NONE,16:21,-122.451781767894,37.7870853907529
700 Block of KIRKWOOD AV,SECONDARY CODES,01/31/2015,Saturday,DOMESTIC VIOLENCE,150098248,"(37.729203356539, -122.374019331833)",BAYVIEW,15009824815200,NONE,21:00,-122.374019331833,37.729203356539
700 Block of KIRKWOOD AV,VANDALISM,01/31/2015,Saturday,"MALICIOUS MISCHIEF, VANDALISM",150098248,"(37.729203356539, -122.374019331833)",BAYVIEW,15009824828150,NONE,21:00,-122.374019331833,37.729203356539
200 Block of STOCKTON ST,BURGLARY,01/31/2015,Saturday,"BURGLARY OF STORE, UNLAWFUL ENTRY",150098254,"(37.7878092959561, -122.40656817787)",CENTRAL,15009825405053,NONE,16:09,-122.40656817787,37.7878092959561
800 Block of GEARY ST,LARCENY/THEFT,01/31/2015,Saturday,PETTY THEFT SHOPLIFTING,150098260,"(37.7862578545865, -122.417295322526)",CENTRAL,15009826006362,NONE,17:00,-122.417295322526,37.7862578545865


In [7]:

df_opt3 = crimes.toDF(['IncidntNum', 'Category', 'Descript', 'DayOfWeek', 'Date', 'Time', 'PdDistrict', 'Resolution', 'Address', 'X', 'Y', 'Location', 'PdId'])
display(df_opt3)
df_opt3.createOrReplaceTempView("sf_crime")


IncidntNum,Category,Descript,DayOfWeek,Date,Time,PdDistrict,Resolution,Address,X,Y,Location,PdId
150060275,NON-CRIMINAL,LOST PROPERTY,Monday,01/19/2015,14:00,MISSION,NONE,18TH ST / VALENCIA ST,-122.42158168137,37.7617007179518,"(37.7617007179518, -122.42158168137)",15006027571000
150098210,ROBBERY,"ROBBERY, BODILY FORCE",Sunday,02/01/2015,15:45,TENDERLOIN,NONE,300 Block of LEAVENWORTH ST,-122.414406029855,37.7841907151119,"(37.7841907151119, -122.414406029855)",15009821003074
150098210,ASSAULT,AGGRAVATED ASSAULT WITH BODILY FORCE,Sunday,02/01/2015,15:45,TENDERLOIN,NONE,300 Block of LEAVENWORTH ST,-122.414406029855,37.7841907151119,"(37.7841907151119, -122.414406029855)",15009821004014
150098210,SECONDARY CODES,DOMESTIC VIOLENCE,Sunday,02/01/2015,15:45,TENDERLOIN,NONE,300 Block of LEAVENWORTH ST,-122.414406029855,37.7841907151119,"(37.7841907151119, -122.414406029855)",15009821015200
150098226,VANDALISM,"MALICIOUS MISCHIEF, VANDALISM OF VEHICLES",Tuesday,01/27/2015,19:00,NORTHERN,NONE,LOMBARD ST / LAGUNA ST,-122.431118543788,37.8004687042875,"(37.8004687042875, -122.431118543788)",15009822628160
150098232,NON-CRIMINAL,AIDED CASE -PROPERTY FOR DESTRUCTION,Sunday,02/01/2015,16:21,RICHMOND,NONE,400 Block of LOCUST ST,-122.451781767894,37.7870853907529,"(37.7870853907529, -122.451781767894)",15009823251041
150098248,SECONDARY CODES,DOMESTIC VIOLENCE,Saturday,01/31/2015,21:00,BAYVIEW,NONE,700 Block of KIRKWOOD AV,-122.374019331833,37.729203356539,"(37.729203356539, -122.374019331833)",15009824815200
150098248,VANDALISM,"MALICIOUS MISCHIEF, VANDALISM",Saturday,01/31/2015,21:00,BAYVIEW,NONE,700 Block of KIRKWOOD AV,-122.374019331833,37.729203356539,"(37.729203356539, -122.374019331833)",15009824828150
150098254,BURGLARY,"BURGLARY OF STORE, UNLAWFUL ENTRY",Saturday,01/31/2015,16:09,CENTRAL,NONE,200 Block of STOCKTON ST,-122.40656817787,37.7878092959561,"(37.7878092959561, -122.40656817787)",15009825405053
150098260,LARCENY/THEFT,PETTY THEFT SHOPLIFTING,Saturday,01/31/2015,17:00,CENTRAL,NONE,800 Block of GEARY ST,-122.417295322526,37.7862578545865,"(37.7862578545865, -122.417295322526)",15009826006362


First, overview the data and features. Find out the data type. For example, we can see from table:the Date and Time columns' data are 'String'. Therefore, we need to transform our data type to Time Stamp and Date type.

#### Q1 question (OLAP): 
#####Write a Spark program that counts the number of crimes for different category.

Below are some example codes to demonstrate the way to use Spark RDD, DF, and SQL to work with big data. You can follow this example to finish other questions.

In [10]:

catorgory_set_rdd = crimes.map(lambda item: (item[1],1))
from operator import add
result = sorted(catorgory_set_rdd.reduceByKey(add).collect(), key = lambda item: -item[1])
display(result)

_1,_2
LARCENY/THEFT,480448
OTHER OFFENSES,309358
NON-CRIMINAL,238323
ASSAULT,194694
VEHICLE THEFT,126602
DRUG/NARCOTIC,119628
VANDALISM,116059
WARRANTS,101379
BURGLARY,91543
SUSPICIOUS OCC,80444


In [11]:
q1_result = df_opt1.groupBy('category').count().orderBy('count', ascending=False)
display(q1_result)

category,count
LARCENY/THEFT,480448
OTHER OFFENSES,309358
NON-CRIMINAL,238323
ASSAULT,194694
VEHICLE THEFT,126602
DRUG/NARCOTIC,119628
VANDALISM,116059
WARRANTS,101379
BURGLARY,91543
SUSPICIOUS OCC,80444


In [12]:
#Spark SQL based
crimeCategory = spark.sql("SELECT  category, COUNT(*) AS Count FROM sf_crime GROUP BY category ORDER BY Count DESC")
display(crimeCategory)

category,Count
LARCENY/THEFT,480448
OTHER OFFENSES,309358
NON-CRIMINAL,238323
ASSAULT,194694
VEHICLE THEFT,126602
DRUG/NARCOTIC,119628
VANDALISM,116059
WARRANTS,101379
BURGLARY,91543
SUSPICIOUS OCC,80444


In [13]:
# important hints: 
## first step: spark df or sql to compute the statisitc result 
## second step: export your result to a pandas dataframe. 

crimes_pd_df = crimeCategory.toPandas()


# Spark does not support this function, please refer https://matplotlib.org/ for visuliation. You need to use display to show the figure in the databricks community. 

display(crimes_pd_df)

category,Count
LARCENY/THEFT,480448
OTHER OFFENSES,309358
NON-CRIMINAL,238323
ASSAULT,194694
VEHICLE THEFT,126602
DRUG/NARCOTIC,119628
VANDALISM,116059
WARRANTS,101379
BURGLARY,91543
SUSPICIOUS OCC,80444


###we can see the top-5 frequency crimes are:
1.Larcency/theft 

2.Other offenses

3.Non-criminal

4.Assault

5.vehicle theft

#### Q2 question (OLAP)
Counts the number of crimes for different district, and visualize your results

In [16]:
crimeDistrict = spark.sql("SELECT  PdDistrict, COUNT(*) AS Count FROM sf_crime GROUP BY PdDistrict ORDER BY Count DESC")
display(crimeDistrict)

PdDistrict,Count
SOUTHERN,399785
MISSION,300076
NORTHERN,272713
CENTRAL,226255
BAYVIEW,221000
INGLESIDE,194180
TENDERLOIN,191746
TARAVAL,166971
PARK,125479
RICHMOND,116818


Southern and Mission distric are most dangerous, there are more than 300k crimes happened in past 15 years.

Compared to other districts, Park and RichMond are public security environmental.

From map, we can see Northern districs are safer than Southern ones.

#### Q3 question (OLAP)
Count the number of crimes each "Sunday" at "SF downtown". 
hints: SF downtown is defiend  via the range of spatial location. Thus, you need to write your own UDF function to filter data which are located inside certain spatial range. You can follow the example here: https://changhsinlee.com/pyspark-udf/

In [19]:
PdDistrict = spark.sql("SELECT distinct PdDistrict FROM sf_crime")
display(PdDistrict)

PdDistrict
MISSION
BAYVIEW
CENTRAL
TARAVAL
TENDERLOIN
INGLESIDE
PARK
SOUTHERN
RICHMOND
NORTHERN


In [20]:
##sql
crimecount = spark.sql("SELECT  PdDistrict, DayOfWeek, COUNT(*) AS Count FROM sf_crime WHERE PdDistrict in ('MISSION','CENTRAL','TARAVAL','TENDERLOIN','RICHMOND','PARK') and DayOfWeek='Sunday' GROUP BY PdDistrict,DayOfWeek ORDER BY Count DESC")
display(crimecount)

PdDistrict,DayOfWeek,Count
MISSION,Sunday,40093
CENTRAL,Sunday,32096
TENDERLOIN,Sunday,24344
TARAVAL,Sunday,21308
PARK,Sunday,16776
RICHMOND,Sunday,15750


In [21]:
crimecount_pd=crimecount.toPandas()
display(crimecount)

PdDistrict,DayOfWeek,Count
MISSION,Sunday,40093
CENTRAL,Sunday,32096
TENDERLOIN,Sunday,24344
TARAVAL,Sunday,21308
PARK,Sunday,16776
RICHMOND,Sunday,15750


I count the total crime numbers of all districts on Sunday through 15 years. And got the same conclusion above, Mission's crime rate is the highest.

#### Q4 question (OLAP)
Analysis the number of crime in each month of 2015, 2016, 2017, 2018. Then, give your insights for the output results. What is the business impact for your result?

In [24]:
from datetime import datetime
func =  udf (lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())
df = df_opt3.withColumn('new_date', func(col('Date')))
df.createOrReplaceTempView('sf_crime1')
display(df)

IncidntNum,Category,Descript,DayOfWeek,Date,Time,PdDistrict,Resolution,Address,X,Y,Location,PdId,new_date
150060275,NON-CRIMINAL,LOST PROPERTY,Monday,01/19/2015,14:00,MISSION,NONE,18TH ST / VALENCIA ST,-122.42158168137,37.7617007179518,"(37.7617007179518, -122.42158168137)",15006027571000,2015-01-19
150098210,ROBBERY,"ROBBERY, BODILY FORCE",Sunday,02/01/2015,15:45,TENDERLOIN,NONE,300 Block of LEAVENWORTH ST,-122.414406029855,37.7841907151119,"(37.7841907151119, -122.414406029855)",15009821003074,2015-02-01
150098210,ASSAULT,AGGRAVATED ASSAULT WITH BODILY FORCE,Sunday,02/01/2015,15:45,TENDERLOIN,NONE,300 Block of LEAVENWORTH ST,-122.414406029855,37.7841907151119,"(37.7841907151119, -122.414406029855)",15009821004014,2015-02-01
150098210,SECONDARY CODES,DOMESTIC VIOLENCE,Sunday,02/01/2015,15:45,TENDERLOIN,NONE,300 Block of LEAVENWORTH ST,-122.414406029855,37.7841907151119,"(37.7841907151119, -122.414406029855)",15009821015200,2015-02-01
150098226,VANDALISM,"MALICIOUS MISCHIEF, VANDALISM OF VEHICLES",Tuesday,01/27/2015,19:00,NORTHERN,NONE,LOMBARD ST / LAGUNA ST,-122.431118543788,37.8004687042875,"(37.8004687042875, -122.431118543788)",15009822628160,2015-01-27
150098232,NON-CRIMINAL,AIDED CASE -PROPERTY FOR DESTRUCTION,Sunday,02/01/2015,16:21,RICHMOND,NONE,400 Block of LOCUST ST,-122.451781767894,37.7870853907529,"(37.7870853907529, -122.451781767894)",15009823251041,2015-02-01
150098248,SECONDARY CODES,DOMESTIC VIOLENCE,Saturday,01/31/2015,21:00,BAYVIEW,NONE,700 Block of KIRKWOOD AV,-122.374019331833,37.729203356539,"(37.729203356539, -122.374019331833)",15009824815200,2015-01-31
150098248,VANDALISM,"MALICIOUS MISCHIEF, VANDALISM",Saturday,01/31/2015,21:00,BAYVIEW,NONE,700 Block of KIRKWOOD AV,-122.374019331833,37.729203356539,"(37.729203356539, -122.374019331833)",15009824828150,2015-01-31
150098254,BURGLARY,"BURGLARY OF STORE, UNLAWFUL ENTRY",Saturday,01/31/2015,16:09,CENTRAL,NONE,200 Block of STOCKTON ST,-122.40656817787,37.7878092959561,"(37.7878092959561, -122.40656817787)",15009825405053,2015-01-31
150098260,LARCENY/THEFT,PETTY THEFT SHOPLIFTING,Saturday,01/31/2015,17:00,CENTRAL,NONE,800 Block of GEARY ST,-122.417295322526,37.7862578545865,"(37.7862578545865, -122.417295322526)",15009826006362,2015-01-31


In [25]:
##sql

Monthcrime = spark.sql("SELECT YEAR(new_date) as year, Month(new_date) as month, Count(*) from sf_crime1 group by YEAR(new_date), Month(new_date) order by 1,2")
display(Monthcrime)

year,month,count(1)
2003,1,12956
2003,2,11924
2003,3,12998
2003,4,12741
2003,5,12605
2003,6,11934
2003,7,12379
2003,8,12893
2003,9,12798
2003,10,13050


#### Q5 question (OLAP)
Analysis the number of crime w.r.t the hour in certian day like 2015/12/15, 2016/12/15, 2017/12/15, 2018/10/15. Then, give your travel suggestion to visit SF.

In [27]:
func1 =  udf (lambda x: datetime.strptime(x, '%H:%M'), TimestampType())
df2 = df_opt3.withColumn('new_time', func1(col('Time')))
df2.createOrReplaceTempView('sf_crime2')
display(df2)

IncidntNum,Category,Descript,DayOfWeek,Date,Time,PdDistrict,Resolution,Address,X,Y,Location,PdId,new_time
150060275,NON-CRIMINAL,LOST PROPERTY,Monday,01/19/2015,14:00,MISSION,NONE,18TH ST / VALENCIA ST,-122.42158168137,37.7617007179518,"(37.7617007179518, -122.42158168137)",15006027571000,1900-01-01T14:00:00.000+0000
150098210,ROBBERY,"ROBBERY, BODILY FORCE",Sunday,02/01/2015,15:45,TENDERLOIN,NONE,300 Block of LEAVENWORTH ST,-122.414406029855,37.7841907151119,"(37.7841907151119, -122.414406029855)",15009821003074,1900-01-01T15:45:00.000+0000
150098210,ASSAULT,AGGRAVATED ASSAULT WITH BODILY FORCE,Sunday,02/01/2015,15:45,TENDERLOIN,NONE,300 Block of LEAVENWORTH ST,-122.414406029855,37.7841907151119,"(37.7841907151119, -122.414406029855)",15009821004014,1900-01-01T15:45:00.000+0000
150098210,SECONDARY CODES,DOMESTIC VIOLENCE,Sunday,02/01/2015,15:45,TENDERLOIN,NONE,300 Block of LEAVENWORTH ST,-122.414406029855,37.7841907151119,"(37.7841907151119, -122.414406029855)",15009821015200,1900-01-01T15:45:00.000+0000
150098226,VANDALISM,"MALICIOUS MISCHIEF, VANDALISM OF VEHICLES",Tuesday,01/27/2015,19:00,NORTHERN,NONE,LOMBARD ST / LAGUNA ST,-122.431118543788,37.8004687042875,"(37.8004687042875, -122.431118543788)",15009822628160,1900-01-01T19:00:00.000+0000
150098232,NON-CRIMINAL,AIDED CASE -PROPERTY FOR DESTRUCTION,Sunday,02/01/2015,16:21,RICHMOND,NONE,400 Block of LOCUST ST,-122.451781767894,37.7870853907529,"(37.7870853907529, -122.451781767894)",15009823251041,1900-01-01T16:21:00.000+0000
150098248,SECONDARY CODES,DOMESTIC VIOLENCE,Saturday,01/31/2015,21:00,BAYVIEW,NONE,700 Block of KIRKWOOD AV,-122.374019331833,37.729203356539,"(37.729203356539, -122.374019331833)",15009824815200,1900-01-01T21:00:00.000+0000
150098248,VANDALISM,"MALICIOUS MISCHIEF, VANDALISM",Saturday,01/31/2015,21:00,BAYVIEW,NONE,700 Block of KIRKWOOD AV,-122.374019331833,37.729203356539,"(37.729203356539, -122.374019331833)",15009824828150,1900-01-01T21:00:00.000+0000
150098254,BURGLARY,"BURGLARY OF STORE, UNLAWFUL ENTRY",Saturday,01/31/2015,16:09,CENTRAL,NONE,200 Block of STOCKTON ST,-122.40656817787,37.7878092959561,"(37.7878092959561, -122.40656817787)",15009825405053,1900-01-01T16:09:00.000+0000
150098260,LARCENY/THEFT,PETTY THEFT SHOPLIFTING,Saturday,01/31/2015,17:00,CENTRAL,NONE,800 Block of GEARY ST,-122.417295322526,37.7862578545865,"(37.7862578545865, -122.417295322526)",15009826006362,1900-01-01T17:00:00.000+0000


In [28]:

hourcrime = spark.sql("SELECT Date, Hour(new_time) as hour, Count(*) from sf_crime2 where Date in ('7/11/2015','7/12/2009','03/07/2008') group by Date, Hour(new_time) order by 1,2") 
display(hourcrime)

Date,hour,count(1)
03/07/2008,0,24
03/07/2008,1,6
03/07/2008,2,11
03/07/2008,3,6
03/07/2008,4,6
03/07/2008,6,5
03/07/2008,7,18
03/07/2008,8,19
03/07/2008,9,13
03/07/2008,10,11


#### Q6 question (OLAP)
(1) Step1: Find out the top-3 danger disrict  
(2) Step2: find out the crime event w.r.t category and time (hour) from the result of step 1  
(3) give your advice to distribute the police based on your analysis results.

In [30]:
topdanger=spark.sql('select PdDistrict, count(*) as ct, rank() over (order by Count(*) DESC) as xrank from sf_crime2 group by PdDistrict')
topdanger.createOrReplaceTempView('topdanger')
top_danger=spark.sql('select PdDistrict, ct, xrank from topdanger where xrank<=3')
display(top_danger)

PdDistrict,ct,xrank
SOUTHERN,399785,1
MISSION,300076,2
NORTHERN,272713,3


In [31]:
top_3danger=spark.sql('select PdDistrict, new_time, Hour(new_time) as hour, ct, rank() over (order by ct DESC) as xrank from (select PdDistrict, new_time, count(*) as ct from sf_crime2  group by PdDistrict, new_time )')
top_3danger.createOrReplaceTempView('top_3danger')
top3danger=spark.sql('select PdDistrict, hour, sum(ct) from top_3danger where PdDistrict in ("SOUTHERN","MISSION","NORTHERN") group by 1,2 order by PdDistrict, hour')
display(top3danger)

PdDistrict,hour,sum(ct)
MISSION,0,16797
MISSION,1,11125
MISSION,2,8930
MISSION,3,5599
MISSION,4,3920
MISSION,5,3089
MISSION,6,4791
MISSION,7,7449
MISSION,8,10647
MISSION,9,11453


From above analysis:

1.we can find out the crime always happend at 0:00, 12:00, 18:00 for all districts

2.Southern crime rate is highest, therefore we need distribute more police power at Southern

3.bewtween 2:00-7:00, the crime rate is low.

####For different category of crime, find the percentage of resolution. Based on the output, give your hints to adjust the policy.

In [34]:
resolution=spark.sql('select PdDistrict, Resolution, count(*) as CT from sf_crime1 group by 1,2 order by 1,2')
resolution.createOrReplaceTempView('resolution')

display(resolution)

PdDistrict,Resolution,CT
,NONE,1
BAYVIEW,"ARREST, BOOKED",50188
BAYVIEW,"ARREST, CITED",19424
BAYVIEW,CLEARED-CONTACT JUVENILE FOR MORE INFO,135
BAYVIEW,COMPLAINANT REFUSES TO PROSECUTE,1412
BAYVIEW,DISTRICT ATTORNEY REFUSES TO PROSECUTE,1273
BAYVIEW,EXCEPTIONAL CLEARANCE,366
BAYVIEW,JUVENILE ADMONISHED,376
BAYVIEW,JUVENILE BOOKED,2169
BAYVIEW,JUVENILE CITED,717


In [35]:
resolution2=spark.sql('select PdDistrict, solved, unsolved, solved/(solved+unsolved) as percentageofsolved from (select PdDistrict, sum(case when Resolution in ("NONE","UNFOUNDED") then CT else 0 end) as unsolved, sum(case when PdDistrict not in ("NONE","UNFOUNDED") then CT else 0 end) as solved from resolution group by PdDistrict) order by percentageofsolved DESC')
display(resolution2)

PdDistrict,solved,unsolved,percentageofsolved
TENDERLOIN,191746,72380,0.725964123183632
MISSION,300076,169981,0.6383821536537059
BAYVIEW,221000,135833,0.6193373370736451
SOUTHERN,399785,253803,0.6116773869777291
PARK,125479,83526,0.60036362766441
INGLESIDE,194180,130562,0.5979516046584673
NORTHERN,272713,191480,0.5874991652179158
TARAVAL,166971,119701,0.5824461405369202
CENTRAL,226255,167196,0.5750525478395022
RICHMOND,116818,88836,0.5680317426356891


#### Q8 question (Apply Spark ML clustering for spatial data analysis)
Extra: visualize the spatial distribution of crimes and run a kmeans clustering algorithm (please use Spark ML kmeans)  
You can refer Spark ML Kmeans a example: https://spark.apache.org/docs/latest/ml-clustering.html#k-means

In [37]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator 
from pyspark.ml.feature import VectorAssembler
import matplotlib.pyplot as plt
import matplotlib.cm as cm
#loads data
df8=df_opt1.select(['IncidntNum','X','Y'])
Feature_Col=['X','Y']
for col in df8.columns:
  if col in Feature_Col:
    df8=df8.withColumn(col,df8[col].cast('float'))
vecAssembler=VectorAssembler(inputCols=Feature_Col,outputCol="features")
df_kmeans=vecAssembler.transform(df8).select('IncidntNum','features')


#optimization
cost=np.zeros(20)
for k in range(2,20):
  kmeans=KMeans().setK(k).setSeed(1).setFeaturesCol("features")
  model=kmeans.fit(df_kmeans.sample(False,0.1,seed=42))
  cost[k]=model.computeCost(df_kmeans)
fig,ax=plt.subplots(1,1,figsize=(8,6))
ax.plot(range(2,20),cost[2:20])
ax.set_xlabel('k')
ax.set_ylabel('cost')
display()


### Conclusion. 
 
1.Performed Big data Analysis(OLAP) based on Apache Spark RDD, Spark SQL and Dataframe

2.Explored  more than 2 million crime data to visualize the crime map over time and districts

When hearing news about crimes happened in SF, I am interested in treating the crime from data perspective. I want to find out some useful information to predict the future crimes and eventually help Police to prevent crimes. Also, how to distribute the Police power and optimize the resources is also my goal.

First, I input the data in three different ways: Sqpark RDD, Dataframe and Spark SQL. Then, I mainly used Spark SQL as tool in the following OLAP. SF Southern and MISSION crimes numbers are highest. The crime always happend at 0:00,12:00,18:00. Thus, if you travle to SF, you need take care at these time points. Also, between 2:00-7:00, the crime happend less (may the offender go to sleep), Police can revocate some power and have a rest during the period. One interesting thing is, Southern and Million percentage of crimes solved is very high, ranked 4 and 2 respectively. Other districts can learn from these districts.

### Optional part: Time series analysis
This part is not based on Spark, and only based on Pandas Time Series package.   
Note: I am not familiar with time series model, please refer the ARIMA model introduced by other teacher.   
process:  
1.visualize time series  
2.plot ACF and find optimal parameter  
3.Train ARIMA  
4.Prediction 

Refer:   
https://zhuanlan.zhihu.com/p/35282988  
https://zhuanlan.zhihu.com/p/35128342  
https://www.statsmodels.org/dev/examples/notebooks/generated/tsa_arma_0.html  
https://www.howtoing.com/a-guide-to-time-series-forecasting-with-arima-in-python-3  
https://www.joinquant.com/post/9576?tag=algorithm  
https://blog.csdn.net/u012052268/article/details/79452244