## Running the full ETL pipeline

Below are the concise steps a data analyst should follow to perform data-quality checks and then transform the cleaned data.  


1. **Prerequisite** – ensure a Python virtual environment is active and the project dependencies are installed

   ```bash
   python -m venv .venv
   .venv\\Scripts\\activate      
   pip install -r requirements.txt
   ```

   Python 3.10.9 was used to test this code.


  * change `vscode://settings/jupyter.notebookFileRoot` to `${workspaceFolder}`


2. **Data availability** – raw CSV files should reside in `/Raw` folder; 
these files come from the HDB resale dataset.  
The schema defined in `resale_flat_schema.raw_resale_flat_schema` treats most columns as strings.


    This should be the folder structure: 
    **Project Name**
    ```
    ├── Data/         # Datasets (Cleaned, Failed, Hashed, Transformed)
    ├── Logs/         # Output error logs, and execution traces
    ├── Raw/          # Immutable original data (Files immediately downloaded from data.gov)
    └── src/          # Source code, scripts, and README.ipynb

    ```





3. **Profile raw data** – execute the first code cell below to load all raw files and generate an HTML profiling report:  
   the dataset contains columns listed in `config.json` and months span from 1990 onward.
    The profiler helps discover actual values for categorical columns.  
   *Assumptions:* 
    * months column: Year and month the flat was resold at






First run data_quality_run to identify the actual categorical values used for `Town, Flat Type, Flat
Model`


In [1]:
## Load config
from pathlib import Path
import json

import os

print(os.getcwd()) # Check that this is the root directory of the project

f:\Projects\HousingETL


In [2]:

config_path =  "src/config.json"
rules_path =  "src/data_quality_rules.json"
with open(config_path, "r") as f:
    config = json.load(f)
with open(rules_path, "r") as f:
    rules = json.load(f)


# Define config variables
folder_paths = config.get("FolderPaths", {})
raw_folder = config["FolderPaths"]["RawFolderPath"]



In [3]:
from src.data_quality_check import data_profiling_run
data_profiling_run(reprofile=False)

  from .autonotebook import tqdm as notebook_tqdm
2026-03-01 14:31:33,262 [INFO] (data_profiling_run) Loaded raw CSV files from f:\Projects\HousingETL\Raw


month,town,flat_type,block,street_name,storey_range,floor_area_sqm,flat_model,lease_commence_date,remaining_lease,resale_price
str,str,str,str,str,str,str,str,str,str,str
"""1990-01""","""ANG MO KIO""","""1 ROOM""","""309""","""ANG MO KIO AVE 1""","""10 TO 12""","""31""","""IMPROVED""","""1977""","""9000""",
"""1990-01""","""ANG MO KIO""","""1 ROOM""","""309""","""ANG MO KIO AVE 1""","""04 TO 06""","""31""","""IMPROVED""","""1977""","""6000""",
"""1990-01""","""ANG MO KIO""","""1 ROOM""","""309""","""ANG MO KIO AVE 1""","""10 TO 12""","""31""","""IMPROVED""","""1977""","""8000""",
"""1990-01""","""ANG MO KIO""","""1 ROOM""","""309""","""ANG MO KIO AVE 1""","""07 TO 09""","""31""","""IMPROVED""","""1977""","""6000""",
"""1990-01""","""ANG MO KIO""","""3 ROOM""","""216""","""ANG MO KIO AVE 1""","""04 TO 06""","""73""","""NEW GENERATION""","""1976""","""47200""",
…,…,…,…,…,…,…,…,…,…,…
"""2026-02""","""YISHUN""","""EXECUTIVE""","""352""","""YISHUN RING RD""","""01 TO 03""","""146.00""","""Maisonette""","""1988""","""61 years 06 months""","""875000"""
"""2026-02""","""YISHUN""","""EXECUTIVE""","""792""","""YISHUN RING RD""","""01 TO 03""","""142.00""","""Apartment""","""1987""","""60 years 07 months""","""833000"""
"""2026-01""","""YISHUN""","""EXECUTIVE""","""643""","""YISHUN ST 61""","""10 TO 12""","""142.00""","""Apartment""","""1987""","""60 years 09 months""","""825000"""
"""2026-01""","""YISHUN""","""EXECUTIVE""","""643""","""YISHUN ST 61""","""04 TO 06""","""146.00""","""Maisonette""","""1987""","""60 years 08 months""","""788000"""


**From the profiling:**
* Categorical values to be saved in `data_quality_rules.json` will be the most common values that appear in the profiling
* Categorical values get split between Upper and lower case. Will add additional step to convert all to upper case




4. **Update rules** – inspect `data_quality_rules.json` and adjust `expected_values` lists for `flat_type`, `flat_model`, etc.    
Rules are applied after upper‑casing to normalize case variations.

5. **Run validation** – call `data_validation` from `data_quality_check` to clean the data:

In [4]:


from src.data_quality_check import data_validation, combine_datasets
from pathlib import Path

import polars as pl


raw = combine_datasets(raw_folder)
qualified, unqualified = data_validation(raw)


with pl.Config(set_tbl_hide_dataframe_shape=True, tbl_cols=-1):
    print(qualified.head(n=50))

2026-03-01 14:31:33,759 [INFO] (filter_null_values) Successfully converted and filtered out null values: 817822 rows with null values found
2026-03-01 14:31:33,762 [INFO] (filter_month_range) Filtered rows by month range: kept 37108 of 153701
2026-03-01 14:31:33,765 [INFO] (data_validation) Successfully converted all categorical values to upper case
2026-03-01 14:31:33,770 [INFO] (data_validation) Successfully filtered out categorical values based on rules: 31946 qualified, 5162 not qualified
2026-03-01 14:31:33,790 [INFO] (data_validation) Validation complete: 31543 qualified, 823786 not qualified


┌─────┬─────┬─────┬───────┬────────┬───────┬───────┬───────┬───────┬───────┬───────┬───────┬───────┐
│ mon ┆ tow ┆ fla ┆ block ┆ street ┆ store ┆ floor ┆ flat_ ┆ lease ┆ remai ┆ resal ┆ creat ┆ compo │
│ th  ┆ n   ┆ t_t ┆ ---   ┆ _name  ┆ y_ran ┆ _area ┆ model ┆ _comm ┆ ning_ ┆ e_pri ┆ ed_da ┆ site_ │
│ --- ┆ --- ┆ ype ┆ str   ┆ ---    ┆ ge    ┆ _sqm  ┆ ---   ┆ ence_ ┆ lease ┆ ce    ┆ tetim ┆ key   │
│ dat ┆ str ┆ --- ┆       ┆ str    ┆ ---   ┆ ---   ┆ str   ┆ date  ┆ ---   ┆ ---   ┆ e     ┆ ---   │
│ e   ┆     ┆ str ┆       ┆        ┆ str   ┆ i64   ┆       ┆ ---   ┆ str   ┆ i64   ┆ ---   ┆ str   │
│     ┆     ┆     ┆       ┆        ┆       ┆       ┆       ┆ date  ┆       ┆       ┆ datet ┆       │
│     ┆     ┆     ┆       ┆        ┆       ┆       ┆       ┆       ┆       ┆       ┆ ime[μ ┆       │
│     ┆     ┆     ┆       ┆        ┆       ┆       ┆       ┆       ┆       ┆       ┆ s]    ┆       │
╞═════╪═════╪═════╪═══════╪════════╪═══════╪═══════╪═══════╪═══════╪═══════╪═══════╪═══════


   *Behavior:*
   - Rows with missing key fields are removed.
   - Only months between Jan 2012 and Dec 2016 are kept (per `filter_month_range`).
   - Categorical values are upper‑cased and filtered using the rules. 
   - Numeric casts and lease calculations are performed.
   - Duplicate records based on composite key are split into qualified and failed sets.
   - Cleaned rows are written to `Data/Cleaned.csv`, failed ones to `Data/Failed.csv`.

6. **Transform cleaned data** – once you have `Cleaned.csv`, run the transformation logic:  
   This function
   - reads `Cleaned.csv` using the cleaned schema,
   - generates `block_num` (3‑digit, zero‑padded numeric part of `block`),
   - computes average resale price by month & flat type,
   - joins the average back to every row,
   - builds a `resale_identifier` with format
     `S{block_num}{last2(avg_price)}{month}{town[1:]}`.
   - duplicates are detected and failed records exported; cleaned results go to `/Data/Transformed.csv` or field used in `config.json`

In [5]:
from src.data_transformation import transform_cleaned_data
transformed = transform_cleaned_data()

with pl.Config(set_tbl_hide_dataframe_shape=True, tbl_cols=-1):
    print(transformed.head(n=50))

2026-03-01 14:31:33,943 [INFO] (transform_cleaned_data) Cleaned data path resolved: Data/Cleaned.csv
2026-03-01 14:31:33,944 [INFO] (transform_cleaned_data) Reading cleaned CSV into DataFrame
2026-03-01 14:31:33,965 [INFO] (transform_cleaned_data) Loaded 31543 rows
2026-03-01 14:31:33,966 [INFO] (get_resale_identifier) Creating block_num column
2026-03-01 14:31:33,971 [INFO] (get_resale_identifier) Aggregating average resale_price by month and flat_type
2026-03-01 14:31:33,974 [INFO] (get_resale_identifier) Aggregation produced 130 groups
2026-03-01 14:31:33,975 [INFO] (get_resale_identifier) Joining averages back to original DataFrame
2026-03-01 14:31:33,978 [INFO] (get_resale_identifier) Joined DataFrame has 31543 rows
2026-03-01 14:31:33,979 [INFO] (get_resale_identifier) Generating resale_identifier column
2026-03-01 14:31:33,983 [INFO] (filter_resale_identifier) Sorting by resale_price and removing duplicates
2026-03-01 14:31:33,986 [INFO] (filter_resale_identifier) Dropping inter

f:\Projects\HousingETL


2026-03-01 14:31:34,180 [INFO] (transform_cleaned_data) Writing transformed data to CSV
2026-03-01 14:31:34,243 [INFO] (transform_cleaned_data) Writing hashed data to CSV


┌─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬────────┬───────┬───────┬───────┐
│ mon ┆ tow ┆ fla ┆ blo ┆ str ┆ sto ┆ flo ┆ fla ┆ lea ┆ rem ┆ res ┆ create ┆ compo ┆ resal ┆ hashe │
│ th  ┆ n   ┆ t_t ┆ ck  ┆ eet ┆ rey ┆ or_ ┆ t_m ┆ se_ ┆ ain ┆ ale ┆ d_date ┆ site_ ┆ e_ide ┆ d_res │
│ --- ┆ --- ┆ ype ┆ --- ┆ _na ┆ _ra ┆ are ┆ ode ┆ com ┆ ing ┆ _pr ┆ time   ┆ key   ┆ ntifi ┆ ale_i │
│ dat ┆ str ┆ --- ┆ str ┆ me  ┆ nge ┆ a_s ┆ l   ┆ men ┆ _le ┆ ice ┆ ---    ┆ ---   ┆ er    ┆ denti │
│ e   ┆     ┆ str ┆     ┆ --- ┆ --- ┆ qm  ┆ --- ┆ ce_ ┆ ase ┆ --- ┆ dateti ┆ str   ┆ ---   ┆ fier  │
│     ┆     ┆     ┆     ┆ str ┆ str ┆ --- ┆ str ┆ dat ┆ --- ┆ i64 ┆ me[μs] ┆       ┆ str   ┆ ---   │
│     ┆     ┆     ┆     ┆     ┆     ┆ i64 ┆     ┆ e   ┆ str ┆     ┆        ┆       ┆       ┆ str   │
│     ┆     ┆     ┆     ┆     ┆     ┆     ┆     ┆ --- ┆     ┆     ┆        ┆       ┆       ┆       │
│     ┆     ┆     ┆     ┆     ┆     ┆     ┆     ┆ dat ┆     ┆     ┆        ┆       ┆       

7. **Review outputs** – inspect the CSV files under `Data` and use the profiling report or additional analysis as needed.

### Notes & assumptions

* The `month` field is assumed to be parseable as a date; rows outside the specified range are removed early.
* The transformation uses Polars for performance and adds logging at each key step; logs appear on the console and in `housing_etl.log` if `LogsFolderName` is configured.
* Adjust the `data_quality_rules.json` and `config.json` as the dataset evolves.


## System Design
![alt text](./System_Design.png)

### Assumptions:

#### Network assumptions
1. AWS athena will be given access to VPC S3 instance
2. Tableau will be given IAM with valid credentials for Athena
3. Tableau on AWS refers to Tableau Cloud, which would be within AWS



### References: 

1. Private https://help.tableau.com/current/online/en-us/private_connect_setup_overview.htm

## Markdown code to generate system diagram: 

```mermaid
flowchart LR

  %% Public Internet (only for source upload)
  subgraph INTERNET [Public Internet]
    SOURCE@{ img: "https://api.iconify.design/mdi/earth.svg", label: "Data Gov Site", pos: "b", w: 60, h: 60, constraint: "on"}

  end

  

  %% Enterprise VPC (Singapore Region)
  subgraph VPC [Enterprise VPC in Singapore]
           ATHENA__A@{ img: "https://api.iconify.design/logos/aws-athena.svg", label: "Athena", pos: "b", w: 60, h: 60, constraint: "on"}
        LAMBDA_A@{ img: "https://api.iconify.design/logos/aws-lambda.svg", label: "AWS Lambda", pos: "b", w: 60, h: 60, constraint: "on"}
        TABLEAU_A@{ img: "https://api.iconify.design/logos/tableau-icon.svg", label: "Tableau in AWS", pos: "b", w: 60, h: 60, constraint: "on"}

    %% Availability Zone A
    subgraph AZ_A [Availability Zone A]
      subgraph PRIVATE_SUBNET_A [Private Subnet A]
        S3_GW_A@{ img: "https://api.iconify.design/logos/aws-s3.svg", label: "S3 Primary", pos: "b", w: 60, h: 60, constraint: "on"}
        end
    end

    %% Availability Zone B
    subgraph AZ_B [Availability Zone B]
      subgraph PRIVATE_SUBNET_B [Private Subnet B]
        S3_GW_B@{ img: "https://api.iconify.design/logos/aws-s3.svg", label: "S3 Secondary", pos: "b", w: 60, h: 60, constraint: "on"}
      end
    end


  end

  %% Data Flow

    S3_GW_A -- "5. Replication" --> S3_GW_B


  LAMBDA_A -- "1. Trigger file upload" --> SOURCE
  SOURCE -- "2. Multipart file Download into" --> S3_GW_A
  LAMBDA_A -- "3. Runs ETL Pipeline" --> S3_GW_A
  ATHENA__A-- "4. Query via Proxy " -->  S3_GW_A
  TABLEAU_A -- "Will access via PrivateLink" --> ATHENA__A






  %% Styling
  classDef vpc fill:none,color:#0a0,stroke:#0a0,stroke-dasharray: 5 5, stroke-width: 2px
  class VPC vpc
  class INTERNET,PRIVATE_SUBNET_A,PRIVATE_SUBNET_B,AZ_A,AZ_B,LB_LAYER group
```