In [2]:
!pip install kafka-python



# Kafka producer & Consumer 
The Kafka Producer in the cell below ingests the input csv file and serializes it into json format, writing it to the topic "mytopic" in kafka.
We will only read a few lines from the input csv for demonstration purposes.
The Kafka Consumer then reads the topic "mytopic" and outputs a json format version of the input dataset.

In [3]:
from kafka import KafkaProducer
import csv
import json

producer = KafkaProducer(bootstrap_servers="localhost:9092", value_serializer=lambda v: json.dumps(v).encode('utf-8'))
i = 0 # We will be reading only about 10 rows for demonstration of the pipeline.
with open("./dataset.csv") as f:
    reader = csv.DictReader(f)
    for row in reader:
        i = i + 1
        producer.send(topic="mytopic", value=row)
        producer.flush()
        if i == 10:
            break

In [4]:
from kafka import KafkaConsumer

consumer = KafkaConsumer("mytopic", bootstrap_servers="localhost:9092", auto_offset_reset='earliest', consumer_timeout_ms=10000) # Timeout is needed for the consumer to not execute endlessly.
data = []
for message in consumer:
    data.append(json.loads(message.value))
consumer.close()

with open("output_dataset.json", "w+") as final:
    json.dump(data, final)

# Pyspark ETL
We will load the Kafka consumer output dataset, perform various transformations to partially normalize data to be compatible with a star schema data warehouse, and then load into a mysql instance running locally.

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ProiectIBD').config("spark.jars.packages", "mysql:mysql-connector-java:8.0.33").getOrCreate() # We inject the mysql connector dependency dynamically from Maven.

In [6]:
df = spark.read.json("output_dataset.json")
df.show(5)
df.printSchema()

+---------+-----------+-----------+----------+---------------+-----------------+-----+----------+----------+------------------+--------------------+-------------------+---------------+-----+--------------------+--------------+--------+----------+----------+
|AGE_GROUP|ARREST_BORO|ARREST_DATE|ARREST_KEY|ARREST_PRECINCT|JURISDICTION_CODE|KY_CD|LAW_CAT_CD|  LAW_CODE|          Latitude|             Lon_Lat|          Longitude|      OFNS_DESC|PD_CD|             PD_DESC|     PERP_RACE|PERP_SEX|X_COORD_CD|Y_COORD_CD|
+---------+-----------+-----------+----------+---------------+-----------------+-----+----------+----------+------------------+--------------------+-------------------+---------------+-----+--------------------+--------------+--------+----------+----------+
|    45-64|          M| 11/22/2021| 236791704|             28|                0|     |         M|PL 2225001|40.799008797000056|POINT (-73.952408...| -73.95240854099995|               |  581|                    |         BLACK|

## We'll first generate all 6 dimension tables, and finally the one fact table by joining the input dataframe with the generated dimension tables to determine their codes.

In [12]:
import pyspark.sql.functions as F

df_pd = df.select(F.col("PD_CD"), F.col("PD_DESC")).distinct()
df_pd = df_pd.withColumn("pd_id", F.monotonically_increasing_id())
df_pd.show()

+-----+--------------------+-----+
|PD_CD|             PD_DESC|pd_id|
+-----+--------------------+-----+
|  729|FORGERY,ETC.,UNCL...|    0|
|     |                    |    1|
|  263|         ARSON 2,3,4|    2|
|  579|                    |    3|
|  581|                    |    4|
|  177|        SEXUAL ABUSE|    5|
|  153|              RAPE 3|    6|
|  681|CHILD, ENDANGERIN...|    7|
|  511|CONTROLLED SUBSTA...|    8|
+-----+--------------------+-----+



In [13]:
df_ky = df.select(F.col("KY_CD")).distinct()
df_ky = df_ky.withColumn("ky_id", F.monotonically_increasing_id())
df_ky.show()

+-----+-----+
|KY_CD|ky_id|
+-----+-----+
|  113|    0|
|  104|    1|
|  233|    2|
|  114|    3|
|  116|    4|
|  235|    5|
|     |    6|
+-----+-----+



In [15]:
df_law = df.select(F.col("LAW_CODE"), F.col("LAW_CAT_CD").alias("category")).distinct()
df_law = df_law.withColumn("law_id", F.monotonically_increasing_id())
df_law.show()

+----------+--------+------+
|  LAW_CODE|category|law_id|
+----------+--------+------+
|PL 2601001|       M|     0|
|PL 2225001|       M|     1|
|PL 1302502|       F|     2|
|PL 2203400|       F|     3|
|PL 2200300|       M|     4|
|PL 2224001|       F|     5|
|PL 1306503|       F|     6|
|PL 1302503|       F|     7|
|PL 1702500|       F|     8|
|PL 1501001|       F|     9|
+----------+--------+------+



In [17]:
df_location_info = df.select(F.col("ARREST_PRECINCT"), F.col("ARREST_BORO")).distinct()
df_location_info = df_location_info.withColumn("location_id", F.monotonically_increasing_id())
df_location_info.show()

+---------------+-----------+-----------+
|ARREST_PRECINCT|ARREST_BORO|location_id|
+---------------+-----------+-----------+
|             67|          K|          0|
|             28|          M|          1|
|             41|          B|          2|
|            113|          Q|          3|
|            115|          Q|          4|
|             25|          M|          5|
|             14|          M|          6|
|             27|          Q|          7|
+---------------+-----------+-----------+



In [18]:
df_gps = df.select(F.col("Longitude"), F.col("Latitude")).distinct()
df_gps = df_gps.withColumn("gps_id", F.monotonically_increasing_id())
df_gps.show()

+-------------------+------------------+------+
|          Longitude|          Latitude|gps_id|
+-------------------+------------------+------+
| -73.95240854099995|40.799008797000056|     0|
|                   |                  |     1|
| -73.87622400099998| 40.77205649600006|     2|
| -73.87833183299993|40.804012949000025|     3|
|-73.941109285999971|40.800694331000045|     4|
| -73.77919852099996| 40.69166001700007|     5|
| -73.77604736799998| 40.67970040800003|     6|
|-73.991212110999982|40.757839003000072|     7|
| -73.95033556299995|40.648650085000035|     8|
| -73.89529641399997|40.816391847000034|     9|
+-------------------+------------------+------+



In [19]:
df_map = df.select(F.col("X_COORD_CD"), F.col("Y_COORD_CD")).distinct()
df_map = df_map.withColumn("map_id", F.monotonically_increasing_id())
df_map.show()

+----------+----------+------+
|X_COORD_CD|Y_COORD_CD|map_id|
+----------+----------+------+
|    997427|    230378|     0|
|          |          |     1|
|   1017934|    232221|     2|
|   1000555|    230994|     3|
|    986685|    215375|     4|
|   1046367|    186986|     5|
|   1013232|    236725|     6|
|   1018534|    220579|     7|
|   1045482|    191341|     8|
|  998032.0|  175598.0|     9|
+----------+----------+------+



In [20]:
df_perp_info = df.select(F.col("AGE_GROUP"), F.col("PERP_SEX"), F.col("PERP_RACE")).distinct()
df_perp_info = df_perp_info.withColumn("perp_id", F.monotonically_increasing_id())
df_perp_info.show()

+---------+--------+--------------+-------+
|AGE_GROUP|PERP_SEX|     PERP_RACE|perp_id|
+---------+--------+--------------+-------+
|    25-44|       M|WHITE HISPANIC|      0|
|    25-44|       M|         BLACK|      1|
|    25-44|       M|       UNKNOWN|      2|
|    45-64|       M|         BLACK|      3|
|    18-24|       M|         BLACK|      4|
+---------+--------+--------------+-------+



## Generate the big fact table

In [33]:
df_arrests = df.select(F.col("ARREST_DATE"), F.col("JURISDICTION_CODE"), F.col("PD_CD").alias("temp_pd_cd"), F.col("PD_DESC").alias("temp_pd_desc"), F.col("KY_CD").alias("ky_cd_temp"),
                       F.col("LAW_CODE").alias("law_code_temp"), F.col("LAW_CAT_CD").alias("law_cat_cd_temp"), F.col("ARREST_PRECINCT").alias("arrest_precinct_temp"), F.col("ARREST_BORO").alias("arrest_boro_temp"),
                       F.col("Longitude").alias("long_temp"), F.col("Latitude").alias("lat_temp"),
                       F.col("X_COORD_CD").alias("x_temp"), F.col("Y_COORD_CD").alias("y_temp"),
                       F.col("AGE_GROUP").alias("temp_age_group"), F.col("PERP_SEX").alias("temp_sex"), F.col("PERP_RACE").alias("temp_race")
                      )
temp_cols = ["temp_pd_cd", "temp_pd_desc", "ky_cd_temp", "law_code_temp", "law_cat_cd_temp", "arrest_precinct_temp", "arrest_boro_temp", "long_temp", "lat_temp", "x_temp", "y_temp", "temp_age_group", "temp_sex", "temp_race"]

temp_cols.remove("temp_pd_cd")
temp_cols.remove("temp_pd_desc")
df_arrests = df_arrests.join(df_pd, (df_arrests["temp_pd_cd"] == df_pd["PD_CD"]) & (df_arrests["temp_pd_desc"] == df_pd["PD_DESC"])).select("ARREST_DATE", "JURISDICTION_CODE", "pd_id", *temp_cols)

temp_cols.remove("ky_cd_temp")
df_arrests = df_arrests.join(df_ky, (df_arrests["ky_cd_temp"] == df_ky["KY_CD"])).select("ARREST_DATE", "JURISDICTION_CODE", "pd_id", "ky_id", *temp_cols)

temp_cols.remove("law_code_temp")
temp_cols.remove("law_cat_cd_temp")
df_arrests = df_arrests.join(df_law, ( (df_arrests["law_code_temp"] == df_law["LAW_CODE"]) & (df_arrests["law_cat_cd_temp"] == df_law["category"]))).select("ARREST_DATE", "JURISDICTION_CODE", "pd_id", "ky_id", "law_id", *temp_cols)

temp_cols.remove("arrest_precinct_temp")
temp_cols.remove("arrest_boro_temp")
df_arrests = df_arrests.join(df_location_info, ( (df_arrests["arrest_precinct_temp"] == df_location_info["ARREST_PRECINCT"] ) & (df_arrests["arrest_boro_temp"] == df_location_info["ARREST_BORO"]) ))\
.select("ARREST_DATE", "JURISDICTION_CODE", "pd_id", "ky_id", "law_id", "location_id", *temp_cols)

temp_cols.remove("long_temp")
temp_cols.remove("lat_temp")
df_arrests = df_arrests.join(df_gps, (df_arrests["long_temp"] == df_gps["Longitude"]) & (df_arrests["lat_temp"] == df_gps["Latitude"]))\
.select("ARREST_DATE", "JURISDICTION_CODE", "pd_id", "ky_id", "law_id", "location_id", "gps_id", *temp_cols)

temp_cols.remove("x_temp")
temp_cols.remove("y_temp")
df_arrests = df_arrests.join(df_map, (df_arrests["x_temp"] == df_map["X_COORD_CD"]) & (df_arrests["y_temp"] == df_map["Y_COORD_CD"]))\
.select("ARREST_DATE", "JURISDICTION_CODE", "pd_id", "ky_id", "law_id", "location_id", "gps_id", "map_id", *temp_cols)

temp_cols.remove("temp_age_group")
temp_cols.remove("temp_sex")
temp_cols.remove("temp_race")
df_arrests = df_arrests.join(df_perp_info, (df_arrests["temp_age_group"] == df_perp_info["AGE_GROUP"]) & (df_arrests["temp_sex"] == df_perp_info["PERP_SEX"]) & (df_arrests["temp_race"] == df_perp_info["PERP_RACE"]))\
.select("ARREST_DATE", "JURISDICTION_CODE", "pd_id", "ky_id", "law_id", "location_id", "gps_id", "map_id", "perp_id")

df_arrests.show()

+-----------+-----------------+-----+-----+------+-----------+------+------+-------+
|ARREST_DATE|JURISDICTION_CODE|pd_id|ky_id|law_id|location_id|gps_id|map_id|perp_id|
+-----------+-----------------+-----+-----+------+-----------+------+------+-------+
| 11/22/2021|                0|    4|    6|     1|          1|     0|     0|      3|
| 12/04/2021|                0|    6|    1|     2|          2|     9|     6|      0|
| 11/09/2021|                0|    7|    2|     0|          3|     6|     5|      1|
| 06/18/2007|                1|    8|    5|     4|          7|     1|     1|      4|
| 01/26/2019|                0|    5|    4|     6|          5|     4|     3|      3|
| 02/06/2019|                0|    1|    6|     3|          6|     7|     4|      2|
| 12/03/2021|                0|    3|    6|     5|          4|     2|     7|      1|
| 11/10/2021|               72|    2|    3|     9|          2|     3|     2|      0|
| 12/28/2021|                0|    0|    0|     8|          3|   

## Write the resulting dimension dataframes as tables to mysql first, and the fact table last

In [4]:
df_pd.write.format("jdbc").option("driver","com.mysql.cj.jdbc.Driver").option("url", "jdbc:mysql://localhost:3306/IBD").option("dbtable", "PD").option("user", "root").option("password", "123").mode("overwrite").save()
df_ky.write.format("jdbc").option("driver","com.mysql.cj.jdbc.Driver").option("url", "jdbc:mysql://localhost:3306/IBD").option("dbtable", "KY").option("user", "root").option("password", "123").mode("overwrite").save()
df_law.write.format("jdbc").option("driver","com.mysql.cj.jdbc.Driver").option("url", "jdbc:mysql://localhost:3306/IBD").option("dbtable", "LAW").option("user", "root").option("password", "123").mode("overwrite").save()
df_location_info.write.format("jdbc").option("driver","com.mysql.cj.jdbc.Driver").option("url", "jdbc:mysql://localhost:3306/IBD").option("dbtable", "LOCATION_INFO").option("user", "root").option("password", "123").mode("overwrite").save()
df_gps.write.format("jdbc").option("driver","com.mysql.cj.jdbc.Driver").option("url", "jdbc:mysql://localhost:3306/IBD").option("dbtable", "GPS_COORDS").option("user", "root").option("password", "123").mode("overwrite").save()
df_map.write.format("jdbc").option("driver","com.mysql.cj.jdbc.Driver").option("url", "jdbc:mysql://localhost:3306/IBD").option("dbtable", "MAP_COORDS").option("user", "root").option("password", "123").mode("overwrite").save()
df_perp_info.write.format("jdbc").option("driver","com.mysql.cj.jdbc.Driver").option("url", "jdbc:mysql://localhost:3306/IBD").option("dbtable", "PERP_INFO").option("user", "root").option("password", "123").mode("overwrite").save()

df_arrests.write.format("jdbc").option("driver","com.mysql.cj.jdbc.Driver").option("url", "jdbc:mysql://localhost:3306/IBD").option("dbtable", "ARRESTS").option("user", "root").option("password", "123").mode("overwrite").save()
