In [2]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("nyc") \
    .getOrCreate()

In [9]:
tickets_2017 = spark.read.format("csv").option("header", "true").load("/user/claastraineryubdc_corestack/data/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv")
tickets_2017

DataFrame[Summons Number: string, Plate ID: string, Registration State: string, Issue Date: string, Violation Code: string, Vehicle Body Type: string, Vehicle Make: string, Violation Precinct: string, Issuer Precinct: string, Violation Time: string]

In [10]:
tickets_2017.printSchema()

root
 |-- Summons Number: string (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: string (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Violation Precinct: string (nullable = true)
 |-- Issuer Precinct: string (nullable = true)
 |-- Violation Time: string (nullable = true)



In [11]:
tickets_2017.show(5)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|    5092469481| GZH7067|                NY|2016-07-10|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|2016-07-08|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|2016-08-23|             5|             SUBN|        FORD|                 0|              0|         0233P|
|    8478629828| 66623ME|                NY|2017-06-14|            47|             REFG|

In [12]:
tickets_2017.describe().show()

+-------+--------------------+--------+------------------+----------+------------------+------------------+------------------+------------------+-----------------+-----------------+
|summary|      Summons Number|Plate ID|Registration State|Issue Date|    Violation Code| Vehicle Body Type|      Vehicle Make|Violation Precinct|  Issuer Precinct|   Violation Time|
+-------+--------------------+--------+------------------+----------+------------------+------------------+------------------+------------------+-----------------+-----------------+
|  count|            10803028|10803028|          10803028|  10803028|          10803028|          10803028|          10803028|          10803028|         10803028|         10803028|
|   mean|  6.81744702906579E9|Infinity|              99.0|      null|34.599430455979565|3.9258887134586864| 6519.974025974026| 45.01216260848347|46.82931211508477|909.2857142857143|
| stddev|2.3202339623282275E9|     NaN|               0.0|      null|19.359868716323483|0.

In [13]:
# 1. Find the total number of tickets for the year.
tickets_2017.count()

10803028

In [16]:
# For using SQL, you need to create a temporary view
tickets_2017.createOrReplaceTempView("data_2017")

In [18]:
# 2. Find out the number of unique states from where the cars 
# that got parking tickets came from. (Hint: Use 'Registration State')
spark.sql("SELECT count(distinct `Registration State`) as count FROM data_2017").show()

+-----+
|count|
+-----+
|   67|
+-----+



In [19]:
# Arranging the dataframe based on number of entries
spark.sql("SELECT `Registration State`, count(*) as count \
        FROM data_2017 group by `Registration State` order by `count` desc").show()

+------------------+-------+
|Registration State|  count|
+------------------+-------+
|                NY|8481061|
|                NJ| 925965|
|                PA| 285419|
|                FL| 144556|
|                CT| 141088|
|                MA|  85547|
|                IN|  80749|
|                VA|  72626|
|                MD|  61800|
|                NC|  55806|
|                IL|  37329|
|                GA|  36852|
|                99|  36625|
|                TX|  36516|
|                AZ|  26426|
|                OH|  25302|
|                CA|  24260|
|                SC|  21836|
|                ME|  21574|
|                MN|  18227|
+------------------+-------+
only showing top 20 rows



In [21]:
#Replacing '99' by 'NY' in the dataframe
from pyspark.sql import functions as F
tickets_2017_new = tickets_2017.withColumn('Registration State',F.when(tickets_2017['Registration State']=='99','NY').otherwise(tickets_2017['Registration State']))

In [59]:
# The temporary view needs to be recreated as values have been updated in tickets_2017_new
tickets_2017_new.createOrReplaceTempView("data_2017")

In [23]:
spark.sql("SELECT `Registration State`, count(*) as count \
        FROM data_2017 group by `Registration State` order by `count` desc").show()

+------------------+-------+
|Registration State|  count|
+------------------+-------+
|                NY|8517686|
|                NJ| 925965|
|                PA| 285419|
|                FL| 144556|
|                CT| 141088|
|                MA|  85547|
|                IN|  80749|
|                VA|  72626|
|                MD|  61800|
|                NC|  55806|
|                IL|  37329|
|                GA|  36852|
|                TX|  36516|
|                AZ|  26426|
|                OH|  25302|
|                CA|  24260|
|                SC|  21836|
|                ME|  21574|
|                MN|  18227|
|                OK|  18165|
+------------------+-------+
only showing top 20 rows



In [25]:
spark.sql('SELECT count(distinct `Registration State`) as count \
                 FROM data_2017').show()

+-----+
|count|
+-----+
|   66|
+-----+



### AGGREGATION TASKS

In [26]:
spark.sql('SELECT `Violation Code`, count(*) as count \
                    FROM data_2017 group by `Violation Code` order by `count` desc limit 5').show()

+--------------+-------+
|Violation Code|  count|
+--------------+-------+
|            21|1528588|
|            36|1400614|
|            38|1062304|
|            14| 893498|
|            20| 618593|
+--------------+-------+



In [27]:
## Here, `Violation Code` will be replaced by the each of the following variables:
spark.sql('SELECT `Vehicle Body Type`, count(*) as count FROM data_2017 \
                    group by `Vehicle Body Type` order by `count` desc limit 5').show()

+-----------------+-------+
|Vehicle Body Type|  count|
+-----------------+-------+
|             SUBN|3719802|
|             4DSD|3082020|
|              VAN|1411970|
|             DELV| 687330|
|              SDN| 438191|
+-----------------+-------+



In [28]:
## `Vehicle Make`
spark.sql('SELECT `Vehicle Make`, count(*) as count FROM data_2017 \
                    group by `Vehicle Make` order by `count` desc limit 5').show()

+------------+-------+
|Vehicle Make|  count|
+------------+-------+
|        FORD|1280958|
|       TOYOT|1211451|
|       HONDA|1079238|
|       NISSA| 918590|
|       CHEVR| 714655|
+------------+-------+



In [29]:
## Here, `Violation Code` will be replaced by 'Violation Precinct' and 'Issuer Precinct'
spark.sql('SELECT `Violation Precinct`, count(*) as count FROM data_2017 \
                    group by `Violation Precinct` order by `count` desc limit 6').show()

+------------------+-------+
|Violation Precinct|  count|
+------------------+-------+
|                 0|2072400|
|                19| 535671|
|                14| 352450|
|                 1| 331810|
|                18| 306920|
|               114| 296514|
+------------------+-------+



In [30]:
spark.sql('SELECT `Issuer Precinct`, count(*) as count FROM data_2017\
                    group by `Issuer Precinct` order by `count` desc limit 6').show()

+---------------+-------+
|Issuer Precinct|  count|
+---------------+-------+
|              0|2388479|
|             19| 521513|
|             14| 344977|
|              1| 321170|
|             18| 296553|
|            114| 289950|
+---------------+-------+



In [32]:
spark.sql("SELECT `Issuer Precinct`, `Violation Code`, count(*) as count_tickets \
                    FROM data_2017 where `Issuer Precinct` = '19'\
                    group by `Issuer Precinct`, `Violation Code` \
                    order by `count_tickets` desc limit 5").show()

+---------------+--------------+-------------+
|Issuer Precinct|Violation Code|count_tickets|
+---------------+--------------+-------------+
|             19|            46|        86390|
|             19|            37|        72437|
|             19|            38|        72344|
|             19|            14|        57563|
|             19|            21|        54700|
+---------------+--------------+-------------+



In [33]:
spark.sql("SELECT `Issuer Precinct`, `Violation Code`, count(*) as count_tickets \
                    FROM data_2017 where `Issuer Precinct` = '14'\
                    group by `Issuer Precinct`, `Violation Code` \
                    order by `count_tickets` desc limit 5").show()

+---------------+--------------+-------------+
|Issuer Precinct|Violation Code|count_tickets|
+---------------+--------------+-------------+
|             14|            14|        73837|
|             14|            69|        58026|
|             14|            31|        39857|
|             14|            47|        30540|
|             14|            42|        20663|
+---------------+--------------+-------------+



In [34]:
spark.sql("SELECT `Issuer Precinct`, `Violation Code`, count(*) as count_tickets \
                    FROM data_2017 where `Issuer Precinct` = '1'\
                    group by `Issuer Precinct`, `Violation Code` \
                    order by `count_tickets` desc limit 5").show()

+---------------+--------------+-------------+
|Issuer Precinct|Violation Code|count_tickets|
+---------------+--------------+-------------+
|              1|            14|        73522|
|              1|            16|        38937|
|              1|            20|        27841|
|              1|            46|        22534|
|              1|            38|        16989|
+---------------+--------------+-------------+



In [35]:
## Check for missing values
spark.sql("select count(*) as count\
                 FROM data_2017 where `Violation Time` is Null").show()

+-----+
|count|
+-----+
|    0|
+-----+



In [36]:
tickets_2017_new.select('Violation Time').dropna().count()

10803028

In [None]:
# Check for the operation to be performed

In [39]:
spark.sql("select `Violation Time`, if(right(`Violation Time`, 1) == 'A' or left(`Violation Time`, 2) == '12',concat(substring(`Violation Time`, 1,2),\
                    ':', substring(`Violation Time`, 3,2)), concat(int(substring(`Violation Time`, 1,2) + 12),\
                    ':', substring(`Violation Time`, 3,2))) as `Violation Time 2`\
                    from data_2017 limit 50").show()

+--------------+----------------+
|Violation Time|Violation Time 2|
+--------------+----------------+
|         0143A|           01:43|
|         0400P|           16:00|
|         0233P|           14:33|
|         1120A|           11:20|
|         0555P|           17:55|
|         0852P|           20:52|
|         0215A|           02:15|
|         0758A|           07:58|
|         1005A|           10:05|
|         0845A|           08:45|
|         0015A|           00:15|
|         0707A|           07:07|
|         1022A|           10:22|
|         1150A|           11:50|
|         0525A|           05:25|
|         0645P|           18:45|
|         1122A|           11:22|
|         0256P|           14:56|
|         1232A|           12:32|
|         1034A|           10:34|
+--------------+----------------+
only showing top 20 rows



In [41]:
# Creating a separate df with the required fields for analysis
time_violation_analysis = spark.sql("select if(right(`Violation Time`, 1) == 'A' or left(`Violation Time`, 2) == '12',\
  concat(substring(`Violation Time`, 1,2),':', substring(`Violation Time`,3,2)),\
  concat(int(substring(`Violation Time`, 1,2) + 12),':', substring(`Violation Time`, 3,2)))\
  as `Violation Time`, `Violation Code` from data_2017")

In [43]:
time_violation_analysis.show()

+--------------+--------------+
|Violation Time|Violation Code|
+--------------+--------------+
|         01:43|             7|
|         16:00|             7|
|         14:33|             5|
|         11:20|            47|
|         17:55|            69|
|         20:52|             7|
|         02:15|            40|
|         07:58|            36|
|         10:05|            36|
|         08:45|             5|
|         00:15|            78|
|         07:07|            19|
|         10:22|            36|
|         11:50|            21|
|         05:25|            40|
|         18:45|            71|
|         11:22|             7|
|         14:56|            64|
|         12:32|            20|
|         10:34|            36|
+--------------+--------------+
only showing top 20 rows



In [44]:
# For using SQL, you need to create a temporary view
time_violation_analysis.createOrReplaceTempView('time_violation_data')

In [45]:
time_violation_analysis = spark.sql('''select case
                                       when int(substring(`Violation Time`,1,2)) between 00 and 03
                                       then '00:00-03:59'
                                       when int(substring(`Violation Time`,1,2)) between 04 and 07
                                       then '04:00-07:59'
                                       when int(substring(`Violation Time`,1,2)) between 08 and 11
                                       then '08:00-11:59'
                                       when int(substring(`Violation Time`,1,2)) between 12 and 15
                                       then '12:00-15:59'
                                       when int(substring(`Violation Time`,1,2)) between 16 and 19
                                       then '16:00-19:59'
                                       else '20:00-23:59'
                                       end as bins,  `Violation Time`, `Violation Code`
                                       from time_violation_data''')

In [46]:
time_violation_analysis.show()

+-----------+--------------+--------------+
|       bins|Violation Time|Violation Code|
+-----------+--------------+--------------+
|00:00-03:59|         01:43|             7|
|16:00-19:59|         16:00|             7|
|12:00-15:59|         14:33|             5|
|08:00-11:59|         11:20|            47|
|16:00-19:59|         17:55|            69|
|20:00-23:59|         20:52|             7|
|00:00-03:59|         02:15|            40|
|04:00-07:59|         07:58|            36|
|08:00-11:59|         10:05|            36|
|08:00-11:59|         08:45|             5|
|00:00-03:59|         00:15|            78|
|04:00-07:59|         07:07|            19|
|08:00-11:59|         10:22|            36|
|08:00-11:59|         11:50|            21|
|04:00-07:59|         05:25|            40|
|16:00-19:59|         18:45|            71|
|08:00-11:59|         11:22|             7|
|12:00-15:59|         14:56|            64|
|12:00-15:59|         12:32|            20|
|08:00-11:59|         10:34|    

In [47]:
# Updating the SQL view
time_violation_analysis.createOrReplaceTempView('time_violation_data')

In [48]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '00:00-03:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

+-----------+--------------+-----+
|       bins|Violation Code|count|
+-----------+--------------+-----+
|00:00-03:59|            21|73160|
|00:00-03:59|            40|45960|
|00:00-03:59|            14|29312|
+-----------+--------------+-----+



In [49]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '04:00-07:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

+-----------+--------------+------+
|       bins|Violation Code| count|
+-----------+--------------+------+
|04:00-07:59|            14|141276|
|04:00-07:59|            21|119469|
|04:00-07:59|            40|112186|
+-----------+--------------+------+



In [50]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '08:00-11:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

+-----------+--------------+-------+
|       bins|Violation Code|  count|
+-----------+--------------+-------+
|08:00-11:59|            21|1182689|
|08:00-11:59|            36| 751422|
|08:00-11:59|            38| 346518|
+-----------+--------------+-------+



In [51]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '12:00-15:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

+-----------+--------------+------+
|       bins|Violation Code| count|
+-----------+--------------+------+
|12:00-15:59|            36|588395|
|12:00-15:59|            38|462859|
|12:00-15:59|            37|337096|
+-----------+--------------+------+



In [52]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '16:00-19:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

+-----------+--------------+------+
|       bins|Violation Code| count|
+-----------+--------------+------+
|16:00-19:59|            38|203232|
|16:00-19:59|            37|145784|
|16:00-19:59|            14|144749|
+-----------+--------------+------+



In [53]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '20:00-23:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

+-----------+--------------+-----+
|       bins|Violation Code|count|
+-----------+--------------+-----+
|20:00-23:59|             7|65593|
|20:00-23:59|            38|47032|
|20:00-23:59|            14|44787|
+-----------+--------------+-----+



In [54]:
# Finding the 3 most commonly occurring violation codes
spark.sql("SELECT `Violation Code`, count(*) as `count`\
                    FROM time_violation_data\
                    group by `Violation Code`\
                    order by `count` desc limit 3").show()

+--------------+-------+
|Violation Code|  count|
+--------------+-------+
|            21|1528588|
|            36|1400614|
|            38|1062304|
+--------------+-------+



In [55]:
spark.sql("SELECT `Violation Code`, bins, count(*) as `count`\
                    FROM time_violation_data where `Violation Code` = '21'\
                    group by `Violation Code`, bins\
                    order by `count` desc limit 1").show()

+--------------+-----------+-------+
|Violation Code|       bins|  count|
+--------------+-----------+-------+
|            21|08:00-11:59|1182689|
+--------------+-----------+-------+



In [56]:
spark.sql("SELECT `Violation Code`, bins, count(*) as `count`\
                    FROM time_violation_data where `Violation Code` = '36'\
                    group by `Violation Code`, bins\
                    order by `count` desc limit 1").show()

+--------------+-----------+------+
|Violation Code|       bins| count|
+--------------+-----------+------+
|            36|08:00-11:59|751422|
+--------------+-----------+------+



In [57]:
spark.sql("SELECT `Violation Code`, bins, count(*) as `count`\
                    FROM time_violation_data where `Violation Code` = '38'\
                    group by `Violation Code`, bins\
                    order by `count` desc limit 1").show()

+--------------+-----------+------+
|Violation Code|       bins| count|
+--------------+-----------+------+
|            38|12:00-15:59|462859|
+--------------+-----------+------+



In [66]:
tickets_seasonality = spark.sql('''select `Violation Code`, `Issue Date`, case
                                  when month(to_date(`Issue Date`, 'yyyy-MM-dd')) between 03 and 05
                                  then 'spring'
                                  when month(to_date(`Issue Date`, 'yyyy-MM-dd')) between 06 and 08
                                  then 'summer'
                                  when month(to_date(`Issue Date`, 'yyyy-MM-dd')) between 09 and 11
                                  then 'autumn'
                                  when month(to_date(`Issue Date`, 'yyyy-MM-dd')) in (1,2,12)
                                  then 'winter'
                                  else 'unknown'
                                  end as season
                                  from data_2017''')

In [67]:
tickets_seasonality.show()

+--------------+----------+------+
|Violation Code|Issue Date|season|
+--------------+----------+------+
|             7|2016-07-10|summer|
|             7|2016-07-08|summer|
|             5|2016-08-23|summer|
|            47|2017-06-14|summer|
|            69|2016-11-21|autumn|
|             7|2017-06-13|summer|
|            40|2016-08-03|summer|
|            36|2016-12-21|winter|
|            36|2016-11-21|autumn|
|             5|2016-10-05|autumn|
|            78|2017-01-11|winter|
|            19|2016-09-27|autumn|
|            36|2016-10-27|autumn|
|            21|2016-09-30|autumn|
|            40|2017-02-04|winter|
|            71|2016-07-07|summer|
|             7|2016-09-24|autumn|
|            64|2017-01-26|winter|
|            20|2017-04-30|spring|
|            36|2017-02-03|winter|
+--------------+----------+------+
only showing top 20 rows



In [68]:
# For using SQL, you need to create a temporary view
tickets_seasonality.createOrReplaceTempView('seasonal_data')

In [69]:
spark.sql("select `season`, count(*) as no_of_tickets\
                    from seasonal_data\
                    group by `season`\
                    order by no_of_tickets desc").show()

+------+-------------+
|season|no_of_tickets|
+------+-------------+
|spring|      2880687|
|autumn|      2830802|
|summer|      2606208|
|winter|      2485331|
+------+-------------+



In [70]:
spark.sql("select `season`, count(*) as no_of_tickets\
                    from seasonal_data\
                    group by `season`\
                    order by no_of_tickets desc").show()

+------+-------------+
|season|no_of_tickets|
+------+-------------+
|spring|      2880687|
|autumn|      2830802|
|summer|      2606208|
|winter|      2485331|
+------+-------------+



In [71]:
spark.sql("select `season`, `Violation Code`, count(*) as no_of_tickets\
                    from seasonal_data where `season` = 'autumn' \
                    group by season, `Violation Code` order by no_of_tickets desc\
                    limit 3").show()

+------+--------------+-------------+
|season|Violation Code|no_of_tickets|
+------+--------------+-------------+
|autumn|            36|       456046|
|autumn|            21|       357479|
|autumn|            38|       283828|
+------+--------------+-------------+



In [72]:
spark.sql("select `season`, `Violation Code`, count(*) as no_of_tickets\
                    from seasonal_data where `season` = 'summer' \
                    group by season, `Violation Code` order by no_of_tickets desc\
                    limit 3").show()

+------+--------------+-------------+
|season|Violation Code|no_of_tickets|
+------+--------------+-------------+
|summer|            21|       405961|
|summer|            38|       247561|
|summer|            36|       240396|
+------+--------------+-------------+



In [73]:
spark.sql("select `season`, `Violation Code`, count(*) as no_of_tickets\
                    from seasonal_data where `season` = 'winter' \
                    group by season, `Violation Code` order by no_of_tickets desc\
                    limit 3").show()

+------+--------------+-------------+
|season|Violation Code|no_of_tickets|
+------+--------------+-------------+
|winter|            21|       362341|
|winter|            36|       359338|
|winter|            38|       259723|
+------+--------------+-------------+



In [74]:
## Total occurrences of the 3 most common violation codes
spark.sql("select `Violation Code`, count(*) as `no_of_tickets`\
                    from data_2017\
                    group by `Violation Code`\
                    order by `no_of_tickets` desc\
                    limit 3").show()

+--------------+-------------+
|Violation Code|no_of_tickets|
+--------------+-------------+
|            21|      1528588|
|            36|      1400614|
|            38|      1062304|
+--------------+-------------+



In [75]:
spark.sql('''select `Violation Code`, case
                    when `Violation Code` = 21
                    then 55 * count(*)
                    when `Violation Code` = 36
                    then 50* count(*)
                    when `Violation Code` = 38
                    then 50* count(*)
                    else '0'
                    end as `fine_amount`
                    from data_2017
                    group by `Violation Code`
                    order by `fine_amount` desc
                    limit 3''').show()

+--------------+-----------+
|Violation Code|fine_amount|
+--------------+-----------+
|            21|   84072340|
|            36|   70030700|
|            38|   53115200|
+--------------+-----------+



In [76]:
spark.stop()