In [0]:
%run ./Connection

Adls_raw_path : abfs://bronze@nycdbda.dfs.core.windows.net/
Adls_silver_path : abfs://silver@nycdbda.dfs.core.windows.net/


In [0]:
data1 = spark.read.csv(f"{Adls_raw_path}/Parking_Violations_Issued_Fiscal_Year_2020.csv",header='True').fillna('')
data2 = spark.read.csv(f"{Adls_raw_path}/Parking_Violations_Issued_Fiscal_Year_2021.csv",header='True').fillna('')
data3 = spark.read.csv(f"{Adls_raw_path}/Parking_Violations_Issued_Fiscal_Year_2022.csv",header='True').fillna('')
data4 = spark.read.csv(f"{Adls_raw_path}/Parking_Violations_Issued_Fiscal_Year_2023.csv",header='True').fillna('')
data5 = spark.read.csv(f"{Adls_raw_path}/Parking_Violations_Issued_Fiscal_Year_2024.csv",header='True').fillna('')

In [0]:
data = data1.union(data2).union(data3).union(data4).union(data5)

In [0]:
nyc = data.toDF(*(col.replace(" ","_") for col in data.columns))
nyc.createOrReplaceTempView("data")
# data.count()

In [0]:
nyc_columns = spark.sql("""SELECT DISTINCT Summons_Number,Plate_ID,Registration_State,Plate_Type,
                         LEFT(replace(Issue_Date,'/','') ,8) AS Issue_Date,Violation_Code,
                         Vehicle_Body_Type,Vehicle_Make,Issuer_Precinct,Issuer_Code,
                         Violation_Time,Violation_County
                         FROM nyc""")

In [0]:
#Converting Issued_Date column string datatype to date datatype 
from pyspark.sql.functions import *
nyc = nyc_columns.withColumn("Issue_Date", to_date('Issue_Date','MMddyyyy'))

In [0]:
nyc.createOrReplaceTempView("data")

In [0]:
#Extracting data for fiscal year 2019-06-01 to 2024-01-15
nyc = spark.sql(f"""
                    SELECT * FROM data WHERE Issue_Date >= '2019-06-01' AND Issue_Date <= '2024-01-15'
                 """)

In [0]:
nyc.count()

27507503

In [0]:
nyc.show()

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+---------------+-----------+--------------+----------------+
|Summons_Number|Plate_ID|Registration_State|Plate_Type|Issue_Date|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Issuer_Precinct|Issuer_Code|Violation_Time|Violation_County|
+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+---------------+-----------+--------------+----------------+
|    1447620008|  83ST36|                NY|       MOT|2019-06-01|            24|                 |       TRIUM|            835|        317|         0445P|               Q|
|    1447619997|  JFVI75|                FL|       PAS|2019-06-01|            24|              VAN|        CHRY|            835|        317|         1115P|               Q|
|    1447619961| IAMBEAN|                NY|       SRF|2019-06-01|            24|             SUBN|        JEEP|            835|       

In [0]:
#checking null values
from pyspark.sql.functions import col

for i in nyc.columns:
    print(i,(20-len(i))*"-",((nyc.filter(nyc[i].isNull()).count())/nyc.count())*100)

Summons_Number ------ 0.0
Plate_ID ------------ 0.0
Registration_State -- 0.0
Plate_Type ---------- 0.0
Issue_Date ---------- 0.0
Violation_Code ------ 0.0
Vehicle_Body_Type --- 0.0
Vehicle_Make -------- 0.0
Issuer_Precinct ----- 0.0
Issuer_Code --------- 0.0
Violation_Time ------ 0.0
Violation_County ---- 0.0


In [0]:
#Dropping null values from data 
nyc = nyc.na.drop()

In [0]:
# Check for the number of rows 
nyc.count()

27507503

In [0]:
#Replacihg the state named 99 with NY, as NY has the maximum violations.

nyc = nyc.withColumn('Registration_State', regexp_replace('Registration_State', '99', 'NY'))

In [0]:
#Removing the rows containing value as BLANKPLATE for plate_id 
nyc = nyc[nyc.Plate_ID != 'BLANKPLATE'] 

In [0]:
#Removing the rows containing value as 999 for plate_id 
nyc = nyc[nyc.Plate_ID != '999']

In [0]:
#Codes other than those between 1 and 99 are invalid so removing rows with 0 as violation code
nyc = nyc[nyc.Violation_Code != 0 ]

In [0]:
#Issuing Precinct having invalid entry So removing from columns
nyc = nyc[nyc.Issuer_Precinct != 0 ]

In [0]:
from pyspark.sql.functions import col, when, unix_timestamp

# Extracting hours, minutes, and AM/PM values from Violation Time column
nyc = nyc.withColumn("violation_hour", col("Violation_Time").substr(1,2).cast("int"))
nyc = nyc.withColumn("violation_minutes", col("Violation_Time").substr(3,2).cast("int"))
nyc = nyc.withColumn("violation_ampm", col("Violation_Time").substr(5,1))

# Converting AM/PM time to absolute hours ranging from 0-23
nyc = nyc.withColumn("violation_hour", 
                   when((col("violation_hour") == 12) & (col("violation_ampm") == 'A'), 0)
                   .when((col("violation_hour") != 12) & (col("violation_ampm") == 'P'), col("violation_hour") + 12)
                   .otherwise(col("violation_hour")))

# After extracting the information, these columns are not required and hence dropped
nyc = nyc.drop("Violation_Time", "violation_ampm")

# Printing the schema now
nyc.printSchema()

root
 |-- Summons_Number: string (nullable = false)
 |-- Plate_ID: string (nullable = false)
 |-- Registration_State: string (nullable = false)
 |-- Plate_Type: string (nullable = false)
 |-- Issue_Date: date (nullable = true)
 |-- Violation_Code: string (nullable = false)
 |-- Vehicle_Body_Type: string (nullable = false)
 |-- Vehicle_Make: string (nullable = false)
 |-- Issuer_Precinct: string (nullable = false)
 |-- Issuer_Code: string (nullable = false)
 |-- Violation_County: string (nullable = false)
 |-- violation_hour: integer (nullable = true)
 |-- violation_minutes: integer (nullable = true)



In [0]:
nyc.count()

15559573

In [0]:
nyc.show()

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+---------------+-----------+----------------+--------------+-----------------+
|Summons_Number|Plate_ID|Registration_State|Plate_Type|Issue_Date|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Issuer_Precinct|Issuer_Code|Violation_County|violation_hour|violation_minutes|
+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+---------------+-----------+----------------+--------------+-----------------+
|    1455214498| HMW4232|                NY|       PAS|2019-06-03|            20|              SDN|       SUBAR|              1|     160474|              NY|            18|               38|
|    1446885938| 71639MM|                NY|       COM|2019-06-04|            14|              VAN|        KERB|              1|     160491|              NY|            15|               28|
|    1455244922| DAX8354|                NY| 

In [0]:
nyc.write.format("delta").mode('overWrite').option('overwriteSchema', 'true').save(f"{Adls_silver_path}/finalnyc/nycparking/track2/")