Skip to content

dbconsultoria/myspark

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

21 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

MySpark - Data Warehouse with Medallion Architecture πŸ—οΈ

πŸ“Œ About the Project

This is my personal Data Engineering portfolio project, developed to demonstrate practical skills in building ETL pipelines, dimensional modeling, and implementing modern Data Warehouses.

After several courses and studies on Spark and Medallion architecture, I decided to build my own project from scratch, applying advanced concepts and adding my own implementation vision. A key differentiator of this project is the implementation of SCD Type 2 (Slowly Changing Dimensions) in Spark, a technique rarely seen in educational projects but essential in corporate environments for maintaining dimensional change history.

πŸ‘¨β€πŸ’» Author

Rodrigo Ribeiro
Data Engineer
LinkedIn


🎯 Project Goals

  • Implement complete Medallion architecture (Bronze β†’ Silver β†’ Gold)
  • Build Data Warehouse with dimensional model (Star Schema)
  • Apply SCD Type 2 for historical change tracking
  • Process data with Apache Spark using PySpark
  • Orchestrate complete environment with Docker
  • Validate data quality across layers
  • Demonstrate Data Engineering best practices

πŸ›οΈ Project Architecture

Technologies Used

  • Apache Spark 3.3.1 - Distributed data processing
  • PySpark - Python API for Spark
  • MySQL 8.0 - Source database (OLTP)
  • Docker & Docker Compose - Containerization and orchestration
  • Jupyter Notebook - Exploratory analysis
  • Apache Airflow 2.9.1 - Pipeline orchestration
  • Parquet - Columnar storage format

Alt text

Medallion Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   MySQL     β”‚ ───> β”‚   Bronze    β”‚ ───> β”‚   Silver    β”‚ ───> β”‚    Gold     β”‚
β”‚   (OLTP)    β”‚      β”‚  Raw Data   β”‚      β”‚  Cleaned    β”‚      β”‚  Business   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
   Source            Ingestion           Validation         DW Modeling

πŸ₯‰ Bronze Layer (Raw Data)

  • Purpose: Ingest raw data from MySQL without transformations
  • Format: Parquet
  • Characteristics:
    • Faithful copy of source data
    • Preservation of original data types
    • Foundation for future reprocessing
  • Tables: tbcategories, tbproducts, tbcustomers, tborders, tborderdetail

πŸ₯ˆ Silver Layer (Cleaned & Validated)

  • Purpose: Cleansing, validation, and standardization of data
  • Format: Parquet
  • Characteristics:
    • Duplicate removal
    • Null value handling
    • Format standardization
    • Integrity validations
  • Tables: Same as Bronze, but refined

πŸ₯‡ Gold Layer (Business Ready)

  • Purpose: Dimensional modeling for analytics and BI
  • Format: Parquet
  • Characteristics:
    • Star Schema model
    • Dimensions with SCD Type 2
    • Aggregated fact table
    • Optimized for analytical queries

πŸ“Š Dimensional Modeling (Star Schema)

Dimensions (SCD Type 2)

dim_categories

Product categories dimension with change history.

Columns:

  • category_sk - Surrogate Key
  • category_code - Business Key (original code)
  • category_description - Category description
  • valid_from - Start date of validity
  • valid_to - End date of validity (NULL = current)
  • is_current - Flag indicating current version

dim_products

Products dimension with tracking of changes in price, description, status, and category.

Columns:

  • product_sk - Surrogate Key
  • product_code - Business Key
  • product_description - Product description
  • sale_value - Sale price
  • is_active - Active/inactive status
  • category_code - FK to category
  • valid_from, valid_to, is_current - SCD2 control

Tracked changes:

  • Price changes
  • Description updates
  • Category modifications
  • Product activation/deactivation

dim_customers

Customers dimension with history of registration changes.

Columns:

  • customer_sk - Surrogate Key
  • customer_code - Business Key
  • customer_name - Customer name
  • customer_address - Address
  • customer_phone - Phone number
  • customer_email - Email
  • birth_date - Birth date
  • valid_from, valid_to, is_current - SCD2 control

Tracked changes:

  • Address updates
  • Phone/email changes
  • Registration updates

Fact Table

fact_orders

Consolidated sales fact table with aggregated metrics.

Columns:

  • order_code - Order code (Degenerate Dimension)
  • customer_code - FK to dim_customers
  • product_code - FK to dim_products
  • order_date - Order date/time
  • total_quantity - Total quantity sold
  • total_sales - Total value (quantity Γ— unit_price)
  • line_count - Number of detail lines

Granularity: One row per product per order

Calculated metrics:

  • Total revenue per product/order
  • Total quantity sold
  • Transaction count

πŸ”„ SCD Type 2 (Slowly Changing Dimensions)

Concept

SCD Type 2 maintains complete change history in dimensions, enabling accurate temporal analysis.

Practical example:

A product had its price changed:

product_sk product_code description sale_value valid_from valid_to is_current
1 100 Gaming Mouse 150.00 2024-01-01 2024-06-15 false
42 100 RGB Gaming Mouse 180.00 2024-06-16 NULL true

Benefits:

  • βœ… Historical price analysis
  • βœ… Registration change tracking
  • βœ… Complete audit trail
  • βœ… Point-in-time reporting

Implementation

SCD2 Algorithm:

  1. Detect new records and changes
  2. Expire old versions (set is_current = false, valid_to = current_date)
  3. Generate new surrogate key for updated versions
  4. Insert new versions with is_current = true

πŸ“ Project Structure

myspark/
β”œβ”€β”€ spark/
β”‚   β”œβ”€β”€ docker-compose.yml
β”‚   β”œβ”€β”€ Dockerfile
β”‚   β”œβ”€β”€ jobs/
β”‚   β”‚   β”œβ”€β”€ test_mysql.py
β”‚   β”‚   β”œβ”€β”€ bronze_*.py              # Bronze ingestion
β”‚   β”‚   β”œβ”€β”€ silver_*.py              # Silver transformation
β”‚   β”‚   β”œβ”€β”€ gold_dim_*_scd2.py       # Gold dimensions with SCD2
β”‚   β”‚   β”œβ”€β”€ gold_fact_orders.py      # Fact table
β”‚   β”‚   β”œβ”€β”€ read_gold_*.py           # Read scripts
β”‚   β”‚   └── generic_query.py         # Generic query
β”‚   β”œβ”€β”€ lakehouse/
β”‚   β”‚   β”œβ”€β”€ bronze/                  # Raw data
β”‚   β”‚   β”œβ”€β”€ silver/                  # Clean data
β”‚   β”‚   └── gold/                    # Dimensional model
β”‚   └── work/                        # Jupyter notebooks
β”œβ”€β”€ airflow/
β”‚   β”œβ”€β”€ dags/
β”‚   β”‚   β”œβ”€β”€ dag_lakehouse_pipeline.py  # Full pipeline DAG
β”‚   β”‚   └── dag_test_tbcategories.py   # Test DAG (categories only)
β”‚   β”œβ”€β”€ logs/
β”‚   └── plugins/
└── mysql/
    β”œβ”€β”€ docker-compose.yml
    └── init/
        └── init.sql                 # Initialization script

πŸš€ Execution Guide

Prerequisites

  • Docker installed
  • Docker Compose installed
  • 8GB RAM available (recommended)

Initial Setup

1. Create Docker network (mandatory)

docker network create lakehouse_net

2. Create directory structure

Inside the spark/ folder, create:

mkdir -p lakehouse/bronze lakehouse/silver lakehouse/gold

Inside the airflow/ folder, create:

mkdir -p airflow/dags airflow/logs airflow/plugins

3. Start MySQL and Spark containers

cd mysql/
docker compose up -d

cd ../spark/
docker compose up -d

4. Get Jupyter Notebook token

docker logs spark_notebook

Look for:

[I ########] http://127.0.0.1:8888/lab?token=99999...

Access: http://localhost:8888 and use the token.


Execution Pipeline

MySQL Connection Test

docker exec -it spark_master spark-submit \
  --master local[*] \
  --packages com.mysql:mysql-connector-j:8.0.33 \
  /opt/spark/jobs/test_mysql.py

Common errors:

  • Missing network β†’ Check step 1
  • Credentials β†’ Check script configuration

Bronze Layer - Ingestion

# Categories
docker exec -it spark_master spark-submit \
  --master local[*] \
  --packages com.mysql:mysql-connector-j:8.0.33 \
  /opt/spark/jobs/bronze_tbcategories.py

# Products
docker exec -it spark_master spark-submit \
  --master local[*] \
  --packages com.mysql:mysql-connector-j:8.0.33 \
  /opt/spark/jobs/bronze_tbproducts.py

# Customers
docker exec -it spark_master spark-submit \
  --master local[*] \
  --packages com.mysql:mysql-connector-j:8.0.33 \
  /opt/spark/jobs/bronze_tbcustomers.py

# Orders
docker exec -it spark_master spark-submit \
  --master local[*] \
  --packages com.mysql:mysql-connector-j:8.0.33 \
  /opt/spark/jobs/bronze_tborders.py

# Order Details
docker exec -it spark_master spark-submit \
  --master local[*] \
  --packages com.mysql:mysql-connector-j:8.0.33 \
  /opt/spark/jobs/bronze_tborderdetail.py

Silver Layer - Cleansing and Validation

# Categories
docker exec -it spark_master spark-submit \
  --master local[*] \
  /opt/spark/jobs/silver_tbcategories.py

# Products
docker exec -it spark_master spark-submit \
  --master local[*] \
  /opt/spark/jobs/silver_tbproducts.py

# Customers
docker exec -it spark_master spark-submit \
  --master local[*] \
  /opt/spark/jobs/silver_tbcustomers.py

# Orders
docker exec -it spark_master spark-submit \
  --master local[*] \
  /opt/spark/jobs/silver_tborders.py

# Order Details
docker exec -it spark_master spark-submit \
  --master local[*] \
  /opt/spark/jobs/silver_tborderdetail.py

Gold Layer - Dimensional Modeling

Dimensions with SCD Type 2:

# Categories Dimension
docker exec -it spark_master spark-submit \
  --master local[*] \
  /opt/spark/jobs/gold_dim_categories_scd2.py

# Products Dimension
docker exec -it spark_master spark-submit \
  --master local[*] \
  /opt/spark/jobs/gold_dim_products_scd2.py

# Customers Dimension
docker exec -it spark_master spark-submit \
  --master local[*] \
  /opt/spark/jobs/gold_dim_customers_scd2.py

Fact Table:

# Sales Fact
docker exec -it spark_master spark-submit \
  --master local[*] \
  /opt/spark/jobs/gold_fact_orders.py

πŸ“– Read and Analysis Scripts

# Read dimensions
docker exec -it spark_master spark-submit \
  --master local[*] \
  /opt/spark/jobs/read_gold_dim_categories.py

docker exec -it spark_master spark-submit \
  --master local[*] \
  /opt/spark/jobs/read_gold_dim_products.py

docker exec -it spark_master spark-submit \
  --master local[*] \
  /opt/spark/jobs/read_gold_dim_customers.py

# Read fact table with analysis
docker exec -it spark_master spark-submit \
  --master local[*] \
  /opt/spark/jobs/read_gold_fact_orders.py

# Customizable generic query
docker exec -it spark_master spark-submit \
  --master local[*] \
  /opt/spark/jobs/generic_query.py

πŸ” Data Validation

Spark vs MySQL Comparison

Execute equivalent SQL queries in MySQL to validate totals:

-- Example: Top 10 customers by revenue
SELECT 
    o.customer AS customer_code,
    COUNT(DISTINCT o.code) AS total_orders,
    SUM(od.quantity) AS total_items,
    ROUND(SUM(od.quantity * od.salesvalue), 2) AS total_revenue
FROM tborders o
INNER JOIN tborderdetail od ON o.code = od.orders
WHERE o.customer IS NOT NULL
GROUP BY o.customer
ORDER BY total_revenue DESC
LIMIT 10;

Validation checklist:

  • βœ… Revenue totals must be identical
  • βœ… Order counts must match
  • βœ… Top customers/products must be the same
  • βœ… Record numbers must align

πŸ“Š Analysis Examples

Query 1: Sales by Customer, Product and Date

SELECT
    c.customer_name,
    p.product_description,
    DATE_FORMAT(f.order_date, 'dd-MM-yyyy') as date,
    SUM(f.total_quantity) as quantity,
    ROUND(SUM(f.total_sales), 2) as sales
FROM fact_orders f
INNER JOIN dim_customers c 
    ON f.customer_code = c.customer_code 
    AND c.is_current = true
INNER JOIN dim_products p 
    ON f.product_code = p.product_code 
    AND p.is_current = true
GROUP BY c.customer_name, p.product_description, DATE_FORMAT(f.order_date, 'dd-MM-yyyy')
ORDER BY date DESC, sales DESC

Query 2: Analysis by Category

SELECT
    cat.category_description,
    COUNT(DISTINCT f.order_code) as total_orders,
    ROUND(SUM(f.total_sales), 2) as revenue
FROM fact_orders f
INNER JOIN dim_products p 
    ON f.product_code = p.product_code 
    AND p.is_current = true
INNER JOIN dim_categories cat 
    ON p.category_code = cat.category_code 
    AND cat.is_current = true
GROUP BY cat.category_description
ORDER BY revenue DESC

πŸŒ€ Airflow Orchestration

Apache Airflow was added to the project to automate and orchestrate the full Medallion pipeline, replacing manual execution of each Spark job.

Setup

Airflow runs as part of the same docker-compose.yml alongside Spark, sharing the lakehouse_net network. It uses PostgreSQL as its metadata database and the LocalExecutor.

Services added:

  • postgres β€” Airflow metadata database
  • airflow-init β€” initializes the database and creates the admin user
  • airflow-webserver β€” UI available at http://localhost:8081
  • airflow-scheduler β€” monitors and triggers DAG runs

Access:

Service URL Credentials
Airflow UI http://localhost:8081 admin / admin

DAGs

lakehouse_pipeline

Full pipeline executing all tables across all three layers in the correct dependency order. Triggered manually (schedule_interval=None).

Execution flow:

bronze_categories  β†’ silver_categories  β†’ gold_dim_categories  ──┐
bronze_customers   β†’ silver_customers   β†’ gold_dim_customers   ───
bronze_products    β†’ silver_products    β†’ gold_dim_products    ──┼──► gold_fact_orders
bronze_orders      β†’ silver_orders      ─────────────────────────────
bronze_orderdetail β†’ silver_orderdetail β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Each layer runs in parallel within itself. gold_fact_orders only starts after all dimensions and the silver orders/orderdetail tasks are complete.

test_tbcategories

Simplified DAG for testing the pipeline with a single table (tbcategories), running bronze β†’ silver β†’ gold in sequence. Useful for validating the environment without triggering the full pipeline.

How Spark jobs are submitted

Airflow uses BashOperator to invoke spark-submit directly inside the spark_master container via docker exec:

docker exec spark_master spark-submit \
  --master local[*] \
  --packages com.mysql:mysql-connector-j:8.0.33 \
  /opt/spark/jobs/<job_file>.py

This approach avoids the need to install Spark inside the Airflow container.


πŸŽ“ Learning Outcomes and Applied Techniques

Data Engineering

  • βœ… Complete ETL pipeline (Extract, Transform, Load)
  • βœ… Layered data architecture (Medallion)
  • βœ… Dimensional modeling (Star Schema)
  • βœ… SCD Type 2 for historical dimensions
  • βœ… Surrogate Keys and Business Keys
  • βœ… Fact and dimension tables
  • βœ… Data quality validation
  • βœ… Pipeline orchestration with Apache Airflow

Apache Spark / PySpark

  • βœ… DataFrames API
  • βœ… Spark SQL
  • βœ… Transformations (select, filter, join, groupBy)
  • βœ… Complex aggregations
  • βœ… Window functions
  • βœ… Read/Write Parquet
  • βœ… JDBC connections
  • βœ… Performance optimization (cache, persist)

DevOps / Infrastructure

  • βœ… Docker containerization
  • βœ… Docker Compose orchestration
  • βœ… Network configuration
  • βœ… Volume management
  • βœ… Environment variables

πŸ”§ Future Improvements

  • Data quality framework (Great Expectations)
  • Optimized partitioning by date
  • Automated testing (pytest)
  • CI/CD pipeline
  • BI Dashboard (Power BI / Metabase)
  • Data lineage documentation
  • Monitoring and alerts
  • Parquet compression and optimization
  • Delta Lake implementation

πŸ“ License

This project was developed for educational and personal portfolio purposes.


πŸ’¬ Contact

Rodrigo Ribeiro
Data Engineer
LinkedIn: https://www.linkedin.com/in/rodrigo-ribeiro-pro/


⭐ If this project was useful to you, consider leaving a star!

Releases

No releases published

Packages

 
 
 

Contributors