In [1]:
import pyspark
from pyspark.sql import SparkSession

import pandas as pd

In [2]:
from pyspark.sql import types
from pyspark.sql import functions as F

In [3]:
spark = SparkSession.builder \
        .master("local[*]") \
        .appName('test') \
        .getOrCreate()

spark_df = spark.read \
            .format('csv') \
            .option('header', 'true') \
            .option('inferSchema','true') \
            .load("../../1_ingestion/data/dataframe.csv")  # just writing for reference, add your own absolute path

In [4]:
spark_df.printSchema()

root
 |-- CRASH DATE: string (nullable = true)
 |-- CRASH TIME: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- ZIP CODE: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- ON STREET NAME: string (nullable = true)
 |-- CROSS STREET NAME: string (nullable = true)
 |-- OFF STREET NAME: string (nullable = true)
 |-- NUMBER OF PERSONS INJURED: string (nullable = true)
 |-- NUMBER OF PERSONS KILLED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS INJURED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS KILLED: integer (nullable = true)
 |-- NUMBER OF CYCLIST INJURED: integer (nullable = true)
 |-- NUMBER OF CYCLIST KILLED: string (nullable = true)
 |-- NUMBER OF MOTORIST INJURED: string (nullable = true)
 |-- NUMBER OF MOTORIST KILLED: integer (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 1: string (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 2: strin

In [5]:
columns_to_drop = ['LATITUDE','LONGITUDE','LOCATION','CROSS STREET NAME','OFF STREET NAME','CONTRIBUTING FACTOR VEHICLE 2','CONTRIBUTING FACTOR VEHICLE 3','CONTRIBUTING FACTOR VEHICLE 4','CONTRIBUTING FACTOR VEHICLE 5','VEHICLE TYPE CODE 2','VEHICLE TYPE CODE 3','VEHICLE TYPE CODE 4','VEHICLE TYPE CODE 5']

spark_df = spark_df.drop(*columns_to_drop)

In [6]:
spark_df.show()

+----------+----------+---------+--------+--------------------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+------------+--------------------+
|CRASH DATE|CRASH TIME|  BOROUGH|ZIP CODE|      ON STREET NAME|NUMBER OF PERSONS INJURED|NUMBER OF PERSONS KILLED|NUMBER OF PEDESTRIANS INJURED|NUMBER OF PEDESTRIANS KILLED|NUMBER OF CYCLIST INJURED|NUMBER OF CYCLIST KILLED|NUMBER OF MOTORIST INJURED|NUMBER OF MOTORIST KILLED|CONTRIBUTING FACTOR VEHICLE 1|COLLISION_ID| VEHICLE TYPE CODE 1|
+----------+----------+---------+--------+--------------------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+---------

In [7]:
spark_df.count()

2076253

In [8]:
spark_df.columns

['CRASH DATE',
 'CRASH TIME',
 'BOROUGH',
 'ZIP CODE',
 'ON STREET NAME',
 'NUMBER OF PERSONS INJURED',
 'NUMBER OF PERSONS KILLED',
 'NUMBER OF PEDESTRIANS INJURED',
 'NUMBER OF PEDESTRIANS KILLED',
 'NUMBER OF CYCLIST INJURED',
 'NUMBER OF CYCLIST KILLED',
 'NUMBER OF MOTORIST INJURED',
 'NUMBER OF MOTORIST KILLED',
 'CONTRIBUTING FACTOR VEHICLE 1',
 'COLLISION_ID',
 'VEHICLE TYPE CODE 1']

In [9]:
BUCKET = 'raw-bucket-911'
FILE_NAME = 'proccessed_collisions'
OUTPUT_PATH = "C:/Users/adity/Downloads/dataframe"

for column in spark_df.columns:
    spark_df = spark_df \
            .withColumnRenamed(column, column.lower().replace(' ', '_'))
    
spark_df = spark_df \
        .withColumnRenamed('on_street_name', 'street_name') \
        .withColumnRenamed('number_of_persons_injured', 'persons_injured') \
        .withColumnRenamed('number_of_persons_killed', 'persons_killed') \
        .withColumnRenamed('number_of_pedestrians_injured', 'pedestrians_injured') \
        .withColumnRenamed('number_of_pedestrians_killed', 'pedestrians_killed') \
        .withColumnRenamed('number_of_cyclist_injured', 'cyclists_injured') \
        .withColumnRenamed('number_of_cyclist_killed', 'cyclists_killed') \
        .withColumnRenamed('number_of_motorist_injured', 'motorists_injured') \
        .withColumnRenamed('number_of_motorist_killed', 'motorists_killed') \
        .withColumnRenamed('contributing_factor_vehicle_1', 'contributing_factor') \
        .withColumnRenamed('vehicle_type_code_1', 'vehicle_type')

spark_df = spark_df.withColumn("borough", F.col("borough").cast("string")) \
    .withColumn("street_name", F.col("street_name").cast("string")) \
    .withColumn("contributing_factor", F.col("contributing_factor").cast("string")) \
    .withColumn("vehicle_type", F.col("vehicle_type").cast("string")) \
    .withColumn('crash_time', F.date_format(F.col('crash_time'), 'HH:mm:ss')) \
    .withColumn("crash_date", F.to_date(F.col("crash_date"),"MM/dd/yyyy")) \
    .withColumn('timestamp', F.to_timestamp(F.concat(F.col('crash_date'), F.lit(' '), F.col('crash_time')), 'yyyy-MM-dd HH:mm:ss')) \
    .withColumn("zip_code", F.col("zip_code").cast("int")) \
    .withColumn("persons_injured", F.col("persons_injured").cast("int")) \
    .withColumn("persons_killed", F.col("persons_killed").cast("int")) \
    .withColumn("pedestrians_injured", F.col("pedestrians_injured").cast("int")) \
    .withColumn("pedestrians_killed", F.col("pedestrians_killed").cast("int")) \
    .withColumn("cyclists_injured", F.col("cyclists_injured").cast("int")) \
    .withColumn("cyclists_killed", F.col("cyclists_killed").cast("int")) \
    .withColumn("motorists_injured", F.col("motorists_injured").cast("int")) \
    .withColumn("motorists_killed", F.col("motorists_killed").cast("int")) \
    .withColumn("collision_id", F.col("collision_id").cast("int"))

spark_df = spark_df.na.drop()

In [10]:
spark_df.show()

+----------+----------+-------------+--------+--------------------+---------------+--------------+-------------------+------------------+----------------+---------------+-----------------+----------------+--------------------+------------+--------------------+-------------------+
|crash_date|crash_time|      borough|zip_code|         street_name|persons_injured|persons_killed|pedestrians_injured|pedestrians_killed|cyclists_injured|cyclists_killed|motorists_injured|motorists_killed| contributing_factor|collision_id|        vehicle_type|          timestamp|
+----------+----------+-------------+--------+--------------------+---------------+--------------+-------------------+------------------+----------------+---------------+-----------------+----------------+--------------------+------------+--------------------+-------------------+
|2021-12-14|  14:58:00|    MANHATTAN|   10017|            3 AVENUE|              0|             0|                  0|                 0|               0|   

In [11]:
spark_df.count()

1107159

In [12]:
spark_df.printSchema()

root
 |-- crash_date: date (nullable = true)
 |-- crash_time: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- zip_code: integer (nullable = true)
 |-- street_name: string (nullable = true)
 |-- persons_injured: integer (nullable = true)
 |-- persons_killed: integer (nullable = true)
 |-- pedestrians_injured: integer (nullable = true)
 |-- pedestrians_killed: integer (nullable = true)
 |-- cyclists_injured: integer (nullable = true)
 |-- cyclists_killed: integer (nullable = true)
 |-- motorists_injured: integer (nullable = true)
 |-- motorists_killed: integer (nullable = true)
 |-- contributing_factor: string (nullable = true)
 |-- collision_id: integer (nullable = true)
 |-- vehicle_type: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [13]:
spark_df.repartition(1).write.parquet(OUTPUT_PATH, mode='overwrite')

In [14]:
df = spark.read.parquet(OUTPUT_PATH)

In [15]:
df.show()

+----------+----------+-------------+--------+--------------------+---------------+--------------+-------------------+------------------+----------------+---------------+-----------------+----------------+--------------------+------------+--------------------+-------------------+
|crash_date|crash_time|      borough|zip_code|         street_name|persons_injured|persons_killed|pedestrians_injured|pedestrians_killed|cyclists_injured|cyclists_killed|motorists_injured|motorists_killed| contributing_factor|collision_id|        vehicle_type|          timestamp|
+----------+----------+-------------+--------+--------------------+---------------+--------------+-------------------+------------------+----------------+---------------+-----------------+----------------+--------------------+------------+--------------------+-------------------+
|2021-12-14|  14:58:00|    MANHATTAN|   10017|            3 AVENUE|              0|             0|                  0|                 0|               0|   

In [16]:
df.count()

1107159

In [17]:
df.printSchema()

root
 |-- crash_date: date (nullable = true)
 |-- crash_time: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- zip_code: integer (nullable = true)
 |-- street_name: string (nullable = true)
 |-- persons_injured: integer (nullable = true)
 |-- persons_killed: integer (nullable = true)
 |-- pedestrians_injured: integer (nullable = true)
 |-- pedestrians_killed: integer (nullable = true)
 |-- cyclists_injured: integer (nullable = true)
 |-- cyclists_killed: integer (nullable = true)
 |-- motorists_injured: integer (nullable = true)
 |-- motorists_killed: integer (nullable = true)
 |-- contributing_factor: string (nullable = true)
 |-- collision_id: integer (nullable = true)
 |-- vehicle_type: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [18]:
spark.stop()