# Data Vault 2.0 with PySpark – Banking Transactions Pipeline

# 1. Introduction



In modern analytics, businesses need scalable and flexible data architectures.  
**Data Vault 2.0** is a methodology for building **enterprise data warehouses** that are:  
- **Scalable** → Handles large volumes of data.  
- **Auditable** → Keeps history of changes.  
- **Flexible** → Easy to adapt to new requirements.  

Unlike traditional **Star Schema** (Facts & Dimensions only), Data Vault separates:  
1. **Raw Vault** → Stores raw, historical data in 3 types of tables:
   - **Hubs** → Unique business keys (e.g., Customer, Transaction, Location).  
   - **Links** → Relationships between hubs (e.g., Customer–Transaction).  
   - **Satellites** → Descriptive attributes with history (e.g., Customer profile, Transaction details).  

2. **Business Vault** → Adds **derived attributes** and **business logic** (e.g., Age calculation, Fraud flags).  

3. **Dimensional Model** → Final tables for **reporting and dashboards** in Power BI.  

We will:  
- Build a **Data Vault 2.0** model in PySpark.  
- Use **hash keys** (SHA-256) to generate unique IDs.  
- Detect changes using **hash diff**.  
- Export **dimensions and facts** for Power BI dashboards.

# 2. Environment Setup

We use **PySpark** as our processing engine.  

- **SparkSession** → The entry point for using Spark.  
- **Parquet** → Storage format for tables (efficient columnar storage).  
- **Modes**:  
  - `"overwrite"` → Deletes old data and replaces it.  
  - `"append"` → Keeps history and adds new records.  

In [13]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    sha2, concat_ws, col, to_date, lpad, when, length,
    split, lit, floor, months_between, current_date, year, month
)

# Initialize Spark session
spark = SparkSession.builder \
    .appName("DataVaultBanking") \
    .master("local[*]") \
    .getOrCreate()

# Define output paths
OUTPUT_BASE = "/content/datalake/raw_vault"
BV_BASE = "/content/datalake/business_vault"

os.makedirs(OUTPUT_BASE, exist_ok=True)
os.makedirs(BV_BASE, exist_ok=True)

spark.sparkContext.setLogLevel("ERROR")


# 3. Load Dataset


We use a **banking dataset** containing:

- **Customers** → ID, Name, Date of Birth, Balance.  
- **Transactions** → ID, Date, Amount.  
- **Locations** → Branch or city where the transaction took place.  

This dataset is the **source system**, and we will transform it into a Data Vault.  

In [14]:
# Load dataset
df = spark.read.csv("/kaggle/input/bank-customer-segmentation/bank_transactions.csv", header=True, inferSchema=True)

# Preview first records
df.show(5, truncate=False)






+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|T1           |C5841053  |10/1/94    |F         |JAMSHEDPUR  |17819.05          |2/8/16         |143207         |25.0                   |
|T2           |C2142763  |4/4/57     |M         |JHAJJAR     |2270.69           |2/8/16         |141858         |27999.0                |
|T3           |C4417068  |26/11/96   |F         |MUMBAI      |17874.44          |2/8/16         |142712         |459.0                  |
|T4           |C5342380  |14/9/73    |F         |MUMBAI      |866503.21         |2/8/16         |142714         |2060.0                 |
|T5           |C9031234  |24/3/88 

                                                                                

# 4. Raw Vault Layer Construction

The Raw Vault is the foundation of the Data Vault 2.0 methodology.  
It ensures traceability, flexibility, and historization of the data.  

It is built on three key components:  

### Hubs – Unique Business Keys  
- Represent the core business entities.  
- Each hub stores a unique list of business keys.  
- Examples in our case:  
  - Customer Hub → list of unique customers.  
  - Transaction Hub → list of unique transactions.  
  - (Optional) Location Hub → list of unique locations.  
- Technical implementation:  
  - Use SHA-256 hashing to generate the hub’s primary key (hash key).  
  - Why SHA-256? → Strong cryptographic hash ensures uniqueness and minimizes the risk of collisions.  

---

### Links – Relationships Between Hubs  
- Represent associations between business entities.  
- Example:  
  - A link between Customer Hub and Transaction Hub → indicates which customer performed which transaction.  
- Technical implementation:  
  - The link’s hash key is created using a concatenation of hub keys followed by SHA-256 hashing.  

---

### Satellites – Descriptive Attributes  
- Store contextual and descriptive information about a hub or link.  
- Examples in our case:  
  - Customer Satellite → Name, Date of Birth, Gender, Account Balance.  
  - Transaction Satellite → Amount, Date, Type of transaction.  
- Technical implementation:  
  - Use a hash diff to detect attribute changes over time.  
  - Store additional metadata:  
    - load_date → when the record was inserted.  
    - record_source → origin of the record (CSV, API, system).  
- This guarantees historization and traceability of changes.  

---

**Summary**:  
- Hubs = Who/What (unique identifiers).  
- Links = How they are related.  
- Satellites = Descriptive details and history.  


**Step 1 : Path variables**

In [15]:
# Input dataset 
INPUT_CSV = "/kaggle/input/bank-customer-segmentation"

# Staging Layer path
STAGING_PATH = "/kaggle/working/datalake/staging/bank_transactions"

# Raw Vault Layer path
OUTPUT_BASE = "/kaggle/working/datalake/raw_vault"

# Create directories if they do not exist
os.makedirs(STAGING_PATH, exist_ok=True)
os.makedirs(OUTPUT_BASE, exist_ok=True)

**Step 2 : Loading Raw Data**

In [16]:
df = spark.read.csv(INPUT_CSV, header=True, inferSchema=True)

print("Initial schema:")
df.printSchema()
df.show(18, truncate=False)




Initial schema:
root
 |-- TransactionID: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- CustomerDOB: string (nullable = true)
 |-- CustGender: string (nullable = true)
 |-- CustLocation: string (nullable = true)
 |-- CustAccountBalance: double (nullable = true)
 |-- TransactionDate: string (nullable = true)
 |-- TransactionTime: integer (nullable = true)
 |-- TransactionAmount (INR): double (nullable = true)

+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|T1           |C5841053  |10/1/94    |F         |JAMSHEDPUR  |17819.05          |2/8/16         |143207         |25.0                   |
|T2       

                                                                                

**Step 3 : Data Cleaning and Normalization**


Before creating the Data Vault structures, we need clean and consistent data.  
The following transformations are applied:

1. **Standardizing dates**:  
   - `CustomerDOB` and `TransactionDate` are reformatted into `YYYY-MM-DD`.  
   - Two-digit years are corrected (e.g., "85" → "1985").  

2. **Filtering invalid dates**:  
   - Customer DOB must be between `1900-01-01` and `2025-12-31`.  
   - Transaction dates must be between `2000-01-01` and `2025-12-31`.  

3. **Casting numeric types**:  
   - `CustAccountBalance` and `TransactionAmount` are cast to `double`.  
   - `TransactionTime` is stored as `string`.  

4. **Staging layer**:  
   The cleaned dataset is saved into the **Staging Layer** (`/datalake/staging`) in Parquet format for traceability.  


In [17]:
# --- Standardize CustomerDOB ---
df = df.withColumn("dob_parts", split(col("CustomerDOB"), "/")) \
       .withColumn("dob_day", lpad(col("dob_parts")[0], 2, "0")) \
       .withColumn("dob_month", lpad(col("dob_parts")[1], 2, "0")) \
       .withColumn(
           "dob_year",
           when(length(col("dob_parts")[2]) == 2, concat_ws("", lit("19"), col("dob_parts")[2]))
           .otherwise(col("dob_parts")[2])
       ) \
       .withColumn("CustomerDOB", to_date(concat_ws("/", col("dob_day"), col("dob_month"), col("dob_year")), "dd/MM/yyyy")) \
       .drop("dob_parts", "dob_day", "dob_month", "dob_year")

# --- Standardize TransactionDate ---
df = df.withColumn("tx_parts", split(col("TransactionDate"), "/")) \
       .withColumn("tx_day", lpad(col("tx_parts")[0], 2, "0")) \
       .withColumn("tx_month", lpad(col("tx_parts")[1], 2, "0")) \
       .withColumn(
           "tx_year",
           when(length(col("tx_parts")[2]) == 2, concat_ws("", lit("20"), col("tx_parts")[2]))
           .otherwise(col("tx_parts")[2])
       ) \
       .withColumn("TransactionDate", to_date(concat_ws("/", col("tx_day"), col("tx_month"), col("tx_year")), "dd/MM/yyyy")) \
       .drop("tx_parts", "tx_day", "tx_month", "tx_year")

# --- Filter invalid dates ---
df = df.filter((col("CustomerDOB") >= "1900-01-01") & (col("CustomerDOB") <= "2025-12-31")) \
       .filter((col("TransactionDate") >= "2000-01-01") & (col("TransactionDate") <= "2025-12-31"))

# --- Convert numeric types ---
df = df.withColumn("CustAccountBalance", col("CustAccountBalance").cast("double")) \
       .withColumn("TransactionTime", col("TransactionTime").cast("string")) \
       .withColumn("TransactionAmount (INR)", col("TransactionAmount (INR)").cast("double"))

print("Schema after cleaning:")
df.printSchema()
df.show(18, truncate=False)

# Save cleaned data into Staging Layer
df.write.mode("overwrite").parquet(STAGING_PATH)


Schema after cleaning:
root
 |-- TransactionID: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- CustomerDOB: date (nullable = true)
 |-- CustGender: string (nullable = true)
 |-- CustLocation: string (nullable = true)
 |-- CustAccountBalance: double (nullable = true)
 |-- TransactionDate: date (nullable = true)
 |-- TransactionTime: string (nullable = true)
 |-- TransactionAmount (INR): double (nullable = true)

+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|T1           |C5841053  |1994-01-10 |F         |JAMSHEDPUR  |17819.05          |2016-08-02     |143207         |25.0                   |
|T2     

                                                                                

**Step 4 : Data Vault - Hubs**

Hubs contain the **unique list of business keys**.  
- **Customer Hub** → based on `CustomerID`.  
- **Transaction Hub** → based on `TransactionID`.  

We use **SHA-256 hashing** to generate surrogate keys (`hk_customer_id`, `hk_transaction_id`).  
This guarantees uniqueness and avoids collisions.  

In [20]:
# Hub Customer
hub_customer = df.select("CustomerID").dropDuplicates() \
                 .withColumn("hk_customer_id", sha2(col("CustomerID").cast("string"), 256))
hub_customer.write.mode("append").parquet(f"{OUTPUT_BASE}/hub_customer")

# Hub Transaction
hub_transaction = df.select("TransactionID").dropDuplicates() \
                    .withColumn("hk_transaction_id", sha2(col("TransactionID").cast("string"), 256))
hub_transaction.write.mode("append").parquet(f"{OUTPUT_BASE}/hub_transaction")


                                                                                

**Step 5 : Data Vault - Links**

Links capture the **relationships between business keys**.  
In our case:  
- **Customer ↔ Transaction** link → which customer performed which transaction.  

A hash key (`hk_link_cust_txn`) is generated by combining `CustomerID` and `TransactionID` with SHA-256.  

In [21]:
link_cust_txn = df.select("CustomerID", "TransactionID").dropDuplicates() \
                  .withColumn(
                      "hk_link_cust_txn",
                      sha2(concat_ws("||",
                                     col("CustomerID").cast("string"),
                                     col("TransactionID").cast("string")), 256)
                  )
link_cust_txn.write.mode("append").parquet(f"{OUTPUT_BASE}/link_cust_transaction")


                                                                                

**Step 6 : Data Vault - Satellites**

Satellites store the **descriptive attributes** related to hubs and links.  
We create two satellites:

- **Customer Satellite** → attributes like DOB, gender, location, account balance.  
- **Transaction Satellite** → attributes like transaction date, time, and amount.  

Each satellite contains:  
- **hash_diff** → to detect changes in descriptive attributes.  
- **load_date** → timestamp of when the record was inserted.  
- **record_source** → source system of the record (e.g., `"bank_transactions_csv_v1"`).  


In [23]:
from pyspark.sql.functions import *
RECORD_SOURCE = "bank_transactions_csv_v1"

# Satellite Customer
sat_customer = df.select(
    "CustomerID", "CustomerDOB", "CustGender", "CustLocation", "CustAccountBalance"
).dropDuplicates(["CustomerID"]) \
 .withColumn("hk_customer_id", sha2(col("CustomerID").cast("string"), 256)) \
 .withColumn("hash_diff",
             sha2(concat_ws("||",
                            col("CustomerDOB").cast("string"),
                            col("CustGender").cast("string"),
                            col("CustLocation").cast("string"),
                            col("CustAccountBalance").cast("string")), 256)) \
 .withColumn("load_date", current_timestamp()) \
 .withColumn("record_source", lit(RECORD_SOURCE))
sat_customer.write.mode("append").parquet(f"{OUTPUT_BASE}/sat_customer")

# Satellite Transaction
sat_transaction = df.select(
    "TransactionID", "TransactionDate", "TransactionTime", col("TransactionAmount (INR)").alias("TransactionAmount")
).dropDuplicates(["TransactionID"]) \
 .withColumn("hk_transaction_id", sha2(col("TransactionID").cast("string"), 256)) \
 .withColumn("hash_diff",
             sha2(concat_ws("||",
                            col("TransactionDate").cast("string"),
                            col("TransactionTime").cast("string"),
                            col("TransactionAmount").cast("string")), 256)) \
 .withColumn("load_date", current_timestamp()) \
 .withColumn("record_source", lit(RECORD_SOURCE))
sat_transaction.write.mode("append").parquet(f"{OUTPUT_BASE}/sat_transaction")


                                                                                

**Step 7 : Final Verification**

We query the created Data Vault components to confirm successful loading:  
- **Hub Customer**  
- **Hub Transaction**  
- **Link Customer ↔ Transaction**  
- **Satellite Customer**  
- **Satellite Transaction**  

Each table is read back from the Raw Vault directory and a sample of rows is displayed.  
This step ensures the Data Vault structures were built correctly and can be used for downstream layers (Business Vault, Information Marts).  


In [24]:
print("===== HUB CUSTOMER =====")
spark.read.parquet(f"{OUTPUT_BASE}/hub_customer").show(5)

print("===== HUB TRANSACTION =====")
spark.read.parquet(f"{OUTPUT_BASE}/hub_transaction").show(5)

print("===== LINK CUSTOMER - TRANSACTION =====")
spark.read.parquet(f"{OUTPUT_BASE}/link_cust_transaction").show(5)

print("===== SAT CUSTOMER =====")
spark.read.parquet(f"{OUTPUT_BASE}/sat_customer").show(5)

print("===== SAT TRANSACTION =====")
spark.read.parquet(f"{OUTPUT_BASE}/sat_transaction").show(5)


===== HUB CUSTOMER =====
+----------+--------------------+
|CustomerID|      hk_customer_id|
+----------+--------------------+
|  C5841053|0917d4201b7ffddc0...|
|  C3232257|fee13cfb2fd583541...|
|  C5310729|409cde3d1ee946fa8...|
|  C7014439|c188f31176370f1b9...|
|  C8393123|9bc9b87b3630be653...|
+----------+--------------------+
only showing top 5 rows

===== HUB TRANSACTION =====
+-------------+--------------------+
|TransactionID|   hk_transaction_id|
+-------------+--------------------+
|         T215|415c994cd9fb163e6...|
|         T744|897d9ef3605395681...|
|         T788|8fb51b4a9281c0df8...|
|        T1599|1da1a25cc6cb59678...|
|        T1668|34e83c1cf34e6befb...|
+-------------+--------------------+
only showing top 5 rows

===== LINK CUSTOMER - TRANSACTION =====
+----------+-------------+--------------------+
|CustomerID|TransactionID|    hk_link_cust_txn|
+----------+-------------+--------------------+
|  C8334633|          T16|5db0dad38783b72b8...|
|  C5529374|          T78|

# 5. Business Vault Layer Construction

The Business Vault is built on top of the Raw Vault.  
It enriches the data with **derived attributes, business rules, and metrics**.  
This makes it easier to prepare Information Marts for analytics and reporting.

We will create:
1. **Customer Profile Enrichment** (age, balance categories, senior flag)  
2. **Enriched Transactions** (categories, suspicious flag)  
3. **Monthly Aggregates** (total transactions, amounts, customers)  
4. **Customer-Level Aggregates** (number of transactions, total & avg amounts)  
5. **Customer Segmentation** (Premium, Active, Occasional, etc.)


**Step 1 : Import libraries**

In [25]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, year, month, current_date, months_between, floor, when, lit
import os

**Step 2 : Path Variables**

In [29]:
BV_BASE = "/kaggle/working/datalake/business_vault"
os.makedirs(BV_BASE, exist_ok=True)

**Step 3 : Customer Profile Enrichment**

We enrich the customer hub with additional attributes:
- **Customer Age**: calculated from date of birth  
- **Balance Category**: Low, Medium, High  
- **IsSenior**: flag if age ≥ 65  

This creates a **Customer Profile table** ready for segmentation.  


In [31]:
# Remove hash key to avoid duplicates
sat_customer_clean = sat_customer.drop("hk_customer_id")

bv_customer_profile = hub_customer.join(
    sat_customer_clean,
    on="CustomerID",
    how="left"
)

# Derive customer age
bv_customer_profile = bv_customer_profile.withColumn(
    "CustomerAge",
    floor(months_between(current_date(), col("CustomerDOB")) / 12)
)

# Balance category
bv_customer_profile = bv_customer_profile.withColumn(
    "BalanceCategory",
    when(col("CustAccountBalance").isNull(), lit("Unknown"))
    .when(col("CustAccountBalance") < 1000, lit("Low"))
    .when((col("CustAccountBalance") >= 1000) & (col("CustAccountBalance") < 50000), lit("Medium"))
    .otherwise(lit("High"))
)

# Senior flag
bv_customer_profile = bv_customer_profile.withColumn(
    "IsSenior",
    when(col("CustomerAge") >= 65, lit(True)).otherwise(lit(False))
)

# Save enriched profile
BV_CUSTOMER_PATH = f"{BV_BASE}/bv_customer_profile"
bv_customer_profile.write.mode("overwrite").parquet(BV_CUSTOMER_PATH)


                                                                                

**Step 4 : Enriched Transactions**

We add business logic to transactions:
- **TransactionCategory**: Small, Medium, Large  
- **IsSuspicious**: transactions above 100,000 flagged as suspicious  

This table helps with fraud detection and transaction monitoring.  


In [32]:
# Remove duplicate hash keys
sat_transaction_clean = sat_transaction.drop("hk_transaction_id")

tx_with_customer = sat_transaction_clean.join(
    link_cust_txn.select("TransactionID", "CustomerID"),
    on="TransactionID",
    how="left"
)

# Transaction category
tx_with_customer = tx_with_customer.withColumn(
    "TransactionCategory",
    when(col("TransactionAmount").isNull(), lit("Unknown"))
    .when(col("TransactionAmount") < 1000, lit("Small"))
    .when((col("TransactionAmount") >= 1000) & (col("TransactionAmount") < 10000), lit("Medium"))
    .otherwise(lit("Large"))
)

# Suspicious flag
tx_with_customer = tx_with_customer.withColumn(
    "IsSuspicious",
    when(col("TransactionAmount") > 100000, lit(True)).otherwise(lit(False))
)

# Save
BV_TX_PATH = f"{BV_BASE}/bv_transaction_enriched"
tx_with_customer.write.mode("overwrite").parquet(BV_TX_PATH)


                                                                                

**Step 5 : Monthly Aggregates**

We group transactions by year and month to compute:
- **TotalTransactions**  
- **TotalAmount**  
- **AverageAmount**  
- **ActiveCustomers** (unique customers per month)  

This table supports time-series analysis of banking activity. 

In [33]:
tx_for_agg = tx_with_customer.withColumn("tx_year", year(col("TransactionDate"))) \
                             .withColumn("tx_month", month(col("TransactionDate")))

bv_tx_monthly = tx_for_agg.groupBy("tx_year", "tx_month").agg(
    F.count("TransactionID").alias("TotalTransactions"),
    F.sum("TransactionAmount").alias("TotalAmount"),
    F.avg("TransactionAmount").alias("AvgAmount"),
    F.countDistinct("CustomerID").alias("ActiveCustomers")
).orderBy("tx_year", "tx_month")

BV_TX_MONTHLY_PATH = f"{BV_BASE}/bv_transaction_monthly"
bv_tx_monthly.write.mode("overwrite").parquet(BV_TX_MONTHLY_PATH)


                                                                                

**Step 6 : Aggregates by Customer**

We compute customer-level aggregates:
- **NbTransactions**  
- **TotalTransactionAmount**  
- **AvgTransactionAmount**  

This prepares the ground for segmentation and customer behavior analysis.  


In [34]:
bv_tx_by_customer = tx_for_agg.groupBy("CustomerID").agg(
    F.count("TransactionID").alias("NbTransactions"),
    F.sum("TransactionAmount").alias("TotalTransactionAmount"),
    F.avg("TransactionAmount").alias("AvgTransactionAmount")
)

BV_TX_BY_CUST_PATH = f"{BV_BASE}/bv_transaction_by_customer"
bv_tx_by_customer.write.mode("overwrite").parquet(BV_TX_BY_CUST_PATH)


                                                                                

**Step 7 : Segmentation**

Finally, we classify customers into business segments:
- **Premium**: High balance customers  
- **Active**: Medium balance & ≥ 10 transactions  
- **Occasional**: Medium balance & < 10 transactions  
- **LowValue**: Low balance customers  

This segmentation enables targeted marketing and personalized offers.  

In [35]:
bv_segment = bv_customer_profile.join(bv_tx_by_customer, on="CustomerID", how="left")

bv_segment = bv_segment.withColumn(
    "Segment",
    when(col("BalanceCategory") == "High", lit("Premium"))
    .when((col("BalanceCategory") == "Medium") & (col("NbTransactions") >= 10), lit("Active"))
    .when((col("BalanceCategory") == "Medium") & (col("NbTransactions") < 10), lit("Occasional"))
    .when(col("BalanceCategory") == "Low", lit("LowValue"))
    .otherwise(lit("Unknown"))
)

BV_SEGMENT_PATH = f"{BV_BASE}/bv_customer_segment"
bv_segment.write.mode("overwrite").parquet(BV_SEGMENT_PATH)


                                                                                

**Step 8 : Verification**

We load and preview the generated Business Vault tables to ensure the enrichment and aggregation logic was applied correctly.  


In [37]:
print("\n--- Business Vault Tables Check ---")
for path in [BV_CUSTOMER_PATH, BV_TX_PATH, BV_TX_MONTHLY_PATH, BV_TX_BY_CUST_PATH, BV_SEGMENT_PATH]:
    df_tmp = spark.read.parquet(path)
    print(f"\n{path}")
    df_tmp.show(5, truncate=False)



--- Business Vault Tables Check ---

/kaggle/working/datalake/business_vault/bv_customer_profile
+----------+----------------------------------------------------------------+-----------+----------+------------+------------------+----------------------------------------------------------------+--------------------------+------------------------+-----------+---------------+--------+
|CustomerID|hk_customer_id                                                  |CustomerDOB|CustGender|CustLocation|CustAccountBalance|hash_diff                                                       |load_date                 |record_source           |CustomerAge|BalanceCategory|IsSenior|
+----------+----------------------------------------------------------------+-----------+----------+------------+------------------+----------------------------------------------------------------+--------------------------+------------------------+-----------+---------------+--------+
|C1010031  |7a82878dee0e045083d355399d710

# 6. Presentation Layer Construction

The **Presentation Layer** organizes data into **facts and dimensions** 
following a **star schema** approach.  
This schema is designed for **analytics and reporting tools**.

We will create:
1. **Dimension tables**
   - `dim_customer` → enriched customer info with surrogate key  
   - `dim_temps` → date dimension for time analysis  
   - `dim_location` → geographical location dimension  
2. **Fact table**
   - `fact_transaction` → central fact table linking all dimensions  

Finally, we **export data to Parquet and CSV** for BI tools.  


**Step 1 : Imports**

In [38]:
from pyspark.sql.functions import (
    col, year, quarter, month, dayofmonth, dayofweek, date_format,
    current_date, floor, months_between, monotonically_increasing_id
)
from pyspark.sql import functions as F
import os, shutil, glob

# Base path for Presentation Layer
OUTPUT_BASE = "/kaggle/working/datalake/presentation_layer"
os.makedirs(OUTPUT_BASE, exist_ok=True)


**Step 2 : DIMENSIONS**

- **dim_customer**: contains customer information enriched with segmentation, 
  plus a **surrogate key (`customer_sk`)**.  
- **dim_temps**: date dimension for time-based analysis (year, quarter, month, weekday).  
- **dim_location**: unique list of customer locations with surrogate key.  


In [39]:
# --- Dim Customer ---
dim_customer = bv_segment.select(
    col("CustomerID").alias("customer_id_bk"),  # Business Key
    col("CustomerDOB").alias("date_of_birth"),
    col("CustGender").alias("gender"),
    col("CustLocation").alias("location"),
    col("CustAccountBalance").alias("account_balance"),
    col("CustomerAge").alias("age"),
    col("BalanceCategory").alias("balance_category"),
    col("IsSenior").alias("is_senior"),
    col("Segment").alias("segment")
).dropDuplicates(["customer_id_bk"]) \
 .withColumn("customer_sk", monotonically_increasing_id())  # Surrogate Key

dim_customer.write.mode("overwrite").parquet(f"{OUTPUT_BASE}/dim_customer")


# --- Dim Temps ---
dim_temps = tx_with_customer.select(col("TransactionDate").alias("date")) \
    .dropDuplicates(["date"]) \
    .withColumn("year", year(col("date"))) \
    .withColumn("quarter", quarter(col("date"))) \
    .withColumn("month", month(col("date"))) \
    .withColumn("day", dayofmonth(col("date"))) \
    .withColumn("day_of_week", dayofweek(col("date"))) \
    .withColumn("weekday_name", date_format(col("date"), "EEEE")) \
    .withColumn("date_sk", monotonically_increasing_id())

dim_temps.write.mode("overwrite").parquet(f"{OUTPUT_BASE}/dim_temps")


# --- Dim Location ---
dim_location = bv_customer_profile.select(
    col("CustLocation").alias("location")
).dropDuplicates(["location"]) \
 .withColumn("location_sk", monotonically_increasing_id())

dim_location.write.mode("overwrite").parquet(f"{OUTPUT_BASE}/dim_location")


                                                                                

**Step 3 : FACT TABLE**

The **fact_transaction** table is the central fact table, linked to all dimensions:  
- `transaction_id` (business key)  
- `customer_sk`, `date_sk`, `location_sk` (foreign keys to dimensions)  
- `amount`, `transaction_category`, `is_suspicious` (measures and attributes)  


In [40]:
# ========================================
# 3. FACT TABLE
# ========================================

fact_transaction = tx_with_customer \
    .join(dim_customer, tx_with_customer.CustomerID == dim_customer.customer_id_bk, "left") \
    .join(dim_temps, tx_with_customer.TransactionDate == dim_temps.date, "left") \
    .join(dim_location, dim_customer.location == dim_location.location, "left") \
    .select(
        col("TransactionID").alias("transaction_id"),
        col("customer_sk"),   # FK to dim_customer
        col("date_sk"),       # FK to dim_temps
        col("location_sk"),   # FK to dim_location
        col("TransactionTime").alias("time"),
        col("TransactionAmount").alias("amount"),
        col("TransactionCategory").alias("transaction_category"),
        col("IsSuspicious").alias("is_suspicious")
    ).dropDuplicates(["transaction_id"])

fact_transaction.write.mode("overwrite").parquet(f"{OUTPUT_BASE}/fact_transaction")


                                                                                

**Step 4 : EXPORT TO CSV**

Each table (dimensions and fact) is exported to **Parquet** (optimized storage) 
and **CSV** (for BI tools).  

The CSV files are stored in:  
`/kaggle/working/datalake/presentation_layer_csv/`  


In [41]:
EXPORT_PATH = "/kaggle/working/datalake/presentation_layer_csv"
os.makedirs(EXPORT_PATH, exist_ok=True)

def export_to_csv_named(df, filename, export_dir=EXPORT_PATH):
    temp_dir = os.path.join(export_dir, f"temp_{filename}")
    df.coalesce(1).write.mode("overwrite").option("header", "true").csv(temp_dir)
    csv_file = glob.glob(os.path.join(temp_dir, '*.csv'))[0]
    final_path = os.path.join(export_dir, f"{filename}.csv")
    shutil.move(csv_file, final_path)
    shutil.rmtree(temp_dir)
    print(f"✅ Export {filename} terminé : {final_path}")

# Export dimensions and fact
export_to_csv_named(dim_customer, "dim_customer")
export_to_csv_named(dim_temps, "dim_temps")
export_to_csv_named(dim_location, "dim_location")
export_to_csv_named(fact_transaction, "fact_transaction")


                                                                                

✅ Export dim_customer terminé : /kaggle/working/datalake/presentation_layer_csv/dim_customer.csv


                                                                                

✅ Export dim_temps terminé : /kaggle/working/datalake/presentation_layer_csv/dim_temps.csv


                                                                                

✅ Export dim_location terminé : /kaggle/working/datalake/presentation_layer_csv/dim_location.csv


[Stage 296:>                                                        (0 + 1) / 1]

✅ Export fact_transaction terminé : /kaggle/working/datalake/presentation_layer_csv/fact_transaction.csv


                                                                                

# 7. Power BI Dashboard

Once the **CSV exports** were generated from the Presentation Layer,  
I imported them into **Power BI** to build a dashboard.  

### Process:
1. Import the CSV files:
   - `dim_customer.csv`
   - `dim_temps.csv`
   - `dim_location.csv`
   - `fact_transaction.csv`

2. Define relationships between tables (Star Schema):
   - `fact_transaction.customer_sk` → `dim_customer.customer_sk`
   - `fact_transaction.date_sk` → `dim_temps.date_sk`
   - `fact_transaction.location_sk` → `dim_location.location_sk`

3. Build visuals:
   - Transaction trends over time
   - Customer segmentation by balance and transactions
   - Suspicious transaction detection
   - Geographic distribution of customers

This dashboard provides **business users** with easy-to-understand insights.


# 8. Conclusion

This project implemented a **complete Data Vault pipeline**:

1. **Raw Vault Layer**: captured data in a traceable, historical format.  
2. **Business Vault Layer**: enriched data with business rules and aggregates.  
3. **Presentation Layer**: structured data into facts and dimensions for BI.  
4. **Power BI Dashboard**: delivered actionable insights to end users.  

### Key Takeaways:
- **Data Vault methodology** ensures *scalability, auditability, and adaptability*.  
- The separation between **raw data** and **business logic** makes the model flexible.  
- With the **Presentation Layer**, business users can access clean data directly in BI tools.  

### Importance:
This approach demonstrates how **modern data warehousing** can transform raw transactions  
into **high-value analytics**, enabling better **decision-making** in banking and finance.  
