In [1]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
from pyspark.sql.functions import to_date, to_timestamp, col, lit, concat, split, when
import sys 
from pyspark.sql import SparkSession
import pandas as pd

In [2]:
spark = SparkSession.builder \
    .master("spark://10.129.6.139:7077") \
    .appName("Preprocess") \
    .getOrCreate()

In [3]:
# The code in this cell is taken from https://stackoverflow.com/questions/37513355/converting-pandas-dataframe-into-spark-dataframe-error
from pyspark.sql.types import *

# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)

# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
          struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return spark.createDataFrame(pandas_df, p_schema)

In [4]:
# using spark.read.csv doesn't work due to "Description" column, because it has content on some rows that has commas
# -> Spark can't read the file right, but Pandas can, so file is first read into Pandas and converted to spark
df_pd = pd.read_csv("/mnt/bdpa21-group9-pvc/Data/Traffic_Violations.csv", low_memory=False)
df = pandas_to_spark(df_pd)

In [5]:
del df_pd

In [6]:
#Removing redundant features
df = df.drop('Agency',
 'SubAgency',
 'Description',
 'Location',
 'Latitude',
 'Longitude',
 'Accident',
 'Commercial License',
 'State',
 'Make',
 'Model',
 'Charge',
 'Article',
 'Driver City',
 'Driver State',
 'DL State',
 'Arrest Type',
 'Geolocation')

In [7]:
df.printSchema()

root
 |-- Date Of Stop: string (nullable = true)
 |-- Time Of Stop: string (nullable = true)
 |-- Belts: string (nullable = true)
 |-- Personal Injury: string (nullable = true)
 |-- Property Damage: string (nullable = true)
 |-- Fatal: string (nullable = true)
 |-- HAZMAT: string (nullable = true)
 |-- Commercial Vehicle: string (nullable = true)
 |-- Alcohol: string (nullable = true)
 |-- Work Zone: string (nullable = true)
 |-- VehicleType: string (nullable = true)
 |-- Year: float (nullable = true)
 |-- Color: string (nullable = true)
 |-- Violation Type: string (nullable = true)
 |-- Contributed To Accident: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Gender: string (nullable = true)



In [8]:
df.show(1,vertical=True)

-RECORD 0----------------------------------
 Date Of Stop            | 09/24/2013      
 Time Of Stop            | 17:11:00        
 Belts                   | No              
 Personal Injury         | No              
 Property Damage         | No              
 Fatal                   | No              
 HAZMAT                  | No              
 Commercial Vehicle      | No              
 Alcohol                 | No              
 Work Zone               | No              
 VehicleType             | 02 - Automobile 
 Year                    | 2008.0          
 Color                   | BLACK           
 Violation Type          | Citation        
 Contributed To Accident | No              
 Race                    | BLACK           
 Gender                  | M               
only showing top 1 row



In [9]:
df = df.withColumn("Date Of Stop", to_date(col("Date Of Stop"), "MM/dd/yyyy")) # Date to standard form

In [10]:
dtime = concat(col("Date Of Stop"), lit(" "), col("Time Of Stop"))
df = df.withColumn("Datetime Of Stop", to_timestamp(dtime)) # Date and time combined to one Datetime column

In [11]:
df = df.withColumn("VehicleType", split(col("VehicleType"), " ")[2]) # Shortening vehicle type category names

In [12]:
binvars =[
 'Belts',
 'Personal Injury',
 'Property Damage',
 'Fatal',
 'HAZMAT',
 'Commercial Vehicle',
 'Alcohol',
 'Work Zone',
 'Contributed To Accident'
]

def to_binary(df, columnName):
    df = df.withColumn(columnName, when(col(columnName) == "Yes", 1).otherwise(0))
    return df
    
for v in binvars:
    df = to_binary(df, v) # All binary variables mapped so that Yes=1, No=0

In [13]:
# Undefined values turned into NULLs
def replace_with_null(column, value):
    return when(column != value, column).otherwise(lit(None))

df = df.withColumn("Gender", replace_with_null(col("Gender"), "U"))
df = df.withColumn("VehicleType", replace_with_null(col("VehicleType"), "Unknown"))

In [14]:
# Filtering out illogical vehicle manufacturing years
df = df.withColumn("Year", when((col("Year") > 1960) & (col("Year") < 2017), col("Year")).otherwise(lit(None)))

In [15]:
# Gender to binary, 1 = M, 0 = F
df = df.withColumn("Gender", when(col("Gender") == "M", 1).when(col("Gender") == "F", 0))

In [17]:
df.groupby("Gender").count().show()

+------+------+
|Gender| count|
+------+------+
|  null|  1143|
|     1|681330|
|     0|336161|
+------+------+



In [16]:
df = df.drop('Date Of Stop', 'Time Of Stop') # Deleting date and time columns as they have been combined into one
df = df.withColumnRenamed("Datetime Of Stop", "Datetime") # Renaming datetime column

In [17]:
df.printSchema()

root
 |-- Belts: integer (nullable = false)
 |-- Personal Injury: integer (nullable = false)
 |-- Property Damage: integer (nullable = false)
 |-- Fatal: integer (nullable = false)
 |-- HAZMAT: integer (nullable = false)
 |-- Commercial Vehicle: integer (nullable = false)
 |-- Alcohol: integer (nullable = false)
 |-- Work Zone: integer (nullable = false)
 |-- VehicleType: string (nullable = true)
 |-- Year: float (nullable = true)
 |-- Color: string (nullable = true)
 |-- Violation Type: string (nullable = true)
 |-- Contributed To Accident: integer (nullable = false)
 |-- Race: string (nullable = true)
 |-- Gender: integer (nullable = true)
 |-- Datetime: timestamp (nullable = true)



In [18]:
df.show(1,vertical=True)

-RECORD 0--------------------------------------
 Belts                   | 0                   
 Personal Injury         | 0                   
 Property Damage         | 0                   
 Fatal                   | 0                   
 HAZMAT                  | 0                   
 Commercial Vehicle      | 0                   
 Alcohol                 | 0                   
 Work Zone               | 0                   
 VehicleType             | Automobile          
 Year                    | 2008.0              
 Color                   | BLACK               
 Violation Type          | Citation            
 Contributed To Accident | 0                   
 Race                    | BLACK               
 Gender                  | 1                   
 Datetime                | 2013-09-24 17:11:00 
only showing top 1 row



In [19]:
df.write.csv("/mnt/bdpa21-group9-pvc/Data/Traffic_Violations_preprocessed.csv", header=True, mode="Overwrite")

In [20]:
spark.stop()