In [2]:
import os
import numpy as np
import pandas as pd
import shutil
import pyspark


In [3]:
import pyspark.sql.functions as psf
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession 
from pyspark.sql import SparkSession, SQLContext, Window
from pyspark.sql.functions import when, count, col, sum, regexp_replace
from pyspark import SparkContext
from pyspark.sql.types import IntegerType

In [15]:
sparkObj = pyspark.sql.SparkSession.builder.appName("Assignment2-NYC").getOrCreate()
sparkObj.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [16]:
sparkObj

In [17]:

nyParkingData = sparkObj.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('Parking_Violations_Issued_-_Fiscal_Year_2023_20231115.csv')



In [18]:
nyParkingData.show(n=2, truncate=False, vertical=True)

-RECORD 0---------------------------------------
 Summons Number                    | 1484697303 
 Plate ID                          | JER1863    
 Registration State                | NY         
 Plate Type                        | PAS        
 Issue Date                        | 06/10/2022 
 Violation Code                    | 67         
 Vehicle Body Type                 | SDN        
 Vehicle Make                      | TOYOT      
 Issuing Agency                    | P          
 Street Code1                      | 34330      
 Street Code2                      | 179        
 Street Code3                      | 0          
 Vehicle Expiration Date           | 20221210   
 Violation Location                | 10         
 Violation Precinct                | 10         
 Issuer Precinct                   | 1          
 Issuer Code                       | 160195     
 Issuer Command                    | 0001       
 Issuer Squad                      | 0000       
 Violation Time     

In [19]:
print("Total Rows: " , nyParkingData.count())
print("Total Columns: " , len(nyParkingData.columns))

Total Rows:  21563502
Total Columns:  43


In [20]:
nyParkingData.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: integer (nullable = true)
 |-- Street Code2: integer (nullable = true)
 |-- Street Code3: integer (nullable = true)
 |-- Vehicle Expiration Date: integer (nullable = true)
 |-- Violation Location: integer (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Issuer Code: integer (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation Coun

In [21]:
nyParkingData.selectExpr("*").show(1, vertical=True)


-RECORD 0---------------------------------------
 Summons Number                    | 1484697303 
 Plate ID                          | JER1863    
 Registration State                | NY         
 Plate Type                        | PAS        
 Issue Date                        | 06/10/2022 
 Violation Code                    | 67         
 Vehicle Body Type                 | SDN        
 Vehicle Make                      | TOYOT      
 Issuing Agency                    | P          
 Street Code1                      | 34330      
 Street Code2                      | 179        
 Street Code3                      | 0          
 Vehicle Expiration Date           | 20221210   
 Violation Location                | 10         
 Violation Precinct                | 10         
 Issuer Precinct                   | 1          
 Issuer Code                       | 160195     
 Issuer Command                    | 0001       
 Issuer Squad                      | 0000       
 Violation Time     

## Null Values in the dataset

In [22]:
from pyspark.sql.functions import isnan, when, count, col
nyParkingData.select([count(when(col(c).isNull(), c)).alias(c) for c in nyParkingData.columns]).show(vertical=True)


-RECORD 0-------------------------------------
 Summons Number                    | 0        
 Plate ID                          | 2        
 Registration State                | 0        
 Plate Type                        | 0        
 Issue Date                        | 0        
 Violation Code                    | 0        
 Vehicle Body Type                 | 51924    
 Vehicle Make                      | 19151    
 Issuing Agency                    | 0        
 Street Code1                      | 0        
 Street Code2                      | 0        
 Street Code3                      | 0        
 Vehicle Expiration Date           | 0        
 Violation Location                | 9838473  
 Violation Precinct                | 0        
 Issuer Precinct                   | 0        
 Issuer Code                       | 0        
 Issuer Command                    | 9829279  
 Issuer Squad                      | 10569424 
 Violation Time                    | 254      
 Time First O

# Pre-processing and Handling Null Values

In [23]:
nyParkingData = nyParkingData.dropna(subset=['Violation Time'])

In [24]:
nyParkingData = nyParkingData.dropna(subset=['Vehicle Body Type'])


In [25]:
nyParkingData = nyParkingData.dropna(subset=['Violation Location'])
nyParkingData = nyParkingData.dropna(subset=['Vehicle Color'])

In [26]:
nyParkingData.select([count(when(col(c).isNull(), c)).alias(c) for c in nyParkingData.columns]).show(vertical=True)


-RECORD 0-------------------------------------
 Summons Number                    | 0        
 Plate ID                          | 1        
 Registration State                | 0        
 Plate Type                        | 0        
 Issue Date                        | 0        
 Violation Code                    | 0        
 Vehicle Body Type                 | 0        
 Vehicle Make                      | 9373     
 Issuing Agency                    | 0        
 Street Code1                      | 0        
 Street Code2                      | 0        
 Street Code3                      | 0        
 Vehicle Expiration Date           | 0        
 Violation Location                | 0        
 Violation Precinct                | 0        
 Issuer Precinct                   | 0        
 Issuer Code                       | 0        
 Issuer Command                    | 0        
 Issuer Squad                      | 737132   
 Violation Time                    | 0        
 Time First O

## Create View for NYC Parking Data

In [27]:
nyParkingData = nyParkingData.withColumn('Issue Year', psf.year(psf.to_date(nyParkingData['Issue Date'], 'MM/dd/yyyy')))
nyParkingData.createOrReplaceTempView("nyParkingDataView")


## Question 1: When are tickets most likely to be issued? 

In [28]:
sparkObj.sql("SELECT `Violation Time`, COUNT(*) AS Ticket_Frequency FROM nyParkingDataView GROUP BY `Violation Time` ORDER BY Ticket_Frequency DESC").show()


+--------------+----------------+
|Violation Time|Ticket_Frequency|
+--------------+----------------+
|         0836A|           35300|
|         0838A|           33070|
|         0839A|           33052|
|         0840A|           32776|
|         0906A|           32194|
|         0837A|           31797|
|         0841A|           31670|
|         0842A|           31067|
|         0843A|           30451|
|         0908A|           30307|
|         0844A|           29968|
|         0845A|           29823|
|         0910A|           29692|
|         0909A|           29676|
|         0907A|           29566|
|         1140A|           29395|
|         0806A|           28904|
|         0846A|           28830|
|         1142A|           28670|
|         1141A|           28614|
+--------------+----------------+
only showing top 20 rows



## Answer 1:
* I have sorted this according to the frequency in descending order and fetched violation time. Hence, at 0836A i.e 8:36 AM, we have maximum violators that is 35300

## Question 2: What are the most common years and types of cars to be ticketed? 

In [29]:

sparkObj.sql("SELECT `Vehicle Body Type` as `Type of Car`,`Issue Year`, COUNT(*) AS `Violation_Count` FROM nyParkingDataView WHERE (`Vehicle Year` > 0) GROUP BY `Vehicle Body Type`, `Issue Year` ORDER BY `Violation_Count` DESC").show(10)


+-----------+----------+---------------+
|Type of Car|Issue Year|Violation_Count|
+-----------+----------+---------------+
|       SUBN|      2023|        2248534|
|       SUBN|      2022|        1493740|
|       4DSD|      2023|        1237369|
|       4DSD|      2022|         887320|
|        VAN|      2023|         712154|
|        VAN|      2022|         489204|
|       PICK|      2023|         159446|
|       DELV|      2023|         136793|
|       PICK|      2022|         106445|
|       DELV|      2022|          98201|
+-----------+----------+---------------+
only showing top 10 rows



## Answer 2:
* Most Common Year is 2023
* And Type of Car is SUBN

## Question 3: Where are tickets most commonly issued

In [30]:
sparkObj.sql("SELECT `Violation Location`, COUNT(*) AS No_of_tickets  FROM nyParkingDataView GROUP BY `Violation Location` ORDER BY count(*) DESC").show(15)

+------------------+-------------+
|Violation Location|No_of_tickets|
+------------------+-------------+
|                19|       543760|
|                13|       444043|
|                 6|       422414|
|               114|       414982|
|                14|       367244|
|                18|       319307|
|                 9|       296111|
|                 1|       287019|
|               109|       270077|
|               115|       247344|
|                20|       231725|
|               108|       228404|
|                70|       217372|
|                84|       212563|
|                10|       206185|
+------------------+-------------+
only showing top 15 rows



## Answer 3: 
* Location with most tickets issued: 19

## Question 4: Which color of the vehicle is most likely to get a ticket ?

In [31]:
sparkObj.sql("SELECT `Vehicle Color`, COUNT(*) as Ticket_Count FROM nyParkingDataView GROUP BY `Vehicle Color` ORDER BY COUNT(*) DESC").show(15)

+-------------+------------+
|Vehicle Color|Ticket_Count|
+-------------+------------+
|           WH|     2227522|
|           GY|     2034985|
|           BK|     1734455|
|        WHITE|     1251084|
|        BLACK|      783294|
|           BL|      709865|
|         GREY|      584562|
|           RD|      401003|
|         BLUE|      281506|
|        SILVE|      273305|
|        BROWN|      269821|
|          RED|      200192|
|           GR|      144667|
|           TN|       79292|
|        OTHER|       73179|
+-------------+------------+
only showing top 15 rows



## Answer 4: 
* Vehicle Color: WH
* Ticket Count for this Vehicle Color: 2227522

In [32]:
sparkObj.stop()

## Based on a K-Means algorithm, please try to answer the following question:

##  Question 5: Given a Black vehicle parking illegally at 34510, 10030, 34050 (street codes). What is the probability that it will get an ticket?

In [4]:
sparkObj = SparkSession.builder \
.appName('Assignment2_NYC-Kmeans') \
.master('local[*]') \
.config('spark.sql.execution.arrow.pyspark.enabled', True) \
.config('spark.sql.session.timeZone', 'UTC') \
.config('spark.driver.memory','32G') \
.config('spark.ui.showConsoleProgress', True) \
.config('spark.sql.repl.eagerEval.enabled', True) \
.getOrCreate()

In [5]:
sparkObj

### Reading the Data

In [6]:


nyParkingData = sparkObj.read.format("csv").option("header", "true").option("inferSchema","true").load('Parking_Violations_Issued_-_Fiscal_Year_2023_20231115.csv').select('Street Code1', 'Street Code2', 'Street Code3', 'Vehicle Color')

nyParkingData = nyParkingData.select(nyParkingData['Street Code1'].cast('float'), nyParkingData['Street Code2'].cast('float'), nyParkingData['Street Code3'].cast('float'), nyParkingData['Vehicle Color'])



In [7]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
import pyspark.sql.functions as psf
import numpy as np

In [8]:
def vectorizeStreetCodes(inputData):
#Vectorize street codes using Spark VectorAssembler.
    return VectorAssembler(inputCols=["Street Code1", "Street Code2", "Street Code3"], outputCol="Street_codes").transform(inputData)

def initializeKmeans(inputData):
#Initialize and fit K-Means clustering.
    kmeans = KMeans(k=4, featuresCol="Street_codes")
    kmeansFitData = kmeans.fit(inputData.select('Street_codes')) 
    clusterCenters = np.array(kmeansFitData.clusterCenters()).astype(float)
    return kmeansFitData.transform(inputData).cache(),clusterCenters

In [9]:
def calculateBlackProbability(inputData, blackColor):

    blackProb = inputData.groupBy('prediction').agg(
        psf.sum(psf.when(psf.col('Vehicle Color').isin(blackColor), 1)).alias('Count'),
        psf.count('Vehicle Color').alias('Total_Cars')
    ).orderBy('prediction')

    return blackProb.select(
        'prediction',
        'Count',
        'Total_Cars',
        (psf.col('Count') / psf.col('Total_Cars')).alias('Probability')
    )




In [10]:
def findClosestCluster(streetsData, clusterCenters):
# Find the cluster with the closest center to specified streets.
    
    closestDistance = float("inf")
    clusterCentreID = 0

    for index in range(len(clusterCenters)):
        newDist = np.sum(np.power((np.array(streetsData) - clusterCenters[index]), 2))
        if newDist < closestDistance:
            closestDistance = newDist
            clusterCentreID = index

    return clusterCentreID


In [11]:

def printClusterProb(clusterCentreID, prob):
# Print the probability for the specified cluster
    print('Cluster ID for given Street Code (34510, 10030, 34050):', clusterCentreID)
    print("-----------------------------------------------------------------------")
    print("Probability for that Specific Cluster:")
    prob.filter(psf.col('prediction') == clusterCentreID).show()


In [12]:

def calculatePrintProbability(inputData, blackColor, streetCode):
# Main function to calculate and print probability.
    inputData = vectorizeStreetCodes(inputData)
    kmeansFitData, clusterCenters= initializeKmeans(inputData)
    
    prob = calculateBlackProbability(kmeansFitData, blackColor)

    clusterCentreID = findClosestCluster(streetCode, clusterCenters)

    # Print the probability for the specified cluster
    printClusterProb(clusterCentreID, prob)

In [13]:
blackColor=['BLK.', 'Black', 'BC', 'BLAC', 'BK/', 'BLK', 'BK.', 'BCK', 'BK', 'B LAC']
streetCode=[34510, 10030, 34050]
calculatePrintProbability(nyParkingData,blackColor,streetCode )


Cluster ID for given Street Code (34510, 10030, 34050): 0
-----------------------------------------------------------------------
Probability for that Specific Cluster:
+----------+------+----------+-------------------+
|prediction| Count|Total_Cars|        Probability|
+----------+------+----------+-------------------+
|         0|856969|   5863372|0.14615634143629297|
+----------+------+----------+-------------------+



In [14]:
sparkObj.stop()