# Overview of this Notebook and the approach

## Toolings used
* Python v3.11
  * Pandas
  * psycopg2
  * sqlalchemy
* Jupyter Notebook
* PostgreSQL v16


## The schema of the simple DWH (Data Warehouse) in PostgreSQL
* `ods`: It stands for Operational Data Store, the very first landing area of the flat CSV files in the DWH
* `int`: An intermediate layer for storing tables during the transformation process
* `dim`: A delicate schema for Dimension tables. See [Dimensional modeling](https://en.wikipedia.org/wiki/Dimensional_modeling)
* `fct`: A delicate schema for Fact tables.
* `sum`: A delicate schema for Summary tables which contain pre-computed & pre-aggregated business metrics, consuming from tables in the `dim` & `fct` schemas only.

### Data Ingestion
It was done with Python ( Pandas, psycopg2, and sqlalchemy) and ingesting the 2 given CSV files (i.e. `purchases` & `delivery_radius_log`) into the `ods` schema in the PostgreSQL DWH.

### Data Transformation
The goal of the Data Transformation is usually creating tables (Data Marts) in the `dim`, `fct`, and `sum` schemas.

For simple Data Marts (e.g. the Dimension Table of `delivery_areas`), it would directly consume from `ods`. If the transformation is complex, `int` is used to store the intermediate transformed tables.

In addition, if there is a lot more data than the given task here, potentially an extra layer between the `ods`  and the `int` schema can be added to centralize all the cleaning logic. It can be named as [the "Staging layer"](https://medium.com/data-panda/dbt-models-staging-layer-55f0f2ddc5e4) like dbt Labs, or just the "Data Layer"/"Merge Layer" from a more old school practice.


# Setup Code

In [1]:
# Importing all packages used
import pandas as pd
import psycopg2 
from sqlalchemy import create_engine 

## Setting up a PostgreSQL DB  in localhost

In [2]:
# Generating a connection STRING for using a test service account `dbuser`
from sqlalchemy import URL

url_object = URL.create(
    "postgresql+psycopg2",
    username="dbuser",
    password="1",  # plain (unescaped) text
    host="localhost",
    database="postgres",
)

url_object

postgresql+psycopg2://dbuser:***@localhost/postgres

In [3]:
# Establish the connection to the PostgreSQL instance, and create a test table there to confirm the connection works as expected
db = create_engine(url_object) 
conn = db.connect() 
conn1 = psycopg2.connect( 
  database="postgres", 
  user='dbuser',  
  password='1',  
  host='localhost',  
  port= '5432'
) 
  
conn1.autocommit = True
cursor = conn1.cursor() 

## Creating ODS tables in the local PostgreSQL server

### Importing delivery_radius_log

In [4]:
# Importing the CSV file delivery_radius_log as a dataframe first
df_delivery_radius_log = pd.read_csv('data/delivery_radius_log.csv')
df_delivery_radius_log.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1316 entries, 0 to 1315
Data columns (total 3 columns):
 #   Column                   Non-Null Count  Dtype 
---  ------                   --------------  ----- 
 0   DELIVERY_AREA_ID         1316 non-null   object
 1   DELIVERY_RADIUS_METERS   1316 non-null   int64 
 2   EVENT_STARTED_TIMESTAMP  1316 non-null   object
dtypes: int64(1), object(2)
memory usage: 31.0+ KB


In [5]:
# exmaining the DataFrame
df_delivery_radius_log.head()

Unnamed: 0,DELIVERY_AREA_ID,DELIVERY_RADIUS_METERS,EVENT_STARTED_TIMESTAMP
0,5db02e5d401d690c836b9ead,3000,2022-06-14T08:26:20.923854Z
1,5db02e5d401d690c836b9ead,7000,2022-06-14T08:49:01.186365Z
2,5db02e5d401d690c836b9ead,3000,2022-06-18T07:43:57.662294Z
3,5db02e5d401d690c836b9ead,7000,2022-06-18T08:00:45.227506Z
4,5d78a7e552dfabd5251dab7b,4000,2022-06-18T08:05:29.093983Z


In [6]:
# Ingesting the file into the PostgreSQL DB instance after checking it is okay
df_delivery_radius_log.to_sql(
    name = 'delivery_radius_log',
    schema = 'ods',
    con=db,
    if_exists = 'replace',
    index = False
)

316

### Importing purchases 

In [7]:
# Importing the CSV file delivery_radius_log as a dataframe first
df_purchases = pd.read_csv('data/purchases.csv')
df_purchases.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 177895 entries, 0 to 177894
Data columns (total 6 columns):
 #   Column                                 Non-Null Count   Dtype  
---  ------                                 --------------   -----  
 0   PURCHASE_ID                            177895 non-null  object 
 1   TIME_RECEIVED                          177895 non-null  object 
 2   TIME_DELIVERED                         177895 non-null  object 
 3   END_AMOUNT_WITH_VAT_EUR                177895 non-null  float64
 4   DROPOFF_DISTANCE_STRAIGHT_LINE_METRES  177895 non-null  int64  
 5   DELIVERY_AREA_ID                       177895 non-null  object 
dtypes: float64(1), int64(1), object(4)
memory usage: 8.1+ MB


In [8]:
# Exmining the DataFrame example data
df_purchases.head()

Unnamed: 0,PURCHASE_ID,TIME_RECEIVED,TIME_DELIVERED,END_AMOUNT_WITH_VAT_EUR,DROPOFF_DISTANCE_STRAIGHT_LINE_METRES,DELIVERY_AREA_ID
0,5f85beff7762a1539ad6faf1,2022-10-13T14:51:43.048Z,2022-10-13T15:18:35.265Z,17.87,735,5d78a7e552dfabd5251dab7b
1,5f85c08dddf0c9826389f3cd,2022-10-13T14:58:21.078Z,2022-10-13T15:28:09.194Z,17.75,436,5cc1b60b034adf90cd8f14dd
2,5f85bc2cf49ddea98955ce5f,2022-10-13T14:39:40.153Z,2022-10-13T15:05:15.058Z,25.8,867,5cc1b60b034adf90cd8f14dd
3,5f855dbf5a93deaf2be5b872,2022-10-13T07:56:47.003Z,2022-10-13T09:05:14.37Z,15.7,252,5db02e5d401d690c836b9ead
4,5f85be8a8876393ee141ed82,2022-10-13T14:49:46.693Z,2022-10-13T15:14:31.299Z,18.8,857,5db02e5d401d690c836b9ead


In [9]:
# Ingesting the DataFrame into the PostgreSQL DB instance
df_purchases.to_sql(
    name = 'purchases',
    schema = 'ods',
    con=db,
    if_exists = 'replace',
    index = False
)


895

# Task 1
In the first task you’ll work with the delivery radius log dataset. Given this delivery radius change log, we would like you to detect at any given time what is a temporary reduction (or increase) of the delivery radius and what is the "default" (more permanent) delivery radius. For this exercise, you can assume that the default radius at any given time is a radius that has lasted for at least 24 hours uninterrupted.

We would like you to produce a dataset(s) and answer the following:
* What are all the default delivery radiuses for the delivery areas during the timeframe
provided? Keep in mind that each area can have multiple default radiuses in the given
dataset.
* How many hours of radius reductions with respect to the the default radiuses have we
had during the timeframe provided for each delivery area?

Please give answers in numerical values to the above questions.

## Task 1 Interpretation

Given the `delivery_radius_log` file is basically Delivery Radius changing events (both of delivery radius expansion & reduction), and the defination of the Default Delivery Radius, this task needs to be solved by 2 steps:
1. Build a CTE that would only include records lasting longer than 24 hours (i.e. a table containing only changelog of Default Delivery Radius changes); Then,
2. Depends on the CTE from 1, calculate the temporary radius reductions since the latest change of the Default Delivery Radius (and until the next change of the Delivery Radius).



In [37]:
sql_fct_delivery_areas_default_radius_events = '''
WITH ods_delivery_radius_log AS (
	-- PostgreSQL syntax issue, need to use `"` to specify the exact column name
	SELECT "DELIVERY_AREA_ID" AS delivery_area_id
		, "EVENT_STARTED_TIMESTAMP"::TIMESTAMP AS event_started_timestamp
		, "DELIVERY_RADIUS_METERS" AS delivery_radius_meters
	FROM ods.delivery_radius_log 
), delivery_radius_log AS (
    SELECT *
        , LAG(delivery_radius_meters) OVER (PARTITION BY delivery_area_id ORDER BY event_started_timestamp) AS previous_delivery_radius_meters
        , LAG(event_started_timestamp) OVER (PARTITION BY delivery_area_id ORDER BY event_started_timestamp) AS previous_event_started_timestamp 
        , LEAD(event_started_timestamp) OVER (PARTITION BY delivery_area_id ORDER BY event_started_timestamp) AS next_event_started_timestamp 
    FROM ods_delivery_radius_log
), fct_delivery_radius_log AS (
    SELECT *
      , EXTRACT('epoch' FROM (next_event_started_timestamp - event_started_timestamp))/3600 AS delivery_radius_duration_hours
    FROM delivery_radius_log
), main_query AS (
	SELECT *
		, (delivery_radius_duration_hours >= 24) AS is_default_delivery_radius
	FROM fct_delivery_radius_log
)
SELECT delivery_area_id
	, event_started_timestamp AS current_default_radius_event_started_timestamp
	, COALESCE(LAG(event_started_timestamp) OVER (PARTITION BY delivery_area_id ORDER BY event_started_timestamp), '1900-01-01 00:00:00') AS previous_default_radius_event_started_timestamp 
    , COALESCE(LEAD(event_started_timestamp) OVER (PARTITION BY delivery_area_id ORDER BY event_started_timestamp), CURRENT_TIMESTAMP) AS next_default_radius_event_started_timestamp 
	, delivery_radius_meters AS default_delivery_radius_meters
    , delivery_radius_duration_hours
FROM main_query
WHERE is_default_delivery_radius
ORDER BY delivery_area_id, event_started_timestamp
'''

df_fct_delivery_areas_default_radius_events = pd.read_sql(sql_fct_delivery_areas_default_radius_events, con = db)

df_fct_delivery_areas_default_radius_events.to_sql(
    name = 'delivery_areas_default_radius_events',
    schema = 'fct',
    con=db,
    if_exists = 'replace',
    index = False
)

df_fct_delivery_areas_default_radius_events.head()

Unnamed: 0,delivery_area_id,current_default_radius_event_started_timestamp,previous_default_radius_event_started_timestamp,next_default_radius_event_started_timestamp,default_delivery_radius_meters,delivery_radius_duration_hours
0,5cc1b60b034adf90cd8f14dd,2021-12-01 12:30:09.405860,1900-01-01 00:00:00.000000,2021-12-02 13:27:00.815321+01:00,6500,24.769979
1,5cc1b60b034adf90cd8f14dd,2021-12-02 13:27:00.815321,2021-12-01 12:30:09.405860,2021-12-05 16:11:52.970808+01:00,6500,74.431627
2,5cc1b60b034adf90cd8f14dd,2021-12-05 16:11:52.970808,2021-12-02 13:27:00.815321,2021-12-08 14:07:17.814469+01:00,6500,69.759161
3,5cc1b60b034adf90cd8f14dd,2021-12-08 14:07:17.814469,2021-12-05 16:11:52.970808,2021-12-10 13:28:13.172008+01:00,6500,47.199525
4,5cc1b60b034adf90cd8f14dd,2021-12-10 13:28:13.172008,2021-12-08 14:07:17.814469,2021-12-11 14:34:38.885918+01:00,6500,24.872071


In [38]:
sql_fct_delivery_radius_events = '''
WITH delivery_radius_log AS (
    SELECT *
        , LAG(delivery_radius_meters) OVER (PARTITION BY delivery_area_id ORDER BY event_started_timestamp) AS previous_delivery_radius_meters
        , COALESCE(LAG(event_started_timestamp) OVER (PARTITION BY delivery_area_id ORDER BY event_started_timestamp), '1900-01-01 00:00:00') AS previous_event_started_timestamp 
        , COALESCE(LEAD(event_started_timestamp) OVER (PARTITION BY delivery_area_id ORDER BY event_started_timestamp), CURRENT_TIMESTAMP) AS next_event_started_timestamp 
    FROM (
        -- PostgreSQL syntax issue, need to use `"` to specify the exact column name
        SELECT "DELIVERY_AREA_ID" AS delivery_area_id
            , "EVENT_STARTED_TIMESTAMP"::TIMESTAMP AS event_started_timestamp
            , "DELIVERY_RADIUS_METERS" AS delivery_radius_meters
        FROM ods.delivery_radius_log 
    )
), fct_delivery_radius_log AS (
    SELECT *
      , EXTRACT('epoch' FROM (next_event_started_timestamp - event_started_timestamp))/3600 AS delivery_radius_duration_hours
    FROM delivery_radius_log
), main_query AS (
	SELECT events.*
		, CASE
			WHEN events.delivery_radius_duration_hours >= 24
				THEN events.delivery_radius_meters
			ELSE default_events.default_delivery_radius_meters
		  END AS default_delivery_radius_meters
	FROM fct_delivery_radius_log events
	LEFT JOIN fct.delivery_areas_default_radius_events default_events ON events.delivery_area_id = default_events.delivery_area_id
		AND events.event_started_timestamp > default_events.current_default_radius_event_started_timestamp 
		AND events.event_started_timestamp <= default_events.next_default_radius_event_started_timestamp
)
SELECT delivery_area_id
	, event_started_timestamp
	, delivery_radius_duration_hours
	, default_delivery_radius_meters
	, delivery_radius_meters
	, (default_delivery_radius_meters > delivery_radius_meters) AS is_reduction
FROM main_query
ORDER BY delivery_area_id, event_started_timestamp
'''

df_fct_delivery_radius_events = pd.read_sql(sql_fct_delivery_radius_events, con = db)

df_fct_delivery_radius_events.to_sql(
    name = 'delivery_radius_events',
    schema = 'fct',
    con=db,
    if_exists = 'replace',
    index = False
)

df_fct_delivery_radius_events.head()

Unnamed: 0,delivery_area_id,event_started_timestamp,delivery_radius_duration_hours,default_delivery_radius_meters,delivery_radius_meters,is_reduction
0,5cc1b60b034adf90cd8f14dd,2021-12-01 12:12:41.947087,0.290961,,3500,
1,5cc1b60b034adf90cd8f14dd,2021-12-01 12:30:09.405860,24.769979,6500.0,6500,False
2,5cc1b60b034adf90cd8f14dd,2021-12-02 13:16:21.329693,0.177635,6500.0,3500,True
3,5cc1b60b034adf90cd8f14dd,2021-12-02 13:27:00.815321,74.431627,6500.0,6500,False
4,5cc1b60b034adf90cd8f14dd,2021-12-05 15:52:54.673552,0.316194,6500.0,3500,True


* The output table `delivery_areas_default_radius_events` is a changelog of per Delivery Areas (`delivery_area_id`) with their default Delivery Radius (i.e. `default_delivery_radius_meters`) over time.
  * Given the dynamic nature of the Delivery Area radius, it is better to deliver a Fact table instead of a Dimension table (or Summary table).
* A simple query can be ran on top of `fct.delivery_radius_events` to calculate the hours of radius reductions with respect to the default radiuses, e.g.
```sql
SELECT delivery_area_id
	, event_started_timestamp
	, delivery_radius_duration_hours
FROM fct.delivery_radius_events
WHERE is_reduction
```

* Important Note: The current query is not perfect per se and could use some refactoring for when the dataset grows bigger in the future. If it is on proper DWH solutions (e.g. Google BigQuery, Snowflake etc.), using Windows Function with IGNORE NULLS would be a lot more scalable way to do so compared to JOINs.

In [45]:
sql_radius_reduction = '''
SELECT delivery_area_id
	, event_started_timestamp
	, delivery_radius_duration_hours
FROM fct.delivery_radius_events
WHERE is_reduction
'''

df_fct_delivery_radius_events = pd.read_sql(sql_fct_delivery_radius_events, con = db)

df_fct_delivery_radius_events.head()

Unnamed: 0,delivery_area_id,event_started_timestamp,delivery_radius_duration_hours,default_delivery_radius_meters,delivery_radius_meters,is_reduction,previous_event_started_timestamp,next_event_started_timestamp
0,5cc1b60b034adf90cd8f14dd,2021-12-01 12:12:41.947087,0.290961,,3500,,1900-01-01 00:00:00.000000,2021-12-01 12:30:09.405860+01:00
1,5cc1b60b034adf90cd8f14dd,2021-12-01 12:30:09.405860,24.769979,6500.0,6500,False,2021-12-01 12:12:41.947087,2021-12-02 13:16:21.329693+01:00
2,5cc1b60b034adf90cd8f14dd,2021-12-02 13:16:21.329693,0.177635,6500.0,3500,True,2021-12-01 12:30:09.405860,2021-12-02 13:27:00.815321+01:00
3,5cc1b60b034adf90cd8f14dd,2021-12-02 13:27:00.815321,74.431627,6500.0,6500,False,2021-12-02 13:16:21.329693,2021-12-05 15:52:54.673552+01:00
4,5cc1b60b034adf90cd8f14dd,2021-12-05 15:52:54.673552,0.316194,6500.0,3500,True,2021-12-02 13:27:00.815321,2021-12-05 16:11:52.970808+01:00


# Task 2

Now that we know the default delivery radiuses and times when the delivery radius was reduced, we would like you to create a derived dataset aggregated to hourly level that can be used to analyze delivery radius reductions and purchases in the areas for any hour in 2022. Build the dataset so that anyone could query the data without writing further joins or calculations and would be able to answer the following questions with a simple SELECT statement:
* How many purchases and how much revenue (End Amount With VAT Eur) do we produce during the hour?
* How long do the deviations (reductions) from default radius last during the hour? How many times have we modified the radius during the hour?
* How do these hourly values compare to the previous week for each area? This is just a simple week-over-week percentage difference for each of the above-mentioned four measures.

We want to emphasize that all three questions should be answered with the same aggregated dataset, meaning for instance that even the week-over-week differences are pre-calculated. Please note that for this task it is enough to only create the dataset and you are not expected to answer these questions. We only wish to see the code which creates this table and a sample of a few rows from the resulting dataset.

## Task 2 Interpretation

From the requirement of Task 2, it is obvious that a Summary table needs to be built. The spec of this Summary table would be:
1. Aggregated to the **hourly level**. Along with the Delivery Area information, it means the combined key of the `base_hour` + `delivery_area_id` columns should be there as the basic dimensions.
2. As for Metrics, these columns would have to be there as well:
    * Sales related: Purchases & Revenue (End Amount with Vat Eur). They
    * Delivery Radius related: Reduction duration & the frequency of delivery radius changing events (both expansion & reduction)
      * Also, it is assumed that all changes in the Delivery Radius are to be included (regardless if they are manually modifying events or automatically triggered)
3. On top of all 4 Metrics mentioned in point 2, WoW (Week-over-week) difference in percentage (%) would also need to be derived from them
    * It is also assumed that the WoW comparison window would be the same Day of the Week **and** same Hour of the Day, e.g. the number of Purchases from this Monday 1800 - 1900 would only be compared to the number of Purchases from last Monday 1800 - 1900.


From the technical perspective, here is the outline of the steps to take to build the Summary table
1. Preparation steps
    1. Generate a Fact table of Purchases
    2. Generate a Dimension table of all Delivery Areas
2. Development of intermediate tables
    1. Build the bases:
        1. Base Hours: Build an intermediate table containing all hours from 1 Jan 2022 till 31 Dec 2022
        2. Base Hours with all Delivery Areas: Get all possible combinations of Base Hours & Delivery Areas with a CROSS JOIN
    2. Calculate the aggregated Sales Metrics (i.e. purchases and revenue) per hour per Delivery Area
    3. Calculate the aggregated Delivery Radius Metrics
        1. Calculate the aggregated Reduction duration per hour per Delivery Area
        2. Calculate the number of delivery radius changing events per hour per Delivery Area
3. Putting everything from step 2 together, by:
    * Using the base table from 2A as the base, then
    * LEFT JOIN the aggregated table from 2B to fetch purchases & revenue
    * LEFT JOIN the aggregated table from 2C to fetch reduction duration & revenue number of delivery radius changing events
4. Based on the table from step 3, calculate the WoW percentage difference
    * COALESCE function would need to be used to replace NULLs with 0 if no metrics are found for that particular window





## Prep work

### Generate a Fact table of Purchases

The structure of the ODS table of `purchases` isn't good enough and need some enrichment with new columns, and they will be vital for later transformation. This step is to build a proper Fact table based on it and enforce the data type per column.

In [11]:
sql_fct_purchases = '''
SELECT "PURCHASE_ID" AS purchase_id
	, "TIME_RECEIVED"::TIMESTAMP AS time_received
	, "TIME_DELIVERED"::TIMESTAMP AS time_delivered
	, "END_AMOUNT_WITH_VAT_EUR" AS end_amount_with_vat_eur
	, "DROPOFF_DISTANCE_STRAIGHT_LINE_METRES" AS dropoff_distance_straight_line_metres
	, "DELIVERY_AREA_ID" AS delivery_area_id
FROM ods.purchases
ORDER BY time_received

'''

df_fct_purchases = pd.read_sql(sql_fct_purchases, con = db)

df_fct_purchases.to_sql(
    name = 'purchases',
    schema = 'fct',
    con=db,
    if_exists = 'replace',
    index = False
)

df_fct_purchases.head()

Unnamed: 0,purchase_id,time_received,time_delivered,end_amount_with_vat_eur,dropoff_distance_straight_line_metres,delivery_area_id
0,5e0bf2fdc3cded7e3ee098ae,2022-01-01 01:16:45.557,2022-01-02 09:36:24.341,28.9,362,5db02e5d401d690c836b9ead
1,5e0c53153040aacc8319ebd7,2022-01-01 08:06:45.992,2022-01-01 08:25:41.983,30.05,454,5db02e5d401d690c836b9ead
2,5e0c55e571bf4d9b33e0a3c0,2022-01-01 08:18:45.382,2022-01-01 08:44:36.997,18.7,1288,5db02e5d401d690c836b9ead
3,5e0c56163040aacc8319ef65,2022-01-01 08:19:34.633,2022-01-01 08:51:02.019,32.0,2044,5cc1b60b034adf90cd8f14dd
4,5e0c577d93a422bd1b6dc02d,2022-01-01 08:25:33.776,2022-01-01 08:56:42.016,39.2,2414,5cc1b60b034adf90cd8f14dd


### Generate a Dimension table for all Delivery Areas

Although the Fact table of Default Delivery Radius is there already, it is still useful to have a Dimension table having `delivery_area_id` as the Primary Key.

In [12]:
sql_dim_delivery_areas = '''
SELECT DISTINCT "DELIVERY_AREA_ID" AS delivery_area_id
FROM ods.delivery_radius_log 
;
'''

df_dim_delivery_areas = pd.read_sql(sql_dim_delivery_areas, con = db)

df_dim_delivery_areas.to_sql(
    name = 'delivery_areas',
    schema = 'dim',
    con=db,
    if_exists = 'replace',
    index = False
)

df_dim_delivery_areas.head()

Unnamed: 0,delivery_area_id
0,5cc1b60b034adf90cd8f14dd
1,5db02e5d401d690c836b9ead
2,5d78a7e552dfabd5251dab7b


## Development of queries for the required dataset in Task 2

### Generate an intermedate table for Base Hours

It needs to be on a hourly basis as it is the very base of the required Summary table.

In [13]:
sql_base_hours = '''
SELECT GENERATE_SERIES(date '2022-01-01', date '2022-12-31', '1 hour') AS base_hour
;
'''

df_base_hours = pd.read_sql(sql_base_hours, con = db)

df_base_hours.to_sql(
    name = 'base_hours',
    schema = 'int',
    con=db,
    if_exists = 'replace',
    index = False
)

df_base_hours.head()

Unnamed: 0,base_hour
0,2022-01-01 00:00:00+01:00
1,2022-01-01 01:00:00+01:00
2,2022-01-01 02:00:00+01:00
3,2022-01-01 03:00:00+01:00
4,2022-01-01 04:00:00+01:00


### Generate an intermedate table for Base Hours with all Delivery Areas

In [14]:
# Develop the Summary of Sales
sql_base_hours_with_delivery_areas = '''
WITH base_hours AS (
    SELECT generate_series(date '2022-01-01', date '2022-12-31', '1 hour') AS base_hour
)
SELECT base_hours.base_hour
    , delivery_areas.delivery_area_id
FROM base_hours
CROSS JOIN dim.delivery_areas
;
'''

df_base_hours_with_delivery_areas = pd.read_sql(sql_base_hours_with_delivery_areas, con = db)

df_base_hours_with_delivery_areas.to_sql(
    name = 'base_hours_with_delivery_areas',
    schema = 'int',
    con=db,
    if_exists = 'replace',
    index = False
)

df_base_hours_with_delivery_areas.head()

Unnamed: 0,base_hour,delivery_area_id
0,2022-01-01 00:00:00+01:00,5cc1b60b034adf90cd8f14dd
1,2022-01-01 01:00:00+01:00,5cc1b60b034adf90cd8f14dd
2,2022-01-01 02:00:00+01:00,5cc1b60b034adf90cd8f14dd
3,2022-01-01 03:00:00+01:00,5cc1b60b034adf90cd8f14dd
4,2022-01-01 04:00:00+01:00,5cc1b60b034adf90cd8f14dd


### Generate an intermedate table for Purchaes Summary

In [15]:
# Develop the Summary of Sales
sql_sum_purchases = '''
SELECT delivery_area_id
    , DATE_TRUNC('HOUR', time_received) AS base_hour
    , COUNT(*) AS nb_purchases
    , SUM(end_amount_with_vat_eur) AS end_amount_with_vat_eur
FROM fct.purchases
GROUP BY 1,2
ORDER BY 1,2
;
'''

df_sum_purchases = pd.read_sql(sql_sum_purchases, con = db)

df_sum_purchases.to_sql(
    name = 'sum_purchases',
    schema = 'int',
    con=db,
    if_exists = 'replace',
    index = False
)

df_sum_purchases.head()

Unnamed: 0,delivery_area_id,base_hour,nb_purchases,end_amount_with_vat_eur
0,5cc1b60b034adf90cd8f14dd,2022-01-01 08:00:00,3,104.65
1,5cc1b60b034adf90cd8f14dd,2022-01-01 09:00:00,4,129.3
2,5cc1b60b034adf90cd8f14dd,2022-01-01 10:00:00,22,626.7
3,5cc1b60b034adf90cd8f14dd,2022-01-01 11:00:00,15,515.0
4,5cc1b60b034adf90cd8f14dd,2022-01-01 12:00:00,19,573.75


### Generate an intermedate table for Delivery Radius Reduction summary

In [48]:
sql_sum_delivery_radius_reduction = '''
WITH delivery_radius_events AS (
    SELECT *
        , DATE_TRUNC('hour', DATE_ADD(base_hour, INTERVAL '1 HOUR')) AS next_base_hour
    FROM (
        SELECT delivery_area_id
            , DATE_TRUNC('hour', event_started_timestamp) AS base_hour
			, previous_event_started_timestamp
            , event_started_timestamp
            , next_event_started_timestamp
        FROM fct.delivery_radius_events
		WHERE is_reduction
    ) 
), delivery_radius_events_with_delta_hours AS (
	SELECT delivery_area_id
		, base_hour
		, next_base_hour
		, previous_event_started_timestamp
		, event_started_timestamp
		, next_event_started_timestamp
	    /*
	    When a Radius Reduction event occur, there are 4 possible scenarios:
	      Scenario 1. event starts & ends within the same hour
	      Scenario 2: event starts in the current base hour, then ends in another base hour later
		  Scenario 3: event starts in the current base hour or from a previous base hour, then ends in the current base hour
	      Scenario 4: event from in a previous base hour, then ends in the future base hour (i.e. full hour closure)
	    Where, Scenario 4 needs some special transofmration and hence won't be handled in this CTE
	    */
		, CASE
			-- Scenario 1: event starts & ends within the same hour
			WHEN base_hour = DATE_TRUNC('hour', event_started_timestamp)
					AND base_hour = DATE_TRUNC('hour', next_event_started_timestamp)
				THEN EXTRACT('epoch' FROM (next_event_started_timestamp - event_started_timestamp))/3600
			-- Scenario 2: event starts in the current base hour, then ends in another base hour later
			WHEN base_hour = DATE_TRUNC('hour', event_started_timestamp)
					AND next_event_started_timestamp > next_base_hour
				THEN EXTRACT('epoch' FROM (next_base_hour - event_started_timestamp))/3600
			-- Scenario 3: event from in a previous base hour, then ends in the current base hour
			WHEN previous_event_started_timestamp < base_hour
					AND base_hour = DATE_TRUNC('hour', event_started_timestamp)
				THEN EXTRACT('epoch' FROM (LEAST(next_event_started_timestamp, next_base_hour) - event_started_timestamp))/3600
			ELSE 0
		END AS delta_hours
	FROM delivery_radius_events
), delivery_radius_events_with_scenarios_1_2_3 AS (
	SELECT delivery_area_id
		, base_hour
		, SUM(delta_hours) AS delta_hours
	FROM delivery_radius_events_with_delta_hours
	GROUP BY 1,2
	ORDER BY delivery_area_id, base_hour
), delivery_radius_events_with_scenario_4 AS (
	SELECT base.delivery_area_id
		, base.base_hour
		, 1 AS delta_hours
	FROM int.base_hours_with_delivery_areas base
	INNER JOIN delivery_radius_events_with_delta_hours events ON base.delivery_area_id = events.delivery_area_id
		AND base.base_hour BETWEEN events.event_started_timestamp AND events.next_event_started_timestamp
	    -- The following 2 lines are to filter double JOINs 
		AND events.previous_event_started_timestamp < base.base_hour
		AND events.next_event_started_timestamp > DATE_ADD(base.base_hour, INTERVAL '1 hour')
	ORDER BY base.delivery_area_id, base.base_hour
), delivery_radius_events_union_all_agg AS (
	SELECT delivery_area_id
		, base_hour
		, SUM(delta_hours) AS delta_hours
	FROM (
		SELECT *
		FROM delivery_radius_events_with_scenarios_1_2_3

		UNION ALL

		SELECT *
		FROM delivery_radius_events_with_scenario_4
	)
	GROUP BY 1,2
	ORDER BY delivery_area_id, base_hour
)
SELECT base.delivery_area_id
	, base.base_hour
	, COALESCE(agg.delta_hours, 0) AS delta_hours_radius_reduction
FROM int.base_hours_with_delivery_areas base
LEFT JOIN delivery_radius_events_union_all_agg agg ON base.base_hour = agg.base_hour
	AND base.delivery_area_id = agg.delivery_area_id
ORDER BY base.delivery_area_id, base.base_hour
'''

df_sum_delivery_radius_reduction = pd.read_sql(sql_sum_delivery_radius_reduction, con = db)

df_sum_delivery_radius_reduction.to_sql(
    name = 'delivery_radius_reduction',
    schema = 'int',
    con=db,
    if_exists = 'replace',
    index = False
)

df_sum_delivery_radius_reduction.head()

Unnamed: 0,delivery_area_id,base_hour,delta_hours_radius_reduction
0,5cc1b60b034adf90cd8f14dd,2022-01-01 00:00:00,0.0
1,5cc1b60b034adf90cd8f14dd,2022-01-01 01:00:00,0.0
2,5cc1b60b034adf90cd8f14dd,2022-01-01 02:00:00,0.0
3,5cc1b60b034adf90cd8f14dd,2022-01-01 03:00:00,0.0
4,5cc1b60b034adf90cd8f14dd,2022-01-01 04:00:00,0.0


### Generate an intermedate table for Delivery Radius Modifciation events

In [50]:
sql_delivery_radius_modifications = '''
SELECT DATE_TRUNC('hour', event_started_timestamp) AS base_hour
	, delivery_area_id
	, COUNT(*) AS nb_delivery_radius_modified
FROM fct.delivery_radius_events
GROUP BY 1,2
ORDER BY 1,2
'''

df_delivery_radius_modifications = pd.read_sql(sql_delivery_radius_modifications, con = db)
df_delivery_radius_modifications.to_sql(
    name = 'delivery_radius_modifications',
    schema = 'int',
    con=db,
    if_exists = 'replace',
    index = False
)

df_delivery_radius_modifications.head()

Unnamed: 0,base_hour,delivery_area_id,nb_delivery_radius_modified
0,2021-12-01 12:00:00,5cc1b60b034adf90cd8f14dd,2
1,2021-12-02 13:00:00,5cc1b60b034adf90cd8f14dd,2
2,2021-12-02 14:00:00,5d78a7e552dfabd5251dab7b,2
3,2021-12-03 17:00:00,5db02e5d401d690c836b9ead,3
4,2021-12-05 14:00:00,5db02e5d401d690c836b9ead,2


## Putting everything together

Assumption: For the WoW (Week-over-week) comparison, it is assumed to compare for the same Delivery Area, same hour in the day, and the same day of week. E.g. the number of Purchases from this Monday 1800 - 1900 would only be compared to the number of Purchases from last Monday 1800 - 1900.

In [51]:
sql_sum_purchases_delivery_radius_reduction = '''
WITH base_hours_with_metrics AS (
	SELECT base.base_hour
		, EXTRACT('hour' FROM base.base_hour) AS hour_of_day
		, EXTRACT('dow' FROM base.base_hour) AS day_of_week
		, base.delivery_area_id
		, COALESCE(sum_purchases.nb_purchases, 0) AS nb_purchases
		, COALESCE(sum_purchases.end_amount_with_vat_eur, 0) AS end_amount_with_vat_eur
		, COALESCE(delivery_radius_reduction.delta_hours_radius_reduction, 0) AS delta_hours_radius_reduction
		, COALESCE(delivery_radius_modifications.nb_delivery_radius_modified, 0) AS nb_delivery_radius_modified
	FROM int.base_hours_with_delivery_areas base
	LEFT JOIN int.sum_purchases ON base.base_hour = sum_purchases.base_hour
		AND base.delivery_area_id = sum_purchases.delivery_area_id
	LEFT JOIN int.delivery_radius_reduction ON base.base_hour = delivery_radius_reduction.base_hour
		AND base.delivery_area_id = delivery_radius_reduction.delivery_area_id
	LEFT JOIN int.delivery_radius_modifications ON base.base_hour = delivery_radius_modifications.base_hour
		AND base.delivery_area_id = delivery_radius_modifications.delivery_area_id
	ORDER BY base.base_hour, base.delivery_area_id
), base_hours_with_metrics_and_metrics_from_last_week AS (
	SELECT *
		, LAG(nb_purchases) OVER (PARTITION BY delivery_area_id, hour_of_day, day_of_week ORDER BY base_hour) AS nb_purchases_last_week
		, LAG(end_amount_with_vat_eur) OVER (delivery_area_dow_hour_window) AS end_amount_with_vat_eur_last_week
		, LAG(delta_hours_radius_reduction) OVER (delivery_area_dow_hour_window) AS delta_hours_radius_reduction_last_week
		, LAG(nb_delivery_radius_modified) OVER (delivery_area_dow_hour_window) AS nb_delivery_radius_modified_last_week
	FROM base_hours_with_metrics
	WINDOW delivery_area_dow_hour_window AS (
		PARTITION BY delivery_area_id, hour_of_day, day_of_week 
		ORDER BY base_hour
	)
	ORDER BY base_hour, delivery_area_id
)
SELECT *
  -- To avoid Division by Zero error. Once again, if it is on BigQuery/Snowflake, we could just use Safe Divide functions like `DIV0`
    , CASE 
        WHEN nb_purchases_last_week != 0 
                AND nb_purchases_last_week IS NOT NULL
            -- Small workaround with the data type CASTing issues in PostgreSQL
            THEN (nb_purchases::FLOAT - nb_purchases_last_week)/ nb_purchases_last_week
        ELSE 0 
      END AS nb_purchases_wow_perc
    , CASE 
        WHEN end_amount_with_vat_eur_last_week != 0 
            -- Small workaround with the data type CASTing issues in PostgreSQL
            THEN (end_amount_with_vat_eur::FLOAT - end_amount_with_vat_eur_last_week)/ end_amount_with_vat_eur_last_week
        ELSE 0 
      END AS end_amount_with_vat_eur_wow_perc
    , CASE 
        WHEN delta_hours_radius_reduction_last_week != 0 
                AND delta_hours_radius_reduction_last_week IS NOT NULL
            -- Small workaround with the data type CASTing issues in PostgreSQL
            THEN (delta_hours_radius_reduction::FLOAT - delta_hours_radius_reduction_last_week)/ delta_hours_radius_reduction_last_week
        ELSE 0 
      END AS delta_hours_radius_reduction_wow_perc
    , CASE 
        WHEN nb_delivery_radius_modified_last_week != 0
                AND nb_delivery_radius_modified_last_week IS NOT NULL
            -- Small workaround with the data type CASTing issues in PostgreSQL
            THEN (nb_delivery_radius_modified::FLOAT - nb_delivery_radius_modified_last_week)/ nb_delivery_radius_modified_last_week
        ELSE 0 
      END AS nb_delivery_radius_modified_wow_perc
FROM base_hours_with_metrics_and_metrics_from_last_week
'''

df_sum_purchases_delivery_radius_reduction = pd.read_sql(sql_sum_purchases_delivery_radius_reduction, con = db)
df_sum_purchases_delivery_radius_reduction.to_sql(
    name = 'purchases_delivery_radius_reduction',
    schema = 'sum',
    con=db,
    if_exists = 'replace',
    index = False
)

df_sum_purchases_delivery_radius_reduction.head()

Unnamed: 0,base_hour,hour_of_day,day_of_week,delivery_area_id,nb_purchases,end_amount_with_vat_eur,delta_hours_radius_reduction,nb_delivery_radius_modified,nb_purchases_last_week,end_amount_with_vat_eur_last_week,delta_hours_radius_reduction_last_week,nb_delivery_radius_modified_last_week,nb_purchases_wow_perc,end_amount_with_vat_eur_wow_perc,delta_hours_radius_reduction_wow_perc,nb_delivery_radius_modified_wow_perc
0,2022-01-01 00:00:00,0.0,6.0,5cc1b60b034adf90cd8f14dd,0,0.0,0.0,0,,,,,0.0,0.0,0.0,0.0
1,2022-01-01 00:00:00,0.0,6.0,5d78a7e552dfabd5251dab7b,0,0.0,0.0,0,,,,,0.0,0.0,0.0,0.0
2,2022-01-01 00:00:00,0.0,6.0,5db02e5d401d690c836b9ead,0,0.0,0.0,0,,,,,0.0,0.0,0.0,0.0
3,2022-01-01 01:00:00,1.0,6.0,5cc1b60b034adf90cd8f14dd,0,0.0,0.0,0,,,,,0.0,0.0,0.0,0.0
4,2022-01-01 01:00:00,1.0,6.0,5d78a7e552dfabd5251dab7b,0,0.0,0.0,0,,,,,0.0,0.0,0.0,0.0


# Solution Clarifications

Thus, in addition to solving the tasks, please also answer the following questions to clarify your solution:
* What assumptions about the data have you made to produce the dataset?
* Why did you decide to go with this particular approach and what could be the pros and cons of applying it?
* How could the solution be improved if given more time and data?
* What strategy would you use for updating the dataset from task 2? Consider how often the default radius should be calculated, do we need to truncate the table before updating etc. Please assume that upstream data (i.e. purchases & delivery_radius_log) is streamed to the tables, so changes arrive near real time.

## What assumptions about the data have you made to produce the dataset?

* It is assumed that the organization is adopting Dimensional Modelling (i.e. Kimball model) for data modeling practice (i.e. instead of Data Mesh, Inmon model, or Data Vault etc.).
* It is also assumed that the option of requesting to add the primary key per event (i.e. `event_id`) is off the table for now, and we as the data team have to come up with the Surrogate Key ourselves instead of requesting the upstream Tech team to enrich it.
  * This assumption is made due to time consideration. Once the business logic is validated and it is okay, it could be treated as a Tech Debt and move upstream in the future's refactoring effort.



For further code-specific assumptions, please see the above inline comments in the code directly.

## Why did you decide to go with this particular approach and what could be the pros and cons of applying it?

As mentioned at the very beginning of this notebook, different layers of data are being introduced as a simple Data Warehouse architecture:
1. `ods` schema as the Operational Data Store, the very first landing area of the ingested data (e.g. flat CSV files)
2. `int` schema as the temp storage of the intermediate transformed data
3. `dim`, `fct`, and `sum` schemas for different types of Data Marts (i.e. Dimension, Fact, and Summary)

A couple of rationale of this architecture:
* The data from the 2 flat files needs to be cleaned at the 1st step of entering the Data Warehouse; AND,
* The cleaned data needs to be transformed and the same transformation should only happen once (i.e. NO duplicated code or data)
* Having a Data Mart type-specific schema so enforce the modeling thinking while designing & consuming from the tables

### Pros
The pros of doing so mostly reaping the benefits of Dimensional Modelling as the most popular data modeling practice in the market:
1. The easiest to understand (by business stakeholders); AND,
2. Rather flexible to adapt to unexpected business development (e.g. introduction of new Entities, new business models, etc.);
    * For comparison, the Inmon model for example would take a lot more planning ahead of implementation and may become a luxury for many companies with the given business dynamic in the modern days.
3. Also, having different layers of data is essential to avoid spaghetti code - which is vital for long-term maintenance of the code

Nevertheless, when the organization scales up, solely by Dimension modeling itself won't be enough to get the job done properly - it may need additional enhancement from Data Mesh (and, depending on the industry & business needs, certain elements from Data Vault should be borrowed as well). And to further enhance the implementation of Data Mesh, Data Contracts could also be considered as well.

### Cons
* Building the Data Warehouse structure requires extra investment of time & effort in short term
* The Dimensional Modelling requires both the data team and stakeholders to have a good understanding of the Primary Key. Or else, it may lead to query performance issues or inaccurately calculated KPIs
* Stakeholders may need further education & promotion to know how to properly use tables in `sum` schema, e.g. provided example queries
* Changes in source data themselves are not tracked by default (only the latest state of the data will be kept). That is rather the strong suit of Data Vault, not the Kimball model's


As for the Tech Debt part (especially the query performance-related engineering decisions), it is a calculated risk to take. When it comes to analytics, the speed of delivery is also vital - especially for time-sensitive requests. Once it is delivered, a disciplined BI/data team should always spare a certain portion of capacity (e.g. 5%) for incremental refactoring to ensure the quality of code in the long run.

For further code-specific details, please see above inline comments in the code directly.

## How could the solution be improved if given more time and data?

One of the biggest constraints of the current solution is the database (the SQL dialect - to be very specific). The currently selected solution of the database is PostgreSQL v16, and the only reason for selecting it is the nature of open-source and free. 

Given the current size of the data (i.e. the 2 CSV  files), it is okay-ish to handle the data transformation with the above approach (e.g. using JOINs instead of Windows Function, processing the data without the required Primary Key and the `updated_at` timestamp, etc.). When the volume of the data increases as the business grows, the performance of the above queries in PostgreSQL is likely to degrade, and cost could also increase faster than expected.

In addition, the usability of the produced datasets could be further enhanced for more use cases beyond the requirement of the assignment itself, e.g. 
* Include TIMESTAMPS & DATEs in the local timezone as well as the timezone topic is traditionally fairly important to the operation of this business (since the provided TIMESTAMPs are all in UTC)
* Enrich the dataset with more Dimensions, e.g. "T-Dimensions" (ISO week, Month, Quarter, Year, etc.), segmentation of the Venue & the Customers, etc.
* Collect more business use cases and enrich the same dataset with more metrics, e.g. Delivery Time (time interval between Order Time Received & Time Delivered), Distance calculation (on top of the Straight Line method, there are also other mathematical ways to calculate the distances, e.g. [Euclidean distance](https://simple.wikipedia.org/wiki/Euclidean_distance)) --> this could be more fitted to new business use cases, for example improving delivery performance in new countries that has very different urban design

In addition, the data still looks like more "downstream" in the picture. It may be an indicator of the team mentality (both data team and tech teams) - seeing data as a by-product of the applications/business or seeing data itself as a Product. If the awareness of seeing data as a Product is in place, the Primary Key of the events should be directly available in the data source instead of having to generate the surrogate key downstream.

## What strategy would you use for updating the dataset from task 2?  Consider how often the default radius should be calculated, do we need to truncate the table before updating etc. Please assume that upstream data (i.e. purchases & delivery_radius_log) is streamed to the tables, so changes arrive near real time.

This is a very good question, and it deserves a detail explanation here.

Essentially, the nature of the Task 2 dataset is a Summary table containing pre-computed & pre-aggregated Metrics from both Sales (Purchases & Revenue) and Delivery Radius Events (Radius Reduction & Radius Modifications).

### Elements to be involved 

#### The combined key of the Summary table
In the summary table sum.purchases_delivery_radius_reduction, the columns being used as combine key are:

* `base_hour`
* `delivery_area_id`

These would be very vital when it comes to refreshing the table.

#### Partitioning & Clustering
Before actually implementing the dataset, there are MUST-HAVEs to be put in place for the sake of query performance & cost:
* Partitioning (DATE/TIMESTAMP Partitioning in BigQuery, Micro-Partitions in Snowflake)
* Clustering

These 2 elements are extremely important when it comes to high-volume & high-velocity data (e.g. tracking vents, or the `delivery_radius_log` in this assignment). Proper Partitioning would help the data warehouse skip irrelevant rows of records which leads to good performance, Clustering would help speed up identifying the relevant records to process. 

### Factors to consider
There are a couple of factors that should be considered regarding deciding the Data Updating Strategy:
* Freshness of the upstream data (i.e. `purchases` & `delivery_radius_log`) - and it is known to be near real-time
* Business Requirement: Even if it is technically possible, it doesn't mean that it **has to** be delivered on near-realtime as well. At the end of the day, it depends on the business use-case.
  * For example, if it is just used for daily reporting/dashboarding, then just a daily refresh would be enough;
  * On the other hand, if someone from the business team is actively looking at those numbers literally a few times every hour AND makes decisions based on it, then **maybe** it makes sense to refresh on an hourly basis.
    * This sort of business use case needs a more proper tech solution instead of depending on an analytical dataset, with the rationale of the resources required to maintain the SLA and the stability of the pipeline since high-velocity data pipelines tend to be more volatile than slower ones.
* The Business Definitions of the metrics:
  * The Sales related Metrics (Purchases & Revenue) are rather straightforward, as they often just anchor on the "base TIMESTAMP" (e.g. `time_received`) unless there is a special requirement from the business logic
  * Yet, the trick lies with the event-based data interval calculation - namely Delivery Radius Reduction. It is tricky as the given definition of the Default Delivery Radius solely depends on the Delivery Radius sticking around for more than 24 hours.
    * A better scenario would be whatever application or micro-service generating the change of Default Radius events available somewhere else (e.g. another Kafka topic) instead of populating everything in just 1 Kafka topic and ask the Data/BI team to do the transformation.
      * The rationale behind is the principle of "Shift Left" - doing so at the beginning would just cost 1 Euro, doing so in the middle would cost 10 Euro, and doing it at the end could cost 100+ Euro.


#### Proposed Strategy
Based on the factors mentioned above, here would be the spec of the Proposed update strategy:
* Materialization: Incremental
  * The TIMESTAMPs from the upstream tables (`purchases` & `delivery_radius_log`) would be required here to implement incremental filtering
    * it is also equally important to plan for backfills in the future. Ideally speaking, there should be a condition statement (i.e. `IF`) to check if the current run is an Incremental one. If not, then the TIMESTAMP filter should be removed from the SQL statement to allow the data warehouse to process the whole table
  * The combined key of `base_hour` + `delivery_area_id` in the Summary table would leverage to identify which exact record contains new Metrics from the run and hence should be updated as well
* Interval: Hourly/Daily (depends on the business use case)
  * It is mostly based on the required Dimension being on an hourly basis. Although it is technically possible to make it even more frequent, the cost of the additional volatility and potential confusion for the stakeholders (e.g. very low numbers of the latest record as the hour hasn't been completed yet)
* Regular backfills & Re-clustering (e.g. every quarter/half-year, or even on an annual basis)
  * While the Incremental load is good for performance & cost, if there are anomalies of the upstream events (e.g. the events arrive late but with a much older TIMESTAMP value), those records will never be covered by the incremental load
  * While a table has records being inserted only for a certain period, the clustering would slowly be messy over time. It might be beneficial to have it re-clustered once a while 

***Notes***
* It is not recommended that the pattern of truncating the table first due to resilience consideration.
  * Depending on which data warehouse is being used (e.g. Snowflake), there is a chance truncating operations would cause problems. Since truncating and inserting are different queries, if there is a service disruption and somehow makes one of the queries fail, it would result in duplication or unexpected missing records (there is also a risk of having the whole table empty if human error is in the mix)
* Last but not least, the use of Incremental Load is a 2-edged sword. It brings the benefit of performance & cost, while it also introduces extra complexity. It is vital to ensure the foundation & tech debt are in a good state to implement this.


#### Additional considerations

##### Missing columns from the given `delivery_radius_log`

As a starter, there are 2 very critical fields/columns missing from the `delivery_radius_log`:
1. A Primary Key (e.g. `event_id`)
    * A Surrogate Key can still be derived by using both of `delivery_area_id` & `event_started_timestamp`. It is okay in most cases, **but** in certain corner cases (e.g. duplicated events from the upstream application/micro-service, Kafka events replay), even the Surrogate Key would be duplicated.
3. TIMESTAMPs indicating when is the event being ingested into the data warehouse (e.g. `created_at`, `updated_at`, or `ingested_at`)

Imagine if these 2 columns exist in `delivery_radius_log`, the schema would look like this:
* `event_id`
* `delivery_area_id`
* `delivery_radius_meters`
* `event_started_timestamp`
* `ingested_at`

then, it would be possible to leverage the strength of the data warehouse (e.g. Snowflake, BigQuery) to do something like this:

```SQL
SELECT * EXCEPT (_row_number)
FROM (
    SELECT *
        , ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY ingested_at DESC) AS _row_number
    FROM delivery_radius_log
)
WHERE _row_number = 1
```

If this is in place, the data pipeline would still be fairly resilient even if there are duplications from the streaming events (e.g. replaying in Kafka).