In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [2]:
spark

In [3]:
import pyspark.sql.functions as f
from pyspark.sql.types import (StructType,
                              StructField,
                              StringType, 
                              IntegerType,
                              TimestampType,
                              DoubleType,
                              BooleanType)
from pyspark.sql.functions import unix_timestamp

In [4]:
!head Part1_Crime_data.csv

﻿X,Y,RowID,CrimeDateTime,CrimeCode,Location,Description,Inside_Outside,Weapon,Post,District,Neighborhood,Latitude,Longitude,GeoLocation,Premise,VRIName,Total_Incidents,Shape
1420074.13302107,594160.602354662,1,2022/02/26 04:00:00+00,4E,200 W MONUMENT ST,COMMON ASSAULT,,NA,124,CENTRAL,MOUNT VERNON,39.2975,-76.6193,"(39.2975,-76.6193)",,,1,
1411374.22509631,589791.383964529,2,2022/02/26 01:26:30+00,9S,2100 FREDERICK AVE,SHOOTING,Outside,FIREARM,835,SOUTHWEST,CARROLLTON RIDGE,39.2856,-76.6501,"(39.2856,-76.6501)",COMMON BUSINESS,Tri-District,1,
1411401.16887136,582761.775193539,3,2022/02/26 10:22:00+00,4E,2000 GRINNALDS AVE,COMMON ASSAULT,,NA,831,SOUTHWEST,MORRELL PARK,39.2663,-76.6501,"(39.2663,-76.6501)",,,1,
1422663.14365552,597595.359444412,4,2022/02/26 01:11:00+00,7A,400 E OLIVER ST,AUTO THEFT,,NA,313,EASTERN,GREENMOUNT WEST,39.3069,-76.6101,"(39.3069,-76.6101)",,,1,
1440496.49709374,607767.144994879,5,2022/02/26 12:30:00+00,4B,5300 PLAINFIELD AVE,AGG. ASSAULT,,KNIFE,444,NORTHEAST,FR

In [5]:
crimeschema = StructType()\
      .add("X", DoubleType(), True)\
      .add("Y", DoubleType(), True)\
      .add("ROWID", IntegerType(), True)\
      .add("CrimeDateTime", StringType(), False)\
      .add("CrimeCode", StringType(), True)\
      .add("Location", StringType(), True)\
      .add("Description", StringType(), True)\
      .add("Inside_Outside", StringType(), True)\
      .add("Weapon", StringType(), True)\
      .add("Post", StringType(), True)\
      .add("District", IntegerType(), True)\
      .add("Neighborhood", StringType(), True)\
      .add("Latitude", DoubleType(), True)\
      .add("Longitude", DoubleType(), True)\
      .add("GeoLocation", StringType(), True)\
      .add("Premise", StringType(), True)\
      .add("VRIName", DoubleType(), True)\
      .add("Total_Incidents", IntegerType(), True)

In [6]:
df_crimeschema = spark.read.format("csv")\
      .option("header", True)\
      .schema(crimeschema)\
      .load("Part1_Crime_data.csv")
df_crimeschema = df_crimeschema.withColumn('CrimeDateTime', df_crimeschema['CrimeDateTime'].substr(1,19))
df_crimeschema = df_crimeschema.withColumn('CrimeDateTime', unix_timestamp('CrimeDateTime', 'yyyy/MM/dd HH:mm:ss')\
                                           .cast(TimestampType()))

In [7]:
df_crimeschema.unpersist()
df_crimeschema.printSchema()
df_crimeschema.show(5)

root
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)
 |-- ROWID: integer (nullable = true)
 |-- CrimeDateTime: timestamp (nullable = true)
 |-- CrimeCode: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Inside_Outside: string (nullable = true)
 |-- Weapon: string (nullable = true)
 |-- Post: string (nullable = true)
 |-- District: integer (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- GeoLocation: string (nullable = true)
 |-- Premise: string (nullable = true)
 |-- VRIName: double (nullable = true)
 |-- Total_Incidents: integer (nullable = true)

+----------------+----------------+-----+-------------------+---------+-------------------+--------------+--------------+-------+----+--------+----------------+--------+---------+------------------+---------------+-------+---------------+
|               X|      

In [8]:
df_crimeschema.count()

513158

In [9]:
print(len(df_crimeschema.columns))

18


# 1. What are distinct crime codes?

In [10]:
Crime_Code_list = df_crimeschema.select('CrimeCode').distinct().rdd.map(lambda r: r[0]).collect()
Crime_Code_list

['3P',
 '3K',
 '3BJ',
 '1A',
 '3M',
 '5F',
 '4B',
 '3B',
 '7A',
 '3NF',
 '3EF',
 '3N',
 '5D',
 '3LO',
 '3AF',
 '7B',
 '3GO',
 '3AJF',
 '8AO',
 '7C',
 '3AK',
 '3GK',
 '6L',
 '3EO',
 '3JO',
 '3F',
 '1K',
 '8H',
 '8CV',
 '4C',
 '5A',
 '6C',
 '3NK',
 '3D',
 '6H',
 '3LK',
 '3AJK',
 '3CO',
 '3L',
 '4E',
 '6D',
 '2A',
 '3C',
 '8I',
 '3NO',
 '3JF',
 '3E',
 '3LF',
 '1O',
 '8J',
 '3CK',
 '8BO',
 '2B',
 '3JK',
 '5B',
 '4A',
 '8GO',
 '8EV',
 '3CF',
 '8EO',
 '6G',
 '6A',
 '9S',
 '3EK',
 '3GF',
 '3H',
 '4D',
 '8FO',
 '6J',
 '6F',
 '6E',
 '3J',
 '5C',
 '5G',
 '8AV',
 '5E',
 '3AJO',
 '3AO',
 '6B',
 '6K',
 '8GV',
 '8BV',
 '8FV',
 '8CO',
 '8DO']

# 2. Count the number of crimes by the crime codes and order by the resulting counts in descending order

In [11]:
df_crimeschema.createOrReplaceTempView('Crimes')

query = """
SELECT CrimeCode, COUNT(*) as count
FROM Crimes
GROUP BY CrimeCode
ORDER BY count DESC
"""
output = spark.sql(query)
output.show(len(Crime_Code_list))

+---------+-----+
|CrimeCode|count|
+---------+-----+
|       4E|91650|
|       6D|68427|
|       5A|43928|
|       7A|40274|
|       6J|27636|
|       6G|26858|
|       6E|24300|
|       6C|23227|
|       4C|22438|
|       5D|14971|
|      3AF|14739|
|       4B|14460|
|       4A|13226|
|       3B|10737|
|       4D| 7232|
|       5B| 6475|
|       9S| 5443|
|       6F| 5081|
|       5C| 4917|
|       6B| 4106|
|      3CF| 3771|
|       2A| 3168|
|     3AJF| 3018|
|      3AK| 2982|
|       3K| 2833|
|       7C| 2725|
|       1A| 2624|
|      3AO| 2473|
|       5F| 2182|
|      3JF| 1815|
|       3D| 1432|
|       5E| 1419|
|       6L| 1205|
|       8H| 1051|
|       3P|  873|
|      3GF|  789|
|       6A|  781|
|      3BJ|  754|
|      3CK|  591|
|      3CO|  516|
|      3NF|  441|
|      3JK|  423|
|      8AO|  413|
|      3JO|  413|
|       2B|  355|
|       7B|  287|
|       8J|  279|
|       3H|  261|
|       1K|  230|
|     3AJO|  224|
|       6H|  222|
|       3J|  210|
|       1O

# 3. Which neighborhood had most crimes?

In [12]:
df_crimeschema.createOrReplaceTempView('Neighborhoods')

query_2 = """
SELECT Neighborhood, COUNT(*) as count
FROM Neighborhoods
GROUP BY Neighborhood
ORDER BY count DESC

"""
output_1 = spark.sql(query_2)
output_1.show(1)

+------------+-----+
|Neighborhood|count|
+------------+-----+
|    DOWNTOWN|17799|
+------------+-----+
only showing top 1 row



# 4. Which month of the year had most crimes?

In [13]:
from pyspark.sql.functions import to_timestamp,date_format
from pyspark.sql.functions import col
df_new = df_crimeschema.withColumn("CrimeDateTime",to_timestamp(col("CrimeDateTime"))).withColumn("Month", date_format(col("CrimeDateTime"), "M"))
df_new = df_new.where(col("Month").isNotNull()).groupBy("Month").count().orderBy("count").sort(col('count').desc())
df_new = df_new.show(1)

+-----+-----+
|Month|count|
+-----+-----+
|    8|46327|
+-----+-----+
only showing top 1 row



# 5. What weapons were used?

In [14]:
weapons = df_crimeschema.select('Weapon').filter(col('Weapon') != 'NA').dropna().distinct().rdd.map(lambda w: w[0]).collect()
weapons

['HANDS', 'KNIFE', 'OTHER', 'FIRE', 'FIREARM']

# 6. Which weapon was used the most? 

In [15]:
df_crimeschema.createOrReplaceTempView('Weapons_1')

query_3 = """
SELECT Weapons_1.Weapon, COUNT(*) as count 
FROM Weapons_1 
WHERE Weapon !='NA'
GROUP BY Weapon
ORDER BY count DESC

"""
output_3 = spark.sql(query_3)
output_3.show(1)

+-------+-----+
| Weapon|count|
+-------+-----+
|FIREARM|46139|
+-------+-----+
only showing top 1 row

