In [1]:
import findspark as fs
fs.init()
fs.find()
from pyspark.sql import SparkSession as ss
from pyspark.context import SparkContext
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import functions as f
import re
from pyspark.sql.functions import coalesce, to_date, col, regexp_replace, split, when, lit, translate, current_date, year,length, approx_count_distinct, to_timestamp, substring

In [2]:
from pyspark.sql.functions import trim,lower,desc,asc

In [3]:
spark = ss.builder.master("local").appName('MyApp').getOrCreate()

# Read pedestrian-counting-system-sensor-locations.csv file

In [4]:
df_pd_cnt_loctn = spark.read.option("header",True).csv("pedestrian-counting-system-sensor-locations.csv")

In [5]:
# Print 5 rows of the dataframe created
df_pd_cnt_loctn.show(5,truncate = False)

+-----------+---------------------------------+-----------+-----------------+-------------------+-------------+------+-----------+-----------+------------+------------+--------------------------+
|location_id|Sensor_description               |sensor_name|Installation_date|Note               |location_type|Status|Direction_1|Direction_2|Latitude    |Longitude   |Location                  |
+-----------+---------------------------------+-----------+-----------------+-------------------+-------------+------+-----------+-----------+------------+------------+--------------------------+
|2          |Bourke Street Mall (South)       |Bou283_T   |2009-03-30       |null               |Outdoor      |A     |East       |West       |-37.81380668|144.96516718|-37.81380668, 144.96516718|
|4          |Town Hall (West)                 |Swa123_T   |2009-03-23       |null               |Outdoor      |A     |North      |South      |-37.81487988|144.9660878 |-37.81487988, 144.9660878 |
|6          |Flinder

In [6]:
df_pd_cnt_loctn.printSchema()

root
 |-- location_id: string (nullable = true)
 |-- Sensor_description: string (nullable = true)
 |-- sensor_name: string (nullable = true)
 |-- Installation_date: string (nullable = true)
 |-- Note: string (nullable = true)
 |-- location_type: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Direction_1: string (nullable = true)
 |-- Direction_2: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Location: string (nullable = true)



# summary of the DataFrame's statistics

In [7]:
df_pd_cnt_loctn.describe().show(truncate = False, vertical=True)

-RECORD 0------------------------------------------------------------
 summary            | count                                          
 location_id        | 108                                            
 Sensor_description | 108                                            
 sensor_name        | 108                                            
 Installation_date  | 107                                            
 Note               | 7                                              
 location_type      | 108                                            
 Status             | 108                                            
 Direction_1        | 100                                            
 Direction_2        | 100                                            
 Latitude           | 108                                            
 Longitude          | 108                                            
 Location           | 108                                            
-RECORD 1-----------

# Total Records

In [8]:
count = df_pd_cnt_loctn.count()

# print the total count of records
print(f"Total count of records: {count}")


Total count of records: 108


# Check Nullability on all columns

In [9]:
null_columns = [col for col in df_pd_cnt_loctn.columns if df_pd_cnt_loctn.filter(f"{col} is null").count() > 0]

# print the null columns
print(null_columns)

['Installation_date', 'Note', 'Direction_1', 'Direction_2']


# Check distinct values

In [10]:
unique_counts = {}
for col_name in df_pd_cnt_loctn.columns:
    unique_count = df_pd_cnt_loctn.select(col_name).distinct().count()
    unique_counts[col_name] = unique_count

# print the number of unique values for each column
for col_name, unique_count in unique_counts.items():
    print(f"Column '{col_name}' has {unique_count} unique value(s).")

Column 'location_id' has 108 unique value(s).
Column 'Sensor_description' has 106 unique value(s).
Column 'sensor_name' has 107 unique value(s).
Column 'Installation_date' has 76 unique value(s).
Column 'Note' has 8 unique value(s).
Column 'location_type' has 4 unique value(s).
Column 'Status' has 1 unique value(s).
Column 'Direction_1' has 4 unique value(s).
Column 'Direction_2' has 4 unique value(s).
Column 'Latitude' has 106 unique value(s).
Column 'Longitude' has 106 unique value(s).
Column 'Location' has 106 unique value(s).


# Create Temporary View on Data

In [11]:
df_pd_cnt_loctn.createOrReplaceTempView("pd_cnt_loctn_vw") 

In [12]:
# Running SQL on the view

stg_pd_cnt_loctn_vw_df=spark.sql("select * from pd_cnt_loctn_vw limit 10")

In [13]:
stg_pd_cnt_loctn_vw_df.show()

+-----------+--------------------+-----------+-----------------+--------------------+-------------+------+-----------+-----------+------------+------------+--------------------+
|location_id|  Sensor_description|sensor_name|Installation_date|                Note|location_type|Status|Direction_1|Direction_2|    Latitude|   Longitude|            Location|
+-----------+--------------------+-----------+-----------------+--------------------+-------------+------+-----------+-----------+------------+------------+--------------------+
|          2|Bourke Street Mal...|   Bou283_T|       2009-03-30|                null|      Outdoor|     A|       East|       West|-37.81380668|144.96516718|-37.81380668, 144...|
|          4|    Town Hall (West)|   Swa123_T|       2009-03-23|                null|      Outdoor|     A|      North|      South|-37.81487988| 144.9660878|-37.81487988, 144...|
|          6|Flinders Street S...|     FliS_T|       2009-03-25| Upgraded on 8/09/21|      Outdoor|     A|    

# Read pedestrian-counting-system-sensor-locations.csv

In [14]:
df_pd_cnt_mnthly_cnt = spark.read.option("header",True).csv("Pedestrian_Counting_System_Monthly_counts_per_hour_may_2009_to_14_dec_2022.csv")

In [15]:
# Print 5 rows of the dataframe created
df_pd_cnt_mnthly_cnt.show(5,truncate = False)

+-------+-----------------------------+----+--------+-----+------+----+---------+----------------------------+-------------+
|ID     |Date_Time                    |Year|Month   |Mdate|Day   |Time|Sensor_ID|Sensor_Name                 |Hourly_Counts|
+-------+-----------------------------+----+--------+-----+------+----+---------+----------------------------+-------------+
|2887628|November 01, 2019 05:00:00 PM|2019|November|1    |Friday|17  |34       |Flinders St-Spark La        |300          |
|2887629|November 01, 2019 05:00:00 PM|2019|November|1    |Friday|17  |39       |Alfred Place                |604          |
|2887630|November 01, 2019 05:00:00 PM|2019|November|1    |Friday|17  |37       |Lygon St (East)             |216          |
|2887631|November 01, 2019 05:00:00 PM|2019|November|1    |Friday|17  |40       |Lonsdale St-Spring St (West)|627          |
|2887632|November 01, 2019 05:00:00 PM|2019|November|1    |Friday|17  |36       |Queen St (West)             |774          |


In [16]:
df_pd_cnt_mnthly_cnt.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Date_Time: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Mdate: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Sensor_ID: string (nullable = true)
 |-- Sensor_Name: string (nullable = true)
 |-- Hourly_Counts: string (nullable = true)



# summary of the DataFrame's statistics

In [17]:
#df_pd_cnt_mnthly_cnt.describe().show(truncate = False, vertical=True)

# Total Records

In [18]:
count = df_pd_cnt_mnthly_cnt.count()

# print the total count of records
print(f"Total count of records: {count}")


Total count of records: 4562230


# Check Nullability on all columns

In [19]:
null_columns = [col for col in df_pd_cnt_mnthly_cnt.columns if df_pd_cnt_mnthly_cnt.filter(f"{col} is null").count() > 0]

# print the null columns
print(null_columns)

[]


# Check distinct values

In [20]:
unique_counts = {}
for col_name in df_pd_cnt_mnthly_cnt.columns:
    unique_count = df_pd_cnt_mnthly_cnt.select(col_name).distinct().count()
    unique_counts[col_name] = unique_count

# print the number of unique values for each column
for col_name, unique_count in unique_counts.items():
    print(f"Column '{col_name}' has {unique_count} unique value(s).")

Column 'ID' has 4562230 unique value(s).
Column 'Date_Time' has 117673 unique value(s).
Column 'Year' has 14 unique value(s).
Column 'Month' has 12 unique value(s).
Column 'Mdate' has 31 unique value(s).
Column 'Day' has 7 unique value(s).
Column 'Time' has 24 unique value(s).
Column 'Sensor_ID' has 82 unique value(s).
Column 'Sensor_Name' has 94 unique value(s).
Column 'Hourly_Counts' has 6417 unique value(s).


# Create Temporary View on Data

In [21]:
df_pd_cnt_mnthly_cnt.createOrReplaceTempView("pd_cnt_mnthly_cnt_vw") 

In [22]:
# Running SQL on the view

stg_pd_cnt_mnthly_cnt_df=spark.sql("select * from pd_cnt_mnthly_cnt_vw limit 10")
stg_pd_cnt_mnthly_cnt_df.show(truncate = False)

+-------+-----------------------------+----+--------+-----+------+----+---------+-----------------------------+-------------+
|ID     |Date_Time                    |Year|Month   |Mdate|Day   |Time|Sensor_ID|Sensor_Name                  |Hourly_Counts|
+-------+-----------------------------+----+--------+-----+------+----+---------+-----------------------------+-------------+
|2887628|November 01, 2019 05:00:00 PM|2019|November|1    |Friday|17  |34       |Flinders St-Spark La         |300          |
|2887629|November 01, 2019 05:00:00 PM|2019|November|1    |Friday|17  |39       |Alfred Place                 |604          |
|2887630|November 01, 2019 05:00:00 PM|2019|November|1    |Friday|17  |37       |Lygon St (East)              |216          |
|2887631|November 01, 2019 05:00:00 PM|2019|November|1    |Friday|17  |40       |Lonsdale St-Spring St (West) |627          |
|2887632|November 01, 2019 05:00:00 PM|2019|November|1    |Friday|17  |36       |Queen St (West)              |774    

# Check if there are any sensors which are present in monthly_count list but not in location list

After looking at data we observed that sensor id is same as location id, hence comparing the records on these attributes

In [23]:
spark.sql("""select distinct(Sensor_ID),Sensor_Name from pd_cnt_mnthly_cnt_vw where Sensor_ID not in
            (select distinct(location_id) from pd_cnt_loctn_vw) """ ).show(truncate = False)

+---------+-------------------------------------+
|Sensor_ID|Sensor_Name                          |
+---------+-------------------------------------+
|38       |Flinders St-Swanston St (West)       |
|33       |Flinders St-Spring St (West)         |
|60       |Flinders la - Swanston St (West) Temp|
|32       |City Square                          |
|13       |Flagstaff Station                    |
|16       |Australia on Collins                 |
|22       |Flinders St-Elizabeth St (East)      |
|15       |State Library                        |
|34       |Flinders St-Spark La                 |
|60       |Flinders La - Swanston St (West) Temp|
+---------+-------------------------------------+



In [24]:
# Calculate the count of those sensors

In [25]:
spark.sql("""select count(distinct(Sensor_ID)) from pd_cnt_mnthly_cnt_vw where Sensor_ID not in
            (select distinct(location_id) from pd_cnt_loctn_vw) """ ).show()

+-------------------------+
|count(DISTINCT Sensor_ID)|
+-------------------------+
|                        9|
+-------------------------+



# Check if there are any sensors which are present in location list but not in monthly_count list

In [26]:
spark.sql("""select distinct(location_id),Sensor_description from pd_cnt_loctn_vw where location_id not in
            (select distinct(Sensor_ID) from pd_cnt_mnthly_cnt_vw) """ ).show(50,truncate = False)

+-----------+--------------------------------------------------------------------------------+
|location_id|Sensor_description                                                              |
+-----------+--------------------------------------------------------------------------------+
|94         |Fitzroy Garden- The Conservatory                                                |
|92         |Library at The Dock-South side                                                  |
|88         |Micro-Labs                                                                      |
|105        |Kathleen Syme Library Lib                                                       |
|91         |Library at The Dock-North side                                                  |
|99         |Town Hall Visitor Centre                                                        |
|122        |Birrarung Marr                                                                  |
|103        |Kensington Town Hall                 

In [27]:
# calculate count

In [28]:
spark.sql("""select count(distinct(location_id))from pd_cnt_loctn_vw where location_id not in
            (select distinct(Sensor_ID) from pd_cnt_mnthly_cnt_vw) """ ).show()

+---------------------------+
|count(DISTINCT location_id)|
+---------------------------+
|                         35|
+---------------------------+



# Check what location type of sensor is installed and its count

In [29]:
spark.sql("""select distinct(location_type),count(*) from pd_cnt_loctn_vw group by location_type """ ).show(truncate = False)

+-------------+--------+
|location_type|count(1)|
+-------------+--------+
|Indoor       |21      |
|Outdoor      |79      |
|Indoor Blix  |1       |
|Outdoor Blix |7       |
+-------------+--------+



# Check what location type of sensor information is available in monthly count


In [30]:
spark.sql("""select distinct(location_type),count(*) from pd_cnt_loctn_vw where location_id in
            (select distinct(Sensor_ID) from pd_cnt_mnthly_cnt_vw) group by location_type """ ).show(truncate = False)

+-------------+--------+
|location_type|count(1)|
+-------------+--------+
|Outdoor      |73      |
+-------------+--------+



# Top 10 (most pedestrians) locations by day


In [31]:
# Check count of sensor name on particular Day)
spark.sql("select Day,cast(sum(Hourly_Counts) as integer) as cnt  ,Sensor_Name from pd_cnt_mnthly_cnt_vw group by Day,Sensor_Name").show(truncate = False)

+--------+--------+-------------------------------------+
|Day     |cnt     |Sensor_Name                          |
+--------+--------+-------------------------------------+
|Friday  |2734124 |Lonsdale St-Spring St (West)         |
|Friday  |1143168 |QVM-Queen St (East)                  |
|Thursday|4974336 |Melbourne Central-Elizabeth St (East)|
|Saturday|512529  |Flinders St - ACMI                   |
|Sunday  |3433937 |Collins St (North)                   |
|Saturday|10583641|Flinders St-Elizabeth St (East)      |
|Monday  |2058517 |Bourke St Bridge                     |
|Thursday|60500   |Macaulay Rd-Bellair St               |
|Sunday  |3874672 |Chinatown-Lt Bourke St (South)       |
|Sunday  |169513  |Pelham St (South)                    |
|Friday  |890987  |Swanston St - City Square            |
|Sunday  |241271  |Flinders Ln -Degraves St (South)     |
|Friday  |18753194|Princes Bridge                       |
|Friday  |13313671|Spencer St-Collins St (North)        |
|Monday  |1825

In [32]:
# Top 10 Sensor name by Day
df_most_pd_sensr_nm = spark.sql("select * from (select *, row_number() over(partition by Day order by Counts desc) as \
rn from  (select Day, Sensor_ID, Sensor_Name,cast(sum(Hourly_Counts) as integer) as Counts from pd_cnt_mnthly_cnt_vw group by Day, \
Sensor_Name,Sensor_ID)as t) as t2 where t2.rn < 11 ")
df_most_pd_sensr_nm.show(truncate= False)

+---------+---------+---------------------------------+--------+---+
|Day      |Sensor_ID|Sensor_Name                      |Counts  |rn |
+---------+---------+---------------------------------+--------+---+
|Wednesday|4        |Town Hall (West)                 |20466895|1  |
|Wednesday|6        |Flinders Street Station Underpass|19366400|2  |
|Wednesday|3        |Melbourne Central                |17046228|3  |
|Wednesday|5        |Princes Bridge                   |15887703|4  |
|Wednesday|2        |Bourke Street Mall (South)       |15560927|5  |
|Wednesday|1        |Bourke Street Mall (North)       |15352588|6  |
|Wednesday|24       |Spencer St-Collins St (North)    |13272424|7  |
|Wednesday|22       |Flinders St-Elizabeth St (East)  |12031778|8  |
|Wednesday|15       |State Library                    |11000867|9  |
|Wednesday|13       |Flagstaff Station                |10553562|10 |
|Tuesday  |4        |Town Hall (West)                 |19567332|1  |
|Tuesday  |6        |Flinders Stre

In [33]:
# Joining location dataframe to find the location
df_pd_loctn = df_most_pd_sensr_nm.join(df_pd_cnt_loctn, [lower(trim(df_pd_cnt_loctn['location_id']))==lower(trim(df_most_pd_sensr_nm['Sensor_ID']))] ,'left')
df_pd_loctn.show()

+---------+---------+--------------------+--------+---+-----------+--------------------+-----------+-----------------+-------------------+-------------+------+-----------+-----------+------------+------------+--------------------+
|      Day|Sensor_ID|         Sensor_Name|  Counts| rn|location_id|  Sensor_description|sensor_name|Installation_date|               Note|location_type|Status|Direction_1|Direction_2|    Latitude|   Longitude|            Location|
+---------+---------+--------------------+--------+---+-----------+--------------------+-----------+-----------------+-------------------+-------------+------+-----------+-----------+------------+------------+--------------------+
|Wednesday|        4|    Town Hall (West)|20466895|  1|          4|    Town Hall (West)|   Swa123_T|       2009-03-23|               null|      Outdoor|     A|      North|      South|-37.81487988| 144.9660878|-37.81487988, 144...|
|Wednesday|        6|Flinders Street S...|19366400|  2|          6|Flinders 

In [34]:
# Selecting only few columns
df_pd_loctn.select('Day','Sensor_ID', df_most_pd_sensr_nm.Sensor_Name,'Location','Counts','rn').orderBy(asc('Day'),asc('rn')).show(truncate = False)

+------+---------+---------------------------------+--------------------------+--------+---+
|Day   |Sensor_ID|Sensor_Name                      |Location                  |Counts  |rn |
+------+---------+---------------------------------+--------------------------+--------+---+
|Friday|4        |Town Hall (West)                 |-37.81487988, 144.9660878 |24519747|1  |
|Friday|6        |Flinders Street Station Underpass|-37.81911705, 144.96558255|21619844|2  |
|Friday|3        |Melbourne Central                |-37.81101524, 144.96429485|20337683|3  |
|Friday|2        |Bourke Street Mall (South)       |-37.81380668, 144.96516718|19078538|4  |
|Friday|5        |Princes Bridge                   |-37.81874249, 144.96787656|18753194|5  |
|Friday|1        |Bourke Street Mall (North)       |-37.81349441, 144.96515323|18744041|6  |
|Friday|24       |Spencer St-Collins St (North)    |-37.81887963, 144.95449198|13313671|7  |
|Friday|22       |Flinders St-Elizabeth St (East)  |null              

# Verifying the Result

In [35]:
spark.sql("select 'Princes Bridge' as Sensor_Name, cast(sum(Hourly_Counts) as integer) as Counts from pd_cnt_mnthly_cnt_vw where Sensor_Name = 'Princes Bridge' and Day = 'Wednesday' ").show()

+--------------+--------+
|   Sensor_Name|  Counts|
+--------------+--------+
|Princes Bridge|15887703|
+--------------+--------+



# Top 10 (most pedestrians) locations by month


In [36]:
df_most_pd_sensr_nm_mnth =spark.sql("select * from (select *, row_number() over(partition by Month order by Counts desc) as \
rn from  (select Month,Sensor_ID,Sensor_Name,cast(sum(Hourly_Counts) as integer) as Counts from pd_cnt_mnthly_cnt_vw group by Month, \
Sensor_Name,Sensor_ID)as t) as t2 where t2.rn < 11 ")
df_most_pd_sensr_nm_mnth.show(25,truncate = False)

+--------+---------+---------------------------------+--------+---+
|Month   |Sensor_ID|Sensor_Name                      |Counts  |rn |
+--------+---------+---------------------------------+--------+---+
|July    |4        |Town Hall (West)                 |12555809|1  |
|July    |3        |Melbourne Central                |11224490|2  |
|July    |6        |Flinders Street Station Underpass|10808119|3  |
|July    |1        |Bourke Street Mall (North)       |9677482 |4  |
|July    |2        |Bourke Street Mall (South)       |9499308 |5  |
|July    |5        |Princes Bridge                   |9159764 |6  |
|July    |24       |Spencer St-Collins St (North)    |6588137 |7  |
|July    |15       |State Library                    |6434716 |8  |
|July    |35       |Southbank                        |5903894 |9  |
|July    |22       |Flinders St-Elizabeth St (East)  |5883997 |10 |
|November|4        |Town Hall (West)                 |12136661|1  |
|November|6        |Flinders Street Station Unde

In [37]:
# Joining location dataframe to find the location
df_pd_loctn_mnth = df_most_pd_sensr_nm_mnth.join(df_pd_cnt_loctn, [lower(trim(df_pd_cnt_loctn['location_id']))==lower(trim(df_most_pd_sensr_nm_mnth['Sensor_ID']))] ,'left')
df_pd_loctn_mnth.show()

+--------+---------+--------------------+--------+---+-----------+--------------------+-----------+-----------------+--------------------+-------------+------+-----------+-----------+------------+------------+--------------------+
|   Month|Sensor_ID|         Sensor_Name|  Counts| rn|location_id|  Sensor_description|sensor_name|Installation_date|                Note|location_type|Status|Direction_1|Direction_2|    Latitude|   Longitude|            Location|
+--------+---------+--------------------+--------+---+-----------+--------------------+-----------+-----------------+--------------------+-------------+------+-----------+-----------+------------+------------+--------------------+
|    July|        4|    Town Hall (West)|12555809|  1|          4|    Town Hall (West)|   Swa123_T|       2009-03-23|                null|      Outdoor|     A|      North|      South|-37.81487988| 144.9660878|-37.81487988, 144...|
|    July|        3|   Melbourne Central|11224490|  2|          3|   Melbour

In [38]:
# Selecting only few columns
df_pd_loctn_mnth.select('Month','Sensor_ID', df_most_pd_sensr_nm_mnth.Sensor_Name,'Location','Counts','rn').orderBy(asc('Month'),asc('rn')).show(truncate = False)

+------+---------+---------------------------------+--------------------------+--------+---+
|Month |Sensor_ID|Sensor_Name                      |Location                  |Counts  |rn |
+------+---------+---------------------------------+--------------------------+--------+---+
|April |4        |Town Hall (West)                 |-37.81487988, 144.9660878 |12463968|1  |
|April |3        |Melbourne Central                |-37.81101524, 144.96429485|10862976|2  |
|April |5        |Princes Bridge                   |-37.81874249, 144.96787656|10660422|3  |
|April |6        |Flinders Street Station Underpass|-37.81911705, 144.96558255|10135936|4  |
|April |1        |Bourke Street Mall (North)       |-37.81349441, 144.96515323|9194871 |5  |
|April |2        |Bourke Street Mall (South)       |-37.81380668, 144.96516718|8815472 |6  |
|April |22       |Flinders St-Elizabeth St (East)  |null                      |7561340 |7  |
|April |24       |Spencer St-Collins St (North)    |-37.81887963, 144.

# Verifying the Result

In [39]:
spark.sql("select 'Princes Bridge' as Sensor_Name, cast(sum(Hourly_Counts) as integer) as Counts from pd_cnt_mnthly_cnt_vw where Sensor_Name = 'Princes Bridge' and Month = 'July' ").show()

+--------------+-------+
|   Sensor_Name| Counts|
+--------------+-------+
|Princes Bridge|9159764|
+--------------+-------+



# Which location has shown most decline due to lockdowns in last 3 years


In [40]:
spark.sql("select * from pd_cnt_mnthly_cnt_vw").show()

+-------+--------------------+----+--------+-----+------+----+---------+--------------------+-------------+
|     ID|           Date_Time|Year|   Month|Mdate|   Day|Time|Sensor_ID|         Sensor_Name|Hourly_Counts|
+-------+--------------------+----+--------+-----+------+----+---------+--------------------+-------------+
|2887628|November 01, 2019...|2019|November|    1|Friday|  17|       34|Flinders St-Spark La|          300|
|2887629|November 01, 2019...|2019|November|    1|Friday|  17|       39|        Alfred Place|          604|
|2887630|November 01, 2019...|2019|November|    1|Friday|  17|       37|     Lygon St (East)|          216|
|2887631|November 01, 2019...|2019|November|    1|Friday|  17|       40|Lonsdale St-Sprin...|          627|
|2887632|November 01, 2019...|2019|November|    1|Friday|  17|       36|     Queen St (West)|          774|
|2887633|November 01, 2019...|2019|November|    1|Friday|  17|       29|St Kilda Rd-Alexa...|          644|
|2887634|November 01, 2019..

In [41]:
# Filtering the records on the latest year data-treating this as current year and store the result in current_year_count
df_current_year_count = spark.sql("""
    select Sensor_ID, Sensor_Name, sum(Hourly_Counts) AS current_count, Year as current_year from pd_cnt_mnthly_cnt_vw where Year in (select max(Year)
    FROM pd_cnt_mnthly_cnt_vw )  group by Sensor_Name, Year, Sensor_ID  
""")

In [42]:
df_current_year_count.show()

+---------+--------------------+-------------+------------+
|Sensor_ID|         Sensor_Name|current_count|current_year|
+---------+--------------------+-------------+------------+
|       48| QVM-Queen St (East)|    1174183.0|        2022|
|        5|      Princes Bridge|    6202149.0|        2022|
|       23|Spencer St-Collin...|    1558124.0|        2022|
|        7|      Birrarung Marr|     116120.0|        2022|
|       17|Collins Place (So...|    1709444.0|        2022|
|       47|Melbourne Central...|    5380759.0|        2022|
|       50|Faraday St-Lygon ...|    1527301.0|        2022|
|       57|    Bourke St Bridge|    2149226.0|        2022|
|       20|Chinatown-Lt Bour...|    1962723.0|        2022|
|       68|Flinders Ln - Deg...|    1176823.0|        2022|
|       19|Chinatown-Swansto...|    2640696.0|        2022|
|       85| Macaulay Rd (North)|     571210.0|        2022|
|       49|QVM-Therry St (So...|     935747.0|        2022|
|        8|         Webb Bridge|    1112

In [43]:
#Subtract 3 years from the current year - and store the result in three_years_ago_count
df_three_years_ago_count = spark.sql("""
     select Sensor_ID,Sensor_Name, sum(Hourly_Counts) AS old_count, Year as old_year from pd_cnt_mnthly_cnt_vw where Year in (select int(max(Year)-3)
    FROM pd_cnt_mnthly_cnt_vw )  group by Sensor_Name, Year, Sensor_ID  
""")

In [44]:
df_three_years_ago_count.show()

+---------+--------------------+-----------+--------+
|Sensor_ID|         Sensor_Name|  old_count|old_year|
+---------+--------------------+-----------+--------+
|        4|    Town Hall (West)|1.2684222E7|    2019|
|       31|     Lygon St (West)|  2113797.0|    2019|
|       49|QVM-Therry St (So...|  1463333.0|    2019|
|       29|St Kilda Rd-Alexa...|  3737463.0|    2019|
|        5|      Princes Bridge|1.0817578E7|    2019|
|       26|QV Market-Elizabe...|  5273517.0|    2019|
|       46|       Pelham St (S)|   338969.0|    2019|
|       36|     Queen St (West)|  2748477.0|    2019|
|       17|Collins Place (So...|  4034128.0|    2019|
|       37|     Lygon St (East)|  1262054.0|    2019|
|       19|Chinatown-Swansto...|  5186631.0|    2019|
|       45|Little Collins St...|  5116101.0|    2019|
|       57|    Bourke St Bridge|  7236113.0|    2019|
|       47|Melbourne Central...|  8429434.0|    2019|
|       22|Flinders St-Eliza...| 1.393414E7|    2019|
|       10|      Victoria Po

In [45]:
# Joined the current year and three years ago count 
df_joined_counts = df_current_year_count.join(df_three_years_ago_count, "Sensor_ID","inner")

In [46]:
#Print the dataframe
df_joined_counts = df_joined_counts.select(df_current_year_count.Sensor_ID,df_current_year_count.Sensor_Name,'current_year','current_count','old_count','old_year')
df_joined_counts.show(truncate = False)

+---------+-------------------------------------+------------+-------------+-----------+--------+
|Sensor_ID|Sensor_Name                          |current_year|current_count|old_count  |old_year|
+---------+-------------------------------------+------------+-------------+-----------+--------+
|51       |QVM-Franklin St (North)              |2022        |786825.0     |1160501.0  |2019    |
|7        |Birrarung Marr                       |2022        |116120.0     |2136024.0  |2019    |
|54       |Lincoln-Swanston(West)               |2022        |946127.0     |806012.0   |2019    |
|54       |Lincoln-Swanston(West)               |2022        |946127.0     |597230.0   |2019    |
|11       |Waterfront City                      |2022        |950956.0     |597321.0   |2019    |
|29       |St Kilda Rd-Alexandra Gardens        |2022        |1944588.0    |3737463.0  |2019    |
|42       |Grattan St-Swanston St (West)        |2022        |1145526.0    |2284048.0  |2019    |
|3        |Melbourne

In [47]:
# Create 2 new columns - Count_diff- which shows the difference between 2 counts and Percentage_change - which calculates the change in Percentage
df_joined_counts = df_joined_counts.withColumn("count_diff", df_joined_counts["current_count"] - df_joined_counts["old_count"])
df_joined_counts = df_joined_counts.withColumn("percent_change", (df_joined_counts["count_diff"]/df_joined_counts["old_count"]) * 100)

In [48]:
df_joined_counts.show(truncate = False)

+---------+-------------------------------------+------------+-------------+-----------+--------+----------+-------------------+
|Sensor_ID|Sensor_Name                          |current_year|current_count|old_count  |old_year|count_diff|percent_change     |
+---------+-------------------------------------+------------+-------------+-----------+--------+----------+-------------------+
|51       |QVM-Franklin St (North)              |2022        |786825.0     |1160501.0  |2019    |-373676.0 |-32.19954140496217 |
|7        |Birrarung Marr                       |2022        |116120.0     |2136024.0  |2019    |-2019904.0|-94.56373149365362 |
|54       |Lincoln-Swanston(West)               |2022        |946127.0     |806012.0   |2019    |140115.0  |17.383736222289496 |
|54       |Lincoln-Swanston(West)               |2022        |946127.0     |597230.0   |2019    |348897.0  |58.419201982485816 |
|11       |Waterfront City                      |2022        |950956.0     |597321.0   |2019    |

In [49]:
# Find the max declined Percentage
min_percent_change = df_joined_counts.agg({"percent_change": "min"}).collect()[0][0]
min_percent_change

-94.56373149365362

In [50]:
#Filter the record with the max declined percentage
df_record_with_max_change = df_joined_counts.filter(df_joined_counts.percent_change == min_percent_change)
df_record_with_max_change.show()

+---------+--------------+------------+-------------+---------+--------+----------+------------------+
|Sensor_ID|   Sensor_Name|current_year|current_count|old_count|old_year|count_diff|    percent_change|
+---------+--------------+------------+-------------+---------+--------+----------+------------------+
|        7|Birrarung Marr|        2022|     116120.0|2136024.0|    2019|-2019904.0|-94.56373149365362|
+---------+--------------+------------+-------------+---------+--------+----------+------------------+



In [51]:
# Joining location dataframe to find the location
df_pd_loctn_decline = df_record_with_max_change.join(df_pd_cnt_loctn, [lower(trim(df_pd_cnt_loctn['location_id']))==lower(trim(df_record_with_max_change['Sensor_ID']))] ,'left')
df_pd_loctn_decline = df_pd_loctn_decline.select('Sensor_ID',df_record_with_max_change.Sensor_Name,'Location','current_year','current_count','old_year','old_count','count_diff','percent_change')
df_pd_loctn_decline.show(truncate = False)

+---------+--------------+--------------------------+------------+-------------+--------+---------+----------+------------------+
|Sensor_ID|Sensor_Name   |Location                  |current_year|current_count|old_year|old_count|count_diff|percent_change    |
+---------+--------------+--------------------------+------------+-------------+--------+---------+----------+------------------+
|7        |Birrarung Marr|-37.81862929, 144.97169395|2022        |116120.0     |2019    |2136024.0|-2019904.0|-94.56373149365362|
+---------+--------------+--------------------------+------------+-------------+--------+---------+----------+------------------+



# Verifying the result

In [52]:
#Check the count for year 2022 and 2019
spark.sql("select Year,int(sum(Hourly_Counts)) from pd_cnt_mnthly_cnt_vw where Sensor_Name = 'Birrarung Marr' and Year in ('2022','2019') group by Year order by Year desc ").show()

+----+-----------------------------------------------+
|Year|CAST(sum(CAST(Hourly_Counts AS DOUBLE)) AS INT)|
+----+-----------------------------------------------+
|2022|                                         116120|
|2019|                                        2136024|
+----+-----------------------------------------------+



# Which location has most growth in last year

In [53]:
df_one_year_ago_count = spark.sql("""
     select Sensor_ID,Sensor_Name, sum(Hourly_Counts) AS old_count, Year as old_year 
     from pd_cnt_mnthly_cnt_vw where Year in (select int(max(Year)-1)
    FROM pd_cnt_mnthly_cnt_vw )  group by Sensor_Name, Year, Sensor_ID  
""")

In [54]:
df_one_year_ago_count.show()

+---------+--------------------+---------+--------+
|Sensor_ID|         Sensor_Name|old_count|old_year|
+---------+--------------------+---------+--------+
|       52|Elizabeth St-Lons...|1470418.0|    2021|
|       11|     Waterfront City| 374339.0|    2021|
|       64|Royal Pde - Gratt...|  51127.0|    2021|
|        6|Flinders Street S...|4614701.0|    2021|
|       54|Lincoln-Swanston(...| 501972.0|    2021|
|       58|Bourke St - Spenc...|2316080.0|    2021|
|       67|Flinders Ln - Deg...| 219014.0|    2021|
|       57|    Bourke St Bridge|1758182.0|    2021|
|       76|Macaulay Rd-Bella...| 387118.0|    2021|
|       35|           Southbank|5173296.0|    2021|
|       10|      Victoria Point| 309276.0|    2021|
|       75|Spring St - Flind...|  35927.0|    2021|
|        2|Bourke Street Mal...|3019402.0|    2021|
|       75|Spring St- Flinde...| 245445.0|    2021|
|       72|    Flinders St-ACMI|1308966.0|    2021|
|        1|Bourke Street Mal...|5080187.0|    2021|
|       51|Q

In [55]:
#Create a new data frame by joining with current year dataframe, 
# Later on create new columns Count difference and Percentage Change
df_joined_counts_1 = df_current_year_count.join(df_one_year_ago_count, "Sensor_ID")
df_joined_counts_1 = df_joined_counts_1.select(df_current_year_count.Sensor_ID,df_current_year_count.Sensor_Name,'current_year','current_count','old_count','old_year')
df_joined_counts_1 = df_joined_counts_1.withColumn("count_diff", df_joined_counts_1["current_count"] - df_joined_counts_1["old_count"])
df_joined_counts_1 = df_joined_counts_1.withColumn("percent_change", (df_joined_counts_1["count_diff"]/df_joined_counts_1["old_count"]) * 100)

In [56]:
df_joined_counts_1.show(truncate = False)

+---------+-------------------------------------+------------+-------------+---------+--------+----------+-------------------+
|Sensor_ID|Sensor_Name                          |current_year|current_count|old_count|old_year|count_diff|percent_change     |
+---------+-------------------------------------+------------+-------------+---------+--------+----------+-------------------+
|51       |QVM-Franklin St (North)              |2022        |786825.0     |697752.0 |2021    |89073.0   |12.765710452997627 |
|7        |Birrarung Marr                       |2022        |116120.0     |1272681.0|2021    |-1156561.0|-90.8759539900415  |
|54       |Lincoln-Swanston(West)               |2022        |946127.0     |501972.0 |2021    |444155.0  |88.48202688596176  |
|11       |Waterfront City                      |2022        |950956.0     |374339.0 |2021    |576617.0  |154.0360475397968  |
|29       |St Kilda Rd-Alexandra Gardens        |2022        |1944588.0    |1073213.0|2021    |871375.0  |81.19

In [57]:
# Find the max Percentage change in last year
max_percent_change = df_joined_counts_1.agg({"percent_change": "max"}).collect()[0][0]
max_percent_change

2233.360727061781

In [58]:
#Filter the record with the max percentage Change
record_with_max_change = df_joined_counts_1.filter(df_joined_counts_1.percent_change == max_percent_change)
record_with_max_change.show()

+---------+-----------------+------------+-------------+---------+--------+----------+-----------------+
|Sensor_ID|      Sensor_Name|current_year|current_count|old_count|old_year|count_diff|   percent_change|
+---------+-----------------+------------+-------------+---------+--------+----------+-----------------+
|       46|Pelham St (South)|        2022|     482679.0|  20686.0|    2021|  461993.0|2233.360727061781|
+---------+-----------------+------------+-------------+---------+--------+----------+-----------------+



In [59]:
# Joining location dataframe to find the location
df_pd_loctn_mx_chng = record_with_max_change.join(df_pd_cnt_loctn, [lower(trim(df_pd_cnt_loctn['location_id']))==lower(trim(record_with_max_change['Sensor_ID']))] ,'left')
df_pd_loctn_mx_chng = df_pd_loctn_mx_chng.select('Sensor_ID',record_with_max_change.Sensor_Name,'Location','current_year','current_count','old_year','old_count','count_diff','percent_change')
df_pd_loctn_mx_chng.show(truncate = False)

+---------+-----------------+-------------------------+------------+-------------+--------+---------+----------+-----------------+
|Sensor_ID|Sensor_Name      |Location                 |current_year|current_count|old_year|old_count|count_diff|percent_change   |
+---------+-----------------+-------------------------+------------+-------------+--------+---------+----------+-----------------+
|46       |Pelham St (South)|-37.80240719, 144.9615673|2022        |482679.0     |2021    |20686.0  |461993.0  |2233.360727061781|
+---------+-----------------+-------------------------+------------+-------------+--------+---------+----------+-----------------+



# Verifying the Result

In [60]:
#Check the count for year 2022 and 2021
spark.sql("select Year,int(sum(Hourly_Counts)) from pd_cnt_mnthly_cnt_vw where Sensor_Name = 'Pelham St (South)' and Year in ('2022','2021') group by Year order by Year desc ").show()

+----+-----------------------------------------------+
|Year|CAST(sum(CAST(Hourly_Counts AS DOUBLE)) AS INT)|
+----+-----------------------------------------------+
|2022|                                         482679|
|2021|                                          20686|
+----+-----------------------------------------------+

