## Re-reading the data written by the last ipynb file

In [None]:
# Importing all the required toos and some functions like regex and cleaning
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnan, udf, lit, lower, regexp_extract, count, array, coalesce
from pyspark.sql.types import StringType, IntegerType, BooleanType
import re

In [None]:
# Now we start 3rd session for spark
spark = SparkSession.builder.appName("DataCleaning3").getOrCreate()

25/04/13 18:24:17 WARN Utils: Your hostname, Huis-Surface-Laptop-3.local resolves to a loopback address: 127.0.0.1; using 192.168.1.211 instead (on interface en0)
25/04/13 18:24:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/13 18:24:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
# Now we load the data written by the last ipynb file
df = spark.read.option("header", True).csv("Intermediate2")

## 9 B. Filling Missing Values for BOROUGH

In [None]:
# Here we define the NYC boroughs and then a user defined function to find where it is located in
boroughs = ['brooklyn', 'manhattan', 'queens', 'bronx', 'staten island']

def find_borough(address):
    if address is None:
        return None
    address = address.lower()
    for borough in boroughs:
        if borough in address:
            return borough.upper()
    return None

find_borough_udf = udf(find_borough, StringType())

#Now we fill the missing information found in the address
df = df.withColumn("BOROUGH", coalesce(col("BOROUGH"), find_borough_udf(col("Location"))))

## 9 C. Filling Missing Values for LATITUDE and LONGITUDE

In [None]:
# Now filling LATITUDE and LONGITUDE by using the values.
df = df.withColumn("LATITUDE", coalesce(col("LATITUDE"), col("NLat")))
df = df.withColumn("LONGITUDE", coalesce(col("LONGITUDE"), col("NLong")))

## 9 D. Extracting ZIP CODE from Location

In [None]:
# Now we define a function to find zip code from the location feild
def extract_zip_code(location):
    if location is None:
        return None
    match = re.search(r'\b\d{5}\b', location)
    return int(match.group(0)) if match else None

extract_zip_code_udf = udf(extract_zip_code, IntegerType())

# Filling the ZIP CODE extracted from the location column
df = df.withColumn("ZIP CODE",
    when(col("ZIP CODE") == 0, extract_zip_code_udf(col("Location"))).otherwise(col("ZIP CODE"))
)

## 5 B.Removing Duplicate Rows

In [None]:
# Here we list all the columns we dont need.
columns_to_exclude = ['CRASH DATE', 'CRASH TIME', 'NeedToProcess', 'Addresses', 'NLat', 'NLong', 'Location']
valid_location_mask = col("LATITUDE").isNotNull() & col("LONGITUDE").isNotNull()

# Now we keep the rows with a valid coordinate
columns_to_count = [col for col in df.columns if col not in columns_to_exclude]
df = df.filter(valid_location_mask).withColumn("non_null_count",
    sum([when(col(c).isNotNull(), 1).otherwise(0) for c in columns_to_count])
)

# Now we keep only which has all the details
df = df.orderBy(["CRASH DATE & TIME", "LATITUDE", "LONGITUDE", col("non_null_count").desc()])
df = df.dropDuplicates(["CRASH DATE & TIME", "LATITUDE", "LONGITUDE"]).drop("non_null_count")

## 10. Standardizing Vehicle Types

In [None]:
# Now we group vehicle types so as to standardize similar names
vehicle_mappings = {
    'sedan': ['4dsd', '2dsd', 'sedan'],
    'suv': ['suv', 'suburban'],
    'truck': ['pickup', 'flatbed'],
}

def clean_vehicle_type(value):
    if value is None:
        return None
    value = value.lower().strip()
    for standard_type, variations in vehicle_mappings.items():
        if value in variations:
            return standard_type
    return value

clean_vehicle_type_udf = udf(clean_vehicle_type, StringType())

# Apply UDF to all the columns
vehicle_columns = ['VEHICLE TYPE CODE 1', 'VEHICLE TYPE CODE 2', 'VEHICLE TYPE CODE 3', 'VEHICLE TYPE CODE 4', 'VEHICLE TYPE CODE 5']
for col_name in vehicle_columns:
    df = df.withColumn(col_name, clean_vehicle_type_udf(col(col_name)))

## 8 B. Dropping Rows Outside New York

In [None]:
# We limit the data only to new york
lat_min, lat_max = 40.4774, 40.9176
lon_min, lon_max = -74.2591, -73.7004

# Only keep rows that are in the defined aread
df = df.filter((col("LATITUDE") >= lat_min) & (col("LATITUDE") <= lat_max) &
               (col("LONGITUDE") >= lon_min) & (col("LONGITUDE") <= lon_max))

## Saving the Cleaned Data

In [None]:
# Now we save the final cleaned data to a new folder called "Intermediate3"
df.write.mode("overwrite").option("header", True).csv("Intermediate3")

25/04/13 18:24:21 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


CodeCache: size=131072Kb used=26485Kb max_used=26875Kb free=104586Kb
 bounds [0x00000001091e0000, 0x000000010ac50000, 0x00000001111e0000]
 total_blobs=9733 nmethods=8801 adapters=842
 compilation: disabled (not enough contiguous free space left)


                                                                                