In [1]:
# Cell: Imports and setup
import os
import json
import time
import logging
import sqlite3
from datetime import datetime
from typing import List, Dict, Any, Optional

import pandas as pd
import numpy as np

# Optional: requests; if unavailable, pip install in notebook: !pip install requests
try:
    import requests
except ImportError:
    requests = None

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s",
)

# Create a local "cloud bucket" directory
DATA_DIR = "data_bucket"
os.makedirs(DATA_DIR, exist_ok=True)




In [2]:
# Cell: Create sample SQLite database with a customers table
db_path = os.path.join(DATA_DIR, "sample.db")
conn = sqlite3.connect(db_path)
cur = conn.cursor()

cur.execute("""
CREATE TABLE IF NOT EXISTS customers (
    customer_id INTEGER PRIMARY KEY,
    name TEXT NOT NULL,
    email TEXT NOT NULL,
    country TEXT,
    created_at TEXT NOT NULL
)
""")

# Insert demo data (idempotent: avoid duplicating on re-run)
cur.execute("SELECT COUNT(*) FROM customers")
count = cur.fetchone()[0]
if count == 0:
    customers = [
        (1, "Aditi Sharma", "aditi@example.com", "IN", datetime.utcnow().isoformat()),
        (2, "Rohan Patel", "rohan@example.com", "IN", datetime.utcnow().isoformat()),
        (3, "Maya Singh", "maya@example.com", "US", datetime.utcnow().isoformat()),
    ]
    cur.executemany("""
        INSERT INTO customers (customer_id, name, email, country, created_at)
        VALUES (?, ?, ?, ?, ?)
    """, customers)
    conn.commit()

conn.close()
logging.info("SQLite demo database prepared at %s", db_path)


2025-12-12 14:30:29,042 | INFO | SQLite demo database prepared at data_bucket/sample.db


In [3]:
# Cell: Create sample CSV file for transactions
csv_path = os.path.join(DATA_DIR, "transactions.csv")

if not os.path.exists(csv_path):
    df_tx = pd.DataFrame({
        "transaction_id": [101, 102, 103, 104],
        "customer_id": [1, 2, 2, 3],
        "amount": [499.0, 999.5, 299.9, 120.0],
        "currency": ["INR", "INR", "INR", "USD"],
        "timestamp": [datetime.utcnow().isoformat() for _ in range(4)]
    })
    df_tx.to_csv(csv_path, index=False)

logging.info("CSV demo file prepared at %s", csv_path)


2025-12-12 14:31:31,446 | INFO | CSV demo file prepared at data_bucket/transactions.csv


In [4]:
# Cell: Ingest from SQLite
def ingest_from_sqlite(db_file: str, query: str) -> pd.DataFrame:
    logging.info("Ingesting from SQLite: %s", db_file)
    with sqlite3.connect(db_file) as conn:
        df = pd.read_sql_query(query, conn)
    logging.info("Loaded %d rows from SQL", len(df))
    return df

df_customers = ingest_from_sqlite(db_path, "SELECT * FROM customers")
df_customers


2025-12-12 14:32:47,555 | INFO | Ingesting from SQLite: data_bucket/sample.db
2025-12-12 14:32:47,560 | INFO | Loaded 3 rows from SQL


Unnamed: 0,customer_id,name,email,country,created_at
0,1,Aditi Sharma,aditi@example.com,IN,2025-12-12T09:00:29.041389
1,2,Rohan Patel,rohan@example.com,IN,2025-12-12T09:00:29.041396
2,3,Maya Singh,maya@example.com,US,2025-12-12T09:00:29.041398


In [5]:
# Cell: REST API ingestion with fallback example
def ingest_from_rest(url: str, timeout: float = 5.0) -> pd.DataFrame:
    """
    Attempts to pull data from a REST API and returns a DataFrame.
    Falls back to local static JSON if requests is unavailable or call fails.
    """
    logging.info("Ingesting from REST API: %s", url)
    fallback_data = [
        {"post_id": 1, "customer_id": 1, "title": "Hello World", "source": "fallback"},
        {"post_id": 2, "customer_id": 2, "title": "Data pipelines FTW", "source": "fallback"},
        {"post_id": 3, "customer_id": 3, "title": "Streaming matters", "source": "fallback"},
    ]

    if requests is None:
        logging.warning("requests not available, using fallback data.")
        return pd.DataFrame(fallback_data)

    try:
        resp = requests.get(url, timeout=timeout)
        resp.raise_for_status()
        data = resp.json()
        # Normalize to a DataFrame
        df = pd.json_normalize(data)
        logging.info("Loaded %d records from REST API", len(df))
        return df
    except Exception as e:
        logging.warning("REST API ingestion failed (%s). Using fallback.", e)
        return pd.DataFrame(fallback_data)

# Try a public placeholder API (works if you have internet). Else uses fallback.
api_url = "https://jsonplaceholder.typicode.com/posts"
df_posts = ingest_from_rest(api_url)
df_posts.head()


2025-12-12 14:35:00,924 | INFO | Ingesting from REST API: https://jsonplaceholder.typicode.com/posts
2025-12-12 14:35:01,798 | INFO | Loaded 100 records from REST API


Unnamed: 0,userId,id,title,body
0,1,1,sunt aut facere repellat provident occaecati e...,quia et suscipit\nsuscipit recusandae consequu...
1,1,2,qui est esse,est rerum tempore vitae\nsequi sint nihil repr...
2,1,3,ea molestias quasi exercitationem repellat qui...,et iusto sed quo iure\nvoluptatem occaecati om...
3,1,4,eum et est occaecati,ullam et saepe reiciendis voluptatem adipisci\...
4,1,5,nesciunt quas odio,repudiandae veniam quaerat sunt sed\nalias aut...


In [6]:
# Cell: Ingest from CSV
def ingest_from_csv(file_path: str) -> pd.DataFrame:
    logging.info("Ingesting from CSV: %s", file_path)
    df = pd.read_csv(file_path)
    logging.info("Loaded %d rows from CSV", len(df))
    return df

df_transactions = ingest_from_csv(csv_path)
df_transactions


2025-12-12 14:36:22,987 | INFO | Ingesting from CSV: data_bucket/transactions.csv
2025-12-12 14:36:22,994 | INFO | Loaded 4 rows from CSV


Unnamed: 0,transaction_id,customer_id,amount,currency,timestamp
0,101,1,499.0,INR,2025-12-12T09:01:31.436673
1,102,2,999.5,INR,2025-12-12T09:01:31.436678
2,103,2,299.9,INR,2025-12-12T09:01:31.436679
3,104,3,120.0,USD,2025-12-12T09:01:31.436681


In [10]:
# Cell: Simulate a streaming source using a generator
def event_stream(num_events: int = 10, sleep_secs: float = 0.0) -> Dict[str, Any]:
    """
    Yields synthetic clickstream events for demonstration.
    """
    for i in range(num_events):
        event = {
            "event_id": i + 1,
            "customer_id": np.random.choice([1, 2, 3]),
            "action": np.random.choice(["view", "add_to_cart", "purchase"]),
            "ts": datetime.utcnow().isoformat()
        }
        if sleep_secs:
            time.sleep(sleep_secs)
        yield event

def ingest_stream_to_df(num_events: int = 10, batch_size: int = 5) -> pd.DataFrame:
    logging.info("Starting streaming ingestion: %d events, batch_size=%d", num_events, batch_size)
    buffer: List[Dict[str, Any]] = []
    frames: List[pd.DataFrame] = []

    for evt in event_stream(num_events=num_events, sleep_secs=0.0):
        buffer.append(evt)
        if len(buffer) >= batch_size:
            frames.append(pd.DataFrame(buffer))
            logging.info("Committed mini-batch of %d events", len(buffer))
            buffer.clear()

    # Flush remaining
    if buffer:
        frames.append(pd.DataFrame(buffer))
        logging.info("Committed final mini-batch of %d events", len(buffer))

    df_stream = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()
    logging.info("Total streamed events ingested: %d", len(df_stream))
    return df_stream

df_stream = ingest_stream_to_df(num_events=20, batch_size=7)
df_stream.head(10)


2025-12-12 14:38:49,270 | INFO | Starting streaming ingestion: 20 events, batch_size=7
2025-12-12 14:38:49,273 | INFO | Committed mini-batch of 7 events
2025-12-12 14:38:49,275 | INFO | Committed mini-batch of 7 events
2025-12-12 14:38:49,277 | INFO | Committed final mini-batch of 6 events
2025-12-12 14:38:49,278 | INFO | Total streamed events ingested: 20


Unnamed: 0,event_id,customer_id,action,ts
0,1,3,view,2025-12-12T09:08:49.272533
1,2,3,purchase,2025-12-12T09:08:49.272579
2,3,2,view,2025-12-12T09:08:49.272608
3,4,3,add_to_cart,2025-12-12T09:08:49.272667
4,5,1,add_to_cart,2025-12-12T09:08:49.272695
5,6,1,view,2025-12-12T09:08:49.272721
6,7,3,add_to_cart,2025-12-12T09:08:49.272746
7,8,3,view,2025-12-12T09:08:49.274461
8,9,3,view,2025-12-12T09:08:49.274525
9,10,3,add_to_cart,2025-12-12T09:08:49.274548


In [11]:
# Cell: Simulate cloud storage ingestion by scanning a local bucket folder for JSON files
# Create a couple of JSON files to simulate object uploads
cloud_obj1 = os.path.join(DATA_DIR, "cloud_obj_1.json")
cloud_obj2 = os.path.join(DATA_DIR, "cloud_obj_2.json")

for i, p in enumerate([cloud_obj1, cloud_obj2], start=1):
    if not os.path.exists(p):
        with open(p, "w") as f:
            json.dump({"object_id": i, "customer_id": i, "tag": f"obj_{i}"}, f)

def ingest_from_cloud_bucket(bucket_dir: str, ext: str = ".json") -> pd.DataFrame:
    logging.info("Scanning bucket: %s", bucket_dir)
    rows = []
    for fname in os.listdir(bucket_dir):
        if fname.endswith(ext):
            full = os.path.join(bucket_dir, fname)
            try:
                with open(full, "r") as f:
                    data = json.load(f)
                rows.append(data)
            except Exception as e:
                logging.warning("Failed to read %s: %s", full, e)

    df = pd.DataFrame(rows)
    logging.info("Loaded %d objects from bucket", len(df))
    return df

df_bucket = ingest_from_cloud_bucket(DATA_DIR)
df_bucket


2025-12-12 14:39:41,998 | INFO | Scanning bucket: data_bucket
2025-12-12 14:39:42,001 | INFO | Loaded 2 objects from bucket


Unnamed: 0,object_id,customer_id,tag
0,2,2,obj_2
1,1,1,obj_1


In [12]:
# Cell: Simple validation functions
def validate_non_empty(df: pd.DataFrame, name: str) -> bool:
    if df is None or df.empty:
        logging.error("Validation failed: %s is empty.", name)
        return False
    logging.info("Validation passed: %s has %d rows.", name, len(df))
    return True

def validate_required_columns(df: pd.DataFrame, required_cols: List[str], name: str) -> bool:
    missing = [c for c in required_cols if c not in df.columns]
    if missing:
        logging.error("Validation failed: %s missing columns: %s", name, missing)
        return False
    logging.info("Validation passed: %s contains required columns.", name)
    return True

# Apply validations
assert validate_non_empty(df_customers, "customers")
assert validate_required_columns(df_customers, ["customer_id", "email", "name"], "customers")

assert validate_non_empty(df_transactions, "transactions")
assert validate_required_columns(df_transactions, ["transaction_id", "customer_id", "amount"], "transactions")

assert validate_non_empty(df_stream, "stream")
assert validate_required_columns(df_stream, ["event_id", "customer_id", "action"], "stream")

assert validate_non_empty(df_bucket, "bucket")
assert validate_required_columns(df_bucket, ["object_id", "customer_id", "tag"], "bucket")

# For REST data, we can't guarantee schema; validate non-empty only
assert validate_non_empty(df_posts, "posts")


2025-12-12 14:40:44,315 | INFO | Validation passed: customers has 3 rows.
2025-12-12 14:40:44,317 | INFO | Validation passed: customers contains required columns.
2025-12-12 14:40:44,318 | INFO | Validation passed: transactions has 4 rows.
2025-12-12 14:40:44,318 | INFO | Validation passed: transactions contains required columns.
2025-12-12 14:40:44,318 | INFO | Validation passed: stream has 20 rows.
2025-12-12 14:40:44,319 | INFO | Validation passed: stream contains required columns.
2025-12-12 14:40:44,319 | INFO | Validation passed: bucket has 2 rows.
2025-12-12 14:40:44,320 | INFO | Validation passed: bucket contains required columns.
2025-12-12 14:40:44,320 | INFO | Validation passed: posts has 100 rows.


In [13]:
# Cell: Normalize REST posts to a minimal schema for merging
# Aim to have 'customer_id' present for join; if not present, map a subset
if "userId" in df_posts.columns and "id" in df_posts.columns and "title" in df_posts.columns:
    df_posts_norm = df_posts.rename(columns={"userId": "customer_id", "id": "post_id"})[
        ["post_id", "customer_id", "title"]
    ].copy()
else:
    # Use fallback or inferred schema
    cols = [c for c in ["post_id", "customer_id", "title"] if c in df_posts.columns]
    df_posts_norm = df_posts[cols].copy()

df_posts_norm["source"] = "posts"

# Create a unified customer dimension
df_customers_dim = df_customers.copy()

# Join transactions with customers
df_tx_joined = df_transactions.merge(df_customers_dim, on="customer_id", how="left", suffixes=("", "_cust"))

# Join stream with customers
df_stream_joined = df_stream.merge(df_customers_dim, on="customer_id", how="left")

# Join bucket objects with customers
df_bucket_joined = df_bucket.merge(df_customers_dim, on="customer_id", how="left")

# For posts, join if possible
if "customer_id" in df_posts_norm.columns:
    df_posts_joined = df_posts_norm.merge(df_customers_dim, on="customer_id", how="left")
else:
    df_posts_joined = df_posts_norm.copy()

# Add a dataset label for union
df_tx_joined["dataset"] = "transactions"
df_stream_joined["dataset"] = "stream"
df_bucket_joined["dataset"] = "bucket_objects"
df_posts_joined["dataset"] = "posts"

# Select common unionable columns
union_cols = sorted(set(
    ["customer_id", "name", "email", "country", "dataset", "timestamp", "amount", "action", "title", "tag"]
))

def select_cols(df: pd.DataFrame, cols: List[str]) -> pd.DataFrame:
    existing = [c for c in cols if c in df.columns]
    return df[existing].copy()

df_union = pd.concat([
    select_cols(df_tx_joined, union_cols),
    select_cols(df_stream_joined, union_cols),
    select_cols(df_bucket_joined, union_cols),
    select_cols(df_posts_joined, union_cols)
], ignore_index=True)

logging.info("Unified DataFrame rows: %d", len(df_union))
df_union.sample(min(10, len(df_union)))


2025-12-12 14:44:20,985 | INFO | Unified DataFrame rows: 126


Unnamed: 0,amount,country,customer_id,dataset,email,name,timestamp,action,tag,title
119,,,10,posts,,,,,,qui qui voluptates illo iste minima
42,,IN,2,posts,rohan@example.com,Rohan Patel,,,,fugit voluptas sed molestias voluptatem provident
78,,,6,posts,,,,,,ut quo aut ducimus alias
26,,IN,1,posts,aditi@example.com,Aditi Sharma,,,,sunt aut facere repellat provident occaecati e...
96,,,8,posts,,,,,,et iusto veniam et illum aut fuga
30,,IN,1,posts,aditi@example.com,Aditi Sharma,,,,nesciunt quas odio
74,,,5,posts,,,,,,laborum non sunt aut ut assumenda perspiciatis...
76,,,6,posts,,,,,,soluta aliquam aperiam consequatur illo quis v...
1,999.5,IN,2,transactions,rohan@example.com,Rohan Patel,2025-12-12T09:01:31.436678,,,
70,,,5,posts,,,,,,ut numquam possimus omnis eius suscipit laudan...


In [14]:
# Cell: Basic observability metrics
metrics = {
    "rows_customers": len(df_customers),
    "rows_transactions": len(df_transactions),
    "rows_stream": len(df_stream),
    "rows_bucket": len(df_bucket),
    "rows_posts": len(df_posts),
    "rows_union": len(df_union),
    "nulls_by_column_union": df_union.isna().sum().to_dict()
}
logging.info("Metrics: %s", metrics)
metrics


2025-12-12 14:44:59,577 | INFO | Metrics: {'rows_customers': 3, 'rows_transactions': 4, 'rows_stream': 20, 'rows_bucket': 2, 'rows_posts': 100, 'rows_union': 126, 'nulls_by_column_union': {'amount': 122, 'country': 70, 'customer_id': 0, 'dataset': 0, 'email': 70, 'name': 70, 'timestamp': 122, 'action': 106, 'tag': 124, 'title': 26}}


{'rows_customers': 3,
 'rows_transactions': 4,
 'rows_stream': 20,
 'rows_bucket': 2,
 'rows_posts': 100,
 'rows_union': 126,
 'nulls_by_column_union': {'amount': 122,
  'country': 70,
  'customer_id': 0,
  'dataset': 0,
  'email': 70,
  'name': 70,
  'timestamp': 122,
  'action': 106,
  'tag': 124,
  'title': 26}}