In [2]:
from pyspark import SparkContext, SparkConf
cf = SparkConf()
cf.set("spark.submit.deployMode","client")
sc = SparkContext.getOrCreate(cf)
from pyspark.sql import SparkSession
spark = SparkSession \
	    .builder \
	    .appName("Python Spark SQL basic example") \
	    .config("spark.some.config.option", "some-value") \
	    .getOrCreate()
                            

Py4JError: org.apache.spark.api.python.PythonUtils.isEncryptionEnabled does not exist in the JVM

In [3]:
# attributes: summons_number,issue_date,violation_code,violation_county,violation_description,
#violation_location,violation_precinct,violation_time,time_first_observed,meter_number,issuer_code,
#issuer_command,issuer_precinct,issuing_agency,plate_id,plate_type,registration_state,street_name,
#vehicle_body_type,vehicle_color,vehicle_make,vehicle_year
parking_df = spark.read.csv(path='/shared/CS-GY-6513/parking-violations/parking-violations-header.csv',header=True)

                                                                                

In [4]:
# check what the data looks like
parking_df.take(5)

[Row(summons_number='1307964308', issue_date='2016-03-07', violation_code='14', violation_county='NY', violation_description=None, violation_location='1', violation_precinct='1', violation_time='1040P', time_first_observed=None, meter_number='-', issuer_code='160307', issuer_command='0001', issuer_precinct='1', issuing_agency='K', plate_id='GBH2444', plate_type='PAS', registration_state='NY', street_name='N/S WARREN ST', vehicle_body_type='SDN', vehicle_color='BLACK', vehicle_make='HONDA', vehicle_year='2008'),
 Row(summons_number='1362655727', issue_date='2016-03-02', violation_code='98', violation_county='BX', violation_description=None, violation_location='45', violation_precinct='45', violation_time='0910P', time_first_observed=None, meter_number='-', issuer_code='945115', issuer_command='0043', issuer_precinct='43', issuing_agency='X', plate_id='GKZ2313', plate_type='PAS', registration_state='NY', street_name='PHILPS', vehicle_body_type='SUBN', vehicle_color='WHITE', vehicle_make=

In [7]:
# remember we can also use the SparkSQL interface
parking_df.createOrReplaceTempView("parking")

In [8]:
spark.sql("SELECT plate_type FROM parking").show(5)
#spark.sql("SELECT * FROM parking").show(5)

+----------+
|plate_type|
+----------+
|       PAS|
|       PAS|
|       COM|
|       PAS|
|       COM|
+----------+
only showing top 5 rows



In [9]:
# Checking for Duplicates
# Values in the summons_number column must be unique - otherwise we have a key 
# constraint violation
parking_df.count()

                                                                                

1014017

In [10]:
# are the keys unique?
parking_df.select('summons_number').distinct().count()

                                                                                

1014017

In [11]:
# let's examine other attributes
# Viewing Range of Values in a Column
plate_type = spark.sql("SELECT DISTINCT plate_type FROM parking")
plate_type.show()



+----------+
|plate_type|
+----------+
|       CCK|
|       CLG|
|       NYA|
|       SOS|
|       SPC|
|       SUP|
|       OMO|
|       LMB|
|       APP|
|       RGL|
|       CHC|
|       BOT|
|       FAR|
|       STA|
|       COM|
|       RGC|
|       TRC|
|       AMB|
|       HAM|
|       NYS|
+----------+
only showing top 20 rows



In [12]:
# How many distinct values are there
spark.sql("SELECT count(DISTINCT plate_type) FROM parking").show()
# parking_df.select('plate_type').distinct().count()

+--------------------------+
|count(DISTINCT plate_type)|
+--------------------------+
|                        75|
+--------------------------+



In [13]:
plate_type.take(26)

[Row(plate_type='CCK'),
 Row(plate_type='CLG'),
 Row(plate_type='NYA'),
 Row(plate_type='SOS'),
 Row(plate_type='SPC'),
 Row(plate_type='SUP'),
 Row(plate_type='OMO'),
 Row(plate_type='LMB'),
 Row(plate_type='APP'),
 Row(plate_type='RGL'),
 Row(plate_type='CHC'),
 Row(plate_type='BOT'),
 Row(plate_type='FAR'),
 Row(plate_type='STA'),
 Row(plate_type='COM'),
 Row(plate_type='RGC'),
 Row(plate_type='TRC'),
 Row(plate_type='AMB'),
 Row(plate_type='HAM'),
 Row(plate_type='NYS'),
 Row(plate_type='BOB'),
 Row(plate_type='MCD'),
 Row(plate_type='CMH'),
 Row(plate_type='ORG'),
 Row(plate_type='IRP'),
 Row(plate_type='999')]

In [14]:
# What is the frequency distribution of the different plate types? i.e., plate type values and their counts

spark.sql("SELECT plate_type,count(*) AS COUNT from parking group by plate_type \
order BY COUNT DESC").show()

# note how NULL values are represented in this dataset!

+----------+------+
|plate_type| COUNT|
+----------+------+
|       PAS|740554|
|       COM|190147|
|       OMT| 35480|
|       OMS|  9032|
|       SRF|  8341|
|       IRP|  5291|
|       999|  4467|
|       TRC|  2784|
|       OMR|  2158|
|       APP|  1952|
|       MOT|  1851|
|       ORG|  1591|
|       CMB|  1368|
|       MED|  1211|
|       OML|  1181|
|       PSD|   900|
|       SPO|   823|
|       SCL|   700|
|       TOW|   611|
|       RGL|   524|
+----------+------+
only showing top 20 rows



In [15]:
# change plate_type 999 to null
from pyspark.sql import functions as F
# create new dataframe with replaced values
parking_df2 = parking_df.withColumn('plate_type', F.when(parking_df['plate_type']=='999', 'null').otherwise(parking_df['plate_type']))
parking_df2.groupBy('plate_type').count().orderBy('count', ascending=False).show(26)


+----------+------+
|plate_type| count|
+----------+------+
|       PAS|740554|
|       COM|190147|
|       OMT| 35480|
|       OMS|  9032|
|       SRF|  8341|
|       IRP|  5291|
|      null|  4467|
|       TRC|  2784|
|       OMR|  2158|
|       APP|  1952|
|       MOT|  1851|
|       ORG|  1591|
|       CMB|  1368|
|       MED|  1211|
|       OML|  1181|
|       PSD|   900|
|       SPO|   823|
|       SCL|   700|
|       TOW|   611|
|       RGL|   524|
|       VAS|   427|
|       SRN|   348|
|       DLR|   333|
|       TRA|   318|
|       ITP|   283|
|       TRL|   223|
+----------+------+
only showing top 26 rows





In [16]:
# if your analysis require a plate_type, you may want to
# remove all rows where plate_type=999  (or null)
parking_df3 = parking_df.filter(parking_df['plate_type']!='999')
parking_df3.groupBy('plate_type').count().orderBy('count', ascending=False).show(26)
parking_df3.select('plate_type').distinct().count()



+----------+------+
|plate_type| count|
+----------+------+
|       PAS|740554|
|       COM|190147|
|       OMT| 35480|
|       OMS|  9032|
|       SRF|  8341|
|       IRP|  5291|
|       TRC|  2784|
|       OMR|  2158|
|       APP|  1952|
|       MOT|  1851|
|       ORG|  1591|
|       CMB|  1368|
|       MED|  1211|
|       OML|  1181|
|       PSD|   900|
|       SPO|   823|
|       SCL|   700|
|       TOW|   611|
|       RGL|   524|
|       VAS|   427|
|       SRN|   348|
|       DLR|   333|
|       TRA|   318|
|       ITP|   283|
|       TRL|   223|
|       CMH|   203|
+----------+------+
only showing top 26 rows



74

In [17]:
#Filtering Rows with Blank Entries
# exclude rows that have a blank entry in the violation_county column.
parking_df.filter( parking_df["violation_county"].isNull()).count()

192974

In [18]:
parking_df4 = parking_df.filter(parking_df['violation_county'].isNotNull())
parking_df4.count()


821043

In [19]:
# “Clustering” helps detect entries in a column that are close together (and thus represent the same value

plate_id_rdd = parking_df.select('plate_id').rdd.flatMap(list)
print('Number of rows:' + str(plate_id_rdd.count()))
plate_id_rdd.take(10)

# a plate number may be represented in different ways, e.g.,
# ('xbgv20', ['XBGV20', 'XBG.V20']),
# ('ap717y', ['AP7!17Y', 'AP717Y']
# this may be a problem if you want to find out, e.g., how many violations there are 
# for each plate number



Number of rows:1014017


                                                                                

['GBH2444',
 'GKZ2313',
 'N346594',
 'GDP2624',
 '42555JU',
 '62636MD',
 'DPE3045',
 'FMW7832',
 'DSD2130',
 '65111MB']

In [20]:
spark.sql("SELECT * from parking where plate_id LIKE 'XBG.V20'").show()

+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+-----------+-----------------+-------------+------------+------------+
|summons_number|issue_date|violation_code|violation_county|violation_description|violation_location|violation_precinct|violation_time|time_first_observed|meter_number|issuer_code|issuer_command|issuer_precinct|issuing_agency|plate_id|plate_type|registration_state|street_name|vehicle_body_type|vehicle_color|vehicle_make|vehicle_year|
+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+-----------+-----------------+-------------+------------+---------

In [21]:
import string, unicodedata
def fingerprint(value):
    key = unicodedata.normalize('NFKD', value).encode('ascii','ignore').decode()
    key = set(key.strip().lower().translate(str.maketrans('','',string.punctuation)).split())
    key = ' '.join(sorted(list(key)))
    return (key, value)

In [22]:
# let's test the fingerprint function
fingerprint('XBG.V20')


('xbgv20', 'XBG.V20')

In [24]:
plate_id_rdd. \
	distinct().map(fingerprint). \
	groupByKey(). \
	filter(lambda x: len(x[1])>1). \
	mapValues(list). \
	collect()



                                                                                

[('xbgv20', ['XBGV20', 'XBG.V20']),
 ('ap717y', ['AP7!17Y', 'AP717Y']),
 ('2970cp', ['2970.CP', '2970CP']),
 ('849rzb', ['849RZB.', '849RZB']),
 ('ab73725', ['AB.73725', 'AB73725']),
 ('47879mg', ['47879MG', '47879MG.']),
 ('12224mg', ['12224MG', '1222]4MG']),
 ('jhd0328', ['JHD0328.', 'JHD0328']),
 ('88cs02', ['88CS02', '88C.S02']),
 ('64582md', ['64582MD.', '64582MD']),
 ('l21687', ['L21687', 'L.21687']),
 ('na', ['N/A', 'NA']),
 ('u57afu', ['U57AFU', 'U57.AFU']),
 ('zxf293', ['ZXF293', 'ZXF293+']),
 ('6786cx', ['6786.CX', '6786CX']),
 ('zgk7779', ['ZGK7779', 'ZGK.7779']),
 ('jnp981', ['JNP981', 'JNP981&']),
 ('41690ja', ['41690JA', '41690JA+']),
 ('xt549k', ['XT.549K', 'XT549K']),
 ('hkv4504', ['HKV4504', 'HKV!4504']),
 ('7', ['7!', '7']),
 ('l08275', ['L08(275', 'L08275']),
 ('ete3059', ['ETE3059+', 'ETE3059']),
 ('l21741', ['L.21741', 'L21741']),
 ('jcw0303', ['JCW0303', 'JCW0303`']),
 ('aj511c', ['AJ511C', 'AJ.511C']),
 ('hcv1327', ['HCV1327', 'HCV!1327']),
 ('k90404', ['K90404',

In [25]:
# To determine whether a cluster should be merged or not, you can look more closely 
# at the data.
# Check the other attributes to determine if the two ids correspond to the same entity
parking_df.where((parking_df.plate_id == '2970CP') | \
	(parking_df.plate_id == '2970.CP')). \
	show()


+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+---------------+-----------------+-------------+------------+------------+
|summons_number|issue_date|violation_code|violation_county|violation_description|violation_location|violation_precinct|violation_time|time_first_observed|meter_number|issuer_code|issuer_command|issuer_precinct|issuing_agency|plate_id|plate_type|registration_state|    street_name|vehicle_body_type|vehicle_color|vehicle_make|vehicle_year|
+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+---------------+-----------------+-------------+----------



In [26]:
parking_df.where((parking_df.plate_id == 'L21687') | \
	(parking_df.plate_id == 'L.21687')). \
	show()

# By looking at other attributes such as vehicle_body_type  (DELV vs Van) and vehicle_make (HINO, NS/OT), 
# it seems that these are not the same (maybe not an error).



+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+-----------+-----------------+-------------+------------+------------+
|summons_number|issue_date|violation_code|violation_county|violation_description|violation_location|violation_precinct|violation_time|time_first_observed|meter_number|issuer_code|issuer_command|issuer_precinct|issuing_agency|plate_id|plate_type|registration_state|street_name|vehicle_body_type|vehicle_color|vehicle_make|vehicle_year|
+--------------+----------+--------------+----------------+---------------------+------------------+------------------+--------------+-------------------+------------+-----------+--------------+---------------+--------------+--------+----------+------------------+-----------+-----------------+-------------+------------+---------

In [27]:
# Making Text Consistent 
# clustering on the street_name column
street_name_rdd = parking_df.select('street_name').distinct().rdd.flatMap(list).filter(lambda x: x!=None)
street_name_rdd.count()

# lots of distinct street names

22893

In [49]:
street_name_rdd.take(5)

['COURT SQUARE', 'E 31 ST', 'RANDALLS ISLAND', 'E 45TH STREET', 'SHERIDAN AVE']

In [48]:
# let's use the fingerprint function to make the street names more consistent
stname_clusters = street_name_rdd.map(fingerprint). \
	groupByKey(). \
	filter(lambda x: len(x[1])>1). \
	mapValues(list)


# after applying the fingerprint function, we go from 22,893 distinct street names to 2,756
stname_clusters.count()


2756

In [29]:
stname_clusters.take(5)



[('fernside pl', ['FERNSIDE PL', 'Fernside Pl']),
 ('astor pl', ['Astor Pl', 'ASTOR PL']),
 ('bedford blvd park', ['Bedford Park Blvd', 'BEDFORD PARK BLVD']),
 ('44th e st', ['E 44th St', 'E 44TH ST']),
 ('223rd pl', ['223rd Pl', '223RD PL'])]

In [72]:
# Now normalize the actual data
stname_normalized = street_name_rdd.map(fingerprint).map(lambda x: x[0])
print("Number of normalized streets:" + str(stname_normalized.distinct().count()))


Number of normalized streets:19949


In [73]:
stname_normalized.take(5)

['court square', '31 e st', 'island randalls', '45th e street', 'ave sheridan']

In [74]:
# filter streets that contain "st"
stname_normalized.filter(lambda x: x if "st" in x else "").take(5)


['31 e st', '45th e street', '87 nb st', '110 st w', '129 co st']

In [None]:
# try to cluster the vehicle_color column. This clustering method is not so effective for this column. 
# You could try to use k-nearest neighbors algorithm 
# (see https://github.com/OpenRefine/OpenRefine/wiki/Clustering-In-Depth ) 
# Remember, different similarity functions can be used depending on the data 