In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["SPARK_HOME"] = "/usr/lib/spark"

In [2]:
import datetime
import tempfile
import pandas as pd

In [4]:
gcs_bucket  = 'crime-data-group8'

In [5]:
output_path = f"gs://{gcs_bucket}/US-chicago/2020To-Present/test/chicago.csv"
SPARK_MASTER = 'spark://10.128.0.3:7077'

In [13]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.functions import col, avg ,lit ,to_timestamp ,struct ,from_json, date_format ,to_json ,coalesce, when
from pyspark.sql.types import (StructType, StructField, StringType, 
                              IntegerType, DoubleType, BooleanType, TimestampType)

import seaborn as sns
spark = SparkSession.builder \
    .appName("GCS Integration") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "test-456807-0581efb4fbc8.json") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .config("spark.streaming.stopGracefullyOnShutdown", "true") \
    .config("spark.sql.streaming.checkpointLocation", f"gs://{gcs_bucket}/checkpoints") \
    .config("spark.sql.adaptive.enabled", "false") \
    .getOrCreate()

In [5]:
# Read CSV file from GCS bucket
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(f"gs://{gcs_bucket}/Chicago.csv")

# Display first few rows to verify the data
df.show(5)

                                                                                

+--------+-----------+--------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|      ID|Case Number|                Date|               Block|IUCR|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+--------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|13829197|   JJ246652|05/08/2025 12:00:...|     040XX W LAKE ST|1310|   CRIMINAL DAMAGE|         TO PROPERTY|         CTA STATION|

In [30]:
df.count()

                                                                                

1259460

In [31]:
df.dtypes

[('ID', 'int'),
 ('Case Number', 'string'),
 ('Date', 'string'),
 ('Block', 'string'),
 ('IUCR', 'string'),
 ('Primary Type', 'string'),
 ('Description', 'string'),
 ('Location Description', 'string'),
 ('Arrest', 'boolean'),
 ('Domestic', 'boolean'),
 ('Beat', 'int'),
 ('District', 'int'),
 ('Ward', 'int'),
 ('Community Area', 'int'),
 ('FBI Code', 'string'),
 ('X Coordinate', 'int'),
 ('Y Coordinate', 'int'),
 ('Year', 'int'),
 ('Updated On', 'string'),
 ('Latitude', 'double'),
 ('Longitude', 'double'),
 ('Location', 'string')]

In [6]:
# Convert Date and Updated On columns to timestamp
df = df.withColumn("Date", 
    to_timestamp(col("Date"), "MM/dd/yyyy hh:mm:ss aa")) \
    .withColumn("Updated On", 
    to_timestamp(col("Updated On"), "MM/dd/yyyy hh:mm:ss aa"))

# Verify the conversion
print("Schema after conversion:")
df.printSchema()

# Show sample data to verify conversion
print("\nSample data with converted timestamps:")
df.select("Date", "Updated On").show(5, truncate=False)

Schema after conversion:
root
 |-- ID: integer (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: boolean (nullable = true)
 |-- Domestic: boolean (nullable = true)
 |-- Beat: integer (nullable = true)
 |-- District: integer (nullable = true)
 |-- Ward: integer (nullable = true)
 |-- Community Area: integer (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: integer (nullable = true)
 |-- Y Coordinate: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Updated On: timestamp (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)


Sample data with converted timestamps:
+-------------------+----

In [47]:
# Filter records with the correct timestamp
filtered_df = df.filter(col("Updated On") <= "2025-05-14T01:00:00.000")

# Show the counts
print(f"Records before filtering: {df.count()}")
print(f"Records after filtering: {filtered_df.count()}")

# Check the highest dates to verify the filtering
df.orderBy(col("Updated On").desc()).select("Updated On").show(5)



Records before filtering: 1259460


                                                                                

Records after filtering: 1257967




+-------------------+
|         Updated On|
+-------------------+
|2025-05-15 15:42:05|
|2025-05-15 15:42:05|
|2025-05-15 15:42:05|
|2025-05-15 15:42:05|
|2025-05-15 15:42:05|
+-------------------+
only showing top 5 rows



                                                                                

In [7]:
df_kafka = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "chicago_crimes_main1") \
    .option("startingOffsets", "latest") \
    .option("failOnDataLoss", "false") \
    .load()

In [None]:
schema = StructType([
    StructField("id", StringType(), True),
    StructField("case_number", StringType(), True),
    StructField("date", StringType(), True),
    StructField("block", StringType(), True),
    StructField("iucr", StringType(), True),
    StructField("primary_type", StringType(), True),
    StructField("description", StringType(), True),
    StructField("location_description", StringType(), True),
    StructField("arrest", BooleanType(), True),
    StructField("domestic", BooleanType(), True),
    StructField("beat", StringType(), True),
    StructField("district", StringType(), True),
    StructField("ward", StringType(), True),
    StructField("community_area", StringType(), True),
    StructField("fbi_code", StringType(), True),
    StructField("x_coordinate", StringType(), True),  # Changed to StringType
    StructField("y_coordinate", StringType(), True),  # Changed to StringType
    StructField("year", StringType(), True),         # Changed to StringType
    StructField("updated_on", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("location", StructType([
        StructField("latitude", StringType(), True),
        StructField("longitude", StringType(), True),
        StructField("human_address", StringType(), True)
    ]), True)
])

In [9]:
streaming_df = df_kafka \
    .selectExpr("CAST(value AS STRING) as raw_json") \
    .select(from_json(col("raw_json"), schema).alias("data")) \
    .select("data.*") \
    .withColumnRenamed("id", "ID") \
    .withColumnRenamed("case_number", "Case Number") \
    .withColumnRenamed("date", "Date") \
    .withColumnRenamed("block", "Block") \
    .withColumnRenamed("iucr", "IUCR") \
    .withColumnRenamed("primary_type", "Primary Type") \
    .withColumnRenamed("description", "Description") \
    .withColumnRenamed("location_description", "Location Description") \
    .withColumnRenamed("community_area", "Community Area") \
    .withColumnRenamed("fbi_code", "FBI Code") \
    .withColumnRenamed("x_coordinate", "X Coordinate") \
    .withColumnRenamed("y_coordinate", "Y Coordinate") \
    .withColumnRenamed("updated_on", "Updated On") \
    .withColumn("Updated On", to_timestamp("Updated On")) \
    .withColumn("Date", to_timestamp("Date")) \
    .withColumn("latitude", col("latitude").cast("double")) \
    .withColumn("longitude", col("longitude").cast("double"))

In [None]:
# Format the streaming data before writing
formatted_streaming_df = streaming_df.select(
    col("ID"),
    col("Case Number"),
    date_format(col("Date"), "yyyy-MM-dd HH:mm:ss").alias("Date"),
    col("Block"),
    col("IUCR"),
    col("Primary Type"),
    col("Description"),
    col("Location Description"),
    col("arrest"),
    col("domestic"),
    col("beat"),
    col("district"),
    col("ward"),
    col("Community Area"),
    col("FBI Code"),
    # Handle coordinates without casting since they're already strings
    col("X Coordinate"),
    col("Y Coordinate"),
    col("year"),
    date_format(col("Updated On"), "yyyy-MM-dd HH:mm:ss").alias("Updated On"),
    col("latitude"),
    col("longitude"),
    # Handle location with proper struct type
    when(col("location").isNotNull(), 
        to_json(struct(
            coalesce(col("location.latitude"), col("latitude")).alias("latitude"),
            coalesce(col("location.longitude"), col("longitude")).alias("longitude"),
            coalesce(col("location.human_address"), 
                    lit("{\"address\": \"\", \"city\": \"\", \"state\": \"\", \"zip\": \"\"}"))
                .alias("human_address")
        ))
    ).alias("location")
)



In [11]:
def write_batch(batch_df, batch_id):
    """
    Write batch data to existing CSV file in GCS bucket
    """
    # Format the data consistently with existing CSV
    batch_to_write = batch_df.select(
        "ID", "Case Number", "Date", "Block", "IUCR",
        "Primary Type", "Description", "Location Description",
        "arrest", "domestic", "beat", "district", "ward",
        "Community Area", "FBI Code", "X Coordinate", "Y Coordinate",
        "year", "Updated On", "latitude", "longitude"
    )
    
    # Use a directory path instead of direct file path
    output_dir = f"gs://{gcs_bucket}/US-chicago/2020To-Present/test/streaming_output"
    
    try:
        # Write to GCS bucket with unique batch ID to avoid conflicts
        batch_to_write.coalesce(1) \
            .write \
            .format("csv") \
            .mode("append") \
            .option("header", "false") \
            .option("compression", "none") \
            .save(output_dir)
    except Exception as e:
        print(f"Error writing batch {batch_id}: {str(e)}")
        raise e

In [12]:
query = formatted_streaming_df \
    .writeStream \
    .foreachBatch(write_batch) \
    .option("checkpointLocation", f"gs://{gcs_bucket}/checkpoints") \
    .trigger(processingTime="1 minute") \
    .start()
query.awaitTermination()

25/05/19 03:56:00 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
ERROR:root:KeyboardInterrupt while sending command.                             
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

                                                                                

In [16]:
df_kafka = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", 'chicago_crimes') \
    .option("startingOffsets", "earliest") \
    .load()

In [17]:
df_raw = df_kafka.select(col("value").cast("string").alias("raw_json"))
df_raw.show(5)

25/05/22 09:52:30 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


+--------------------+
|            raw_json|
+--------------------+
|{"id": "13831323"...|
|{"id": "13831657"...|
|{"id": "13831985"...|
|{"id": "13832376"...|
|{"id": "13833431"...|
+--------------------+
only showing top 5 rows



In [18]:
# Display one complete JSON record
df_raw.select("raw_json").limit(5).show(5, False)

25/05/22 09:52:33 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|raw_json                                                                                                                                                                                                                                                                 

In [10]:
print("Raw JSON Schema:")
df_raw.printSchema()
# print("\nParsed Schema:")
# df_parsed.printSchema()

Raw JSON Schema:
root
 |-- raw_json: string (nullable = true)



In [21]:
schema = StructType([
    StructField("id", StringType(), True),
    StructField("case_number", StringType(), True),
    StructField("date", TimestampType(), True),
    StructField("block", StringType(), True),
    StructField("iucr", StringType(), True),
    StructField("primary_type", StringType(), True),
    StructField("description", StringType(), True),
    StructField("location_description", StringType(), True),
    StructField("arrest", BooleanType(), True),
    StructField("domestic", BooleanType(), True),
    StructField("beat", StringType(), True),
    StructField("district", StringType(), True),
    StructField("ward", StringType(), True),
    StructField("community_area", StringType(), True),
    StructField("fbi_code", StringType(), True),
    StructField("x_coordinate", StringType(), True),
    StructField("y_coordinate", StringType(), True),
    StructField("year", StringType(), True),
    StructField("updated_on", TimestampType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("location", StructType([
        StructField("latitude", StringType(), True),
        StructField("longitude", StringType(), True),
        StructField("human_address", StringType(), True)
    ]), True)
])

# Parse JSON with updated schema
df_parsed = df_kafka \
    .selectExpr("CAST(value AS STRING) as raw_json") \
    .select(from_json(col("raw_json"), schema).alias("data")) \
    .select("data.*") \
    .withColumnRenamed("id", "ID") \
    .withColumnRenamed("case_number", "Case Number") \
    .withColumnRenamed("date", "Date") \
    .withColumnRenamed("block", "Block") \
    .withColumnRenamed("iucr", "IUCR") \
    .withColumnRenamed("primary_type", "Primary Type") \
    .withColumnRenamed("description", "Description") \
    .withColumnRenamed("location_description", "Location Description") \
    .withColumnRenamed("community_area", "Community Area") \
    .withColumnRenamed("fbi_code", "FBI Code") \
    .withColumnRenamed("x_coordinate", "X Coordinate") \
    .withColumnRenamed("y_coordinate", "Y Coordinate") \
    .withColumnRenamed("updated_on", "Updated On") \
    .withColumn("latitude", col("latitude").cast("double")) \
    .withColumn("longitude", col("longitude").cast("double")) \
    

# Verify the parsing worked
df_parsed.show(5)

25/05/22 09:59:09 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-------------------+------------+-------------+--------------------+
|      ID|Case Number|               Date|               Block|IUCR|       Primary Type|         Description|Location Description|arrest|domestic|beat|district|ward|Community Area|FBI Code|X Coordinate|Y Coordinate|year|         Updated On|    latitude|    longitude|            location|
+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-------------------+------------+-------------+--------------------+
|13831323|   JJ249309|2025-05-10 09:41:00|   072XX N DAMEN AVE|0460|            BATTERY|              SIMPLE|           APARTMENT|  t

In [39]:
# First rename columns in df_parsed to match df's column names
df_parsed_aligned = df_parsed \
    .withColumnRenamed("id", "ID") \
    .withColumnRenamed("case_number", "Case Number") \
    .withColumnRenamed("date", "Date") \
    .withColumnRenamed("block", "Block") \
    .withColumnRenamed("iucr", "IUCR") \
    .withColumnRenamed("primary_type", "Primary Type") \
    .withColumnRenamed("description", "Description") \
    .withColumnRenamed("location_description", "Location Description") \
    .withColumnRenamed("community_area", "Community Area") \
    .withColumnRenamed("fbi_code", "FBI Code") \
    .withColumnRenamed("x_coordinate", "X Coordinate") \
    .withColumnRenamed("y_coordinate", "Y Coordinate") \
    .withColumnRenamed("updated_on", "Updated On")

# Combine the DataFrames using unionByName
combined_df = df1.unionByName(df_parsed_aligned)

# Show the count before and after combining
print(f"Original df count: {df.count()}")
print(f"Parsed df count: {df_parsed.count()}")
print(f"Combined df count: {combined_df.count()}")

# Verify the combined data
combined_df.show(5)



Original df count: 1259460


25/05/18 09:31:44 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/05/18 09:31:49 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


Parsed df count: 245389


25/05/18 09:31:54 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


Combined df count: 1504849
+--------+-----------+--------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|      ID|Case Number|                Date|               Block|IUCR|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+--------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|13829197|   JJ246652|05/08/2025 12:00:...|     040XX W LAKE ST|1310|   CRIMINAL DAMAGE|         TO PRO

In [48]:
output_path = f"gs://{gcs_bucket}/US-chicago/2020To-Present"
filtered_df.coalesce(1) \
    .write \
    .format("csv") \
    .mode("overwrite") \
    .option("header", "true") \
    .option("compression", "none") \
    .save(output_path)

                                                                                

In [49]:
filtered_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: boolean (nullable = true)
 |-- Domestic: boolean (nullable = true)
 |-- Beat: integer (nullable = true)
 |-- District: integer (nullable = true)
 |-- Ward: integer (nullable = true)
 |-- Community Area: integer (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: integer (nullable = true)
 |-- Y Coordinate: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Updated On: timestamp (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)



In [None]:
## append file to bucket

# output_path = f"gs://{gcs_bucket}/Chicago.csv"
# df_parsed_aligned.coalesce(1) \
#     .write \
#     .format("csv") \
#     .mode("append") \
#     .option("header", "false") \
#     .option("compression", "none") \
#     .save(output_path)

In [5]:
df_kafka = spark.read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "chicago_crimes_test1") \
        .option("startingOffsets", "earliest") \
        .load()

In [19]:
schema = StructType([
    StructField("id", StringType(), True),
    StructField("case_number", StringType(), True),
    StructField("date", StringType(), True),
    StructField("block", StringType(), True),
    StructField("iucr", StringType(), True),
    StructField("primary_type", StringType(), True),
    StructField("description", StringType(), True),
    StructField("location_description", StringType(), True),
    StructField("arrest", BooleanType(), True),
    StructField("domestic", BooleanType(), True),
    StructField("beat", StringType(), True),
    StructField("district", StringType(), True),
    StructField("ward", StringType(), True),
    StructField("community_area", StringType(), True),
    StructField("fbi_code", StringType(), True),
    StructField("x_coordinate", StringType(), True),  # Changed to StringType
    StructField("y_coordinate", StringType(), True),  # Changed to StringType
    StructField("year", StringType(), True),         # Changed to StringType
    StructField("updated_on", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("location", StructType([
        StructField("latitude", StringType(), True),
        StructField("longitude", StringType(), True),
        StructField("human_address", StringType(), True)
    ]), True)
])

In [27]:
df_transformed = df_kafka \
    .selectExpr("CAST(value AS STRING) as raw_json") \
    .select(from_json(col("raw_json"), schema).alias("data")) \
    .select("data.*") \
    .withColumnRenamed("id", "ID") \
    .withColumnRenamed("case_number", "Case Number") \
    .withColumnRenamed("date", "Date") \
    .withColumnRenamed("block", "Block") \
    .withColumnRenamed("iucr", "IUCR") \
    .withColumnRenamed("primary_type", "Primary Type") \
    .withColumnRenamed("description", "Description") \
    .withColumnRenamed("location_description", "Location Description") \
    .withColumnRenamed("community_area", "Community Area") \
    .withColumnRenamed("fbi_code", "FBI Code") \
    .withColumnRenamed("x_coordinate", "X Coordinate") \
    .withColumnRenamed("y_coordinate", "Y Coordinate") \
    .withColumnRenamed("updated_on", "Updated On") \
    .withColumn("Updated On", to_timestamp("Updated On")) \
    .withColumn("Date", to_timestamp("Date")) \
    .withColumn("latitude", col("latitude").cast("double")) \
    .withColumn("longitude", col("longitude").cast("double"))

In [31]:
formatted_df = df_transformed.select(
    col("ID"),
    col("Case Number"),
    date_format(col("Date"), "yyyy-MM-dd HH:mm:ss").alias("Date"),
    col("Block"),
    col("IUCR"),
    col("Primary Type"),
    col("Description"),
    col("Location Description"),
    col("arrest"),
    col("domestic"),
    col("beat"),
    col("district"),
    col("ward"),
    col("Community Area"),
    col("FBI Code"),
    # Handle coordinates without casting since they're already strings
    col("X Coordinate"),
    col("Y Coordinate"),
    col("year"),
    date_format(col("Updated On"), "yyyy-MM-dd HH:mm:ss").alias("Updated On"),
    col("latitude"),
    col("longitude"),
    # Handle location with proper struct type
    when(col("location").isNotNull(), 
        to_json(struct(
            coalesce(col("location.latitude"), col("latitude")).alias("latitude"),
            coalesce(col("location.longitude"), col("longitude")).alias("longitude"),
            coalesce(col("location.human_address"), 
                    lit("{\"address\": \"\", \"city\": \"\", \"state\": \"\", \"zip\": \"\"}"))
                .alias("human_address")
        ))
    ).alias("location")
)

In [32]:
formatted_df.show()

25/05/19 15:44:33 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-------------------+------------+-------------+--------------------+
|      ID|Case Number|               Date|               Block|IUCR|       Primary Type|         Description|Location Description|arrest|domestic|beat|district|ward|Community Area|FBI Code|X Coordinate|Y Coordinate|year|         Updated On|    latitude|    longitude|            location|
+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+-------------------+------------+-------------+--------------------+
|13831737|   JJ249165|2025-05-09 16:09:00|   0000X E Grand Ave|1150| DECEPTIVE PRACTICE|   CREDIT CARD FRAUD|    DEPARTMENT STORE| fa