In [5]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [7]:
import pyspark
import pandas as pd
from pyspark.sql.functions import split, explode, col, lower, sort_array
import pyspark.sql.functions as F
from google.colab import drive
drive.mount('/content/drive')
import warnings
warnings.filterwarnings("ignore")

Mounted at /content/drive


In [8]:
df = pd.read_csv('/content/drive/MyDrive/datasets/parking2017.csv') #read the csv by panad

In [9]:
pd.set_option("display.max_rows", None) #to see all the value counts

In [10]:
for i in df.columns:
    print(df[i].value_counts())

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



[1;30;43mStreaming output truncated to the last 5000 lines.[0m
East End Ave              210
E 174th St                210
114th St                  210
Guernsey St               210
College Ave               209
W 93rd St                 209
Bainbridge St             209
Watson Ave                208
Midwood St                208
Old Slip                  208
Wilson Ave                208
46th Rd                   208
30th Rd                   207
Argyle Rd                 207
Fenimore St               207
Hooper St                 207
Burling St                206
N 11th St                 206
Dupont St                 206
Intervale Ave             206
Suffolk St                205
169th St                  205
Fox St                    205
W 136th St                205
117th St                  205
Van Wyck Expy             204
W 134th St                204
Manor Ave                 203
W 95th St                 203
Castleton Ave             202
Pleasant Ave              202
Kings

In [11]:
spark = pyspark.sql.SparkSession.builder.config("spark.executor.memory", "16g").config("spark.driver.memory", "16g").getOrCreate() #init pyspark

In [13]:
dfs = spark.read.csv('/content/drive/MyDrive/datasets/parking2017.csv', sep=',', inferSchema=True, header=True, multiLine=True) #read the csv by spark

In [14]:
dfs.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: integer (nullable = true)
 |-- Street Code2: integer (nullable = true)
 |-- Street Code3: integer (nullable = true)
 |-- Vehicle Expiration Date: integer (nullable = true)
 |-- Violation Location: integer (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Issuer Code: integer (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Violation County: string (nul

In [15]:
#dropping the index column and dropping the columns that contained a single value
drop_cols = ['_c0','Issuing Agency', 'Law Section', 'Days Parking In Effect    ', 'Feet From Curb']

dfs = dfs.drop(*drop_cols)

In [16]:
renameing =(col.replace(' ', '_') for col in dfs.columns) #rename the column to remove the space
dfs = dfs.toDF(*renameing)

In [17]:
#remaping the colors by shortcut
dfs = dfs.replace({'WHITE': 'WH', 'BLACK':'BK', 'GREY':'GY', 'BLUE':'BL', 'RED':'RD','BROWN':'BR','GREEN':'GR', 'GOLD':'GL','YELLO':'YW', 'TAN':'TN', 'ORANG':'OR', 'PURPL':'PR'},subset=['Vehicle_Color'])

In [18]:
#limit the year from 1950 to 2018
dfs = dfs.filter("Vehicle_Year > 1950 and Vehicle_Year <= 2018 ")

In [19]:
Vehicle_Make_del = dfs.groupby('Vehicle_Make').count().orderBy('count', ascending=True).filter("count < 100") #value count for Vehicle_Make

In [20]:
Vehicle_Make_del = Vehicle_Make_del.select(Vehicle_Make_del.Vehicle_Make).rdd.flatMap(lambda x: x).collect() #convert to list of values

In [21]:
dfs = dfs.filter(~(dfs.Vehicle_Make).isin(Vehicle_Make_del))  #dropped the values with less than 100 examples

In [22]:
dfs = dfs.filter(~(dfs.Violation_In_Front_Of_Or_Opposite).isin(['I','R'])) #I and R values with not enough examples so we dropped the I and R

In [23]:
Vehicle_Body_Type_del = dfs.groupby('Vehicle_Body_Type').count().orderBy('count', ascending=True).filter("count < 100")  #value count for Vehicle_Body_Type

In [24]:
Vehicle_Body_Type_del = Vehicle_Body_Type_del.select(Vehicle_Body_Type_del.Vehicle_Body_Type).rdd.flatMap(lambda x: x).collect() #convert to list of values

In [25]:
dfs = dfs.filter(~(dfs.Vehicle_Body_Type).isin(Vehicle_Body_Type_del)) #dropped the values with less than 100 examples

In [26]:
dfs = dfs.withColumn('Vehicle_Expiration_year', col('Vehicle_Expiration_Date').substr(0, 4)) #convert Vehicle_Expiration_Date to year
dfs = dfs.drop('Vehicle_Expiration_Date')

In [27]:
dfs.show()

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+------------+------------+------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+----------------+---------------------------------+------------+------------+-------------------+------------+--------------------+------------------+-------------+------------+-------------------+---------------------+-----------------------+
|Summons_Number|Plate_ID|Registration_State|Plate_Type|Issue_Date|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Street_Code1|Street_Code2|Street_Code3|Violation_Location|Violation_Precinct|Issuer_Precinct|Issuer_Code|Issuer_Command|Issuer_Squad|Violation_Time|Violation_County|Violation_In_Front_Of_Or_Opposite|House_Number| Street_Name|Date_First_Observed|Sub_Division|From_Hours_In_Effect|To_Hours_In_Effect|Vehicle_Color|Vehicle_Year|Violation_Post_Code|Violation_Description|Vehicle_Expiratio

In [28]:
print('Dataframe shape:', (dfs.count(), len(dfs.columns)))

Dataframe shape: (822802, 31)


In [29]:
dfP = dfs.toPandas()

In [30]:
dfP.to_csv('afterCleaning.csv')