In [42]:
import pyspark
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import IntegerType,StringType,DoubleType
from pyspark.ml.stat import Correlation
from pyspark.sql.functions import split,udf,col,regexp_replace

## Creating a spark session

In [2]:
conf = pyspark.SparkConf().setMaster('local[*]') \
        .set('spark.executor.heartbeatInterval', 10000) \
        .set('spark.network.timeout', 10000) \
        .set('spark.core.connection.ack.wait.timeout', '3600')
spark = SparkSession \
        .builder \
        .appName('parking') \
        .config(conf=conf) \
        .getOrCreate()

## Loading dataset and overview

In [3]:
df = spark.read.csv('./parking.csv',header=True,inferSchema=True)

In [4]:
pd.DataFrame(df.dtypes,columns=['column Name', 'Data type'])

Unnamed: 0,column Name,Data type
0,Summons Number,bigint
1,Plate ID,string
2,Registration State,string
3,Plate Type,string
4,Issue Date,string
5,Violation Code,int
6,Vehicle Body Type,string
7,Vehicle Make,string
8,Issuing Agency,string
9,Street Code1,int


In [5]:
df.describe().toPandas()

Unnamed: 0,summary,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,...,Hydrant Violation,Double Parking Violation,Latitude,Longitude,Community Board,Community Council,Census Tract,BIN,BBL,NTA
0,count,9100278.0,9100276,9100278,9100278,9100278,9100278.0,8989410,9037103,9100278,...,1,1,1,1,1,1,1,1,1,1
1,mean,6486507134.673543,,99.0,999.0,,35.25493495912982,5.256613226452906,4555.626865671642,,...,,,,,,,,,,
2,stddev,2166447421.2002106,,0.0,9.85113404791972E-14,,20.50547511932227,23.127694456636892,12905.597273858535,,...,,,,,,,,,,
3,min,1001793950.0,!,99,999,01/01/1981,0.0,-,(UPS),A,...,"{""""id"""":135110655","name"""":""""Drug Maximum Out of Pocket - individu...","dataTypeName"""":""""text","fieldName"""":""""drug_maximum_out_of_pocket_indiv...","position"""":64","tableColumnId"""":16639867","width"""":700","format"""":{}","metadata"""":{}}","{""""id"""":135110656"
4,max,8006150291.0,],YT,WUG,12/31/2031,99.0,ZSR,`,X,...,"{""""id"""":135110655","name"""":""""Drug Maximum Out of Pocket - individu...","dataTypeName"""":""""text","fieldName"""":""""drug_maximum_out_of_pocket_indiv...","position"""":64","tableColumnId"""":16639867","width"""":700","format"""":{}","metadata"""":{}}","{""""id"""":135110656"


In [6]:
df.limit(2).toPandas()

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Hydrant Violation,Double Parking Violation,Latitude,Longitude,Community Board,Community Council,Census Tract,BIN,BBL,NTA
0,1283294138,GBB9093,NY,PAS,08/04/2013,46,SUBN,AUDI,P,37250,...,,,,,,,,,,
1,1283294151,62416MB,NY,COM,08/04/2013,46,VAN,FORD,P,37290,...,,,,,,,,,,


## Pre-Processing, EDA

In [7]:
import datetime 
def day_finder(x):
    return datetime.datetime.strptime(x, '%m/%d/%Y').weekday()
day_udf = func.udf(lambda x: day_finder(x), IntegerType())

In [8]:
df = df.withColumnRenamed('Violation Time', 'Violation_Time')

In [60]:
df_new = df.withColumn('Month',func.split('Issue Date','/')[0]) \
            .withColumn('Year',func.split('Issue Date','/')[2]) \
            .withColumn('Day',day_udf(func.col('Issue Date')))  \
            .withColumn('Meridiem', \
                      func.when(func.isnan(df.Violation_Time) \
                               | func.col('Violation_Time').isNull()\
                               , func.lit(None))\
                      .otherwise(func.substring(df.Violation_Time,5,1))) \
            .withColumn('Time_Hour', \
                      func.when(func.isnan(df.Violation_Time) \
                               | func.col('Violation_Time').isNull()\
                               , func.lit(None))\
                      .otherwise(func.substring(df.Violation_Time,1,2)))

In [61]:
df_new = df_new.withColumn('Year',df_new['Year'].cast(IntegerType())) \
    .withColumn("Month",df_new["Month"].cast(DoubleType())) \
    .withColumn("Day",df_new["Day"].cast(DoubleType())) \
    .withColumn("Time_Hour",df_new["Time_Hour"].cast(DoubleType()))

In [11]:
df_new.groupBy("Year") \
  .count() \
  .withColumnRenamed('count', 'count') \
  .withColumn('percentage', (func.col('count') / df_new.count()) * 100 ) \
  .show()

+----+-------+--------------------+
|Year|  count|          percentage|
+----+-------+--------------------+
|2003|     39|4.285583363497247E-4|
|2007|     13|1.428527787832415...|
|2018|    181|0.001988950227674...|
|2044|      9|9.889807761916724E-5|
|2015|   1522| 0.01672476379293028|
|2032|      3|3.296602587305575E-5|
|2023|     31|3.406489340215760...|
|2006|     11|1.208754282012044...|
|2031|     93|0.001021946802064...|
|2013|4379109|   48.12060686497709|
|2014|4716512|  51.828218874192636|
|1973|     10|1.098867529101858...|
|2041|     39|4.285583363497247E-4|
|2019|    329|0.003615274170745...|
|2004|     77|8.461279974084309E-4|
|2030|     45|4.944903880958363E-4|
|2053|      1|1.098867529101858...|
|2012|    618|0.006791001329849484|
|2009|      9|9.889807761916724E-5|
|2016|    296|0.003252647886141...|
+----+-------+--------------------+
only showing top 20 rows



In [62]:
df_great = df_new.where(func.col('Year')>2014)
df_new = df_new.where(func.col('Year')<2015)
df_new = df_new.where(func.col('Year')>2012)

In [13]:
df_new.groupBy("Meridiem") \
  .count() \
  .withColumnRenamed('count', 'count') \
  .withColumn('percentage', (func.col('count') / df_new.count()) * 100 ) \
  .show()

+--------+-------+--------------------+
|Meridiem|  count|          percentage|
+--------+-------+--------------------+
|    null|   2054|0.022582295370486522|
|       A|4436275|   48.77374508018749|
|       P|4657277|  51.203507709918874|
|        |     15|1.649145231535043E-4|
+--------+-------+--------------------+



In [63]:
df_new = df_new.withColumn("Meridiem", \
       func.when(col("Meridiem")=="" ,None) \
          .otherwise(col("Meridiem"))) 

In [15]:
df_new.groupBy("Time_Hour") \
  .count() \
  .withColumnRenamed('count', 'count') \
  .withColumn('percentage', (func.col('count') / df_new.count()) * 100 ) \
  .show()

+---------+------+--------------------+
|Time_Hour| count|          percentage|
+---------+------+--------------------+
|      8.0|955730|  10.507583814233245|
|      0.0| 52530|   0.577530660083572|
|      7.0|518121|   5.696378510054453|
|     29.0|    10|1.099430154356695...|
|     44.0|     1|1.099430154356695...|
|     null|  2057|0.022615278275117223|
|      1.0|968418|  10.647079512218022|
|      0.6|     1|1.099430154356695...|
|      4.0|580011|   6.376815832585812|
|     85.0|     1|1.099430154356695...|
|     77.0|     1|1.099430154356695...|
|     11.0|964384|  10.602728499791274|
|      3.0|627837|   6.902629298208446|
|     53.0|     3|3.298290463070086E-5|
|     59.0|     6|6.596580926140172E-5|
|     28.0|    15|1.649145231535043E-4|
|      2.0|857338|   9.425832496758606|
|     10.0|823190|    9.05039908764888|
|     30.0|     9| 9.89487138921026E-5|
|     84.0|     2|2.198860308713390...|
+---------+------+--------------------+
only showing top 20 rows



In [64]:
df_new = df_new.withColumn("Time_Hour", func.when((func.col("Time_Hour") <= 0.0) |  \
                                                  (func.col("Time_Hour") > 12.0),1.0) \
                           .otherwise(df_new.Time_Hour))

In [65]:
df_new = df_new.withColumnRenamed('Days Parking In Effect    ','Days Parking In Effect')
df_new = df_new.withColumnRenamed('Community Council ','Community Council')

In [18]:
df_new.stat.corr('Violation Location','Violation Precinct')

1.0

In [23]:
df_new.groupBy("Violation_County") \
  .count() \
  .withColumnRenamed('count', 'count') \
  .withColumn('percentage', (func.col('count') / df_new.count()) * 100 ) \
  .show()

+----------------+-------+------------------+
|Violation_County|  count|        percentage|
+----------------+-------+------------------+
|               K|1977706| 23.57016688800254|
|               Q|1825154|21.752062428038034|
|              BX| 942939|11.237883484808272|
|               R|  99163|1.1818179542940133|
|              NY|3545755| 42.25806924485715|
+----------------+-------+------------------+



In [66]:
df_new = df_new.withColumn('Violation County', regexp_replace('Violation County', 'KINGS', 'K')) \
                .withColumn('Violation County', regexp_replace('Violation County','QUEEN', 'Q')) \
                .withColumn('Violation County', regexp_replace('Violation County', 'BRONX', 'BX')) \
                .withColumn('Violation County', regexp_replace('Violation County', 'RC', 'R'))   \
                .withColumn('Violation County', regexp_replace('Violation County', 'RICH','R'))  \
                .withColumn('Violation County', regexp_replace('Violation County', 'NYC', 'NY'))

In [67]:
df_new = df_new.dropna(how='any',subset=['Violation County'])

In [68]:
df_new = df_new.withColumnRenamed('Violation County', 'Violation_County')
df_new = df_new.filter(df_new.Violation_County != '103')

In [69]:
df_new=df_new.select([func.when(func.col(c)=="",None).otherwise(func.col(c)).alias(c) for c in df_new.columns])

In [46]:
df_new.groupBy("Month") \
  .count() \
  .withColumnRenamed('count', 'count') \
  .withColumn('percentage', (func.col('count') / df_new.count()) * 100 ) \
  .show()

+-----+------+------------------+
|Month| count|        percentage|
+-----+------+------------------+
|  8.0|758831|  9.04369674248339|
|  7.0|139850|1.6667228795822813|
|  1.0|668092| 7.962275452741404|
|  4.0|795846| 9.484839019120773|
| 11.0|790389| 9.419802860709044|
|  3.0|837324| 9.979171029126594|
|  2.0|604254|7.2014584689246455|
| 10.0|883243|10.526430577982786|
|  6.0|642754| 7.660298875531138|
|  5.0|840087|10.012100277008509|
|  9.0|750106| 8.939712780206984|
| 12.0|679941|  8.10349103658245|
+-----+------+------------------+



In [47]:
df_new.groupBy("Violation_County") \
  .count() \
  .withColumnRenamed('count', 'count') \
  .withColumn('percentage', (func.col('count') / df_new.count()) * 100 ) \
  .show()

+----------------+-------+------------------+
|Violation_County|  count|        percentage|
+----------------+-------+------------------+
|               K|1977706| 23.57016688800254|
|               Q|1825154|21.752062428038034|
|              BX| 942939|11.237883484808272|
|               R|  99163|1.1818179542940133|
|              NY|3545755| 42.25806924485715|
+----------------+-------+------------------+



In [50]:
df_new.groupBy("Vehicle Body Type") \
  .count() \
  .withColumnRenamed('count', 'count') \
  .withColumn('percentage', (func.col('count') / df_new.count()) * 100 ) \
  .show()

+-----------------+-----+--------------------+
|Vehicle Body Type|count|          percentage|
+-----------------+-----+--------------------+
|             ARMO|   60|7.150759583477789E-4|
|             FRER|    1|1.191793263912964...|
|              TRK|  484|0.005768279397338749|
|             MOTC|    2|2.383586527825929...|
|             BOAT| 2898| 0.03453816878819772|
|             WINN|    8|9.534346111303718E-5|
|             FREG|    8|9.534346111303718E-5|
|               RF|    6|7.150759583477788E-5|
|              UTI|   18|2.145227875043336...|
|             TNAC|    2|2.383586527825929...|
|             STAK| 5048| 0.06016172396232646|
|              RED|   14|1.668510569478150...|
|             ARTW|    1|1.191793263912964...|
|               PU|   17|2.026048548652040...|
|               NS|  122|0.001453987781973817|
|              NIS|   10|1.191793263912964...|
|             DODG|    2|2.383586527825929...|
|               SL|   15|1.787689895869447...|
|            

In [70]:
df_copy = df_new

In [71]:
len(df_copy.columns)

56

In [72]:
# Uneccessary columns
columns_to_drop = ['Summons Number','Plate ID','Issuer Code','Vehicle Expiration Date','House Number','Street Name','Date First Observed','Law Section','Sub Division','Vehicle Color','Vehicle Year','Feet From Curb','Violation Post Code','Violation Description','Violation Precinct']
# Columns that has more than 30% of missing value

columns_missing2 = ['Days Parking In Effect','From Hours In Effect','To Hours In Effect']
# columns that has more than 75% of missing values
columns_missing = ['Time First Observed','Intersecting Street','Violation Legal Code','Unregistered Vehicle?','Meter Number',"No Standing or Stopping Violation","Hydrant Violation","Double Parking Violation","Latitude","Longitude","Community Board","Community Council","Census Tract","BIN","BBL","NTA"]
# Columns that are no longer needed
columns_to_drop2 = ['Year','Violation_Time','Issue Date','Issuer Squad']
#

final_columns = columns_missing+columns_missing2+columns_to_drop+columns_to_drop2

In [73]:
df_new = df_new.drop(*final_columns)

In [74]:
len(df_copy.columns)

18

In [75]:
df_copy = df_copy.dropna(how='any')

In [77]:
df_new = df_new.select([func.col(col).alias(col.replace(' ', '_')) for col in df_copy.columns])