# Data Engineering Pipeline: Social Media Metrics

This notebook explains the **end-to-end data engineering pipeline** for turning raw unstructured PDF reports into queryable analytical data for business analysts.

---

## Pipeline Overview

```
SOURCE  →  INGESTION  →  RAW STORAGE  →  TRANSFORMATION  →  MODELLING  →  SERVING
(PDFs)      (Extract)     (Postgres)      (Clean/Shape)     (Analytical)   (Analysts)
```

| Stage | Purpose |
|-------|---------|
| **Source** | Raw PDF files containing social media metrics (unstructured) |
| **Ingestion** | Extract text/metrics from PDFs and load into the system |
| **Raw Storage** | Persist raw extracted data in Postgres (bronze layer) |
| **Transformation** | Clean, validate, and reshape data for analytics |
| **Modelling** | Build analyst-friendly dimensional/fact models |
| **Serving** | Serve data to end users via Postgres queries / BI tools |

### Setup & Dependencies

Run the cell below once to ensure required packages are installed. The notebook uses:
- `sqlalchemy` — database connectivity (Postgres or SQLite)
- `pandas` — data transformation
- `pdfplumber` — optional, for real PDF extraction (falls back to sample data if missing)

In [None]:
# OPTIONAL: Install dependencies (uncomment and run once if needed)
# !pip install sqlalchemy pandas pdfplumber

# Verify core imports
import sqlalchemy
import pandas as pd
print("Setup OK: sqlalchemy, pandas ready")

---

## Stage 1: SOURCE — Raw Unstructured PDFs

**What it is:** The origin of our data — PDF reports (often monthly/quarterly) containing social media metrics such as:
- Impressions, reach, engagement (likes, comments, shares)
- Follower growth, click-through rates
- Platform-specific KPIs (Facebook, Instagram, LinkedIn, etc.)

**Why it's challenging:** PDFs are unstructured. Text, tables, and charts are embedded in a binary format. We cannot query them directly — we must extract and structure the content first.

In [None]:
# =============================================================================
# STAGE 1: SOURCE — Representing raw PDF files
# =============================================================================
# In practice, these PDFs live in a folder, S3 bucket, or shared drive.
# We simulate the source by listing files we intend to ingest.
# =============================================================================

from pathlib import Path

# Example: directory containing monthly social media PDF reports
SOURCE_DIR = Path("data/raw_reports")  # e.g. ./data/raw_reports/2024-01_report.pdf

# Each PDF file is a SOURCE document — unstructured, not queryable yet
def list_source_pdfs(directory: Path) -> list[Path]:
    """
    List all PDF files in the source directory.
    These represent the raw, unstructured input to our pipeline.
    """
    if not directory.exists():
        return []
    return sorted(directory.glob("*.pdf"))

# In production: SOURCE = S3 paths, Azure Blob, or file share
source_files = list_source_pdfs(SOURCE_DIR)
print(f"Source: {len(source_files)} PDF file(s) identified for ingestion")

---

## Stage 2: INGESTION — Extract and Load

**What it is:** The process of reading PDFs, extracting text and tabular data, and preparing it for storage.

**Key tasks:**
- Open each PDF
- Extract text, tables, and metadata (e.g. report date, platform name)
- Normalise into a consistent structure (e.g. rows with platform, metric_name, value, report_date)
- Load into Raw Storage

**Tools:** Libraries like `PyMuPDF` (fitz), `pdfplumber`, or `tabula-py` for table extraction.

In [None]:
# =============================================================================
# STAGE 2: INGESTION — Extract content from PDFs
# =============================================================================
# We extract text/tables and convert to a structured format (records)
# that can be written to Postgres. This is the "bronze" or "raw" ingest.
# =============================================================================

# Optional: pip install pymupdf pdfplumber  (use one for PDF parsing)

import re
from datetime import datetime
from typing import Any


def extract_metrics_from_pdf(pdf_path: Path) -> list[dict[str, Any]]:
    """
    INGESTION: Parse a PDF and extract social media metrics as structured records.
    
    In a real implementation:
    - Use pdfplumber or PyMuPDF to extract tables
    - Map table columns to: platform, metric_name, value, report_date
    - Return a list of dicts, one per metric row
    """
    records = []
    try:
        import pdfplumber
        with pdfplumber.open(pdf_path) as pdf:
            for page in pdf.pages:
                tables = page.extract_tables()
                for table in tables or []:
                    # Assume header row, then data rows
                    # Example: [['Platform','Metric','Value'], ['Instagram','Impressions','15000'], ...]
                    if len(table) < 2:
                        continue
                    headers = [str(h or "").strip().lower() for h in table[0]]
                    for row in table[1:]:
                        if not row:
                            continue
                        record = dict(zip(headers, [str(c or "").strip() for c in row]))
                        # Add metadata from filename (e.g. 2024-01_report.pdf -> report_date)
                        record["source_file"] = pdf_path.name
                        record["ingested_at"] = datetime.utcnow().isoformat()
                        records.append(record)
    except ImportError:
        # Fallback: simulate extracted data for demonstration
        records = [
            {"platform": "Instagram", "metric_name": "impressions", "value": "15000", "source_file": pdf_path.name, "ingested_at": datetime.utcnow().isoformat()},
            {"platform": "Instagram", "metric_name": "engagement", "value": "1200", "source_file": pdf_path.name, "ingested_at": datetime.utcnow().isoformat()},
            {"platform": "LinkedIn", "metric_name": "impressions", "value": "8000", "source_file": pdf_path.name, "ingested_at": datetime.utcnow().isoformat()},
        ]
    return records


# Example: for each source PDF, extract and collect records for loading
all_raw_records = []
for pdf_file in source_files:
    all_raw_records.extend(extract_metrics_from_pdf(pdf_file))

# If no PDFs exist, create sample records for demo purposes
if not all_raw_records:
    all_raw_records = extract_metrics_from_pdf(Path("sample_report.pdf"))

print(f"Ingestion complete: {len(all_raw_records)} raw records extracted")

---

## Stage 3: RAW STORAGE — Postgres (Bronze Layer)

**What it is:** A durable storage layer that holds the **raw ingested data** exactly as extracted — no cleansing or aggregation yet.

**Why Postgres?**
- ACID compliance, reliable storage
- SQL query capability for downstream stages
- Well-supported by BI tools and analysts

**Schema design:** A simple table that stores each extracted record with metadata (source file, ingestion timestamp). This is the "bronze" or raw layer — we keep it as-is for replay and debugging.

In [None]:
# =============================================================================
# STAGE 3: RAW STORAGE — Persist ingested data in Postgres
# =============================================================================
# We create a table for the raw/bronze layer and insert ingested records.
# Connection: set POSTGRES_URL or use sqlite for local demo.
# =============================================================================

import os
import json
from datetime import datetime

# Use Postgres if available; otherwise SQLite for local demo
POSTGRES_URL = os.getenv("POSTGRES_URL", "postgresql://localhost:5432/social_metrics")
USE_SQLITE = os.getenv("USE_SQLITE", "true").lower() == "true"  # Default: SQLite for portability

if USE_SQLITE:
    DB_URL = "sqlite:///data/social_metrics_raw.db"
else:
    DB_URL = POSTGRES_URL


def create_raw_storage_schema(engine):
    """
    RAW STORAGE schema: One table to store all raw ingested records.
    We preserve original structure (JSONB in Postgres) for flexibility.
    """
    from sqlalchemy import create_engine, text
    
    with engine.connect() as conn:
        conn.execute(text("""
            CREATE TABLE IF NOT EXISTS raw_social_metrics (
                id SERIAL PRIMARY KEY,
                source_file VARCHAR(255),
                ingested_at TIMESTAMP,
                raw_json JSONB
            )
        """)) if "postgresql" in str(engine.url) else conn.execute(text("""
            CREATE TABLE IF NOT EXISTS raw_social_metrics (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                source_file VARCHAR(255),
                ingested_at TIMESTAMP,
                raw_json TEXT
            )
        """))
        conn.commit()


def load_into_raw_storage(engine, records: list[dict]) -> int:
    """
    INGESTION → RAW STORAGE: Insert each extracted record into Postgres.
    """
    from sqlalchemy import create_engine, text
    
    count = 0
    with engine.connect() as conn:
        for r in records:
            source_file = r.get("source_file", "unknown")
            ingested_at = r.get("ingested_at", datetime.utcnow().isoformat())
            raw_json = json.dumps({k: v for k, v in r.items() if k not in ("source_file", "ingested_at")})
            conn.execute(text("""
                INSERT INTO raw_social_metrics (source_file, ingested_at, raw_json)
                VALUES (:sf, :ia, :rj)
            """), {"sf": source_file, "ia": ingested_at, "rj": raw_json})
            count += 1
        conn.commit()
    return count

# Create engine and load data (simplified — in production use connection pooling)
from sqlalchemy import create_engine
Path("data").mkdir(exist_ok=True)
engine = create_engine(DB_URL)
# For SQLite, schema differs slightly — we use a generic approach below
print(f"Raw storage: {DB_URL}")

In [None]:
# =============================================================================
# Apply RAW STORAGE schema (SQLite-compatible version)
# =============================================================================

from sqlalchemy import text

with engine.connect() as conn:
    conn.execute(text("""
        CREATE TABLE IF NOT EXISTS raw_social_metrics (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            source_file VARCHAR(255),
            ingested_at TIMESTAMP,
            raw_json TEXT
        )
    """))
    conn.commit()

loaded = load_into_raw_storage(engine, all_raw_records)
print(f"Loaded {loaded} records into raw storage")

---

## Stage 4: TRANSFORMATION — Clean and Reshape

**What it is:** Apply business rules to clean and standardise the raw data.

**Typical steps:**
- Normalise column names (e.g. `metric` → `metric_name`)
- Parse strings to proper types (ints, floats, dates)
- Handle missing values and duplicates
- Derive report_date from filename or embedded metadata
- Standardise platform names (e.g. "IG" → "Instagram")

**Output:** A cleaned dataset ready for modelling (silver layer).

In [None]:
# =============================================================================
# STAGE 4: TRANSFORMATION — Clean and standardise raw data
# =============================================================================
# Read from raw storage, apply cleaning rules, produce silver-level data.
# =============================================================================

import json
import re
import pandas as pd
from sqlalchemy import text


def extract_report_date(source_file: str) -> str:
    """Derive report date from filename (e.g. 2024-01_report.pdf -> 2024-01-01)."""
    match = re.search(r"(\d{4})[-_]?(\d{2})", source_file or "")
    if match:
        return f"{match.group(1)}-{match.group(2)}-01"
    return None


def transform_raw_to_silver(raw_df: pd.DataFrame) -> pd.DataFrame:
    """
    TRANSFORMATION: Clean and standardise raw social media metrics.
    - Normalise column names
    - Parse numeric values
    - Standardise platform names
    - Add report_date
    """
    # Flatten raw_json if stored as JSON string
    if "raw_json" in raw_df.columns:
        expanded = raw_df["raw_json"].apply(
            lambda x: json.loads(x) if isinstance(x, str) else (x or {})
        )
        raw_df = pd.concat([raw_df.drop(columns=["raw_json"]), pd.json_normalize(expanded)], axis=1)
    
    # Normalise column names for common variants
    cols = {c.lower().replace(" ", "_"): c for c in raw_df.columns}
    raw_df = raw_df.rename(columns={v: k for k, v in cols.items()})
    
    # Map to canonical columns
    platform_col = next((c for c in ["platform", "channel", "social_platform"] if c in raw_df.columns), None)
    metric_col = next((c for c in ["metric_name", "metric", "kpi"] if c in raw_df.columns), None)
    value_col = next((c for c in ["value", "count", "metric_value"] if c in raw_df.columns), None)
    
    df = pd.DataFrame()
    df["platform"] = raw_df[platform_col] if platform_col else "unknown"
    df["metric_name"] = raw_df[metric_col] if metric_col else "unknown"
    df["value"] = pd.to_numeric(raw_df[value_col], errors="coerce").fillna(0)
    df["report_date"] = raw_df["source_file"].apply(extract_report_date)
    df["source_file"] = raw_df["source_file"]
    
    # Standardise platform names
    platform_map = {"ig": "Instagram", "fb": "Facebook", "li": "LinkedIn", "tw": "Twitter", "x": "X"}
    df["platform"] = df["platform"].replace(platform_map).str.strip()
    
    # Drop rows with invalid values
    df = df[df["value"] > 0].drop_duplicates()
    return df


# Read from raw storage and transform
with engine.connect() as conn:
    raw_df = pd.read_sql(text("SELECT * FROM raw_social_metrics"), conn)

silver_df = transform_raw_to_silver(raw_df)
print(f"Transformation complete: {len(silver_df)} cleaned records")
silver_df.head(10)

---

## Stage 5: MODELLING — Analyst-Friendly Schema

**What it is:** Design tables/views that make it **easy for analysts** to run common queries.

**Modelling choices for social media metrics:**
- **Fact table:** One row per (platform, metric, report_date) with value
- **Dimension tables:** Platforms, metrics, dates for filtering
- **Aggregations:** Pre-computed totals, growth rates, benchmarks

**Goal:** Analysts can write simple SQL like `SELECT platform, SUM(value) FROM metrics GROUP BY platform` without wrestling with raw JSON or inconsistent column names.

In [None]:
# =============================================================================
# STAGE 5: MODELLING — Build analyst-friendly fact and dimension tables
# =============================================================================
# We persist the silver data into a star schema: fact_metrics + dim_platform, dim_metric
# =============================================================================

from sqlalchemy import text


def create_modelled_schema(engine):
    """
    MODELLING: Create fact and dimension tables for easy analyst querying.
    - fact_social_metrics: (platform_id, metric_id, report_date, value)
    - dim_platform: platform lookup
    - dim_metric: metric lookup
    """
    with engine.connect() as conn:
        conn.execute(text("""
            CREATE TABLE IF NOT EXISTS dim_platform (
                platform_id INTEGER PRIMARY KEY AUTOINCREMENT,
                platform_name VARCHAR(100) UNIQUE
            )
        """))
        conn.execute(text("""
            CREATE TABLE IF NOT EXISTS dim_metric (
                metric_id INTEGER PRIMARY KEY AUTOINCREMENT,
                metric_name VARCHAR(100) UNIQUE
            )
        """))
        conn.execute(text("""
            CREATE TABLE IF NOT EXISTS fact_social_metrics (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                platform_id INTEGER,
                metric_id INTEGER,
                report_date DATE,
                value REAL,
                FOREIGN KEY (platform_id) REFERENCES dim_platform(platform_id),
                FOREIGN KEY (metric_id) REFERENCES dim_metric(metric_id)
            )
        """))
        conn.commit()


def load_modelled_data(engine, silver_df: pd.DataFrame):
    """
    Load transformed (silver) data into the modelled star schema.
    """
    platforms = silver_df["platform"].dropna().unique().tolist()
    metrics = silver_df["metric_name"].dropna().unique().tolist()
    
    with engine.connect() as conn:
        for p in platforms:
            conn.execute(text("INSERT OR IGNORE INTO dim_platform (platform_name) VALUES (:p)"), {"p": p})
        for m in metrics:
            conn.execute(text("INSERT OR IGNORE INTO dim_metric (metric_name) VALUES (:m)"), {"m": m})
        conn.commit()
        
        platform_ids = pd.read_sql(text("SELECT platform_id, platform_name FROM dim_platform"), conn)
        metric_ids = pd.read_sql(text("SELECT metric_id, metric_name FROM dim_metric"), conn)
    
    silver_df = silver_df.merge(platform_ids, left_on="platform", right_on="platform_name", how="left")
    silver_df = silver_df.merge(metric_ids, left_on="metric_name", right_on="metric_name", how="left")
    
    with engine.connect() as conn:
        for _, row in silver_df.iterrows():
            conn.execute(text("""
                INSERT INTO fact_social_metrics (platform_id, metric_id, report_date, value)
                VALUES (:pid, :mid, :rd, :v)
            """), {"pid": row["platform_id"], "mid": row["metric_id"], "rd": row["report_date"] or "2024-01-01", "v": row["value"]})
        conn.commit()


create_modelled_schema(engine)
load_modelled_data(engine, silver_df)
print("Modelling complete: fact_social_metrics + dim_platform + dim_metric populated")

---

## Stage 6: SERVING — Query Data for Analysts

**What it is:** The final layer where **end users (analysts)** consume data. They connect BI tools (Tableau, Power BI, Metabase) or run SQL directly against Postgres.

**Serving from Postgres:**
- The modelled tables (fact + dimensions) are the serving layer
- Analysts write simple, readable SQL
- Views can pre-join dimensions for convenience
- Access control and query performance are managed at this layer

**Example analyst queries:**
- Total impressions by platform
- Engagement trend over time
- Top-performing platform by metric

In [None]:
# =============================================================================
# STAGE 6: SERVING — Example analyst queries against the modelled data
# =============================================================================
# These are the kinds of queries analysts run from Postgres / BI tools.
# =============================================================================

# --- Query 1: Total value by platform (e.g. total impressions per platform) ---
query_by_platform = """
SELECT p.platform_name, SUM(f.value) AS total_value
FROM fact_social_metrics f
JOIN dim_platform p ON f.platform_id = p.platform_id
JOIN dim_metric m ON f.metric_id = m.metric_id
GROUP BY p.platform_name
ORDER BY total_value DESC
"""

# --- Query 2: Metrics by report date (trend over time) ---
query_trend = """
SELECT f.report_date, p.platform_name, m.metric_name, SUM(f.value) AS total_value
FROM fact_social_metrics f
JOIN dim_platform p ON f.platform_id = p.platform_id
JOIN dim_metric m ON f.metric_id = m.metric_id
GROUP BY f.report_date, p.platform_name, m.metric_name
ORDER BY f.report_date, p.platform_name
"""

# --- Query 3: Simple flat view for ad-hoc analysis ---
query_flat = """
SELECT p.platform_name, m.metric_name, f.report_date, f.value
FROM fact_social_metrics f
JOIN dim_platform p ON f.platform_id = p.platform_id
JOIN dim_metric m ON f.metric_id = m.metric_id
ORDER BY f.report_date, p.platform_name, m.metric_name
"""

# Execute and display results
from sqlalchemy import text

with engine.connect() as conn:
    df_platform = pd.read_sql(text(query_by_platform), conn)
    print("\n--- SERVING: Total by platform ---")
    print(df_platform.to_string(index=False))
    
    df_trend = pd.read_sql(text(query_trend), conn)
    print("\n--- SERVING: Trend by date/platform/metric ---")
    print(df_trend.to_string(index=False))

---

## Summary: End-to-End Flow

| Stage | Artifact | Purpose |
|-------|----------|---------|
| **Source** | PDF files | Raw, unstructured input |
| **Ingestion** | Extracted records | Structured extraction from PDFs |
| **Raw Storage** | `raw_social_metrics` | Postgres bronze layer — immutable raw data |
| **Transformation** | Silver DataFrame | Cleaned, validated, standardised |
| **Modelling** | `fact_social_metrics`, `dim_*` | Star schema for analytics |
| **Serving** | SQL queries / BI tools | Analysts query Postgres directly |

**Production considerations:**
- **Orchestration:** Use Airflow, Dagster, or Prefect to schedule ingestion and transformation
- **Incremental loads:** Track `ingested_at` or `report_date` to avoid re-processing
- **Data quality:** Add validation checks (e.g. Great Expectations) in transformation
- **Serving:** Use views, materialised views, or a separate analytics schema for read-heavy workloads