In [None]:
from pyspark import *;
from pyspark.sql import *;
from pyspark.sql.functions import *;
import pandas as pd;

In [None]:
spark = SparkSession.builder.appName("Q5").getOrCreate()

In [None]:
df = spark.read.csv("crime_data.csv",header=True,inferSchema=True)
df.printSchema()
df.show()

root
 |-- DR_NO: integer (nullable = true)
 |-- Date Rptd: string (nullable = true)
 |-- DATE OCC: string (nullable = true)
 |-- TIME OCC: integer (nullable = true)
 |-- AREA NAME: string (nullable = true)
 |-- Part 1-2: integer (nullable = true)
 |-- Crime code: string (nullable = true)
 |-- Crime Code Desc: string (nullable = true)
 |-- Vict Age: integer (nullable = true)
 |-- Vict Sex: string (nullable = true)
 |-- Premis Cd: integer (nullable = true)
 |-- Premis Desc: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- LAT: double (nullable = true)
 |-- LON: double (nullable = true)

+---------+--------------------+--------------------+--------+-----------+--------+----------+--------------------+--------+--------+---------+--------------------+------------+-------+---------+
|    DR_NO|           Date Rptd|            DATE OCC|TIME OCC|  AREA NAME|Part 1-2|Crime code|     Crime Code Desc|Vict Age|Vict Sex|Premis Cd|         Premis Desc| Status Desc|    LAT|   

In [None]:
df.select([count(when(isnull(c)|isnan(c),c)).alias(c) for c in df.columns]).show()

+-----+---------+--------+--------+---------+--------+----------+---------------+--------+--------+---------+-----------+-----------+---+---+
|DR_NO|Date Rptd|DATE OCC|TIME OCC|AREA NAME|Part 1-2|Crime code|Crime Code Desc|Vict Age|Vict Sex|Premis Cd|Premis Desc|Status Desc|LAT|LON|
+-----+---------+--------+--------+---------+--------+----------+---------------+--------+--------+---------+-----------+-----------+---+---+
|    0|        0|       0|       0|        0|       0|         0|              0|      15|     168|        0|          1|          0|  0|  0|
+-----+---------+--------+--------+---------+--------+----------+---------------+--------+--------+---------+-----------+-----------+---+---+



In [None]:
df = df.fillna({"Vict Sex": "Unknown", "Vict Age": -1})
df = df.filter((col("Vict Age") > 0) & (col("Vict Age") <= 120))
df.show()

+---------+--------------------+--------------------+--------+-----------+--------+----------+--------------------+--------+--------+---------+--------------------+------------+-------+---------+
|    DR_NO|           Date Rptd|            DATE OCC|TIME OCC|  AREA NAME|Part 1-2|Crime code|     Crime Code Desc|Vict Age|Vict Sex|Premis Cd|         Premis Desc| Status Desc|    LAT|      LON|
+---------+--------------------+--------------------+--------+-----------+--------+----------+--------------------+--------+--------+---------+--------------------+------------+-------+---------+
|200106753|    02-09-2020 00:00|    02-08-2020 00:00|    1800|    Central|       1|       330|BURGLARY FROM VEH...|      47|       M|      128|BUS STOP/LAYOVER ...| Invest Cont|34.0444|-118.2628|
|200320258|    11-11-2020 00:00|    11-04-2020 00:00|    1700|  Southwest|       1|       480|       BIKE - STOLEN|      19|       X|      502|MULTI-UNIT DWELLI...| Invest Cont| 34.021|-118.3002|
|200907217|    05-10

In [None]:
df = df.withColumn("TIME OCC", lpad(col("TIME OCC").cast("string"), 4, "0"))
df = df.withColumn("TIME OCC", regexp_replace(col("TIME OCC"), r"(\d{2})(\d{2})", "$1:$2"))
df.show()

+---------+--------------------+--------------------+--------+-----------+--------+----------+--------------------+--------+--------+---------+--------------------+------------+-------+---------+
|    DR_NO|           Date Rptd|            DATE OCC|TIME OCC|  AREA NAME|Part 1-2|Crime code|     Crime Code Desc|Vict Age|Vict Sex|Premis Cd|         Premis Desc| Status Desc|    LAT|      LON|
+---------+--------------------+--------------------+--------+-----------+--------+----------+--------------------+--------+--------+---------+--------------------+------------+-------+---------+
|200106753|    02-09-2020 00:00|    02-08-2020 00:00|   18:00|    Central|       1|       330|BURGLARY FROM VEH...|      47|       M|      128|BUS STOP/LAYOVER ...| Invest Cont|34.0444|-118.2628|
|200320258|    11-11-2020 00:00|    11-04-2020 00:00|   17:00|  Southwest|       1|       480|       BIKE - STOLEN|      19|       X|      502|MULTI-UNIT DWELLI...| Invest Cont| 34.021|-118.3002|
|200907217|    05-10

In [None]:
df = df.withColumn("AREA NAME", regexp_replace(col("AREA NAME"), r"[\\/|]+", ""))
df = df.withColumn("AREA NAME",trim(col("AREA NAME")))
df.show()

+---------+--------------------+--------------------+--------+-----------+--------+----------+--------------------+--------+--------+---------+--------------------+------------+-------+---------+
|    DR_NO|           Date Rptd|            DATE OCC|TIME OCC|  AREA NAME|Part 1-2|Crime code|     Crime Code Desc|Vict Age|Vict Sex|Premis Cd|         Premis Desc| Status Desc|    LAT|      LON|
+---------+--------------------+--------------------+--------+-----------+--------+----------+--------------------+--------+--------+---------+--------------------+------------+-------+---------+
|200106753|    02-09-2020 00:00|    02-08-2020 00:00|   18:00|    Central|       1|       330|BURGLARY FROM VEH...|      47|       M|      128|BUS STOP/LAYOVER ...| Invest Cont|34.0444|-118.2628|
|200320258|    11-11-2020 00:00|    11-04-2020 00:00|   17:00|  Southwest|       1|       480|       BIKE - STOLEN|      19|       X|      502|MULTI-UNIT DWELLI...| Invest Cont| 34.021|-118.3002|
|200907217|    05-10

In [None]:
df = df.withColumn("Date_Rptd",split("Date Rptd", " ")[0])
df = df.withColumn("Date_Occ",split("DATE OCC", " ")[0])
df = df.withColumn("Date_Rptd",regexp_replace("Date_Rptd","/","-"))
df = df.withColumn("Date_Occ",regexp_replace("Date_Occ","/","-"))
df = df.withColumn("Date_Rptd",date_format(to_date("Date_Rptd","MM-dd-yyyy"),"yyyy-MM-dd"))
df = df.withColumn("Date_Occ",date_format(to_date("Date_Occ","MM-dd-yyyy"),"yyyy-MM-dd"))

df = df.drop("DATE OCC","Date Rptd")
df.show()

+---------+--------+-----------+--------+----------+--------------------+--------+--------+---------+--------------------+------------+-------+---------+----------+----------+
|    DR_NO|TIME OCC|  AREA NAME|Part 1-2|Crime code|     Crime Code Desc|Vict Age|Vict Sex|Premis Cd|         Premis Desc| Status Desc|    LAT|      LON| Date_Rptd|  Date_Occ|
+---------+--------+-----------+--------+----------+--------------------+--------+--------+---------+--------------------+------------+-------+---------+----------+----------+
|200106753|   18:00|    Central|       1|       330|BURGLARY FROM VEH...|      47|       M|      128|BUS STOP/LAYOVER ...| Invest Cont|34.0444|-118.2628|2020-02-09|2020-02-08|
|200320258|   17:00|  Southwest|       1|       480|       BIKE - STOLEN|      19|       X|      502|MULTI-UNIT DWELLI...| Invest Cont| 34.021|-118.3002|2020-11-11|2020-11-04|
|200907217|   20:37|   Van Nuys|       1|       343|SHOPLIFTING-GRAND...|      19|       M|      405|      CLOTHING STOR

In [None]:
df = df.withColumn("Crime code", regexp_extract(col("Crime code"), r"(\d+)", 1))
df.show()

+---------+--------+-----------+--------+----------+--------------------+--------+--------+---------+--------------------+------------+-------+---------+----------+----------+
|    DR_NO|TIME OCC|  AREA NAME|Part 1-2|Crime code|     Crime Code Desc|Vict Age|Vict Sex|Premis Cd|         Premis Desc| Status Desc|    LAT|      LON| Date_Rptd|  Date_Occ|
+---------+--------+-----------+--------+----------+--------------------+--------+--------+---------+--------------------+------------+-------+---------+----------+----------+
|200106753|   18:00|    Central|       1|       330|BURGLARY FROM VEH...|      47|       M|      128|BUS STOP/LAYOVER ...| Invest Cont|34.0444|-118.2628|2020-02-09|2020-02-08|
|200320258|   17:00|  Southwest|       1|       480|       BIKE - STOLEN|      19|       X|      502|MULTI-UNIT DWELLI...| Invest Cont| 34.021|-118.3002|2020-11-11|2020-11-04|
|200907217|   20:37|   Van Nuys|       1|       343|SHOPLIFTING-GRAND...|      19|       M|      405|      CLOTHING STOR

In [None]:
df = df.withColumn("Year", year(col("Date_Rptd")))
df = df.withColumn("Month", month(col("Date_Rptd")))
df.show()

+---------+--------+-----------+--------+----------+--------------------+--------+--------+---------+--------------------+------------+-------+---------+----------+----------+----+-----+
|    DR_NO|TIME OCC|  AREA NAME|Part 1-2|Crime code|     Crime Code Desc|Vict Age|Vict Sex|Premis Cd|         Premis Desc| Status Desc|    LAT|      LON| Date_Rptd|  Date_Occ|Year|Month|
+---------+--------+-----------+--------+----------+--------------------+--------+--------+---------+--------------------+------------+-------+---------+----------+----------+----+-----+
|200106753|   18:00|    Central|       1|       330|BURGLARY FROM VEH...|      47|       M|      128|BUS STOP/LAYOVER ...| Invest Cont|34.0444|-118.2628|2020-02-09|2020-02-08|2020|    2|
|200320258|   17:00|  Southwest|       1|       480|       BIKE - STOLEN|      19|       X|      502|MULTI-UNIT DWELLI...| Invest Cont| 34.021|-118.3002|2020-11-11|2020-11-04|2020|   11|
|200907217|   20:37|   Van Nuys|       1|       343|SHOPLIFTING-G

In [None]:
area_crime_counts = df.groupBy("AREA NAME").count()
area_crime_counts.show()

+-----------+-----+
|  AREA NAME|count|
+-----------+-----+
|  Hollywood|   61|
|     Harbor|   42|
|     Newton|   18|
|   Van Nuys|   53|
|West Valley|   49|
|   Wilshire|   64|
|    Central|   67|
| Hollenbeck|   46|
|  Southwest|   87|
|    Rampart|   46|
|    West LA|   42|
|77th Street|   81|
|  Northeast|   47|
+-----------+-----+



In [None]:
df = df.toPandas()
df.to_csv("cleaned_crime.csv")

In [None]:
spark.stop()