In [1]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
from pyspark.sql.window import Window
import time

In [2]:
from pyspark.conf import SparkConf
config = SparkConf().setAll([('spark.rdd.compress', 'True'), ('spark.master', 'local[*]')])
spark = SparkSession\
    .builder\
    .appName('App')\
    .config(conf=config)\
    .getOrCreate()
    # .config("spark.sql.broadcastTimeout", "36000")\
print(spark.sparkContext.getConf().getAll())

[('spark.driver.extraJavaOptions', '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED'), ('spark.driver.host', 'DESKTOP-ILS2AFI'), ('spark.driver.port', '60111'), ('spark.app.name', 'App'), ('spark.app.startTime', '1665028996165'), ('spark.sql.warehouse.dir', 'file:/D:/DEcamp/DeemoSpark/spark-warehouse'), ('spark.executor.

# Load data

In [3]:
sensor_schema = StructType([\
                           StructField('sensor_id', StringType(), True),
                           StructField('sensor_description', StringType(), True),
                           StructField('sensor_name', StringType(), True),
                           StructField('installation_date', StringType(), True),
                           StructField('status', StringType(), True),
                           StructField('note', StringType(), True),
                           StructField('direction_1', StringType(), True),
                           StructField('direction_2', StringType(), True),
                           StructField('latitude', StringType(), True),
                           StructField('longitude', StringType(), True),
                           StructField('location', StringType(), True),
                           ])
sensor_df = spark.read.csv('Pedestrian_Counting_System_-_Sensor_Locations.csv',
                          header=True, schema = sensor_schema)

sensor_df.show(3)

+---------+--------------------+-----------+-----------------+------+--------------------+-----------+-----------+------------+------------+--------------------+
|sensor_id|  sensor_description|sensor_name|installation_date|status|                note|direction_1|direction_2|    latitude|   longitude|            location|
+---------+--------------------+-----------+-----------------+------+--------------------+-----------+-----------+------------+------------+--------------------+
|       16|Australia on Collins|   Col270_T|       2009/03/30|     R|Device moved to l...|       null|       null|-37.81573422|144.96521044|(-37.81573422, 14...|
|       50|Faraday St-Lygon ...|   Lyg309_T|       2017/11/30|     A|                null|      South|      North|-37.79808191|144.96721014|(-37.79808191, 14...|
|       73|Bourke St - Spenc...|   Bou655_T|       2020/10/02|     I|                null|       East|       West|-37.81695684|144.95415373|(-37.81695684, 14...|
+---------+-----------------

In [4]:
count_schema = StructType([\
                          StructField('ID', StringType(), True),
                          StructField('Date_Time', StringType(), True),
                          StructField('Year', IntegerType(), True),
                          StructField('Month', StringType(), True),
                          StructField('Mdate', IntegerType(), True),
                          StructField('Day', StringType(), True),
                          StructField('Time', IntegerType(), True),
                          StructField('Sensor_ID', StringType(), True),
                          StructField('Sensor_Name', StringType(), True),
                          StructField('Hourly_Counts', IntegerType(), True)
                          ])
count_df = spark.read.csv('Pedestrian_Counting_System_-_Monthly__counts_per_hour_.csv',
                         header=True, schema=count_schema)
count_df = count_df.withColumn('Date_Time', to_timestamp('Date_Time', 'MMMM dd, yyyy hh:mm:ss a'))
count_df = count_df.withColumn("Month",from_unixtime(unix_timestamp(col("Month"),'MMMM'),'MM'))
count_df.show(3)

+-------+-------------------+----+-----+-----+------+----+---------+--------------------+-------------+
|     ID|          Date_Time|Year|Month|Mdate|   Day|Time|Sensor_ID|         Sensor_Name|Hourly_Counts|
+-------+-------------------+----+-----+-----+------+----+---------+--------------------+-------------+
|2887628|2019-11-01 17:00:00|2019|   11|    1|Friday|  17|       34|Flinders St-Spark La|          300|
|2887629|2019-11-01 17:00:00|2019|   11|    1|Friday|  17|       39|        Alfred Place|          604|
|2887630|2019-11-01 17:00:00|2019|   11|    1|Friday|  17|       37|     Lygon St (East)|          216|
+-------+-------------------+----+-----+-----+------+----+---------+--------------------+-------------+
only showing top 3 rows



# Top 10 (most pedestrians) locations by day

In [5]:
start_time = time.time()

In [6]:
sumPedestrians = count_df.groupBy(['Year', 'Month', 'Mdate', 'Sensor_ID'])\
                        .agg(sum('Hourly_Counts').alias('Hourly_Counts'))\
                        .sort(col("Year").asc(), col("Month").asc(),
                              col("Mdate").asc(),col("Hourly_Counts").desc())
tmp = Window.partitionBy('Year', 'Month', 'Mdate')\
        .orderBy(sumPedestrians['Hourly_Counts'].desc())
top10PerDay = sumPedestrians.withColumn("rank",row_number().over(tmp))\
    .filter(col("rank") <= 10)\
    .orderBy(col("Year").asc(), col("Month").asc(), 
             col("Mdate").asc(),col("Hourly_Counts").desc())
top10PerDay.show(3)

+----+-----+-----+---------+-------------+----+
|Year|Month|Mdate|Sensor_ID|Hourly_Counts|rank|
+----+-----+-----+---------+-------------+----+
|2009|   05|    1|        4|        45185|   1|
|2009|   05|    1|        1|        36869|   2|
|2009|   05|    1|        6|        29015|   3|
+----+-----+-----+---------+-------------+----+
only showing top 3 rows



In [7]:
print("--- %s seconds ---" % (time.time() - start_time))

--- 22.203291177749634 seconds ---


In [8]:
fullTop10PerDay = top10PerDay.drop('rank')\
    .join(sensor_df, top10PerDay.Sensor_ID == sensor_df.sensor_id, 'left')\
    .drop(sensor_df['sensor_id'])
# fullTop10PerDay.write.mode("overwrite").csv("SparkTop10PerDay")
fullTop10PerDay.show(3)

+----+-----+-----+---------+-------------+--------------------+-----------+-----------------+------+-------------------+-----------+-----------+------------+------------+--------------------+
|Year|Month|Mdate|Sensor_ID|Hourly_Counts|  sensor_description|sensor_name|installation_date|status|               note|direction_1|direction_2|    latitude|   longitude|            location|
+----+-----+-----+---------+-------------+--------------------+-----------+-----------------+------+-------------------+-----------+-----------+------------+------------+--------------------+
|2009|   05|    1|        4|        45185|    Town Hall (West)|   Swa123_T|       2009/03/23|     A|               null|      South|      North|-37.81487988| 144.9660878|(-37.81487988, 14...|
|2009|   05|    1|        1|        36869|Bourke Street Mal...|   Bou292_T|       2009/03/24|     A|               null|       East|       West| -37.8134944|144.96515324|(-37.8134944, 144...|
|2009|   05|    1|        6|        2901

In [9]:
print("--- %s seconds ---" % (time.time() - start_time))

--- 37.887107372283936 seconds ---


# Top 10 (most pedestrians) locations by month

In [10]:
start_time = time.time()

In [11]:
sumPedestriansMonth = count_df.groupBy(['Year', 'Month', 'Sensor_ID'])\
                        .agg(sum('Hourly_Counts').alias('Hourly_Counts'))\
                        .sort(col("Year").asc(), col("Month").asc(),col("Hourly_Counts").desc())
tmp = Window.partitionBy('Year', 'Month')\
        .orderBy(sumPedestriansMonth['Hourly_Counts'].desc())
top10PerMonth = sumPedestriansMonth.withColumn("rank",row_number().over(tmp))\
    .filter(col("rank") <= 10)\
    .orderBy(col("Year").asc(), col("Month").asc(), col("Hourly_Counts").desc())
top10PerMonth.show(3)

+----+-----+---------+-------------+----+
|Year|Month|Sensor_ID|Hourly_Counts|rank|
+----+-----+---------+-------------+----+
|2009|   05|        4|      1095125|   1|
|2009|   05|        1|       842470|   2|
|2009|   05|        6|       729966|   3|
+----+-----+---------+-------------+----+
only showing top 3 rows



In [13]:
fullTop10PerMonth = top10PerMonth.drop('rank')\
    .join(sensor_df, top10PerMonth.Sensor_ID == sensor_df.sensor_id, 'left')\
    .drop(sensor_df['sensor_id'])
# fullTop10PerDay.write.mode("overwrite").csv("SparkTop10PerDay")
fullTop10PerMonth.show(3)

+----+-----+---------+-------------+--------------------+-----------+-----------------+------+-------------------+-----------+-----------+------------+------------+--------------------+
|Year|Month|Sensor_ID|Hourly_Counts|  sensor_description|sensor_name|installation_date|status|               note|direction_1|direction_2|    latitude|   longitude|            location|
+----+-----+---------+-------------+--------------------+-----------+-----------------+------+-------------------+-----------+-----------+------------+------------+--------------------+
|2009|   05|        4|      1095125|    Town Hall (West)|   Swa123_T|       2009/03/23|     A|               null|      South|      North|-37.81487988| 144.9660878|(-37.81487988, 14...|
|2009|   05|        1|       842470|Bourke Street Mal...|   Bou292_T|       2009/03/24|     A|               null|       East|       West| -37.8134944|144.96515324|(-37.8134944, 144...|
|2009|   05|        6|       729966|Flinders Street S...|     FliS_T| 

In [14]:
print("--- %s seconds ---" % (time.time() - start_time))

--- 20.541664123535156 seconds ---


# Which location has shown most decline due to lockdowns in last 2 years

In [15]:
start_time = time.time()

In [16]:
# Sum Hourly_counts last 3 year
countLast3year_df = count_df.select('Year', 'Sensor_ID', 'Hourly_Counts')\
                            .filter(count_df.Year.isin(2020,2021,2022))\
                            .groupBy('Year', 'Sensor_ID')\
                            .agg(sum('Hourly_Counts').alias('Hourly_Counts'))

In [17]:
# Hourly_count based on Sensor_ID per year
year2020 = countLast3year_df.filter(countLast3year_df.Year==2020)['Sensor_ID', 'Hourly_Counts']\
                            .withColumnRenamed('Hourly_Counts', 'Y2020')
year2021 = countLast3year_df.filter(countLast3year_df.Year==2021)['Sensor_ID', 'Hourly_Counts']\
                            .withColumnRenamed('Hourly_Counts', 'Y2021')
year2022 = countLast3year_df.filter(countLast3year_df.Year==2022)['Sensor_ID', 'Hourly_Counts']\
                            .withColumnRenamed('Hourly_Counts', 'Y2022')

In [18]:
# Calculate change from this year to another year
year2020to2022 = year2020.join(year2022, year2020.Sensor_ID == year2022.Sensor_ID)\
                        .select(year2020.Sensor_ID, (year2020.Y2020 - year2022.Y2022).alias('Hourly_Counts'))
year2020to2021 = year2020.join(year2021, year2020.Sensor_ID == year2021.Sensor_ID)\
                        .select(year2020.Sensor_ID, (year2020.Y2020 - year2021.Y2021).alias('Hourly_Counts'))
year2021to2022 = year2021.join(year2022, year2021.Sensor_ID == year2022.Sensor_ID)\
                        .select(year2021.Sensor_ID, (year2021.Y2021 - year2022.Y2022).alias('Hourly_Counts'))

In [19]:
justIn2020_2021 = year2020to2021.join(year2020to2022, year2020to2021.Sensor_ID == year2020to2022.Sensor_ID, "leftanti")
justIn2020_2021.show()

+---------+-------------+
|Sensor_ID|Hourly_Counts|
+---------+-------------+
|       22|      1507594|
|       34|       148705|
|       15|      -712874|
+---------+-------------+



In [20]:
justIn2021_2022 = year2021to2022.join(year2020to2022, year2021to2022.Sensor_ID == year2020to2022.Sensor_ID, "leftanti")
justIn2021_2022.show()

+---------+-------------+
|Sensor_ID|Hourly_Counts|
+---------+-------------+
|       77|       -28753|
|       75|         2197|
|       79|     -1673189|
|       76|       102432|
|       72|      -136961|
|       78|       -67258|
+---------+-------------+



In [21]:
ChangeFrom2020To2022 = year2020to2022.union(justIn2020_2021).union(justIn2021_2022)
ChangeFrom2020To2022.createOrReplaceTempView("DF")
mostDeclide_df = spark.sql("select* from DF where Hourly_Counts == (select max(Hourly_Counts) from DF)")

In [23]:
mostDeclide_df = mostDeclide_df.join(sensor_df, mostDeclide_df.Sensor_ID == sensor_df.sensor_id, 'left')\
    .drop(sensor_df['sensor_id'])
mostDeclide_df.show(3)

+---------+-------------+--------------------+-----------+-----------------+------+----+-----------+-----------+------------+------------+--------------------+
|Sensor_ID|Hourly_Counts|  sensor_description|sensor_name|installation_date|status|note|direction_1|direction_2|    latitude|   longitude|            location|
+---------+-------------+--------------------+-----------+-----------------+------+----+-----------+-----------+------------+------------+--------------------+
|       55|      3815915|Elizabeth St-La T...|   Eli380_T|       2018/07/19|     I|null|      South|      North|-37.80988941|144.96134331|(-37.80988941, 14...|
+---------+-------------+--------------------+-----------+-----------------+------+----+-----------+-----------+------------+------------+--------------------+



In [24]:
print("--- %s seconds ---" % (time.time() - start_time))

--- 42.59197425842285 seconds ---


# Which location has most growth in last year

In [25]:
start_time = time.time()

In [26]:
year2021to2022.createOrReplaceTempView("Y2021to2022")
mostGrowth_df = spark.sql("select * from Y2021to2022 \
    where Hourly_Counts == (select min(Hourly_Counts) from Y2021to2022)")

mostGrowth_df = mostGrowth_df.join(sensor_df, mostGrowth_df.Sensor_ID == sensor_df.sensor_id, 'left')\
    .drop(sensor_df['sensor_id'])
mostGrowth_df.show(3)

+---------+-------------+------------------+-----------+-----------------+------+----+-----------+-----------+------------+------------+--------------------+
|Sensor_ID|Hourly_Counts|sensor_description|sensor_name|installation_date|status|note|direction_1|direction_2|    latitude|   longitude|            location|
+---------+-------------+------------------+-----------+-----------------+------+----+-----------+-----------+------------+------------+--------------------+
|        3|     -2087604| Melbourne Central|   Swa295_T|       2009/03/25|     A|null|      South|      North|-37.81101523|144.96429485|(-37.81101523, 14...|
+---------+-------------+------------------+-----------+-----------------+------+----+-----------+-----------+------------+------------+--------------------+



In [27]:
print("--- %s seconds ---" % (time.time() - start_time))

--- 5.9329869747161865 seconds ---
