In [214]:
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as sf
from pyspark.conf import SparkConf

conf = SparkConf()
conf.set("spark.driver.extraClassPath", "./drivers/postgresql-42.7.2.jar")
conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

spark = SparkSession.builder.appName("Data Wrangling").config(conf=conf).getOrCreate()

In [215]:
df = spark.read.csv("./data/LACrimeData.csv", header=True, inferSchema=True)
df.show(n=3)

+---------+--------------------+--------------------+--------+----+---------+-----------+--------+------+--------------------+--------------+--------+--------+------------+---------+--------------------+--------------+-----------+------+------------+--------+--------+--------+--------+--------------------+------------+-------+---------+
|    DR_NO|           Date Rptd|            DATE OCC|TIME OCC|AREA|AREA NAME|Rpt Dist No|Part 1-2|Crm Cd|         Crm Cd Desc|       Mocodes|Vict Age|Vict Sex|Vict Descent|Premis Cd|         Premis Desc|Weapon Used Cd|Weapon Desc|Status| Status Desc|Crm Cd 1|Crm Cd 2|Crm Cd 3|Crm Cd 4|            LOCATION|Cross Street|    LAT|      LON|
+---------+--------------------+--------------------+--------+----+---------+-----------+--------+------+--------------------+--------------+--------+--------+------------+---------+--------------------+--------------+-----------+------+------------+--------+--------+--------+--------+--------------------+------------+--

In [216]:
df.printSchema()

root
 |-- DR_NO: integer (nullable = true)
 |-- Date Rptd: string (nullable = true)
 |-- DATE OCC: string (nullable = true)
 |-- TIME OCC: integer (nullable = true)
 |-- AREA: integer (nullable = true)
 |-- AREA NAME: string (nullable = true)
 |-- Rpt Dist No: integer (nullable = true)
 |-- Part 1-2: integer (nullable = true)
 |-- Crm Cd: integer (nullable = true)
 |-- Crm Cd Desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- Vict Age: integer (nullable = true)
 |-- Vict Sex: string (nullable = true)
 |-- Vict Descent: string (nullable = true)
 |-- Premis Cd: integer (nullable = true)
 |-- Premis Desc: string (nullable = true)
 |-- Weapon Used Cd: integer (nullable = true)
 |-- Weapon Desc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- Crm Cd 1: integer (nullable = true)
 |-- Crm Cd 2: integer (nullable = true)
 |-- Crm Cd 3: integer (nullable = true)
 |-- Crm Cd 4: integer (nullable = true)
 |-- L

In [217]:
df.columns

['DR_NO',
 'Date Rptd',
 'DATE OCC',
 'TIME OCC',
 'AREA',
 'AREA NAME',
 'Rpt Dist No',
 'Part 1-2',
 'Crm Cd',
 'Crm Cd Desc',
 'Mocodes',
 'Vict Age',
 'Vict Sex',
 'Vict Descent',
 'Premis Cd',
 'Premis Desc',
 'Weapon Used Cd',
 'Weapon Desc',
 'Status',
 'Status Desc',
 'Crm Cd 1',
 'Crm Cd 2',
 'Crm Cd 3',
 'Crm Cd 4',
 'LOCATION',
 'Cross Street',
 'LAT',
 'LON']

In [218]:
# drop the column part 1-2
df = df.drop("Part 1-2", "Mocodes", "Rpt Dist No")

In [219]:
# change the time occ column to a string value

df = df.withColumn("TIME OCC", sf.col("TIME OCC").cast("string"))

In [220]:
df.show(n=5)

+---------+--------------------+--------------------+--------+----+----------+------+--------------------+--------+--------+------------+---------+--------------------+--------------+-----------+------+------------+--------+--------+--------+--------+--------------------+------------+-------+---------+
|    DR_NO|           Date Rptd|            DATE OCC|TIME OCC|AREA| AREA NAME|Crm Cd|         Crm Cd Desc|Vict Age|Vict Sex|Vict Descent|Premis Cd|         Premis Desc|Weapon Used Cd|Weapon Desc|Status| Status Desc|Crm Cd 1|Crm Cd 2|Crm Cd 3|Crm Cd 4|            LOCATION|Cross Street|    LAT|      LON|
+---------+--------------------+--------------------+--------+----+----------+------+--------------------+--------+--------+------------+---------+--------------------+--------------+-----------+------+------------+--------+--------+--------+--------+--------------------+------------+-------+---------+
|190326475|03/01/2020 12:00:...|03/01/2020 12:00:...|    2130|   7|  Wilshire|   510|   

In [221]:
# concatinate the colon in between the hour and minute

df = df.withColumn(
    "TIME OCC", 
    sf.col("TIME OCC").cast("string")
).withColumn(
    "TIME OCC", 
    sf.concat(
        sf.substring("TIME OCC", 0, 2),
        sf.lit(":"),
        sf.substring("TIME OCC", 3, 2),
    ),
)

In [222]:
# clean the times that they will fit into the database

df = df.withColumn(
    "TIME OCC",
    sf.to_timestamp("TIME OCC", "HH:mm")
).withColumn(
    "TIME OCC",
    sf.col("TIME OCC").cast("string")
).na.fill(value="1970-01-01 00:00:00", subset="TIME OCC").withColumn(
    "Date Rptd",
    sf.substring("Date Rptd", 0, 10)
).withColumn(
    "DATE AND TIME OCC",
    sf.concat(
        sf.substring("DATE OCC", 0, 10),
        sf.substring("TIME OCC", 11, 9)
    )
).drop("DATE OCC", "TIME OCC")

In [223]:
# fill the null values in the specified col with X according to the data dictionary
df = df.na.fill(value="X", subset="Vict Sex").na.fill(value="X", subset="Vict Descent")

In [224]:
df = df.withColumn(
    "location_name",
    sf.regexp_replace(
        sf.regexp_replace(df.LOCATION, r'\s+', " "),
        r'PLACE',
        "PL"
    ),
) \
.orderBy(sf.asc("location_name"))

In [225]:
w = Window.orderBy(df["location_name"])
df = df.withColumn("location_id_pk", sf.rank().over(w)).drop("LOCATION")

In [226]:
# connect to the database

properties = {
    "user": "postgres",
    "password": "sopro722!",
    "driver": "org.postgresql.Driver"
}

url = "jdbc:postgresql://localhost:5432/lacrime_db"


In [227]:
def insert_data(df, url: str, table: str, properties: dict[str]):
    df.repartition(4) \
    .write \
    .mode("append") \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", table) \
    .option("user", properties["user"]) \
    .option("password", properties["password"]) \
    .save()

In [228]:
# insert weapon into the database

df_weapons = df.select(
    sf.col("Weapon Used Cd").alias("weapon_id_pk"),
    sf.col("Weapon Desc").alias("weapon")
) \
.filter(df["Weapon Used Cd"].isNotNull()) \
.dropDuplicates() \
.sort(sf.asc("weapon_id_pk"))

In [229]:
insert_data(df_weapons, url, "weapon", properties)
    
print("Saved to Database")

Saved to Database


In [230]:
df_area = df.select(
    sf.col("AREA").alias("area_id_pk"),
    sf.col("AREA NAME").alias("area_name")
) \
.filter(df["AREA"].isNotNull()) \
.dropDuplicates() \
.orderBy(sf.asc("area_id_pk"))

In [231]:
insert_data(df_area, url, "area", properties)

print("Saved to Database")

Saved to Database


In [232]:
# input the status data into the status table

df_status = df.select(
    sf.col("Status").alias("status_code"),
    sf.col("Status Desc").alias("status_desc")
) \
.na.drop(subset="status_code") \
.dropDuplicates() \
.orderBy(sf.asc("status_code")) 



In [233]:
df_status.show()

+-----------+------------+
|status_code| status_desc|
+-----------+------------+
|         AA|Adult Arrest|
|         AO| Adult Other|
|         CC|         UNK|
|         IC| Invest Cont|
|         JA|  Juv Arrest|
|         JO|   Juv Other|
+-----------+------------+



In [234]:
insert_data(df_status, url, "status", properties)

In [235]:
# input the status data into the status table

df_premisis = df.select(
    sf.col("Premis Cd").alias("premisis_id_pk"),
    sf.col("Premis Desc").alias("premisis_desc")
) \
.filter(df["Premis Cd"].isNotNull()) \
.dropDuplicates() \
.orderBy(sf.asc("premisis_id_pk"))

In [236]:
insert_data(df_premisis, url, "premisis", properties)

In [237]:
df_crime = df.select(
    sf.col("Crm Cd").alias("crime_id_pk"),
    sf.col("Crm Cd Desc").alias("crime_desc")
) \
.filter(df["Crm Cd"].isNotNull()) \
.dropDuplicates(subset=["crime_id_pk"]) \
.orderBy(sf.asc("crime_id_pk"))

In [238]:
insert_data(df_crime, url, "crime", properties)

In [239]:
df_location = df.select(
    sf.col("location_id_pk"),
    sf.col("location_name")
) \
.na.drop(subset="location_name") \
.dropDuplicates(subset=["location_id_pk"]) \
.orderBy(sf.asc("location_name"))


In [240]:
df_location.filter(df_location["location_name"].isNull()) \
.show()

+--------------+-------------+
|location_id_pk|location_name|
+--------------+-------------+
+--------------+-------------+



In [241]:
insert_data(df_location, url, "location", properties)

In [242]:
df_report = df.select(
    sf.col("DR_NO").alias("report_id_pk"),
    sf.to_date("Date Rptd", "MM/dd/yyyy").alias("date_reported"),
    sf.to_timestamp("DATE AND TIME OCC", "MM/dd/yyyy HH:mm:ss").alias("date_occured"),
    sf.col("Vict Age").alias("victim_age"),
    sf.col("Vict Sex").alias("victim_sex"),
    sf.col("Vict Descent").alias("victim_decent"),
    sf.col("AREA").alias("area_id_fk"),
    sf.col("location_id_pk").alias("location_id_fk"),
    sf.col("Premis Cd").alias("premisis_id_fk"),
    sf.col("Weapon Used Cd").alias("weapon_id_fk"),
    sf.col("Status").alias("status_code"),
    sf.col("LAT").alias("latitude"),
    sf.col("LON").alias("longitude"),
) \
.filter(df["DR_NO"].isNotNull()) \
.dropDuplicates(subset=["report_id_pk"]) \
.repartition(4)

In [243]:
df_report.write \
    .mode("append") \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", "report") \
    .option("user", properties["user"]) \
    .option("password", properties["password"]) \
    .save()

In [244]:
df_crime_report = df.dropDuplicates(subset=["DR_NO"]) \
.withColumn('crime_id_fk', sf.explode(sf.array("Crm Cd 1", "Crm Cd 2", "Crm Cd 3", "Crm Cd 4"))) \
.select(
    sf.col("DR_NO").alias("report_id_fk"),
    sf.col("crime_id_fk")
) \
.na.drop(subset=["crime_id_fk"])

In [245]:
df_crime_report = df_crime_report.join(df_crime, df_crime_report["crime_id_fk"] == df_crime["crime_id_pk"], "left") \
.filter("crime_id_pk IS NOT NULL") \
.drop("crime_id_pk", "crime_desc")

In [246]:
insert_data(df_crime_report, url, "crime_report", properties)