# Prototyping - Preprocessing - Dataflow
### Initialize PySpark Session

In [None]:
from pyspark.sql import SparkSession
import os
import findspark

os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3'

findspark.init()
spark = SparkSession.builder.appName("CSV to Parquet").config("spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()
spark.conf.set("spark.sql.debug.maxToStringFields", 1000)

### Get all CSV files from HDFS

In [None]:
def getRawCsvDataFiles():
    import subprocess
    p = subprocess.Popen("hdfs dfs -ls -d /parkingviolations/rawdata/* | awk '{print $8}' ",
                         shell=True,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.STDOUT)
    
    csv_files = []
    
    for line in p.stdout.readlines():
        csv_files.append(line.decode().strip())
    
    p.wait()
    return csv_files

### Define Parquet schema

In [1]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
def getSchema():
    return StructType([
        StructField("summons_number", IntegerType()),
        StructField("plate_id", StringType()),
        StructField("registration_state", StringType()),
        StructField("plate_type", StringType()),
        StructField("issue_date", DateType()),
        StructField("violation_code", IntegerType()),
        StructField("vehicle_body_type", StringType()),
        StructField("vehicle_make", StringType()),
        StructField("issuing_agency", StringType()),
        StructField("street_code1", IntegerType()),
        StructField("street_code2", IntegerType()),
        StructField("street_code3", IntegerType()),
        StructField("vehicle_expiration_date", IntegerType()),
        StructField("violation_location", StringType()),
        StructField("violation_precinct", IntegerType()),
        StructField("issuer_precinct", IntegerType()),
        StructField("issuer_code", IntegerType()),
        StructField("issuer_command", StringType()),
        StructField("issuer_squad", StringType()),
        StructField("violation_time", StringType()),
        StructField("time_first_observed", StringType()),
        StructField("violation_county", StringType()),
        StructField("violation_in_front_of_or_opposite", StringType()),
        StructField("house_number", StringType()),
        StructField("street_name", StringType()),
        StructField("intersecting_street", StringType()),
        StructField("date_first_observed", IntegerType()),
        StructField("law_section", IntegerType()),
        StructField("sub_division", StringType()),
        StructField("violation_legal_code", StringType()),
        StructField("days_parking_in_effect", StringType()),
        StructField("from_hours_in_effect", StringType()),
        StructField("to_hours_in_effect", StringType()),
        StructField("vehicle_color", StringType()),
        StructField("unregistered_vehicle", StringType()),
        StructField("vehicle_year", IntegerType()),
        StructField("meter_number", StringType()),
        StructField("feet_from_curb", IntegerType()),
        StructField("violation_post_code", StringType()),
        StructField("no_standing_or_stopping_violation", StringType()),
        StructField("hydrant_violation", StringType()),
        StructField("double_parking_violation", StringType())
    ])

### Get Further Violation Information

[Codes-Mapping.xlsx](https://data.cityofnewyork.us/api/views/pvqr-7yc4/files/7875fa68-3a29-4825-9dfb-63ef30576f9e?download=true&filename=ParkingViolationCodes_January2020.xlsx) contains the violation descriptions and the fine amounts for the areas `manhattan_96th_st_below` and `all_other_areas`. 

Since it is challenging to determine the exact area where each violation occurred, we decided to use the mean value between the fine amounts for these two areas. 

The difference in fine amounts between these areas is minimal, so we consider this approach to be an acceptable approximation.  

In [None]:
import pandas as pd

def get_violation_codes_df():
    pandas_df = pd.read_excel("../1_Data/Codes-Mapping.xlsx", skiprows=1)
    pandas_df.columns = ['violation_code', 'violation_description', 'manhattan_96th_st_below', 'all_other_areas']

    pandas_df['fine_amount'] = pandas_df[['manhattan_96th_st_below', 'all_other_areas']].mean(axis=1)
    
    return spark.createDataFrame(pandas_df)

## Combine violation time and issue date into one timestamp

The column `violation_time` has following format: 1059A, 1145P. 
The suffix `A` stands for AM and `P` stands for PM.

We want to convert this to the clearer format `yyyy-MM-dd HHmm`. After reformatting the entries, we save them to the new column `issue_datetime`.

~510'000 entries don't have any violation time. This is about 0.4% of all entries. 
In the cases, where we don't have any violation time, we set the timestamp to the known date and set the time to midnight. 
Since this deviation is minor, we accept it. 

In [None]:
from pyspark.sql.functions import col, to_timestamp, lit, when, concat, regexp_extract, lpad

pattern = r'^(0[1-9]|1[0-2])([0-5][0-9])[APap]$'

def clean_and_combine_timestamp(df):
    # Replace broken data with 0000A (Midnight).
    df = df.withColumn(
        "cleaned_violation_time",
        when(
            regexp_extract(col("violation_time"), pattern, 0) == "",
            lit("0000A")
        ).otherwise(col("violation_time"))
    )
    
    # 12 to 24 Hour format conversion.
    df = df.withColumn(
        "formatted_violation_time",
        concat(
            col("issue_date").cast("string"),
            lit(" "),
            when(
                col("cleaned_violation_time").substr(-1, 1) == "P",
                (col("cleaned_violation_time").substr(1, 2).cast("int") + 12) % 24
            ).otherwise(
                lpad(col("cleaned_violation_time").substr(1, 2), 2, "0")
            ).cast("string"),
            col("cleaned_violation_time").substr(3, 2)
        )
    )

    # Additional Column.
    df = df.withColumn(
        "issue_datetime",
        to_timestamp(col("formatted_violation_time"), "yyyy-MM-dd HHmm")
    )

    # Cleanup.
    df = df.drop("cleaned_violation_time", "formatted_violation_time")
    
    return df

### Transform CSV to dataframe

In [None]:
from pyspark.sql.functions import col, to_timestamp, lit, when, concat, regexp_extract

def transform_csv_to_df(csv_file):
    schema = getSchema()

    # Read csv with schema.
    df_raw = spark.read.csv(csv_file, header=True, schema=schema, dateFormat="MM/DD/YYYY")
    
    # Rename columns to underscore case.
    df = df_raw.select([col(col_name).alias(col_name.lower().replace(' ', '_')) for col_name in df_raw.columns])

    df = clean_and_combine_timestamp(df)
    
    return df

### Combine all dataframes into one

In [None]:
my_dfs = []
for csvFile in getRawCsvDataFiles():
    my_dfs.append(transform_csv_to_df(csvFile))

### Join Further Violation Information

Here we use the foreign key `violation_code` to join the fields `violation_description` and `fine_amount` on top of the dataframe with the parking violations.

This is a good approach to speed up the further data processing, since the data is directly available without the need of an expensive join operation.

In [None]:
from functools import reduce
from pyspark.sql import DataFrame

meta_df = get_violation_codes_df()
df = reduce(DataFrame.unionAll, my_dfs)

df = df.join(meta_df, on="violation_code", how="left")

### Write Parquet to HDFS

- 1 spark executor = 4 cores
- 4 machines x 4 executor x 4 cores = 64 partitions

We partition the whole Parquet into 64 partitions. Like this, every core gets its own partition.

Partitioning the Parquet file into 64 partitions ensures that each core can process its own partition independently. This improves parallel processing efficiency, reduces data shuffling between cores, and optimizes resource utilization across the cluster. By minimizing data movement and allowing each core to work on a specific subset of the data, we achieve faster data processing and better performance.

In [None]:
!hdfs dfs -rm -r /parkingviolations/raw_all.parquet/
df.repartition(64).write.parquet(f"/parkingviolations/raw_all.parquet")

## Stop Spark

In [None]:
spark.stop()