In [22]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType, StructType, StringType, TimestampType, StructField, IntegerType
from pyspark.sql.functions import UserDefinedFunction, col, when, to_timestamp, to_utc_timestamp

1. Specify the schema for the crime data set. (https://sparkbyexamples.com/pyspark/pyspark-structtype-and-structfield/)

In [23]:
crime_schema = StructType([
    StructField('X', FloatType()),
    StructField('Y', FloatType()),
    StructField('RowID', IntegerType()),
    StructField('CrimeDateTime', StringType()),
    StructField('CrimeCode', StringType()),
    StructField('Location', StringType()),
    StructField('Description', StringType()),
    StructField('Inside_Outside', StringType()),
    StructField('Weapon', StringType()),
    StructField('Post', StringType()),
    StructField('District', StringType()),
    StructField('Neighborhood', StringType()),
    StructField('Latitude', FloatType()),
    StructField('Longitude', FloatType()),
    StructField('Geolocation', FloatType()),
    StructField('Premise', StringType()),
    StructField('VRIName', StringType()),
    StructField('Total_Incidents', StringType())])

2. Read the file using the schema definition

In [24]:
spark = SparkSession.builder.getOrCreate()
df = spark.read.options(header = 'True').schema(crime_schema).csv('Part1_Crime_data.csv')
name = 'CrimeDateTime'
udf = UserDefinedFunction(lambda x: x[:-3].replace('/', '-'), StringType())
#df1 = df.select(*[udf(column).alias(name) if column == name else column for column in df.columns])
# I'd like to do this, but I get this error:
# Python in worker has different version 3.9 than that in driver 3.8, PySpark cannot run with different minor versions.
# Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.
# When I fix that, it doesn't let me change the variable type to timestamp (attempting the solutions here: https://sparkbyexamples.com/pyspark/pyspark-cast-column-type/)
# Additionally, it makes my cached dataframe throw another error:
#An error occurred while calling o2171.count.
#: org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 73.0 failed 1 times, most recent failure: Lost task 10.0 in stage 73.0 (TID 314) (DESKTOP-DVH32FF executor driver): java.net.SocketException: Connection reset
# I'm not really sure where to go from here, everything else works

3. Cache the DataFrame (https://sparkbyexamples.com/spark/spark-dataframe-cache-and-persist-explained/)

In [25]:
df1 = df.cache()

4. Show the count of the rows

In [26]:
df1.count()

350294

5. Print the schema

In [27]:
df1.printSchema()

root
 |-- X: float (nullable = true)
 |-- Y: float (nullable = true)
 |-- RowID: integer (nullable = true)
 |-- CrimeDateTime: string (nullable = true)
 |-- CrimeCode: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Inside_Outside: string (nullable = true)
 |-- Weapon: string (nullable = true)
 |-- Post: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Latitude: float (nullable = true)
 |-- Longitude: float (nullable = true)
 |-- Geolocation: float (nullable = true)
 |-- Premise: string (nullable = true)
 |-- VRIName: string (nullable = true)
 |-- Total_Incidents: string (nullable = true)



6. Display first 5 rows

In [28]:
df1.show(5)

+---------+---------+-----+--------------------+---------+--------------------+-----------------+--------------+------+----+---------+--------------+--------+---------+-----------+-------+-------+---------------+
|        X|        Y|RowID|       CrimeDateTime|CrimeCode|            Location|      Description|Inside_Outside|Weapon|Post| District|  Neighborhood|Latitude|Longitude|Geolocation|Premise|VRIName|Total_Incidents|
+---------+---------+-----+--------------------+---------+--------------------+-----------------+--------------+------+----+---------+--------------+--------+---------+-----------+-------+-------+---------------+
|1421661.4| 593584.5|    1|2021/09/24 08:00:...|       6D|500 SAINT PAUL ST...|LARCENY FROM AUTO|          null|    NA| 124|  CENTRAL|  MOUNT VERNON| 39.2959| -76.6137|       null|   null|   null|              1|
|1428629.5|592267.25|    2|2021/09/23 02:00:...|       6D|   0 N WASHINGTON ST|LARCENY FROM AUTO|          null|    NA| 212|SOUTHEAST|BUTCHER'S HILL

1. What are distinct crime codes?

In [29]:
df1.createOrReplaceTempView('crime_codes')

query = """
SELECT DISTINCT CrimeCode
FROM crime_codes
"""
output = spark.sql(query)
output.show(100)

+---------+
|CrimeCode|
+---------+
|       3P|
|       3K|
|      3BJ|
|       1A|
|       3M|
|       5F|
|       4B|
|       3B|
|       7A|
|      3NF|
|      3EF|
|       5D|
|       3N|
|       6K|
|      3LO|
|      3AF|
|       7B|
|      3GO|
|     3AJF|
|      8GV|
|      8AO|
|       7C|
|      3AK|
|       6L|
|      3GK|
|      3EO|
|      3JO|
|       3F|
|       1K|
|       8H|
|      8CV|
|      8DO|
|       4C|
|       5A|
|       6C|
|       3D|
|      3NK|
|       6H|
|      3LK|
|     3AJK|
|      3CO|
|       3L|
|       4E|
|      8BV|
|       6D|
|       2A|
|       3C|
|       8I|
|      3NO|
|      3JF|
|      3LF|
|       3E|
|       8J|
|       1O|
|      8BO|
|      3CK|
|       2B|
|      3JK|
|       5B|
|       4A|
|      8GO|
|      8EV|
|      3CF|
|      8EO|
|       6G|
|       6A|
|       9S|
|      3EK|
|      8FV|
|      3GF|
|      8CO|
|       3G|
|       3H|
|       4D|
|      8FO|
|       6J|
|       6F|
|       6E|
|       3J|
|       5C|
|   

2. Count the number of crimes by the crime codes and order by the resulting counts in descending order

In [30]:
df1.createOrReplaceTempView('crime_code_count')

query = """
SELECT CrimeCode, COUNT(*) as count
FROM crime_code_count
GROUP BY CrimeCode
ORDER BY count DESC
"""
output = spark.sql(query)
output.show(100)

+---------+-----+
|CrimeCode|count|
+---------+-----+
|       4E|60450|
|       6D|45203|
|       7A|29006|
|       5A|28377|
|       6J|19500|
|       6G|17326|
|       6E|17262|
|       6C|16543|
|       4C|16106|
|      3AF|11195|
|       5D|10804|
|       4A|10220|
|       4B| 9302|
|       3B| 7712|
|       4D| 4938|
|       9S| 4680|
|       5B| 4154|
|      3CF| 2803|
|       5C| 2787|
|       6F| 2645|
|     3AJF| 2419|
|       2A| 2293|
|      3AK| 2160|
|       6B| 2145|
|       3K| 1997|
|       1A| 1969|
|      3AO| 1772|
|       7C| 1639|
|       5F| 1240|
|       3D| 1096|
|      3JF| 1088|
|       5E| 1007|
|       6L|  988|
|       8H|  668|
|      3BJ|  598|
|       6A|  523|
|       3P|  503|
|      3CK|  478|
|      3GF|  461|
|      3CO|  423|
|      3JO|  351|
|      3JK|  307|
|      3NF|  256|
|      8AO|  243|
|       3H|  209|
|       2B|  198|
|       8J|  186|
|     3AJO|  173|
|       7B|  143|
|       1K|  143|
|       6H|  135|
|       1O|  132|
|      3EF

3. Which neighborhood had most crimes?

In [31]:
df1.createOrReplaceTempView('nb_most_crimes')

query = """
SELECT Neighborhood, COUNT(*) as count
FROM nb_most_crimes
GROUP BY Neighborhood
ORDER BY count
"""
output = spark.sql(query)
output.show(1, truncate = False)

+-----------------------+-----+
|Neighborhood           |count|
+-----------------------+-----+
|DUNDALK MARINE TERMINAL|5    |
+-----------------------+-----+
only showing top 1 row



4. Which month of the year had most crimes?

In [32]:
df1.createOrReplaceTempView('most_crimes')

# I presume this would work if my other part was doing what I wanted it to do, but I haven't been able to change the data type
# or change the format without it messing up the rest of the program

query = """
SELECT MONTH(CrimeDateTime) as m, COUNT(*) as count
FROM most_crimes
GROUP BY m
ORDER BY count DESC
"""
output = spark.sql(query)
output.show(12, truncate = False)

+----+------+
|m   |count |
+----+------+
|null|350294|
+----+------+



5. What weapons were used? 

In [33]:
df1.createOrReplaceTempView('weapons_used')

query = """
SELECT Weapon
FROM weapons_used
WHERE Weapon is not null and Weapon != 'NA'
GROUP BY Weapon
"""
output = spark.sql(query)
output.show(10, truncate = False)

+-------+
|Weapon |
+-------+
|HANDS  |
|KNIFE  |
|OTHER  |
|FIRE   |
|FIREARM|
+-------+



6. Which weapon was used the most? 

In [34]:
df1.createOrReplaceTempView('most_used_weapon')

query = """
SELECT Weapon, COUNT(*) as count
FROM most_used_weapon
WHERE Weapon is not null and Weapon != 'NA'
GROUP BY Weapon
ORDER BY count DESC
"""
output = spark.sql(query)
output.show(1, truncate = False)

+-------+-----+
|Weapon |count|
+-------+-----+
|FIREARM|35293|
+-------+-----+
only showing top 1 row

