In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=8cc1544c410210d1d6ff27b3d7c00be0f51dcb31e631de55bf3f8bb9a11757f1
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("NYC Case Study").master("local").getOrCreate()
sc = spark.sparkContext

In [3]:
tickets_2017 = spark.read.format("csv").option("header", "true").load('/content/Parking_Violations_Issued_-_Fiscal_Year_2017.csv')

In [4]:
tickets_2017 = tickets_2017.select("Summons Number","Plate ID","Registration State","Issue Date","Violation Code","Vehicle Body Type","Vehicle Make","Violation Precinct","Issuer Precinct","Violation Time")


In [5]:
tickets_2017.printSchema()

root
 |-- Summons Number: string (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: string (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Violation Precinct: string (nullable = true)
 |-- Issuer Precinct: string (nullable = true)
 |-- Violation Time: string (nullable = true)



In [6]:
tickets_2017.show(5)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|    5092469481| GZH7067|                NY|07/10/2016|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|07/08/2016|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|08/23/2016|             5|             SUBN|        FORD|                 0|              0|         0233P|
|    8478629828| 66623ME|                NY|06/14/2017|            47|             REFG|

In [7]:
tickets_2017.describe().show()

+-------+--------------------+-----------------+------------------+----------+------------------+------------------+------------+------------------+-----------------+--------------+
|summary|      Summons Number|         Plate ID|Registration State|Issue Date|    Violation Code| Vehicle Body Type|Vehicle Make|Violation Precinct|  Issuer Precinct|Violation Time|
+-------+--------------------+-----------------+------------------+----------+------------------+------------------+------------+------------------+-----------------+--------------+
|  count|               48771|            48771|             48771|     48771|             48771|             48594|       48445|             48771|            48771|         48771|
|   mean| 6.858202851162699E9|1888038.675804529|              99.0|      NULL|34.832318385926065|3.9476439790575917|        NULL| 45.49937462836522|47.00225543868282|          NULL|
| stddev|2.3047056755701222E9|3616944.469201554|               0.0|      NULL|19.386938046

In [8]:
# 1. Find the total number of tickets for the year.
tickets_2017.count()

48771

In [9]:
# For using SQL, you need to create a temporary view
tickets_2017.createOrReplaceTempView("data_2017")

In [10]:
# 2. Find out the number of unique states from where the cars
# that got parking tickets came from. (Hint: Use 'Registration State')
spark.sql("SELECT count(distinct `Registration State`) as count FROM data_2017").show()

+-----+
|count|
+-----+
|   59|
+-----+



In [11]:
# Arranging the dataframe based on number of entries
spark.sql("SELECT `Registration State`, count(*) as count \
        FROM data_2017 group by `Registration State` order by `count` desc").show()

+------------------+-----+
|Registration State|count|
+------------------+-----+
|                NY|38175|
|                NJ| 4214|
|                PA| 1320|
|                CT|  658|
|                FL|  657|
|                MA|  395|
|                IN|  390|
|                VA|  347|
|                MD|  279|
|                NC|  235|
|                TX|  179|
|                99|  160|
|                GA|  160|
|                IL|  156|
|                AZ|  121|
|                CA|  114|
|                OH|  113|
|                ME|  108|
|                SC|   99|
|                MN|   92|
+------------------+-----+
only showing top 20 rows



In [12]:
#Replacing '99' by 'NY' in the dataframe
from pyspark.sql import functions as F
tickets_2017_new = tickets_2017.withColumn('Registration State',F.when(tickets_2017['Registration State']=='99','NY').otherwise(tickets_2017['Registration State']))

In [13]:
# The temporary view needs to be recreated as values have been updated in tickets_2017_new
tickets_2017_new.createOrReplaceTempView("data_2017")

In [14]:
spark.sql("SELECT `Registration State`, count(*) as count \
        FROM data_2017 group by `Registration State` order by `count` desc").show()

+------------------+-----+
|Registration State|count|
+------------------+-----+
|                NY|38335|
|                NJ| 4214|
|                PA| 1320|
|                CT|  658|
|                FL|  657|
|                MA|  395|
|                IN|  390|
|                VA|  347|
|                MD|  279|
|                NC|  235|
|                TX|  179|
|                GA|  160|
|                IL|  156|
|                AZ|  121|
|                CA|  114|
|                OH|  113|
|                ME|  108|
|                SC|   99|
|                MN|   92|
|                DE|   83|
+------------------+-----+
only showing top 20 rows



In [15]:
spark.sql('SELECT count(distinct `Registration State`) as count \
                 FROM data_2017').show()

+-----+
|count|
+-----+
|   58|
+-----+



### AGGREGATION TASKS

In [16]:
spark.sql('SELECT `Violation Code`, count(*) as count \
                    FROM data_2017 group by `Violation Code` order by `count` desc limit 5').show()

+--------------+-----+
|Violation Code|count|
+--------------+-----+
|            21| 6878|
|            36| 6169|
|            38| 4901|
|            14| 4120|
|            20| 2815|
+--------------+-----+



In [17]:
## Here, `Violation Code` will be replaced by the each of the following variables:
spark.sql('SELECT `Vehicle Body Type`, count(*) as count FROM data_2017 \
                    group by `Vehicle Body Type` order by `count` desc limit 5').show()

+-----------------+-----+
|Vehicle Body Type|count|
+-----------------+-----+
|             SUBN|16820|
|             4DSD|14073|
|              VAN| 6409|
|             DELV| 3080|
|              SDN| 1973|
+-----------------+-----+



In [18]:
## `Vehicle Make`
spark.sql('SELECT `Vehicle Make`, count(*) as count FROM data_2017 \
                    group by `Vehicle Make` order by `count` desc limit 5').show()

+------------+-----+
|Vehicle Make|count|
+------------+-----+
|        FORD| 5633|
|       TOYOT| 5441|
|       HONDA| 4871|
|       NISSA| 4268|
|       CHEVR| 3222|
+------------+-----+



In [19]:
## Here, `Violation Code` will be replaced by 'Violation Precinct' and 'Issuer Precinct'
spark.sql('SELECT `Violation Precinct`, count(*) as count FROM data_2017 \
                    group by `Violation Precinct` order by `count` desc limit 6').show()

+------------------+-----+
|Violation Precinct|count|
+------------------+-----+
|                 0| 8965|
|                19| 2373|
|                14| 1608|
|                 1| 1478|
|                18| 1400|
|               114| 1393|
+------------------+-----+



In [20]:
spark.sql('SELECT `Issuer Precinct`, count(*) as count FROM data_2017\
                    group by `Issuer Precinct` order by `count` desc limit 6').show()

+---------------+-----+
|Issuer Precinct|count|
+---------------+-----+
|              0|10394|
|             19| 2331|
|             14| 1572|
|              1| 1442|
|            114| 1371|
|             18| 1350|
+---------------+-----+



In [21]:
spark.sql("SELECT `Issuer Precinct`, `Violation Code`, count(*) as count_tickets \
                    FROM data_2017 where `Issuer Precinct` = '19'\
                    group by `Issuer Precinct`, `Violation Code` \
                    order by `count_tickets` desc limit 5").show()

+---------------+--------------+-------------+
|Issuer Precinct|Violation Code|count_tickets|
+---------------+--------------+-------------+
|             19|            46|          386|
|             19|            38|          320|
|             19|            37|          294|
|             19|            14|          271|
|             19|            21|          236|
+---------------+--------------+-------------+



In [22]:
spark.sql("SELECT `Issuer Precinct`, `Violation Code`, count(*) as count_tickets \
                    FROM data_2017 where `Issuer Precinct` = '14'\
                    group by `Issuer Precinct`, `Violation Code` \
                    order by `count_tickets` desc limit 5").show()

+---------------+--------------+-------------+
|Issuer Precinct|Violation Code|count_tickets|
+---------------+--------------+-------------+
|             14|            14|          335|
|             14|            69|          274|
|             14|            31|          173|
|             14|            47|          123|
|             14|            42|           97|
+---------------+--------------+-------------+



In [23]:
spark.sql("SELECT `Issuer Precinct`, `Violation Code`, count(*) as count_tickets \
                    FROM data_2017 where `Issuer Precinct` = '1'\
                    group by `Issuer Precinct`, `Violation Code` \
                    order by `count_tickets` desc limit 5").show()

+---------------+--------------+-------------+
|Issuer Precinct|Violation Code|count_tickets|
+---------------+--------------+-------------+
|              1|            14|          334|
|              1|            16|          175|
|              1|            20|          135|
|              1|            46|          103|
|              1|            38|           76|
+---------------+--------------+-------------+



In [24]:
## Check for missing values
spark.sql("select count(*) as count\
                 FROM data_2017 where `Violation Time` is Null").show()

+-----+
|count|
+-----+
|    0|
+-----+



In [25]:
tickets_2017_new.select('Violation Time').dropna().count()

48771

In [26]:
# Check for the operation to be performed

In [27]:
spark.sql("select `Violation Time`, if(right(`Violation Time`, 1) == 'A' or left(`Violation Time`, 2) == '12',concat(substring(`Violation Time`, 1,2),\
                    ':', substring(`Violation Time`, 3,2)), concat(int(substring(`Violation Time`, 1,2) + 12),\
                    ':', substring(`Violation Time`, 3,2))) as `Violation Time 2`\
                    from data_2017 limit 50").show()

+--------------+----------------+
|Violation Time|Violation Time 2|
+--------------+----------------+
|         0143A|           01:43|
|         0400P|           16:00|
|         0233P|           14:33|
|         1120A|           11:20|
|         0555P|           17:55|
|         0852P|           20:52|
|         0215A|           02:15|
|         0758A|           07:58|
|         1005A|           10:05|
|         0845A|           08:45|
|         0015A|           00:15|
|         0707A|           07:07|
|         1022A|           10:22|
|         1150A|           11:50|
|         0525A|           05:25|
|         0645P|           18:45|
|         1122A|           11:22|
|         0256P|           14:56|
|         1232A|           12:32|
|         1034A|           10:34|
+--------------+----------------+
only showing top 20 rows



In [28]:
# Creating a separate df with the required fields for analysis
time_violation_analysis = spark.sql("select if(right(`Violation Time`, 1) == 'A' or left(`Violation Time`, 2) == '12',\
  concat(substring(`Violation Time`, 1,2),':', substring(`Violation Time`,3,2)),\
  concat(int(substring(`Violation Time`, 1,2) + 12),':', substring(`Violation Time`, 3,2)))\
  as `Violation Time`, `Violation Code` from data_2017")

In [29]:
time_violation_analysis.show()

+--------------+--------------+
|Violation Time|Violation Code|
+--------------+--------------+
|         01:43|             7|
|         16:00|             7|
|         14:33|             5|
|         11:20|            47|
|         17:55|            69|
|         20:52|             7|
|         02:15|            40|
|         07:58|            36|
|         10:05|            36|
|         08:45|             5|
|         00:15|            78|
|         07:07|            19|
|         10:22|            36|
|         11:50|            21|
|         05:25|            40|
|         18:45|            71|
|         11:22|             7|
|         14:56|            64|
|         12:32|            20|
|         10:34|            36|
+--------------+--------------+
only showing top 20 rows



In [30]:
# For using SQL, you need to create a temporary view
time_violation_analysis.createOrReplaceTempView('time_violation_data')

In [31]:
time_violation_analysis = spark.sql('''select case
                                       when int(substring(`Violation Time`,1,2)) between 00 and 03
                                       then '00:00-03:59'
                                       when int(substring(`Violation Time`,1,2)) between 04 and 07
                                       then '04:00-07:59'
                                       when int(substring(`Violation Time`,1,2)) between 08 and 11
                                       then '08:00-11:59'
                                       when int(substring(`Violation Time`,1,2)) between 12 and 15
                                       then '12:00-15:59'
                                       when int(substring(`Violation Time`,1,2)) between 16 and 19
                                       then '16:00-19:59'
                                       else '20:00-23:59'
                                       end as bins,  `Violation Time`, `Violation Code`
                                       from time_violation_data''')

In [32]:
time_violation_analysis.show()

+-----------+--------------+--------------+
|       bins|Violation Time|Violation Code|
+-----------+--------------+--------------+
|00:00-03:59|         01:43|             7|
|16:00-19:59|         16:00|             7|
|12:00-15:59|         14:33|             5|
|08:00-11:59|         11:20|            47|
|16:00-19:59|         17:55|            69|
|20:00-23:59|         20:52|             7|
|00:00-03:59|         02:15|            40|
|04:00-07:59|         07:58|            36|
|08:00-11:59|         10:05|            36|
|08:00-11:59|         08:45|             5|
|00:00-03:59|         00:15|            78|
|04:00-07:59|         07:07|            19|
|08:00-11:59|         10:22|            36|
|08:00-11:59|         11:50|            21|
|04:00-07:59|         05:25|            40|
|16:00-19:59|         18:45|            71|
|08:00-11:59|         11:22|             7|
|12:00-15:59|         14:56|            64|
|12:00-15:59|         12:32|            20|
|08:00-11:59|         10:34|    

In [33]:
# Updating the SQL view
time_violation_analysis.createOrReplaceTempView('time_violation_data')

In [34]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '00:00-03:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

+-----------+--------------+-----+
|       bins|Violation Code|count|
+-----------+--------------+-----+
|00:00-03:59|            21|  370|
|00:00-03:59|            40|  238|
|00:00-03:59|            14|  133|
+-----------+--------------+-----+



In [35]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '04:00-07:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

+-----------+--------------+-----+
|       bins|Violation Code|count|
+-----------+--------------+-----+
|04:00-07:59|            14|  622|
|04:00-07:59|            21|  544|
|04:00-07:59|            40|  491|
+-----------+--------------+-----+



In [36]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '08:00-11:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

+-----------+--------------+-----+
|       bins|Violation Code|count|
+-----------+--------------+-----+
|08:00-11:59|            21| 5289|
|08:00-11:59|            36| 3309|
|08:00-11:59|            38| 1607|
+-----------+--------------+-----+



In [37]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '12:00-15:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

+-----------+--------------+-----+
|       bins|Violation Code|count|
+-----------+--------------+-----+
|12:00-15:59|            36| 2595|
|12:00-15:59|            38| 2115|
|12:00-15:59|            37| 1562|
+-----------+--------------+-----+



In [38]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '16:00-19:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

+-----------+--------------+-----+
|       bins|Violation Code|count|
+-----------+--------------+-----+
|16:00-19:59|            38|  937|
|16:00-19:59|            37|  683|
|16:00-19:59|            14|  654|
+-----------+--------------+-----+



In [39]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '20:00-23:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

+-----------+--------------+-----+
|       bins|Violation Code|count|
+-----------+--------------+-----+
|20:00-23:59|             7|  252|
|20:00-23:59|            38|  230|
|20:00-23:59|            14|  226|
+-----------+--------------+-----+



In [40]:
# Finding the 3 most commonly occurring violation codes
spark.sql("SELECT `Violation Code`, count(*) as `count`\
                    FROM time_violation_data\
                    group by `Violation Code`\
                    order by `count` desc limit 3").show()

+--------------+-----+
|Violation Code|count|
+--------------+-----+
|            21| 6878|
|            36| 6169|
|            38| 4901|
+--------------+-----+



In [41]:
spark.sql("SELECT `Violation Code`, bins, count(*) as `count`\
                    FROM time_violation_data where `Violation Code` = '21'\
                    group by `Violation Code`, bins\
                    order by `count` desc limit 1").show()

+--------------+-----------+-----+
|Violation Code|       bins|count|
+--------------+-----------+-----+
|            21|08:00-11:59| 5289|
+--------------+-----------+-----+



In [42]:
spark.sql("SELECT `Violation Code`, bins, count(*) as `count`\
                    FROM time_violation_data where `Violation Code` = '36'\
                    group by `Violation Code`, bins\
                    order by `count` desc limit 1").show()

+--------------+-----------+-----+
|Violation Code|       bins|count|
+--------------+-----------+-----+
|            36|08:00-11:59| 3309|
+--------------+-----------+-----+



In [43]:
spark.sql("SELECT `Violation Code`, bins, count(*) as `count`\
                    FROM time_violation_data where `Violation Code` = '38'\
                    group by `Violation Code`, bins\
                    order by `count` desc limit 1").show()

+--------------+-----------+-----+
|Violation Code|       bins|count|
+--------------+-----------+-----+
|            38|12:00-15:59| 2115|
+--------------+-----------+-----+



In [44]:
tickets_seasonality = spark.sql('''select `Violation Code`, `Issue Date`, case
                                  when month(to_date(`Issue Date`, 'yyyy-MM-dd')) between 03 and 05
                                  then 'spring'
                                  when month(to_date(`Issue Date`, 'yyyy-MM-dd')) between 06 and 08
                                  then 'summer'
                                  when month(to_date(`Issue Date`, 'yyyy-MM-dd')) between 09 and 11
                                  then 'autumn'
                                  when month(to_date(`Issue Date`, 'yyyy-MM-dd')) in (1,2,12)
                                  then 'winter'
                                  else 'unknown'
                                  end as season
                                  from data_2017''')

In [45]:
tickets_seasonality.show()

+--------------+----------+-------+
|Violation Code|Issue Date| season|
+--------------+----------+-------+
|             7|07/10/2016|unknown|
|             7|07/08/2016|unknown|
|             5|08/23/2016|unknown|
|            47|06/14/2017|unknown|
|            69|11/21/2016|unknown|
|             7|06/13/2017|unknown|
|            40|08/03/2016|unknown|
|            36|12/21/2016|unknown|
|            36|11/21/2016|unknown|
|             5|10/05/2016|unknown|
|            78|01/11/2017|unknown|
|            19|09/27/2016|unknown|
|            36|10/27/2016|unknown|
|            21|09/30/2016|unknown|
|            40|02/04/2017|unknown|
|            71|07/07/2016|unknown|
|             7|09/24/2016|unknown|
|            64|01/26/2017|unknown|
|            20|04/30/2017|unknown|
|            36|02/03/2017|unknown|
+--------------+----------+-------+
only showing top 20 rows



In [46]:
# For using SQL, you need to create a temporary view
tickets_seasonality.createOrReplaceTempView('seasonal_data')

In [47]:
spark.sql("select `season`, count(*) as no_of_tickets\
                    from seasonal_data\
                    group by `season`\
                    order by no_of_tickets desc").show()

+-------+-------------+
| season|no_of_tickets|
+-------+-------------+
|unknown|        48771|
+-------+-------------+



In [48]:
spark.sql("select `season`, count(*) as no_of_tickets\
                    from seasonal_data\
                    group by `season`\
                    order by no_of_tickets desc").show()

+-------+-------------+
| season|no_of_tickets|
+-------+-------------+
|unknown|        48771|
+-------+-------------+



In [49]:
spark.sql("select `season`, `Violation Code`, count(*) as no_of_tickets\
                    from seasonal_data where `season` = 'autumn' \
                    group by season, `Violation Code` order by no_of_tickets desc\
                    limit 3").show()

+------+--------------+-------------+
|season|Violation Code|no_of_tickets|
+------+--------------+-------------+
+------+--------------+-------------+



In [50]:
spark.sql("select `season`, `Violation Code`, count(*) as no_of_tickets\
                    from seasonal_data where `season` = 'summer' \
                    group by season, `Violation Code` order by no_of_tickets desc\
                    limit 3").show()

+------+--------------+-------------+
|season|Violation Code|no_of_tickets|
+------+--------------+-------------+
+------+--------------+-------------+



In [51]:
spark.sql("select `season`, `Violation Code`, count(*) as no_of_tickets\
                    from seasonal_data where `season` = 'winter' \
                    group by season, `Violation Code` order by no_of_tickets desc\
                    limit 3").show()

+------+--------------+-------------+
|season|Violation Code|no_of_tickets|
+------+--------------+-------------+
+------+--------------+-------------+



In [52]:
## Total occurrences of the 3 most common violation codes
spark.sql("select `Violation Code`, count(*) as `no_of_tickets`\
                    from data_2017\
                    group by `Violation Code`\
                    order by `no_of_tickets` desc\
                    limit 3").show()

+--------------+-------------+
|Violation Code|no_of_tickets|
+--------------+-------------+
|            21|         6878|
|            36|         6169|
|            38|         4901|
+--------------+-------------+



In [53]:
spark.sql('''select `Violation Code`, case
                    when `Violation Code` = 21
                    then 55 * count(*)
                    when `Violation Code` = 36
                    then 50* count(*)
                    when `Violation Code` = 38
                    then 50* count(*)
                    else '0'
                    end as `fine_amount`
                    from data_2017
                    group by `Violation Code`
                    order by `fine_amount` desc
                    limit 3''').show()

+--------------+-----------+
|Violation Code|fine_amount|
+--------------+-----------+
|            21|     378290|
|            36|     308450|
|            38|     245050|
+--------------+-----------+



In [54]:
spark.stop()