In [1]:
from pyspark import SparkContext, SparkConf
cf = SparkConf()
cf.set("spark.submit.deployMode","client")
sc = SparkContext.getOrCreate(cf)
from pyspark.sql import SparkSession
spark = SparkSession \
	    .builder \
	    .appName("Python Spark SQL basic example") \
	    .config("spark.some.config.option", "some-value") \
	    .getOrCreate()
                            

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/19 22:36:31 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
23/04/19 22:36:31 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
23/04/19 22:36:31 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
23/04/19 22:36:32 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


In [2]:
parking_df = spark.read.csv(path='/shared/CS-GY-6513/parking-violations/parking-violations-header.csv',header=True)

                                                                                

In [3]:
parking_df.count()

                                                                                

1014017

In [4]:
parking_df.select('summons_number').distinct().count()

                                                                                

1014017

In [5]:
# Register the DataFrame as a SQL temporary view
parking_df.createOrReplaceTempView("parking")
spark.sql("SELECT count(DISTINCT plate_type) FROM parking").show()


+--------------------------+
|count(DISTINCT plate_type)|
+--------------------------+
|                        75|
+--------------------------+



In [6]:
# List the distinct plate types in the table
spark.sql("SELECT DISTINCT plate_type FROM parking").show()

+----------+
|plate_type|
+----------+
|       CCK|
|       CLG|
|       SOS|
|       SPC|
|       SUP|
|       NYA|
|       OMO|
|       LMB|
|       APP|
|       RGL|
|       FAR|
|       CHC|
|       STA|
|       BOT|
|       COM|
|       RGC|
|       TRC|
|       AMB|
|       HAM|
|       MCD|
+----------+
only showing top 20 rows



In [7]:
# List each distinct plate type and its frequency in the table
# e.g.
#    plate_type count
#    PAS        740554
#    COM        190147

spark.sql("SELECT plate_type, count(plate_type) AS count FROM parking GROUP BY plate_type").show()


+----------+------+
|plate_type| count|
+----------+------+
|       CLG|     2|
|       NYA|     3|
|       SPC|    28|
|       SOS|    15|
|       OMO|     2|
|       LMB|     2|
|       SUP|     8|
|       CCK|     2|
|       APP|  1952|
|       RGL|   524|
|       CHC|    35|
|       BOT|     2|
|       STA|    33|
|       FAR|     2|
|       COM|190147|
|       TRC|  2784|
|       RGC|    25|
|       AMB|    17|
|       HAM|    19|
|       NYS|     5|
+----------+------+
only showing top 20 rows



In [8]:
# order the results of the previous query by count, the most frequent should appear first
spark.sql("SELECT DISTINCT plate_type, count(plate_type) AS count FROM parking GROUP BY plate_type ORDER BY count DESC").show()

+----------+------+
|plate_type| count|
+----------+------+
|       PAS|740554|
|       COM|190147|
|       OMT| 35480|
|       OMS|  9032|
|       SRF|  8341|
|       IRP|  5291|
|       999|  4467|
|       TRC|  2784|
|       OMR|  2158|
|       APP|  1952|
|       MOT|  1851|
|       ORG|  1591|
|       CMB|  1368|
|       MED|  1211|
|       OML|  1181|
|       PSD|   900|
|       SPO|   823|
|       SCL|   700|
|       TOW|   611|
|       RGL|   524|
+----------+------+
only showing top 20 rows



In [9]:
# change plate_type 999 to null
from pyspark.sql import functions as F
parking_df2 = parking_df.withColumn('plate_type', F.when(parking_df['plate_type']=='999', 'null').otherwise(parking_df['plate_type']))
parking_df2.groupBy('plate_type').count().orderBy('count', ascending=False).show(26)


+----------+------+
|plate_type| count|
+----------+------+
|       PAS|740554|
|       COM|190147|
|       OMT| 35480|
|       OMS|  9032|
|       SRF|  8341|
|       IRP|  5291|
|      null|  4467|
|       TRC|  2784|
|       OMR|  2158|
|       APP|  1952|
|       MOT|  1851|
|       ORG|  1591|
|       CMB|  1368|
|       MED|  1211|
|       OML|  1181|
|       PSD|   900|
|       SPO|   823|
|       SCL|   700|
|       TOW|   611|
|       RGL|   524|
|       VAS|   427|
|       SRN|   348|
|       DLR|   333|
|       TRA|   318|
|       ITP|   283|
|       TRL|   223|
+----------+------+
only showing top 26 rows



In [10]:
# remove all rows where plate_type=999  

parking_df2 = parking_df.filter(parking_df['plate_type'] != '999')
parking_df2.groupBy('plate_type').count().orderBy('count', ascending=False).show(26)

+----------+------+
|plate_type| count|
+----------+------+
|       PAS|740554|
|       COM|190147|
|       OMT| 35480|
|       OMS|  9032|
|       SRF|  8341|
|       IRP|  5291|
|       TRC|  2784|
|       OMR|  2158|
|       APP|  1952|
|       MOT|  1851|
|       ORG|  1591|
|       CMB|  1368|
|       MED|  1211|
|       OML|  1181|
|       PSD|   900|
|       SPO|   823|
|       SCL|   700|
|       TOW|   611|
|       RGL|   524|
|       VAS|   427|
|       SRN|   348|
|       DLR|   333|
|       TRA|   318|
|       ITP|   283|
|       TRL|   223|
|       CMH|   203|
+----------+------+
only showing top 26 rows



In [11]:
# Suppose we are interested in analyzing violations based on what county they occur.
# We might want to exclude rows that have a blank entry in the violation_county column.
# How many rows have Blank Entries in violation_county?
parking_df.filter( parking_df["violation_county"].isNull()).count()

192974

In [12]:
# Create a new dataframe without any black entries in violation_count
parking_df4 = parking_df.filter(parking_df['violation_county'].isNotNull())
parking_df4.count()

821043

In [13]:
# Let's clean the plate_ids
# “Clustering” helps detect entries in a column that are close together (and thus represent the same value

plate_id_rdd = parking_df.select('plate_id').rdd.flatMap(list)
plate_id_rdd.take(50)

                                                                                

['GBH2444',
 'GKZ2313',
 'N346594',
 'GDP2624',
 '42555JU',
 '62636MD',
 'DPE3045',
 'FMW7832',
 'DSD2130',
 '65111MB',
 'GMZ3750',
 '44884',
 'XZ876G',
 'PIKINE',
 'GMU4296',
 'GEJ8235',
 '74452JW',
 '42972JW',
 '66951',
 '63400JM',
 'GGS5172',
 '51329A',
 '49216KA',
 '31695JZ',
 '79638KA',
 '88720MB',
 'ERP5344',
 'FWM1758',
 '14307LV',
 'EWT1353',
 '65566PA',
 'FPF5158',
 '24393MC',
 '24393MG',
 'FXR1798',
 'FWH9893',
 '88629JH',
 '1510332',
 'DJE1615',
 '2208656',
 'GXC7520',
 'GRC4443',
 'T639084C',
 'GARFR5',
 'GTJ6780',
 '401ZGU',
 'ZWF21Z',
 'S274036',
 'PF090W',
 'GLR6718']

"Key Collision" methods are based on the idea of creating a key value that contains only 
 the most valuable or meaningful part of the string and groups together different strings based 
 on the fact that their key is the same (hence the name "key collision").
Fingerprinting Method:
note that the order of these operations (the last 3 lines) is significant.
remove leading and trailing whitespace	
change all characters to their lowercase representation
remove all punctuation and control characters
normalize extended western characters to their ASCII representation (for example "gödel" → "godel")
split the string into whitespace-separated tokens
sort the tokens and remove duplicates
join the tokens back together


In [14]:
import string, unicodedata
def fingerprint(value):
    key = unicodedata.normalize('NFKD', value).encode('ascii','ignore').decode()
    key = set(key.strip().lower().translate(str.maketrans('','',string.punctuation)).split())
    key = ' '.join(sorted(list(key)))
    return (key, value)

In [15]:
plate_id_rdd.distinct().map(fingerprint).first()

                                                                                

('gkz2313', 'GKZ2313')

In [16]:
plate_id_rdd.distinct().map(fingerprint).take(5)

                                                                                

[('gkz2313', 'GKZ2313'),
 ('44884', '44884'),
 ('pikine', 'PIKINE'),
 ('gmu4296', 'GMU4296'),
 ('gtd7184', 'GTD7184')]

In [17]:
# apply the fingerprint function to all plate_id values, and group the ones
# that have the same key
# mapValues(list) - makes the grouped values into a list
plate_id_rdd.distinct(). \
	map(fingerprint). \
	groupByKey(). \
	filter(lambda x: len(x[1])>1). \
	mapValues(list). \
	collect()


                                                                                

[('2970cp', ['2970.CP', '2970CP']),
 ('ap717y', ['AP717Y', 'AP7!17Y']),
 ('xbgv20', ['XBGV20', 'XBG.V20']),
 ('47879mg', ['47879MG', '47879MG.']),
 ('849rzb', ['849RZB', '849RZB.']),
 ('12224mg', ['12224MG', '1222]4MG']),
 ('jhd0328', ['JHD0328', 'JHD0328.']),
 ('ab73725', ['AB.73725', 'AB73725']),
 ('88cs02', ['88CS02', '88C.S02']),
 ('l21687', ['L21687', 'L.21687']),
 ('na', ['NA', 'N/A']),
 ('64582md', ['64582MD.', '64582MD']),
 ('6786cx', ['6786.CX', '6786CX']),
 ('zxf293', ['ZXF293+', 'ZXF293']),
 ('u57afu', ['U57AFU', 'U57.AFU']),
 ('jnp981', ['JNP981', 'JNP981&']),
 ('41690ja', ['41690JA', '41690JA+']),
 ('xt549k', ['XT.549K', 'XT549K']),
 ('zgk7779', ['ZGK7779', 'ZGK.7779']),
 ('l08275', ['L08(275', 'L08275']),
 ('hkv4504', ['HKV4504', 'HKV!4504']),
 ('7', ['7!', '7']),
 ('ete3059', ['ETE3059+', 'ETE3059']),
 ('l21741', ['L.21741', 'L21741']),
 ('jcw0303', ['JCW0303`', 'JCW0303']),
 ('k90404', ['K90404', 'K.90404']),
 ('aj511c', ['AJ511C', 'AJ.511C']),
 ('hcv1327', ['HCV1327', 

In [18]:
plate_id_rdd.distinct(). \
	map(fingerprint). \
	groupByKey(). \
	filter(lambda x: len(x[1])>1).take(5)


                                                                                

[('2970cp', <pyspark.resultiterable.ResultIterable at 0x7f5765003580>),
 ('ap717y', <pyspark.resultiterable.ResultIterable at 0x7f57650035e0>),
 ('xbgv20', <pyspark.resultiterable.ResultIterable at 0x7f5765003640>),
 ('849rzb', <pyspark.resultiterable.ResultIterable at 0x7f5765003dc0>),
 ('ab73725', <pyspark.resultiterable.ResultIterable at 0x7f5765003e50>)]

In [19]:
# To determine a cluster should be merged or not, you can look more closely at the data.
# check the other attributes to determine if the two ids correspond to the same entity
parking_df.where((parking_df.plate_id == '2970CP') | \
	(parking_df.plate_id == '2970.CP')). \
	show()


+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+---------------+-----------------+-------------+------------+------------+
|summons_number|issue_date|violation_code|violation_county|violation_description|violation_location|violation_precinct|violation_time|time_first_observed|meter_number|issuer_code|issuer_command|issuer_precinct|issuing_agency|plate_id|plate_type|registration_state|    street_name|vehicle_body_type|vehicle_color|vehicle_make|vehicle_year|
+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+---------------+-----------------+-------------+----------

In [20]:
# clustering on the street_name column.
street_name_rdd = parking_df.select('street_name').distinct().rdd.flatMap(list).filter(lambda x: x!=None)

In [21]:
street_name_rdd.take(3)

['Mount Hope Pl', 'Carver Loop', 'Rombouts Ave']

In [22]:
# apply the fingerprint to all street addresses
#[('court square', 'COURT SQUARE'),
# ('31 e st', 'E 31 ST'),
# ('island randalls', 'RANDALLS ISLAND'),
# ('45th e street', 'E 45TH STREET'),
# ('ave sheridan', 'SHERIDAN AVE')]

In [23]:
street_name_rdd.distinct().map(fingerprint).first()

('235 nb st w', 'N/B W 235 ST')

In [24]:
street_name_rdd.distinct(). \
	map(fingerprint). \
	groupByKey(). \
	filter(lambda x: len(x[1])>1). \
	mapValues(list). \
	collect()


[('plains rd white', ['White Plains Rd', 'WHITE PLAINS RD']),
 ('ave riverdale', ['Riverdale Ave', 'RIVERDALE AVE']),
 ('eldert ln', ['ELDERT LN', 'Eldert Ln']),
 ('ave chittenden', ['CHITTENDEN AVE', 'Chittenden Ave']),
 ('5 avenue', ['5 AVENUE', 'AVENUE 5']),
 ('184th st', ['184th St', '184TH ST']),
 ('ave ryder', ['Ryder Ave', 'RYDER AVE']),
 ('ave foothill', ['Foothill Ave', 'FOOTHILL AVE']),
 ('ave lott', ['Lott Ave', 'LOTT AVE']),
 ('marine way', ['Marine Way', 'MARINE WAY']),
 ('astor pl', ['Astor Pl', 'ASTOR PL']),
 ('37th ave', ['37TH AVE', '37th Ave']),
 ('clawson st', ['Clawson St', 'CLAWSON ST']),
 ('ave shepard', ['SHEPARD AVE', 'Shepard Ave']),
 ('gramercy park', ['Gramercy Park', 'GRAMERCY PARK']),
 ('degraw st', ['Degraw St', 'DEGRAW ST']),
 ('ave glenmore', ['Glenmore Ave', 'GLENMORE AVE']),
 ('ave neptune', ['Neptune Ave', 'NEPTUNE  AVE', 'NEPTUNE AVE']),
 ('hartman ln', ['HARTMAN LN', 'Hartman Ln']),
 ('7 arr jfkia terminal', ['TERMINAL 7 JFKIA ARR', 'JFKIA TERMINAL 

In [25]:
# count street_name after clustering
street_name_rdd.count()

22893

In [26]:
# filter the result by only taking the keys
street_name_rdd.distinct(). \
	map(fingerprint). \
	groupByKey(). \
	filter(lambda x: len(x[1])>1).take(5)


[('127th st', <pyspark.resultiterable.ResultIterable at 0x7f5764efa700>),
 ('5 avenue', <pyspark.resultiterable.ResultIterable at 0x7f5764efa790>),
 ('197th e st', <pyspark.resultiterable.ResultIterable at 0x7f5764efa820>),
 ('mitchell pl', <pyspark.resultiterable.ResultIterable at 0x7f5764efa8e0>),
 ('cumberland st', <pyspark.resultiterable.ResultIterable at 0x7f5764efa940>)]

In [27]:
# To determine a cluster should be merged or not, look more closely at the data.
# check the other attributes to determine if the two ids correspond to the same entity
parking_df.where((parking_df.street_name == 'DYER AVE') | \
	(parking_df.street_name == 'Dyer Ave')). \
	show()

+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+-----------+-----------------+-------------+------------+------------+
|summons_number|issue_date|violation_code|violation_county|violation_description|violation_location|violation_precinct|violation_time|time_first_observed|meter_number|issuer_code|issuer_command|issuer_precinct|issuing_agency|plate_id|plate_type|registration_state|street_name|vehicle_body_type|vehicle_color|vehicle_make|vehicle_year|
+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+-----------+-----------------+-------------+------------+---------

In [28]:
# count result after clustering
street_name_rdd.distinct(). \
	map(fingerprint). \
	groupByKey(). \
	filter(lambda x: len(x[1])>1).count()

2756

# 1. Entity Resolution

In [29]:
# entity resolution
# make text consistent
# make everything lowercase and to filter out punctuation
stname_normalized = street_name_rdd.map(\
                                        lambda x: ' '.join(str(x).strip().lower().translate(str.maketrans('', '', string.punctuation)).split()))

In [30]:
stname_normalized.take(10)

['court square',
 'e 31 st',
 'randalls island',
 'e 45th street',
 'sheridan ave',
 'nb 87 st',
 'roebling ave',
 'w 110 st',
 'co 129 st',
 'fernside pl']

In [31]:
# apply clustering again to normalized street name data
stname_normalized.map(fingerprint).first()

('court square', 'court square')

In [32]:
stname_normalized.map(fingerprint).take(10)

[('hope mount pl', 'mount hope pl'),
 ('carver loop', 'carver loop'),
 ('ave rombouts', 'rombouts ave'),
 ('ave monticello', 'monticello ave'),
 ('242nd st', '242nd st'),
 ('bessemer st', 'bessemer st'),
 ('56th rd', '56th rd'),
 ('astor pl', 'astor pl'),
 ('hamilton ter', 'hamilton ter'),
 ('rd victor', 'victor rd')]

In [33]:
stname_normalized.distinct(). \
	map(fingerprint). \
	groupByKey(). \
	filter(lambda x: len(x[1])>1). \
	mapValues(list). \
	collect()

[('5 avenue', ['5 avenue', 'avenue 5']),
 ('ave madison nb', ['madison ave nb', 'nb madison ave']),
 ('7 arr jfkia terminal', ['terminal 7 jfkia arr', 'jfkia terminal 7 arr']),
 ('james st', ['james st', 'st james', 'st james st']),
 ('concourse grand sb', ['grand concourse sb', 'sb grand concourse']),
 ('blvd nb woodhaven', ['woodhaven blvd nb', 'nb woodhaven blvd']),
 ('st w41', ['w41 st', 'w41 st st']),
 ('ave h', ['ave h', 'h ave']),
 ('nicholas st', ['nicholas st', 'st nicholas']),
 ('14 ave co ne', ['ne co 14 ave', 'co ne 14 ave']),
 ('church st', ['st church st', 'church st']),
 ('101 e st', ['e 101 st', 'e 101 st st']),
 ('25 86 ave io st', ['25 ave io 86 st', '86 st io 25 ave']),
 ('main ny st', ['main st ny ny', 'main st ny']),
 ('51 e st', ['e 51 st st', 'e 51 st']),
 ('91 st west', ['west 91 st', 'west 91 st st']),
 ('81 st w', ['w 81 st st', 'w 81 st']),
 ('co marys st', ['co st marys st', 'co st marys']),
 ('blvd linden wb', ['wb linden blvd', 'linden blvd wb']),
 ('7 ave

In [34]:
# filter the result by only taking the keys
stname_normalized.distinct(). \
	map(fingerprint). \
	groupByKey(). \
	filter(lambda x: len(x[1])>1).take(5)

[('7 arr jfkia terminal',
  <pyspark.resultiterable.ResultIterable at 0x7f577864efa0>),
 ('5 avenue', <pyspark.resultiterable.ResultIterable at 0x7f5642fc89d0>),
 ('ave madison nb', <pyspark.resultiterable.ResultIterable at 0x7f5764f14280>),
 ('concourse grand sb',
  <pyspark.resultiterable.ResultIterable at 0x7f5764f147f0>),
 ('james st', <pyspark.resultiterable.ResultIterable at 0x7f5764f14880>)]

In [35]:
# count stname after clustering
stname_normalized.distinct(). \
	map(fingerprint). \
	groupByKey(). \
	filter(lambda x: len(x[1])>1).count()

48

In [36]:
# To determine a cluster should be merged or not, look more closely at the data.
# check the other attributes to determine if the two ids correspond to the same entity
parking_df.where((parking_df.street_name == '71 ST ST') | \
	(parking_df.street_name == '71 ST')). \
	show()

+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+-----------+-----------------+-------------+------------+------------+
|summons_number|issue_date|violation_code|violation_county|violation_description|violation_location|violation_precinct|violation_time|time_first_observed|meter_number|issuer_code|issuer_command|issuer_precinct|issuing_agency|plate_id|plate_type|registration_state|street_name|vehicle_body_type|vehicle_color|vehicle_make|vehicle_year|
+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+-----------+-----------------+-------------+------------+---------

In [37]:
parking_df.where(parking_df.street_name == '71 ST ST').count()

4

In [38]:
parking_df.where(parking_df.street_name == '71 ST').count()

20

In [39]:
# we can combine them together since it looks like the 71 ST ST is the typo since they match with county and violation_locations
# I chose to replace 71 ST ST with 71 ST as 71 ST ST only appeared 4 times
parking_df = parking_df.withColumn('street_name', F.when(parking_df['street_name']=='71 ST ST', '71 ST').otherwise(parking_df['street_name']))
parking_df.where(parking_df.street_name == '71 ST ST').count()     

0

In [40]:
# vehicle make data
vehicle_make_rdd = parking_df.select('vehicle_make').distinct().rdd.flatMap(list).filter(lambda x: x!=None)

In [41]:
vehicle_make_rdd.take(10)

['BOAT',
 'PETER',
 'YAMAH',
 'MARCU',
 'ACURA',
 'STE',
 'KENWO',
 'SUMMI',
 'WINN',
 'MO/V']

# 2. Outlier

In [42]:
# Outliers in vehicle_make

In [43]:
# check how many distinct vehicle_make in the table
spark.sql("SELECT count(DISTINCT vehicle_make) FROM parking").show()

+----------------------------+
|count(DISTINCT vehicle_make)|
+----------------------------+
|                        1146|
+----------------------------+



In [44]:
# check how many non null distinct vehicle_make in the table
spark.sql("SELECT count(DISTINCT vehicle_make) FROM parking WHERE plate_type IS NOT NULL").show()

+----------------------------+
|count(DISTINCT vehicle_make)|
+----------------------------+
|                        1146|
+----------------------------+



In [45]:
# list some vehicle_make
spark.sql("SELECT vehicle_make FROM parking").show(5)

+------------+
|vehicle_make|
+------------+
|       HONDA|
|        JEEP|
|        FORD|
|       DODGE|
|       CHEVR|
+------------+
only showing top 5 rows



In [46]:
# Clearly, all the vehicle_make is not null, check any invalid vehicle_make
spark.sql("SELECT count(vehicle_make) FROM parking WHERE vehicle_make like '%9%'").show()

+-------------------+
|count(vehicle_make)|
+-------------------+
|                  1|
+-------------------+



In [47]:
# check if intereting vehicle make with number 9
spark.sql("SELECT summons_number, issue_date, violation_code, violation_county, vehicle_make FROM parking WHERE vehicle_make like '%9%'").show()

+--------------+----------+--------------+----------------+------------+
|summons_number|issue_date|violation_code|violation_county|vehicle_make|
+--------------+----------+--------------+----------------+------------+
|    1405530509|2016-03-03|            79|              NY|          95|
+--------------+----------+--------------+----------------+------------+



In [48]:
# check if any intereting vehicle make
spark.sql("SELECT summons_number, issue_date, violation_code, violation_county, vehicle_make FROM parking WHERE vehicle_make like '%8%'").show()

+--------------+----------+--------------+----------------+------------+
|summons_number|issue_date|violation_code|violation_county|vehicle_make|
+--------------+----------+--------------+----------------+------------+
|    1409097596|2016-03-17|            66|              BX|        8115|
+--------------+----------+--------------+----------------+------------+



In [49]:
# check any intereting vehicle make
spark.sql("SELECT summons_number, issue_date, violation_code, violation_county, vehicle_make FROM parking WHERE vehicle_make REGEXP '^[0-9]+$'").show()

+--------------+----------+--------------+----------------+------------+
|summons_number|issue_date|violation_code|violation_county|vehicle_make|
+--------------+----------+--------------+----------------+------------+
|    1405530509|2016-03-03|            79|              NY|          95|
|    1406835997|2016-03-06|            98|               K|           1|
|    1409097596|2016-03-17|            66|              BX|        8115|
|    1405561907|2016-03-23|            46|              NY|          12|
|    4005641325|2016-03-21|             5|            null|         201|
+--------------+----------+--------------+----------------+------------+



In [50]:
# check total number of vehicle make that only consists of numbers
spark.sql("SELECT count(*) FROM parking WHERE vehicle_make REGEXP '^[0-9]+$'").show()

+--------+
|count(1)|
+--------+
|       5|
+--------+



In [51]:
# only 5 violation records have pure numberic vehicle make. We want to delete them from our database
# create a new data frame with no numberic vehicle make
from pyspark.sql.functions import col
parking_df5 = parking_df.filter(~col("vehicle_make").rlike("^[0-9]+$"))

In [52]:
parking_df5.count()

1008096

In [53]:
# after cleaning, check again
parking_df5.filter(col("vehicle_make").rlike("^[0-9]+$")).count()

0

In [54]:
# create a vew with parking_df5
parking_df5.createOrReplaceTempView("parking_cleaned")

In [55]:
parking_df5.groupBy('vehicle_make').count().orderBy('count', ascending=False).show(26)

+------------+------+
|vehicle_make| count|
+------------+------+
|        FORD|124710|
|       TOYOT|110717|
|       HONDA| 97505|
|       NISSA| 81869|
|       CHEVR| 70458|
|       FRUEH| 40756|
|       ME/BE| 35403|
|       DODGE| 34626|
|         BMW| 33989|
|        JEEP| 29899|
|       INTER| 27209|
|         GMC| 26677|
|       HYUND| 25912|
|       LEXUS| 23694|
|       ACURA| 18617|
|       CHRYS| 18071|
|       VOLKS| 17231|
|       INFIN| 15960|
|       SUBAR| 12535|
|        AUDI| 12152|
|       NS/OT| 11702|
|       ISUZU| 10989|
|       MITSU|  9955|
|        HINO|  9803|
|       MAZDA|  9728|
|         KIA|  9469|
+------------+------+
only showing top 26 rows



# 3. uniqueness

In [57]:
# show any repeated summons_number in the original database
spark.sql("SELECT summons_number, count(summons_number) AS count FROM parking GROUP BY summons_number ORDER BY count DESC").show()

+--------------+-----+
|summons_number|count|
+--------------+-----+
|    1398889878|    1|
|    1398890285|    1|
|    1382344934|    1|
|    1399191962|    1|
|    1399275999|    1|
|    1388230318|    1|
|    1399310215|    1|
|    1399361193|    1|
|    1388885270|    1|
|    1119098361|    1|
|    1398805774|    1|
|    1375895953|    1|
|    1401135250|    1|
|    1399622936|    1|
|    1400539936|    1|
|    1400543228|    1|
|    1399919192|    1|
|    1400213812|    1|
|    1402141671|    1|
|    1403014760|    1|
+--------------+-----+
only showing top 20 rows



In [58]:
# check if there are any repeated summons_number
spark.sql("SELECT summons_number, count(*) AS count FROM parking GROUP BY summons_number HAVING count > 1").show()

                                                                                

+--------------+-----+
|summons_number|count|
+--------------+-----+
+--------------+-----+



In [59]:
# Clearly, the summons_number is unique

# 4. functional dependency

In [60]:
# functional dependency:
# summons_number --> summons_number, issue_date, violation_code, violation_county, violation_description, violation_location, violation_precinct, violation_time, time_first_observed, meter_number, issuer_code, issuer_command, issuer_precinct, issuing_agency, plate_id, plate_type, registration_state, street_name, vehicle_body_type, vehicle_color, vehicle_make, vehicle_year
# summons_number can uniquely determine the other attributes
spark.sql("SELECT count(DISTINCT summons_number) AS unique_num, count(*) FROM parking").show()
# the functional dependency is valid and the summons_number is the primary key

+----------+--------+
|unique_num|count(1)|
+----------+--------+
|   1014017| 1014017|
+----------+--------+



In [68]:
# It is also possible that violation_location --> violation_precinct
spark.sql("SELECT violation_county, violation_location, violation_precinct, issuer_precinct FROM parking").show()

+----------------+------------------+------------------+---------------+
|violation_county|violation_location|violation_precinct|issuer_precinct|
+----------------+------------------+------------------+---------------+
|              NY|                 1|                 1|              1|
|              BX|                45|                45|             43|
|              NY|                34|                34|              0|
|               K|                67|                67|             67|
|              NY|                14|                14|            161|
|              NY|                14|                14|             14|
|              NY|                36|                36|              0|
|              BX|                47|                47|              0|
|               K|                70|                70|             70|
|              NY|                10|                10|            420|
|              BX|                43|              

In [70]:
# check if there exists any violation record that has different violation_location than violation_precinct
spark.sql("SELECT summons_number, violation_location, violation_precinct FROM parking WHERE violation_location != violation_precinct").show()

+--------------+------------------+------------------+
|summons_number|violation_location|violation_precinct|
+--------------+------------------+------------------+
|    1399962218|                -1|                 0|
|    1405544685|                -1|                 0|
|    1408162891|                -1|                 0|
|    4617489101|                -1|                 0|
|    6020498920|                -1|                 0|
|    7495892237|                -1|                 0|
|    7604084588|                -1|                 0|
|    1399378934|                -1|                 0|
|    1404295392|                -1|                 0|
|    1405539288|                -1|                 0|
|    1405544170|                -1|                 0|
|    1405547870|                -1|                 0|
|    1407025302|                -1|                 0|
|    4005586958|                -1|                 0|
|    4616846996|                -1|                 0|
|    46171

In [71]:
spark.sql("SELECT count(violation_location) FROM parking WHERE violation_location == 1").show()
spark.sql("SELECT summons_number, violation_location, violation_precinct FROM parking WHERE violation_location != violation_precinct").show()spark.sql("SELECT count(violation_precinct) FROM parking WHERE violation_precinct == 1").show()

+-------------------------+
|count(violation_location)|
+-------------------------+
|                    29953|
+-------------------------+

+-------------------------+
|count(violation_precinct)|
+-------------------------+
|                    29953|
+-------------------------+



In [74]:
# Show that either violation_location = violation_precinct or violation_location = -1 and violation_precinct = 0
# thus, violation_location --> violation_precinct
spark.sql("SELECT summons_number, violation_location, violation_precinct FROM parking WHERE violation_location != violation_precinct AND violation_location == -1 AND violation_precinct != 0").show()

+--------------+------------------+------------------+
|summons_number|violation_location|violation_precinct|
+--------------+------------------+------------------+
+--------------+------------------+------------------+



# 5. value constraints

In [61]:
# value constraints
# we want to check if vehicle year is between 1886(first car built in history) and 2023(current year)
spark.sql("SELECT vehicle_year, count(vehicle_year) AS count FROM parking GROUP BY vehicle_year HAVING vehicle_year > 2023 OR vehicle_year < 1886 ORDER BY count desc").show()

+------------+------+
|vehicle_year| count|
+------------+------+
|           0|223110|
|        2066|    28|
|        2069|    13|
|        2062|    11|
|        2068|    11|
|        2067|     9|
|        2065|     7|
|        2025|     6|
|        2063|     6|
|        2027|     5|
|        2064|     4|
|        2061|     4|
|        2047|     2|
|        2052|     2|
|        2045|     2|
|        2058|     2|
|        2059|     2|
|        2029|     2|
|        2031|     2|
|        2032|     1|
+------------+------+
only showing top 20 rows



In [62]:
# Clearly, there are many invalid data of vehicle_year
# For vehicle year > 2023, we force them to be 2023. For vehicle year = 0, change the vehicle year to null
# change plate_type 999 to null
parking_df6 = parking_df.withColumn('vehicle_year', F.when(parking_df['vehicle_year']=='0', 'null').otherwise(parking_df['vehicle_year']))
parking_df6 = parking_df6.withColumn('vehicle_year', F.when(parking_df6['vehicle_year'] > 2023, 2023).otherwise(parking_df6['vehicle_year']))
parking_df6.groupBy('vehicle_year').count().orderBy('count', ascending=False).show(26)


+------------+------+
|vehicle_year| count|
+------------+------+
|        null|223110|
|        2015|110455|
|        2014| 83167|
|        2013| 69772|
|        2012| 50065|
|        2007| 45898|
|        2011| 42989|
|        2016| 42562|
|        2006| 42195|
|        2008| 39694|
|        2005| 37570|
|        2004| 34845|
|        2010| 34208|
|        2009| 31723|
|        2003| 29119|
|        2002| 24699|
|        2001| 20716|
|        1999| 12380|
|        1998|  9505|
|        1997|  8540|
|        1996|  4814|
|        1995|  4061|
|        1990|  1799|
|        1994|  1698|
|        1988|  1529|
|        1992|  1442|
+------------+------+
only showing top 26 rows



In [63]:
# check if there are still vehicle_year > 2023
parking_df6.filter(col("vehicle_year") > 2023).show()

+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+-----------+-----------------+-------------+------------+------------+
|summons_number|issue_date|violation_code|violation_county|violation_description|violation_location|violation_precinct|violation_time|time_first_observed|meter_number|issuer_code|issuer_command|issuer_precinct|issuing_agency|plate_id|plate_type|registration_state|street_name|vehicle_body_type|vehicle_color|vehicle_make|vehicle_year|
+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+-----------+-----------------+-------------+------------+---------

# 6. meta data

In [64]:
# meta data
# I found that there are 64 registration_state. However, there are only 50 states in the US
spark.sql("SELECT count(DISTINCT registration_state) FROM parking").show()

+----------------------------------+
|count(DISTINCT registration_state)|
+----------------------------------+
|                                64|
+----------------------------------+



In [65]:
# find out all the US states and set the registration_state to null if they are not in states array
states = ['AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA', 'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD', 'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ', 'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC', 'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY']
parking_df7 = parking_df.withColumn('registration_state', F.when(parking_df['registration_state'].isin(states), parking_df['registration_state']).otherwise('null'))


In [66]:
# 50 states + null should be 51.
parking_df7.select(col("registration_state")).distinct().count()

51

In [67]:
parking_df7.groupBy('registration_state').count().orderBy('count', ascending=False).show()

+------------------+------+
|registration_state| count|
+------------------+------+
|                NY|794106|
|                NJ| 93049|
|                PA| 25434|
|                CT| 13168|
|                FL| 12440|
|                MA|  8720|
|                IN|  8092|
|                VA|  6628|
|                MD|  5232|
|                NC|  5082|
|              null|  5079|
|                IL|  3265|
|                GA|  3188|
|                TX|  2907|
|                OH|  2233|
|                ME|  2092|
|                AZ|  1992|
|                CA|  1991|
|                SC|  1985|
|                OK|  1857|
+------------------+------+
only showing top 20 rows

