In [89]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Chicago_crime_analysis").getOrCreate()

In [90]:
from pyspark.sql.types import  (StructType, 
                                StructField, 
                                DateType, 
                                BooleanType,
                                DoubleType,
                                IntegerType,
                                StringType,
                               TimestampType)
crimes_schema = StructType([StructField("ID", StringType(), True),
                            StructField("CaseNumber", StringType(), True),
                            StructField("Date", StringType(), True ),
                            StructField("Block", StringType(), True),
                            StructField("IUCR", StringType(), True),
                            StructField("PrimaryType", StringType(), True  ),
                            StructField("Description", StringType(), True ),
                            StructField("LocationDescription", StringType(), True ),
                            StructField("Arrest", BooleanType(), True),
                            StructField("Domestic", BooleanType(), True),
                            StructField("Beat", StringType(), True),
                            StructField("District", StringType(), True),
                            StructField("Ward", StringType(), True),
                            StructField("CommunityArea", StringType(), True),
                            StructField("FBICode", StringType(), True ),
                            StructField("XCoordinate", DoubleType(), True),
                            StructField("YCoordinate", DoubleType(), True ),
                            StructField("Year", IntegerType(), True),
                            StructField("UpdatedOn", DateType(), True ),
                            StructField("Latitude", DoubleType(), True),
                            StructField("Longitude", DoubleType(), True),
                            StructField("Location", StringType(), True )
                            ])

In [94]:
crimes = spark.read.csv("Crimes_-_2001_to_Present.csv",
                       header = True, 
                        schema = crimes_schema)

In [95]:
crimes = crimes.dropna(how='all')

In [96]:
crimes.count()

7248100

In [97]:
crimes.columns

['ID',
 'CaseNumber',
 'Date',
 'Block',
 'IUCR',
 'PrimaryType',
 'Description',
 'LocationDescription',
 'Arrest',
 'Domestic',
 'Beat',
 'District',
 'Ward',
 'CommunityArea',
 'FBICode',
 'XCoordinate',
 'YCoordinate',
 'Year',
 'UpdatedOn',
 'Latitude',
 'Longitude',
 'Location']

In [98]:
from datetime import datetime
from pyspark.sql.functions import col,udf
myfunc =  udf(lambda x: datetime.strptime(x, '%m/%d/%Y %I:%M:%S %p'), TimestampType())
df = crimes.withColumn('Date_time', myfunc(col('Date'))).drop("Date")

In [99]:
crimes.select("PrimaryType").distinct().count()

36

## Answer 2 (Find number of ‘NARCOTICS’ cases filed in the year 2015)

In [101]:
crimes.where((crimes["PrimaryType"] == "NARCOTICS") & (crimes["Year"] == 2015)).count()

23939

## Answer 1    (Find number of crimes that happened under each FBI code)

In [102]:
crimes.groupBy("FBICode").count().orderBy('count', ascending=False).show(n=50)

+-------+-------+
|FBICode|  count|
+-------+-------+
|     06|1529047|
|    08B|1133311|
|     14| 825438|
|     26| 725477|
|     18| 692034|
|     05| 407570|
|    08A| 349208|
|     07| 333905|
|     03| 272457|
|     11| 257900|
|    04B| 195621|
|    04A| 119948|
|     15|  86355|
|     16|  69429|
|     24|  54109|
|     10|  39741|
|     02|  35328|
|     17|  35307|
|     20|  29367|
|     19|  14886|
|     22|  14452|
|     09|  12026|
|    01A|  10741|
|     13|   2892|
|     12|   1495|
|    01B|     56|
+-------+-------+



In [27]:
# crimes.select("FBICode").distinct().count()

26

## Answer 4 (Find number of crimes happened per year)

In [103]:
crimes.groupBy("Year").count().orderBy('count', ascending=False).show(n=50)

+----+------+
|Year| count|
+----+------+
|2002|486765|
|2001|485785|
|2003|475961|
|2004|469398|
|2005|453742|
|2006|448140|
|2007|437046|
|2008|427110|
|2009|392775|
|2010|370407|
|2011|351880|
|2012|336151|
|2013|307312|
|2014|275574|
|2016|269487|
|2017|268696|
|2018|268260|
|2015|264465|
|2019|260476|
|2020|198670|
+----+------+



## Answer 5 (Find number of crimes per month)

In [104]:
from pyspark.sql.functions import year, month, dayofmonth

In [106]:
df4 = df3.withColumn('Date_time',month(df3.Date_time))


In [107]:
df4.groupBy("Date_time").count().orderBy('Date_time', ascending=True).show()

+---------+------+
|Date_time| count|
+---------+------+
|        1|568511|
|        2|500670|
|        3|594770|
|        4|593189|
|        5|644535|
|        6|641622|
|        7|675190|
|        8|669029|
|        9|626116|
|       10|633044|
|       11|569807|
|       12|531617|
+---------+------+



## Answer 6 (Where do most crimes take place?)

In [108]:
crime_location  = crimes.groupBy("LocationDescription").count().collect()

In [109]:
location = [item[0] for item in crime_location]

In [110]:
len(location)

215

In [111]:
crimes.groupBy("LocationDescription").count().orderBy('count', ascending=False).show(n=1)

+-------------------+-------+
|LocationDescription|  count|
+-------------------+-------+
|             STREET|1882911|
+-------------------+-------+
only showing top 1 row



In [None]:
# crimes.where((crimes["PrimaryType"] == "NARCOTICS") & (crimes["Year"] == 2015)).count()

## Answer 3

In [114]:
crimes.groupBy("District").count().show(20)

+--------+------+
|District| count|
+--------+------+
|     009|356984|
|     012|355198|
|     024|216761|
|    null|    47|
|     031|   205|
|     015|314537|
|     006|421366|
|     019|323249|
|     020|126556|
|     011|467402|
|     025|415294|
|     005|321937|
|     003|367741|
|     016|239909|
|     018|322428|
|     008|489631|
|     022|237390|
|     001|287945|
|     014|283256|
|     010|312477|
+--------+------+
only showing top 20 rows



## Answer 3 (Find the number of theft related arrests that happened in each district)

In [123]:
crimes.where((crimes["PrimaryType"] == "THEFT") & (crimes['Arrest']=="TRUE")).groupBy("District").count().orderBy('count', ascending=False).show(n=50)

+--------+-----+
|District|count|
+--------+-----+
|     001|21774|
|     018|17503|
|     025|13651|
|     008|12962|
|     006|12275|
|     019|11188|
|     012| 9153|
|     009| 8509|
|     002| 7894|
|     017| 6818|
|     014| 6364|
|     004| 6117|
|     003| 5772|
|     024| 5579|
|     011| 4973|
|     007| 4648|
|     016| 4308|
|     010| 4301|
|     020| 4166|
|     022| 4149|
|     005| 4106|
|     015| 2986|
+--------+-----+



In [122]:
from pyspark.sql import SQLContext

In [None]:
result=sqlContext.sql("select district ,count(*) as count from Crimes where Primary_type =’THEFT’ and arrest = ‘true’ group by district")