# Contact
- Candidate Name: **Tony Nguyen**
- Company: **ARQ**

#High level approach & architecture

Before I can present a solution and architecture design, I have to understand the business use cases, time and budget constraints, and the business requirement circumstances.  Because this data volume is relatively small, if business users just require ad-hoc report, I can either use report authoring software i.e Power BI or Tableau to build reports directly from  csv file or ingest the csv file into the database i.e Spark or Databricks, then query the database from Power BI or Tableau.  For a strageic solution design, data should be ingested into a Enterprise Data Platform/Warehousing whose architecture consists of 3 different zones: Raw (AKA Bronze), Conform (AKA Silver), Presentation (AKA Gold or Enrich) zones. Our reports should query from Presentation zone only.

For the scope of this exercise, I presume  that the business use case requires adhoc reports, limited time and budget constraints. Therefore, I ingested the csv file into a Databricks table and provided the SQL queries for reports to meet the requirements of this excercise.  In addition, I also provide Python scripts (PySpark) to prove my compentency in Pyspark. 

#What tests have you run and what QA issues did you encounter?

**Unit Tests, SIT, Functional tests**

If I have to implement data warehouse for the dataset, these tests are suggested but not limited to:

- Data accuracy i.e. correct calcuations, correct business rules, NULL values
- Data completeness i.e. verify counts between source and target tables based on the logic rules.
- Data integrity i.e. SCD2, NULL surrogate keys, orphan foerign keys etc
- Integration Tests: everything works together from end-to-end


***Source Data Quality***

1. Verify source data quality i.e. no Senor_id  have more than 1 Sensor_Name 
2. Verify the Date_Time column agrees with year, month, mday columns
3. Verify missing data points i.e. sensor_id and date have no count



# Verify data source table structure

I have imported the csv file from  Source Data Pedestrian Counting System – 2009 to Present (counts per hour)  https://data.melbourne.vic.gov.au/explore/dataset/pedestrian-counting-system-sensor-locations/information/ into a Databricks table named **pedestrian_counting_hourly**. This table will be considered data source table which will be transformed as per requirements. Below this the table structure

In [0]:
%sql
select
  *
from   pedestrian_counting_hourly
limit  10;

ID,Date_Time,Year,Month,Mdate,Day,Time,Sensor_ID,Sensor_Name,Hourly_Counts
2887628,"November 01, 2019 05:00:00 PM",2019,November,1,Friday,17,34,Flinders St-Spark La,300
2887629,"November 01, 2019 05:00:00 PM",2019,November,1,Friday,17,39,Alfred Place,604
2887630,"November 01, 2019 05:00:00 PM",2019,November,1,Friday,17,37,Lygon St (East),216
2887631,"November 01, 2019 05:00:00 PM",2019,November,1,Friday,17,40,Lonsdale St-Spring St (West),627
2887632,"November 01, 2019 05:00:00 PM",2019,November,1,Friday,17,36,Queen St (West),774
2887633,"November 01, 2019 05:00:00 PM",2019,November,1,Friday,17,29,St Kilda Rd-Alexandra Gardens,644
2887634,"November 01, 2019 05:00:00 PM",2019,November,1,Friday,17,42,Grattan St-Swanston St (West),453
2887635,"November 01, 2019 05:00:00 PM",2019,November,1,Friday,17,43,Monash Rd-Swanston St (West),387
2887636,"November 01, 2019 05:00:00 PM",2019,November,1,Friday,17,44,Tin Alley-Swanston St (West),27
2887637,"November 01, 2019 05:00:00 PM",2019,November,1,Friday,17,35,Southbank,2691


#	Top 10 (most pedestrians) locations by day
The query below shows top 10 locations by each day.  The output is sorted by count_date and location’s daily_count.  I decided to introduce a new column i.e. count_date whose data type is date and derives from the existing date_time column, instead of combination of year, month and mday columns. This is because a single column i.e.  count_date is easy for report developers to render charts in the reports/ presentation layer.

Below are explaination for the query I have wrote:

1. Calculate the daily count for each location
2. Rank the count for each day using Windows function
3. Show all records whose ranking <= 10
4. Poissibly, add ORDER BY count_date but not required

In [0]:
%sql
with pedestrian_count_daily  (
 select
          to_date(date_time, "MMMM dd, yyyy hh:mm:ss a") as count_date,
          sensor_id,
          sensor_name,
          sum(hourly_counts) as daily_count
        from
          pedestrian_counting_hourly
        group by
          to_date(date_time, "MMMM dd, yyyy hh:mm:ss a"),
          sensor_id,
          sensor_name
), pedestrian_top_count_daily (
   select
      *,
      row_number() over (
        partition by count_date
        order by
          daily_count desc
      ) as ranking
    from pedestrian_count_daily  
)      
select
  count_date,
  sensor_id,
  sensor_name,
  daily_count
from pedestrian_top_count_daily
where ranking <= 10
order by count_date desc;

count_date,sensor_id,sensor_name,daily_count
2022-10-31,41,Flinders La-Swanston St (West),41206
2022-10-31,4,Town Hall (West),35015
2022-10-31,5,Princes Bridge,28340
2022-10-31,1,Bourke Street Mall (North),25176
2022-10-31,84,Elizabeth St - Flinders St (East) - New footpath,24036
2022-10-31,3,Melbourne Central,22860
2022-10-31,66,State Library - New,21295
2022-10-31,35,Southbank,21090
2022-10-31,47,Melbourne Central-Elizabeth St (East),19100
2022-10-31,28,The Arts Centre,17537


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

df =  spark.read.table("pedestrian_counting_hourly")
df_pedestrian_count_daily =( df.withColumn("count_date", to_date("date_time", "MMMM dd, yyyy hh:mm:ss a")  )
      .groupBy("count_date","sensor_id","sensor_name")
      .sum("hourly_counts")
      .withColumnRenamed("sum(hourly_counts)","daily_count")      
)
windowSpec  = Window.partitionBy("count_date").orderBy(desc("daily_count"))
df_pedestrian_top_count_daily = df_pedestrian_count_daily.withColumn("ranking",row_number().over(windowSpec))
df_final= (df_pedestrian_top_count_daily
        .where("ranking <=10")
        .orderBy(desc("count_date"),desc("daily_count"))
        .select("count_date",  "sensor_id",  "sensor_name",  "daily_count")
)
df_final.show()

+----------+---------+--------------------+-----------+
|count_date|sensor_id|         sensor_name|daily_count|
+----------+---------+--------------------+-----------+
|2022-10-31|       41|Flinders La-Swans...|      41206|
|2022-10-31|        4|    Town Hall (West)|      35015|
|2022-10-31|        5|      Princes Bridge|      28340|
|2022-10-31|        1|Bourke Street Mal...|      25176|
|2022-10-31|       84|Elizabeth St - Fl...|      24036|
|2022-10-31|        3|   Melbourne Central|      22860|
|2022-10-31|       66| State Library - New|      21295|
|2022-10-31|       35|           Southbank|      21090|
|2022-10-31|       47|Melbourne Central...|      19100|
|2022-10-31|       28|     The Arts Centre|      17537|
|2022-10-30|       41|Flinders La-Swans...|      34557|
|2022-10-30|       35|           Southbank|      33084|
|2022-10-30|        4|    Town Hall (West)|      30841|
|2022-10-30|        1|Bourke Street Mal...|      24260|
|2022-10-30|        3|   Melbourne Central|     

#### Unit tests for "Top 10 (most pedestrians) locations by day"
We count pedestrian for a particular date i.e. 2022-10-31 and compare with the result above.  Daily count of the test query match the main query above.

In [0]:
%sql
select
  to_date(date_time, "MMMM dd, yyyy hh:mm:ss a") as count_date,
  sensor_id,
  sensor_name,
  sum(hourly_counts) as daily_count
from
  pedestrian_counting_hourly
where
  to_date(date_time, "MMMM dd, yyyy hh:mm:ss a") = '2022-10-31'
group by
  to_date(date_time, "MMMM dd, yyyy hh:mm:ss a"),
  sensor_id,
  sensor_name
order by
  daily_count desc
limit
  10;

count_date,sensor_id,sensor_name,daily_count
2022-10-31,41,Flinders La-Swanston St (West),41206
2022-10-31,4,Town Hall (West),35015
2022-10-31,5,Princes Bridge,28340
2022-10-31,1,Bourke Street Mall (North),25176
2022-10-31,84,Elizabeth St - Flinders St (East) - New footpath,24036
2022-10-31,3,Melbourne Central,22860
2022-10-31,66,State Library - New,21295
2022-10-31,35,Southbank,21090
2022-10-31,47,Melbourne Central-Elizabeth St (East),19100
2022-10-31,28,The Arts Centre,17537


# Top 10 (most pedestrians) locations by month

Below are explaination for the query I have wrote:

1. Calculate the monthly count for each location
2. Rank the count for each month using Windows function 
3. Show all records whose ranking <= 10
4. Poissibly, add ORDER BY month but not required

In [0]:
%sql
with pedestrian_count_monthly  (
 select
          date_format(to_date(date_time, "MMMM dd, yyyy hh:mm:ss a"),"yyyy-MM")  as month,
          sensor_id,
          sensor_name,
          sum(hourly_counts) as monthly_count
        from
          pedestrian_counting_hourly
        group by
          date_format(to_date(date_time, "MMMM dd, yyyy hh:mm:ss a"),"yyyy-MM") ,
          sensor_id,
          sensor_name
), pedestrian_top_count_monthly (
   select
      *,
      row_number() over (
        partition by month
        order by
          monthly_count desc
      ) as ranking
    from pedestrian_count_monthly
)      
select
  month,
  sensor_id,
  sensor_name,
  monthly_count
from pedestrian_top_count_monthly
where ranking <= 10
order by  month desc;



month,sensor_id,sensor_name,monthly_count
2022-10,41,Flinders La-Swanston St (West),1179043
2022-10,4,Town Hall (West),969871
2022-10,35,Southbank,953917
2022-10,84,Elizabeth St - Flinders St (East) - New footpath,764000
2022-10,3,Melbourne Central,712606
2022-10,25,Melbourne Convention Exhibition Centre,700074
2022-10,6,Flinders Street Station Underpass,693675
2022-10,1,Bourke Street Mall (North),686410
2022-10,66,State Library - New,684893
2022-10,47,Melbourne Central-Elizabeth St (East),620583


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

df =  spark.read.table("pedestrian_counting_hourly")
df_pedestrian_count_monthly =( df.withColumn("month", date_format(to_date("Date_Time", "MMMM dd, yyyy hh:mm:ss a"),"yyyy-MM"))
      .groupBy("month","sensor_id","sensor_name")
      .sum("hourly_counts")
      .withColumnRenamed("sum(hourly_counts)","monthly_count")      
)
windowSpec  = Window.partitionBy("month").orderBy(desc("monthly_count"))
df_pedestrian_top_count_monthly = df_pedestrian_count_monthly.withColumn("ranking",row_number().over(windowSpec))
df_final= (df_pedestrian_top_count_monthly
        .where("ranking <=10")
        .orderBy(desc("month"),desc("monthly_count"))
        .select("month",  "sensor_id",  "sensor_name",  "monthly_count")
)
df_final.show()

+-------+---------+--------------------+-------------+
|  month|sensor_id|         sensor_name|monthly_count|
+-------+---------+--------------------+-------------+
|2022-10|       41|Flinders La-Swans...|      1179043|
|2022-10|        4|    Town Hall (West)|       969871|
|2022-10|       35|           Southbank|       953917|
|2022-10|       84|Elizabeth St - Fl...|       764000|
|2022-10|        3|   Melbourne Central|       712606|
|2022-10|       25|Melbourne Convent...|       700074|
|2022-10|        6|Flinders Street S...|       693675|
|2022-10|        1|Bourke Street Mal...|       686410|
|2022-10|       66| State Library - New|       684893|
|2022-10|       47|Melbourne Central...|       620583|
|2022-09|       41|Flinders La-Swans...|      1139026|
|2022-09|       35|           Southbank|       959285|
|2022-09|        3|   Melbourne Central|       769524|
|2022-09|       84|Elizabeth St - Fl...|       730788|
|2022-09|       66| State Library - New|       697878|
|2022-09| 

#### Unit tests for "Top 10 (most pedestrians) locations by month"
We count pedestrian for a particular date i.e. 2022-10 and compare with the result above. Monthly count of the test query match the main query above.

In [0]:
%sql
select
  date_format(to_date(date_time, "MMMM dd, yyyy hh:mm:ss a"),"yyyy-MM") as month,
  sensor_id,
  sensor_name,
  sum(hourly_counts) as monthly_count
from
  pedestrian_counting_hourly
where
 date_format(to_date(date_time, "MMMM dd, yyyy hh:mm:ss a"),"yyyy-MM")  = '2022-10'
group by
 date_format(to_date(date_time, "MMMM dd, yyyy hh:mm:ss a"),"yyyy-MM") ,
  sensor_id,
  sensor_name
order by
  monthly_count desc
limit 10;


month,sensor_id,sensor_name,monthly_count
2022-10,41,Flinders La-Swanston St (West),1179043
2022-10,4,Town Hall (West),969871
2022-10,35,Southbank,953917
2022-10,84,Elizabeth St - Flinders St (East) - New footpath,764000
2022-10,3,Melbourne Central,712606
2022-10,25,Melbourne Convention Exhibition Centre,700074
2022-10,6,Flinders Street Station Underpass,693675
2022-10,1,Bourke Street Mall (North),686410
2022-10,66,State Library - New,684893
2022-10,47,Melbourne Central-Elizabeth St (East),620583


#	Which location has shown most decline due to lockdowns in last 3 years?

I implemented the following steps to show  most decline due to lockdowns in last 3 years:

1. Calculate yearly count of pedestrians for each location
2. Use  **LAG(year_count,3)** get the last 3 year count for each location
3. Calculate the difference between current year and last 3 year i.e.   year_count - previous_3_year_count
4. Add a filter i.e.  **year_count - previous_3_year_count  < 0**  to show only location and year with decline. If no location with declined count, this query just returns no row. I also add  **previous_3_year_count>0** to check the sensor's data availability in the last 3 years. **It does not make sense to compare the a sensor was just installed at location recently or less than 3 years**.
5. Add ORDER BY year DESC to show the last year's count in the result on the top and ORDDER BY  **year_count - previous_3_year_count** to show the most declined count  on the top of the result.
6. Add **LIMIT 1** to show only location of the most  decline count  in last 3 years


**Result:**  The Sensor 4 - Town Hall (West) has data availability in last 3 years and has most delinced count in 2022

In [0]:
%sql
with pedestrian_count_yearly as (
SELECT   
        sensor_id,
        sensor_name,
        Date_format(To_date(date_time, "MMMM dd, yyyy hh:mm:ss a"), "yyyy" ) AS year,
        Sum(hourly_counts) AS year_count
FROM     pedestrian_counting_hourly
GROUP BY 
        sensor_id,
        sensor_name, 
        Date_format(To_date(date_time, "MMMM dd, yyyy hh:mm:ss a"), "yyyy" )
),
pedestrian_count_last_3_years as (
SELECT   *,                       
        LAG(year_count,3) OVER (PARTITION BY sensor_id ORDER BY year ) AS previous_3_year_count -- Last 3 year count for each sensor_id
        
FROM     pedestrian_count_yearly )
select *,
       year_count - previous_3_year_count  as 3_year_decline_count
FROM   pedestrian_count_last_3_years                                  
WHERE  year_count - previous_3_year_count  < 0 -- must be have delince, if no location with declined count, query just returns no row.
       AND previous_3_year_count>0
ORDER BY  year desc,-- last year on the top
      (year_count - previous_3_year_count)   -- most declient count
limit 1  -- just return a location with the most delince count 


sensor_id,sensor_name,year,year_count,previous_3_year_count,3_year_decline_count
4,Town Hall (West),2022,3938330,12684222,-8745892


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.column import *
from pyspark.sql.window import Window

df =  spark.read.table("pedestrian_counting_hourly")
df_pedestrian_count_yearly =(df.withColumn("year",  date_format(to_date("date_time", "MMMM dd, yyyy hh:mm:ss a"), "yyyy" ) )
                            .groupBy("year","sensor_id","sensor_name")
                            .sum("hourly_counts")
                            .withColumnRenamed("sum(hourly_counts)","year_count")      
                            
)

windowSpec  = Window.partitionBy("sensor_id").orderBy("year")
df_pedestrian_count_last_3_years = df_pedestrian_count_yearly.withColumn("previous_3_year_count",lag("year_count",3).over(windowSpec))
df_final= (df_pedestrian_count_last_3_years.withColumn("3_year_decline_count",  col("year_count") - col("previous_3_year_count")  )
        .where("3_year_decline_count <0 and previous_3_year_count > 0")
        .orderBy(desc("year"),"3_year_decline_count")        
       
        )
df_final.show(1)


+----+---------+----------------+----------+---------------------+--------------------+
|year|sensor_id|     sensor_name|year_count|previous_3_year_count|3_year_decline_count|
+----+---------+----------------+----------+---------------------+--------------------+
|2022|        4|Town Hall (West)|   3938330|             12684222|            -8745892|
+----+---------+----------------+----------+---------------------+--------------------+
only showing top 1 row



### Unit Tests

Verify if the LAG() returns the correct data

In [0]:
%sql
with pedestrian_count_yearly as (
SELECT   
        sensor_id,
        sensor_name,
        Date_format(To_date(date_time, "MMMM dd, yyyy hh:mm:ss a"), "yyyy" ) AS year,
        Sum(hourly_counts) AS year_count
FROM     pedestrian_counting_hourly
GROUP BY 
        sensor_id,
        sensor_name, 
        Date_format(To_date(date_time, "MMMM dd, yyyy hh:mm:ss a"), "yyyy" )
),
pedestrian_count_last_3_years as (
SELECT   *,                       
        LAG(year_count,3) OVER (PARTITION BY sensor_id ORDER BY year ) AS previous_3_year_count -- Last 3 year count for each sensor_id
        
FROM     pedestrian_count_yearly )
select *,
       year_count - previous_3_year_count 
FROM   pedestrian_count_last_3_years                                  
WHERE  sensor_id=4
ORDER BY year DESC

sensor_id,sensor_name,year,year_count,previous_3_year_count,(year_count - previous_3_year_count)
4,Town Hall (West),2022,3938330,12684222.0,-8745892.0
4,Town Hall (West),2021,6155040,12901919.0,-6746879.0
4,Town Hall (West),2020,5260684,7470453.0,-2209769.0
4,Town Hall (West),2019,12684222,9918382.0,2765840.0
4,Town Hall (West),2018,12901919,13627947.0,-726028.0
4,Town Hall (West),2017,7470453,13180843.0,-5710390.0
4,Town Hall (West),2016,9918382,12983375.0,-3064993.0
4,Town Hall (West),2015,13627947,13811639.0,-183692.0
4,Town Hall (West),2014,13180843,13033407.0,147436.0
4,Town Hall (West),2013,12983375,12731642.0,251733.0


## Which location has most growth in last year?
I implemented the following steps to show most growth in last year:

1. Calculate yearly count of pedestrians for each location
2. Use **LAG(year_count,1)** get the last year count for each location
3. Calculate the difference between current year and last year i.e. year_count - previous_year_count
4. Add a filter i.e. year_count - previous_3_year_count > 0 to show only location and year with count growth. If no location with count growth, this query just returns no row. I also add previous_year_count>0 to check the sensor's data availability for the last year. It does not make sense to compare the a sensor was just installed at location recently or less than 1 year.
5. Add **ORDER BY year DESC** to show the last year's count in the result on the top and **ORDER BY year_count - previous_year_count DESC** to show the most declined count on the top of the result.
6. Add **LIMIT 1** to show only location of the most count growth  for last year.

Result: The Sensor 4 - Town Hall (West) has data availability in last 3 years and has most delinced count in 2022

In [0]:
%sql
with pedestrian_count_yearly as (
SELECT   
        sensor_id,
        sensor_name,
        Date_format(To_date(date_time, "MMMM dd, yyyy hh:mm:ss a"), "yyyy" ) AS year,
        Sum(hourly_counts) AS year_count
FROM     pedestrian_counting_hourly
GROUP BY 
        sensor_id,
        sensor_name, 
        Date_format(To_date(date_time, "MMMM dd, yyyy hh:mm:ss a"), "yyyy" )
),
pedestrian_count_last_year as (
SELECT   *,                       
        LAG(year_count,1) OVER (PARTITION BY sensor_id ORDER BY year ) AS previous_year_count -- Last 3 year count for each sensor_id
        
FROM     pedestrian_count_yearly )
select *,
       year_count - previous_year_count  as increase_count
FROM   pedestrian_count_last_year                                  
WHERE  year_count - previous_year_count  > 0 -- must be have increase, if no location with increased count, query just returns no row.
       AND previous_year_count > 0
ORDER BY  year desc,-- last year on the top
      (year_count - previous_year_count) DESC  -- most declient count
limit 1  -- just return a location with the most delince count 

sensor_id,sensor_name,year,year_count,previous_year_count,increase_count
3,Melbourne Central,2022,6897406,3327672,3569734


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.column import *
from pyspark.sql.window import Window

df =  spark.read.table("pedestrian_counting_hourly")
df_pedestrian_count_yearly =(df.withColumn("year",  date_format(to_date("date_time", "MMMM dd, yyyy hh:mm:ss a"), "yyyy" ) )
                            .groupBy("year","sensor_id","sensor_name")
                            .sum("hourly_counts")
                            .withColumnRenamed("sum(hourly_counts)","year_count")      
                            
)

windowSpec  = Window.partitionBy("sensor_id").orderBy("year")
df_pedestrian_count_last_year = df_pedestrian_count_yearly.withColumn("previous_year_count",lag("year_count",1).over(windowSpec))
df_final= (df_pedestrian_count_last_year.withColumn("increase_count",  col("year_count") - col("previous_year_count")  )
        .where("increase_count >0 and previous_year_count > 0")
        .orderBy(desc("year"),desc("increase_count"))        
       
        )
df_final.show(1)

+----+---------+-----------------+----------+-------------------+--------------+
|year|sensor_id|      sensor_name|year_count|previous_year_count|increase_count|
+----+---------+-----------------+----------+-------------------+--------------+
|2022|        3|Melbourne Central|   6897406|            3327672|       3569734|
+----+---------+-----------------+----------+-------------------+--------------+
only showing top 1 row



### Unit Tests

Verify if the LAG() returns the correct data

In [0]:
%sql
with pedestrian_count_yearly as (
SELECT   
        sensor_id,
        sensor_name,
        Date_format(To_date(date_time, "MMMM dd, yyyy hh:mm:ss a"), "yyyy" ) AS year,
        Sum(hourly_counts) AS year_count
FROM     pedestrian_counting_hourly
GROUP BY 
        sensor_id,
        sensor_name, 
        Date_format(To_date(date_time, "MMMM dd, yyyy hh:mm:ss a"), "yyyy" )
),
pedestrian_count_last_year as (
SELECT   *,                       
        LAG(year_count,1) OVER (PARTITION BY sensor_id ORDER BY year ) AS previous_year_count -- Last 3 year count for each sensor_id
        
FROM     pedestrian_count_yearly )
select *,
       year_count - previous_year_count as increased_count
FROM   pedestrian_count_last_year                                  
WHERE  sensor_id=3
ORDER BY year DESC

sensor_id,sensor_name,year,year_count,previous_year_count,increased_count
3,Melbourne Central,2022,6897406,3327672.0,3569734.0
3,Melbourne Central,2021,3327672,4142439.0,-814767.0
3,Melbourne Central,2020,4142439,9776584.0,-5634145.0
3,Melbourne Central,2019,9776584,10147696.0,-371112.0
3,Melbourne Central,2018,10147696,10623720.0,-476024.0
3,Melbourne Central,2017,10623720,8661012.0,1962708.0
3,Melbourne Central,2016,8661012,11244232.0,-2583220.0
3,Melbourne Central,2015,11244232,11499357.0,-255125.0
3,Melbourne Central,2014,11499357,10999364.0,499993.0
3,Melbourne Central,2013,10999364,10583007.0,416357.0
