# Top 10 locations by day


####  Import packages and start the session


In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 2
%extra_py_files s3://pedestrian-analysis-working-bucket/glue-job-scripts/util.py

import sys, io, zipfile, pandas as pd, util
from datetime import datetime

from pyspark.sql.functions import sum, col, rank, desc, lit
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

from pyspark.context import SparkContext
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
)

from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job


sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0 
Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 2
Extra py files to be included:
s3://pedestrian-analysis-working-bucket/glue-job-scripts/util.py
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::632753217422:role/pedestrians-analysis-gluejob-role
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 2
Session ID: f9c32cea-fa21-4803-92b3-dc049ee08b88
Job Type: glueetl
Applying the following 

####  Create output glue table if it doesn't already exist
##### The results of this notebook will be loaded into this table


In [2]:
BUCKET_NAME = 'pedestrian-analysis-working-bucket'
DATABASE_NAME = 'pedestrian_analysis_report'
OUTPUT_TABLE_NAME = 'report_top_10_locations_by_day'

# Define the schema for the table
schema = StructType([
    StructField("date",StringType(),True),
    StructField("rank",IntegerType(),True),
    StructField("sensor_id",IntegerType(),True),
    StructField("location_name",StringType(),True),
    StructField("daily_count",IntegerType(),True),
])

s3_path = f"s3://{BUCKET_NAME}/report/{OUTPUT_TABLE_NAME}/"
util.create_glue_catalog_table(DATABASE_NAME, OUTPUT_TABLE_NAME, schema, s3_path)

Table pedestrian_analysis_report.report_top_10_locations_by_day not found in the Glue Data Catalog. Creating table...
Table pedestrian_analysis_report.report_top_10_locations_by_day created in the Glue Data Catalog


####  Load sensor_counts


In [3]:
sensor_count_df = glueContext.create_dynamic_frame.from_catalog(
    database="pedestrian_analysis_raw",
    table_name="sensor_counts"
).toDF()

sensor_count_df.show(10, truncate=False)

+-------+-------------------+---------+--------------------------------------+------------+
|id     |date_time          |sensor_id|sensor_name                           |hourly_count|
+-------+-------------------+---------+--------------------------------------+------------+
|2902119|2019-11-12T22:00:00|11       |Waterfront City                       |27          |
|763567 |2014-01-24T10:00:00|25       |Melbourne Convention Exhibition Centre|436         |
|1028480|2015-01-06T16:00:00|23       |Spencer St-Collins St (South)         |760         |
|331173 |2011-07-08T12:00:00|10       |Victoria Point                        |569         |
|430567 |2012-03-03T19:00:00|1        |Bourke Street Mall (North)            |847         |
|631323 |2013-06-14T14:00:00|4        |Town Hall (West)                      |3219        |
|573323 |2013-01-30T13:00:00|16       |Australia on Collins                  |2931        |
|408282 |2012-01-12T04:00:00|18       |Collins Place (North)                 |1 

####  Load sensor_reference_data


In [4]:
sensor_reference_df = glueContext.create_dynamic_frame.from_catalog(
    database="pedestrian_analysis_raw",
    table_name="sensor_reference_data"
).toDF()

sensor_reference_df.show(10)

+-----------+-----------+-----------------+------------+--------------------+-----------+-------------+------------+----+--------------------+-----------+------+
|direction_1|direction_2|installation_date|    latitude|            location|location_id|location_type|   longitude|note|  sensor_description|sensor_name|status|
+-----------+-----------+-----------------+------------+--------------------+-----------+-------------+------------+----+--------------------+-----------+------+
|         In|        Out|       2021-11-05|-37.80680012|{lon -> 144.96001...|         82|       Indoor|144.96001735|null|510 Elizabeth Str...|   Eli510_T|     A|
|         In|        Out|       2022-07-28|-37.81330972|{lon -> 144.96687...|         88|       Indoor|144.96687704|null|          Micro-Labs|   MicLab_T|     A|
|       East|       West|       2019-09-25|-37.80996494|{lon -> 144.96216...|         62|      Outdoor|144.96216521|null| La Trobe St (North)|   Lat224_T|     A|
|         In|        Out|   

####  'date_time' is currently a full iso formatted string
####  This converts date_time to a timestamp and then to a date


In [5]:
sensor_count_df = sensor_count_df \
    .withColumn("date_time", col("date_time").cast("timestamp")) \
    .withColumn("date", col("date_time").cast("date"))
sensor_count_df.show(10, truncate=False)

+-------+-------------------+---------+--------------------------------------+------------+----------+
|id     |date_time          |sensor_id|sensor_name                           |hourly_count|date      |
+-------+-------------------+---------+--------------------------------------+------------+----------+
|2902119|2019-11-12 22:00:00|11       |Waterfront City                       |27          |2019-11-12|
|763567 |2014-01-24 10:00:00|25       |Melbourne Convention Exhibition Centre|436         |2014-01-24|
|1028480|2015-01-06 16:00:00|23       |Spencer St-Collins St (South)         |760         |2015-01-06|
|331173 |2011-07-08 12:00:00|10       |Victoria Point                        |569         |2011-07-08|
|430567 |2012-03-03 19:00:00|1        |Bourke Street Mall (North)            |847         |2012-03-03|
|631323 |2013-06-14 14:00:00|4        |Town Hall (West)                      |3219        |2013-06-14|
|573323 |2013-01-30 13:00:00|16       |Australia on Collins              

#### Group by 'date' and 'sensor_id' and sum the 'hourly_counts' per group

In [6]:
grouped_sensor_count_df = sensor_count_df \
    .groupBy("date", "sensor_id") \
    .agg(sum("hourly_count").alias("daily_count"))

grouped_sensor_count_df.show(10, truncate=False)

+----------+---------+-----------+
|date      |sensor_id|daily_count|
+----------+---------+-----------+
|2014-01-24|25       |8768       |
|2011-07-08|10       |7037       |
|2013-01-30|16       |18984      |
|2014-05-16|17       |14861      |
|2015-03-06|12       |3669       |
|2014-03-12|25       |10503      |
|2013-01-28|4        |27588      |
|2014-07-06|18       |2251       |
|2009-09-18|12       |4797       |
|2009-12-24|16       |16926      |
+----------+---------+-----------+
only showing top 10 rows


#### Add a new column 'rank' that ranks the rows within each partition based on their daily_count

In [7]:
window_spec = Window.partitionBy("date") \
    .orderBy(desc("daily_count"))
ranked_sensor_count_df = grouped_sensor_count_df \
    .withColumn("rank", row_number().over(window_spec))

ranked_sensor_count_df.show(10, truncate=False)

+----------+---------+-----------+----+
|date      |sensor_id|daily_count|rank|
+----------+---------+-----------+----+
|2009-05-04|4        |34983      |1   |
|2009-05-04|1        |26481      |2   |
|2009-05-04|6        |25887      |3   |
|2009-05-04|13       |25829      |4   |
|2009-05-04|5        |19720      |5   |
|2009-05-04|2        |19452      |6   |
|2009-05-04|15       |18130      |7   |
|2009-05-04|16       |18044      |8   |
|2009-05-04|17       |13657      |9   |
|2009-05-04|18       |11656      |10  |
+----------+---------+-----------+----+
only showing top 10 rows


#### Filter the rows where rank <= 10 to get the top 10 sensor_ids for each day

In [8]:
top_10_sensors_by_day_df = ranked_sensor_count_df.filter(col("rank") <= 10) \
    .orderBy(col("date").desc(), col("rank"))

top_10_sensors_by_day_df.show(10)

+----------+---------+-----------+----+
|      date|sensor_id|daily_count|rank|
+----------+---------+-----------+----+
|2022-10-31|       41|      41206|   1|
|2022-10-31|        4|      35015|   2|
|2022-10-31|        5|      28340|   3|
|2022-10-31|        1|      25176|   4|
|2022-10-31|       84|      24036|   5|
|2022-10-31|        3|      22860|   6|
|2022-10-31|       66|      21295|   7|
|2022-10-31|       35|      21090|   8|
|2022-10-31|       47|      19100|   9|
|2022-10-31|       28|      17537|  10|
+----------+---------+-----------+----+
only showing top 10 rows


#### Left join the reference data in to obtain the correct sensor_description

In [9]:
top_10_sensors_by_day_df = top_10_sensors_by_day_df.join(
    sensor_reference_df,
    col("sensor_id") == col("location_id"),
    "left"
)




#### Select and format final report

In [10]:
top_10_sensors_by_day_df = top_10_sensors_by_day_df.select(
    col('date').cast('string'),
    col('rank').cast('int'),
    col('sensor_id').cast('int'),
    col('sensor_description').alias('location_name'),
    col('daily_count').cast('int').alias('daily_count')
)

top_10_sensors_by_day_df.show(100, truncate=False)

+----------+----+---------+---------------------------------+-----------+
|date      |rank|sensor_id|location_name                    |daily_count|
+----------+----+---------+---------------------------------+-----------+
|2009-05-04|1   |4        |Town Hall (West)                 |34983      |
|2009-05-04|2   |1        |Bourke Street Mall (North)       |26481      |
|2009-05-04|3   |6        |Flinders Street Station Underpass|25887      |
|2009-05-04|4   |13       |null                             |25829      |
|2009-05-04|5   |5        |Princes Bridge                   |19720      |
|2009-05-04|6   |2        |Bourke Street Mall (South)       |19452      |
|2009-05-04|7   |15       |null                             |18130      |
|2009-05-04|8   |16       |null                             |18044      |
|2009-05-04|9   |17       |Collins Place (South)            |13657      |
|2009-05-04|10  |18       |Collins Place (North)            |11656      |
|2009-05-11|1   |4        |Town Hall (

#### Upload to S3

In [11]:
util.upload_to_s3(glueContext, top_10_sensors_by_day_df, s3_path)

Successfully Uploaded to s3 in path: s3://pedestrian-analysis-working-bucket/report/report_top_10_locations_by_day/
