# BDA Practice Lab 04 — Relational + Streaming

> Author : Badr TAJINI - Big Data Analytics - ESIEE 2025-2026

This lab combines **Chapter 7** (Analyzing Relational Data) and **Chapter 8** (Real‑Time Analytics) practicals.

## 0. Bootstrap

In [1]:
import sys
import platform
import shutil
from pathlib import Path

from pyspark.sql import SparkSession
import pyspark

spark = (
    SparkSession.builder
    .appName('BDA-Lab04')
    .config('spark.sql.session.timeZone', 'UTC')
    .config('spark.sql.shuffle.partitions', '4')
    .getOrCreate()
)

spark.sparkContext.setLogLevel('WARN')

print(f'Spark version: {spark.version}')
print(f'PySpark version: {pyspark.__version__}')
print(f'Python version: {sys.version.split()[0]}')
print(f'Session timezone: {spark.conf.get("spark.sql.session.timeZone")}')
print(f'Shuffle partitions: {spark.conf.get("spark.sql.shuffle.partitions")}')

BASE_DIR = Path.cwd()
DATA_ROOT = BASE_DIR / 'data'
OUTPUT_ROOT = BASE_DIR / 'outputs'
PROOF_ROOT = BASE_DIR / 'proof'
CHECKPOINT_ROOT = BASE_DIR / 'checkpoints'

for directory in (DATA_ROOT, OUTPUT_ROOT, PROOF_ROOT, CHECKPOINT_ROOT):
    directory.mkdir(parents=True, exist_ok=True)

print(f'Workspace ready at {BASE_DIR}')



Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/10/14 22:25:21 WARN Utils: Your hostname, btj-XPS-13-9380, resolves to a loopback address: 127.0.1.1; using 172.16.10.167 instead (on interface wlp2s0)
25/10/14 22:25:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/10/14 22:25:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark version: 4.0.1
PySpark version: 4.0.1
Python version: 3.13.5


Session timezone: UTC
Shuffle partitions: 4
Workspace ready at /home/btj/bda-website/BDA/labs-final/lab4-practice


## 1. Data Acquisition

In [2]:

from datetime import datetime, timedelta
from pathlib import Path

from pyspark.sql import Row, types as T

TPC_H_TEXT_DIR = DATA_ROOT / 'tpch' / 'TPC-H-0.1-TXT'
TPC_H_PARQUET_DIR = DATA_ROOT / 'tpch' / 'TPC-H-0.1-PARQUET'
TAXI_DIR = DATA_ROOT / 'taxi-data'

TPC_H_TEXT_DIR.mkdir(parents=True, exist_ok=True)
TPC_H_PARQUET_DIR.mkdir(parents=True, exist_ok=True)
TAXI_DIR.mkdir(parents=True, exist_ok=True)

TPCH_DEFINITIONS = {
    'nation': {
        'columns': [('n_nationkey', int), ('n_name', str), ('n_regionkey', int), ('n_comment', str)],
        'rows': [
            {'n_nationkey': 1, 'n_name': 'UNITED STATES', 'n_regionkey': 1, 'n_comment': 'USA'},
            {'n_nationkey': 2, 'n_name': 'CANADA', 'n_regionkey': 1, 'n_comment': 'CAN'},
            {'n_nationkey': 3, 'n_name': 'BRAZIL', 'n_regionkey': 2, 'n_comment': 'BRA'},
        ],
    },
    'customer': {
        'columns': [('c_custkey', int), ('c_name', str), ('c_nationkey', int), ('c_comment', str)],
        'rows': [
            {'c_custkey': 1, 'c_name': 'Customer#1', 'c_nationkey': 1, 'c_comment': 'USA customer'},
            {'c_custkey': 2, 'c_name': 'Customer#2', 'c_nationkey': 2, 'c_comment': 'Canada customer'},
            {'c_custkey': 3, 'c_name': 'Customer#3', 'c_nationkey': 1, 'c_comment': 'USA repeat'},
        ],
    },
    'orders': {
        'columns': [
            ('o_orderkey', int),
            ('o_custkey', int),
            ('o_orderstatus', str),
            ('o_totalprice', float),
            ('o_orderdate', str),
            ('o_clerk', str),
            ('o_shippriority', int),
            ('o_comment', str),
        ],
        'rows': [
            {'o_orderkey': 1, 'o_custkey': 1, 'o_orderstatus': 'O', 'o_totalprice': 1000.0, 'o_orderdate': '1995-03-01', 'o_clerk': 'Clerk#000000001', 'o_shippriority': 0, 'o_comment': 'first order'},
            {'o_orderkey': 2, 'o_custkey': 2, 'o_orderstatus': 'O', 'o_totalprice': 800.0, 'o_orderdate': '1995-03-05', 'o_clerk': 'Clerk#000000002', 'o_shippriority': 0, 'o_comment': 'canada order'},
            {'o_orderkey': 3, 'o_custkey': 1, 'o_orderstatus': 'F', 'o_totalprice': 1200.0, 'o_orderdate': '1995-04-10', 'o_clerk': 'Clerk#000000003', 'o_shippriority': 0, 'o_comment': 'usa april'},
            {'o_orderkey': 4, 'o_custkey': 2, 'o_orderstatus': 'F', 'o_totalprice': 650.0, 'o_orderdate': '1995-05-01', 'o_clerk': 'Clerk#000000004', 'o_shippriority': 0, 'o_comment': 'canada may'},
        ],
    },
    'part': {
        'columns': [('p_partkey', int), ('p_name', str), ('p_mfgr', str), ('p_brand', str), ('p_type', str)],
        'rows': [
            {'p_partkey': 1, 'p_name': 'Part#1', 'p_mfgr': 'MFGR#1', 'p_brand': 'Brand#1', 'p_type': 'SMALL'},
            {'p_partkey': 2, 'p_name': 'Part#2', 'p_mfgr': 'MFGR#2', 'p_brand': 'Brand#2', 'p_type': 'MEDIUM'},
        ],
    },
    'supplier': {
        'columns': [('s_suppkey', int), ('s_name', str), ('s_nationkey', int), ('s_comment', str)],
        'rows': [
            {'s_suppkey': 1, 's_name': 'Supplier#1', 's_nationkey': 1, 's_comment': 'usa supplier'},
            {'s_suppkey': 2, 's_name': 'Supplier#2', 's_nationkey': 2, 's_comment': 'can supplier'},
        ],
    },
    'lineitem': {
        'columns': [
            ('l_orderkey', int),
            ('l_partkey', int),
            ('l_suppkey', int),
            ('l_linenumber', int),
            ('l_quantity', float),
            ('l_extendedprice', float),
            ('l_discount', float),
            ('l_tax', float),
            ('l_returnflag', str),
            ('l_linestatus', str),
            ('l_shipdate', str),
            ('l_commitdate', str),
            ('l_receiptdate', str),
            ('l_shipinstruct', str),
            ('l_shipmode', str),
            ('l_comment', str),
        ],
        'rows': [
            {'l_orderkey': 1, 'l_partkey': 1, 'l_suppkey': 1, 'l_linenumber': 1, 'l_quantity': 3.0, 'l_extendedprice': 300.0, 'l_discount': 0.0, 'l_tax': 0.0, 'l_returnflag': 'N', 'l_linestatus': 'O', 'l_shipdate': '1995-03-15', 'l_commitdate': '1995-03-13', 'l_receiptdate': '1995-03-20', 'l_shipinstruct': 'DELIVER IN PERSON', 'l_shipmode': 'AIR', 'l_comment': 'usa order'},
            {'l_orderkey': 1, 'l_partkey': 2, 'l_suppkey': 2, 'l_linenumber': 2, 'l_quantity': 2.0, 'l_extendedprice': 200.0, 'l_discount': 0.0, 'l_tax': 0.0, 'l_returnflag': 'N', 'l_linestatus': 'O', 'l_shipdate': '1995-03-15', 'l_commitdate': '1995-03-13', 'l_receiptdate': '1995-03-21', 'l_shipinstruct': 'DELIVER IN PERSON', 'l_shipmode': 'AIR', 'l_comment': 'mixed suppliers'},
            {'l_orderkey': 2, 'l_partkey': 1, 'l_suppkey': 1, 'l_linenumber': 1, 'l_quantity': 5.0, 'l_extendedprice': 500.0, 'l_discount': 0.05, 'l_tax': 0.0, 'l_returnflag': 'N', 'l_linestatus': 'O', 'l_shipdate': '1995-03-15', 'l_commitdate': '1995-03-14', 'l_receiptdate': '1995-03-22', 'l_shipinstruct': 'TAKE BACK RETURN', 'l_shipmode': 'MAIL', 'l_comment': 'canada'},
            {'l_orderkey': 3, 'l_partkey': 2, 'l_suppkey': 2, 'l_linenumber': 1, 'l_quantity': 1.0, 'l_extendedprice': 150.0, 'l_discount': 0.0, 'l_tax': 0.0, 'l_returnflag': 'R', 'l_linestatus': 'F', 'l_shipdate': '1995-04-12', 'l_commitdate': '1995-04-10', 'l_receiptdate': '1995-04-18', 'l_shipinstruct': 'DELIVER IN PERSON', 'l_shipmode': 'TRUCK', 'l_comment': 'usa april'},
            {'l_orderkey': 4, 'l_partkey': 1, 'l_suppkey': 1, 'l_linenumber': 1, 'l_quantity': 4.0, 'l_extendedprice': 420.0, 'l_discount': 0.0, 'l_tax': 0.0, 'l_returnflag': 'R', 'l_linestatus': 'F', 'l_shipdate': '1995-05-20', 'l_commitdate': '1995-05-18', 'l_receiptdate': '1995-05-25', 'l_shipinstruct': 'COLLECT COD', 'l_shipmode': 'SHIP', 'l_comment': 'canada may'},
        ],
    },
}

def write_pipe_table(table: str, definition: dict) -> None:
    path = TPC_H_TEXT_DIR / f'{table}.tbl'
    columns = [col for col, _ in definition['columns']]
    with path.open('w', encoding='utf-8') as handle:
        for row in definition['rows']:
            values = ['' if row.get(col) is None else str(row.get(col)) for col in columns]
            handle.write('|'.join(values) + '|\n')

def write_parquet_table(table: str, definition: dict) -> None:
    columns = definition['columns']
    schema = T.StructType([
        T.StructField(col, T.IntegerType() if caster is int else T.DoubleType() if caster is float else T.StringType(), True)
        for col, caster in columns
    ])
    data_rows = []
    for row in definition['rows']:
        values = []
        for col, caster in columns:
            value = row.get(col)
            if value is None:
                values.append(None)
            elif caster is int:
                values.append(int(value))
            elif caster is float:
                values.append(float(value))
            else:
                values.append(str(value))
        data_rows.append(values)
    df = spark.createDataFrame(data_rows, schema=schema)
    target = TPC_H_PARQUET_DIR / table
    df.write.mode('overwrite').parquet(str(target))

for table, definition in TPCH_DEFINITIONS.items():
    text_path = TPC_H_TEXT_DIR / f'{table}.tbl'
    parquet_path = TPC_H_PARQUET_DIR / table
    if not text_path.exists():
        write_pipe_table(table, definition)
    if not parquet_path.exists():
        write_parquet_table(table, definition)

if not any(TAXI_DIR.glob('*.csv')):
    rows = [
        {
            'tpep_pickup_datetime': datetime(2021, 1, 1, 8, 5) + timedelta(minutes=i * 7),
            'tpep_dropoff_datetime': datetime(2021, 1, 1, 8, 15) + timedelta(minutes=i * 7),
            'passenger_count': 1 + (i % 3),
            'trip_distance': 2.5 + i * 0.2,
            'dropoff_longitude': -74.0135 + (i % 2) * 0.004,
            'dropoff_latitude': 40.7135 + (i % 3) * 0.003,
        }
        for i in range(12)
    ]
    header = 'tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,dropoff_longitude,dropoff_latitude\n'
    for batch_idx in range(3):
        file_path = TAXI_DIR / f'part-2021-01-01-{batch_idx:04d}.csv'
        with file_path.open('w', encoding='utf-8') as handle:
            handle.write(header)
            for row in rows[batch_idx * 4:(batch_idx + 1) * 4]:
                handle.write(
                    f"{row['tpep_pickup_datetime']:%Y-%m-%d %H:%M:%S},{row['tpep_dropoff_datetime']:%Y-%m-%d %H:%M:%S},{row['passenger_count']},{row['trip_distance']:.2f},{row['dropoff_longitude']:.6f},{row['dropoff_latitude']:.6f}\n"
                )

print(f'TPC-H text tables: {sorted(p.name for p in TPC_H_TEXT_DIR.glob("*.tbl"))}')
print(f'TPC-H parquet tables: {sorted(p.name for p in TPC_H_PARQUET_DIR.iterdir())}')
print(f'Taxi samples: {sorted(p.name for p in TAXI_DIR.glob("*.csv"))}')


[Stage 0:>                                                          (0 + 8) / 8]



                                                                                

TPC-H text tables: ['customer.tbl', 'lineitem.tbl', 'nation.tbl', 'orders.tbl', 'part.tbl', 'supplier.tbl']
TPC-H parquet tables: ['customer', 'lineitem', 'nation', 'orders', 'part', 'supplier']
Taxi samples: ['part-2021-01-01-0000.csv', 'part-2021-01-01-0001.csv', 'part-2021-01-01-0002.csv']


## 2. Helpers

In [3]:
from typing import Dict, Iterable, List, Tuple

from pyspark.sql import Row, DataFrame, functions as F, types as T

TPCH_TABLE_SCHEMAS = {
    'nation': T.StructType([
        T.StructField('n_nationkey', T.IntegerType(), False),
        T.StructField('n_name', T.StringType(), False),
        T.StructField('n_regionkey', T.IntegerType(), True),
        T.StructField('n_comment', T.StringType(), True),
    ]),
    'customer': T.StructType([
        T.StructField('c_custkey', T.IntegerType(), False),
        T.StructField('c_name', T.StringType(), True),
        T.StructField('c_nationkey', T.IntegerType(), False),
        T.StructField('c_comment', T.StringType(), True),
    ]),
    'orders': T.StructType([
        T.StructField('o_orderkey', T.IntegerType(), False),
        T.StructField('o_custkey', T.IntegerType(), False),
        T.StructField('o_orderstatus', T.StringType(), True),
        T.StructField('o_totalprice', T.DoubleType(), True),
        T.StructField('o_orderdate', T.StringType(), True),
        T.StructField('o_clerk', T.StringType(), True),
        T.StructField('o_shippriority', T.IntegerType(), True),
        T.StructField('o_comment', T.StringType(), True),
    ]),
    'part': T.StructType([
        T.StructField('p_partkey', T.IntegerType(), False),
        T.StructField('p_name', T.StringType(), False),
        T.StructField('p_mfgr', T.StringType(), True),
        T.StructField('p_brand', T.StringType(), True),
        T.StructField('p_type', T.StringType(), True),
    ]),
    'supplier': T.StructType([
        T.StructField('s_suppkey', T.IntegerType(), False),
        T.StructField('s_name', T.StringType(), False),
        T.StructField('s_nationkey', T.IntegerType(), False),
        T.StructField('s_comment', T.StringType(), True),
    ]),
    'lineitem': T.StructType([
        T.StructField('l_orderkey', T.IntegerType(), False),
        T.StructField('l_partkey', T.IntegerType(), False),
        T.StructField('l_suppkey', T.IntegerType(), False),
        T.StructField('l_linenumber', T.IntegerType(), False),
        T.StructField('l_quantity', T.DoubleType(), True),
        T.StructField('l_extendedprice', T.DoubleType(), True),
        T.StructField('l_discount', T.DoubleType(), True),
        T.StructField('l_tax', T.DoubleType(), True),
        T.StructField('l_returnflag', T.StringType(), True),
        T.StructField('l_linestatus', T.StringType(), True),
        T.StructField('l_shipdate', T.StringType(), False),
        T.StructField('l_commitdate', T.StringType(), True),
        T.StructField('l_receiptdate', T.StringType(), True),
        T.StructField('l_shipinstruct', T.StringType(), True),
        T.StructField('l_shipmode', T.StringType(), True),
        T.StructField('l_comment', T.StringType(), True),
    ]),
}

TPCH_TYPE_PARSERS = {
    T.IntegerType(): int,
    T.DoubleType(): float,
    T.StringType(): str,
}

BOUNDING_BOXES = {
    'goldman': {'lon_min': -74.0145, 'lon_max': -74.0115, 'lat_min': 40.7125, 'lat_max': 40.7155},
    'citigroup': {'lon_min': -74.0095, 'lon_max': -74.0055, 'lat_min': 40.7190, 'lat_max': 40.7225},
}

TAXI_SCHEMA = T.StructType([
    T.StructField('tpep_pickup_datetime', T.TimestampType(), True),
    T.StructField('tpep_dropoff_datetime', T.TimestampType(), True),
    T.StructField('passenger_count', T.IntegerType(), True),
    T.StructField('trip_distance', T.DoubleType(), True),
    T.StructField('dropoff_longitude', T.DoubleType(), True),
    T.StructField('dropoff_latitude', T.DoubleType(), True),
])

def _parse_pipe_line(table: str, line: str) -> Row:
    if not line.strip():
        return None
    parts = line.split('|')
    schema = TPCH_TABLE_SCHEMAS[table]
    values = {}
    for idx, field in enumerate(schema):
        if idx >= len(parts):
            values[field.name] = None
            continue
        raw_value = parts[idx]
        if raw_value == '':
            values[field.name] = None
            continue
        caster = TPCH_TYPE_PARSERS.get(type(field.dataType), str)
        values[field.name] = caster(raw_value)
    return Row(**values)

def load_tpch_text(table: str):
    path = TPC_H_TEXT_DIR / f'{table}.tbl'
    if not path.exists():
        raise FileNotFoundError(f'Missing text table: {path}')
    rdd = spark.sparkContext.textFile(str(path))
    return rdd.map(lambda line: _parse_pipe_line(table, line)).filter(lambda row: row is not None)

def load_tpch_parquet(table: str) -> DataFrame:
    path = TPC_H_PARQUET_DIR / table
    if not path.exists():
        raise FileNotFoundError(f'Missing parquet table: {path}')
    return spark.read.schema(TPCH_TABLE_SCHEMAS[table]).parquet(str(path))

def ensure_dir(path: Path) -> Path:
    path.parent.mkdir(parents=True, exist_ok=True)
    return path

def write_csv(df: DataFrame, path: Path, header: bool = True) -> None:
    ensure_dir(path)
    df.coalesce(1).write.mode('overwrite').option('header', str(header).lower()).csv(str(path))

def within_bbox(lon: float, lat: float, bbox: Dict[str, float]) -> bool:
    if lon is None or lat is None:
        return False
    return bbox['lon_min'] <= lon <= bbox['lon_max'] and bbox['lat_min'] <= lat <= bbox['lat_max']

## Part A — Relational (RDD-first; DF allowed for contrast)

### A1. Q1 — shipped items on date

In [4]:

from pyspark.sql import functions as F

TARGET_DATE = '1995-03-15'

q1_results = []
for fmt in ('text', 'parquet'):
    if fmt == 'text':
        lineitem_rdd = load_tpch_text('lineitem')
        count = lineitem_rdd.filter(lambda row: row.l_shipdate == TARGET_DATE).count()
    else:
        lineitem_df = load_tpch_parquet('lineitem')
        count = lineitem_df.filter(F.col('l_shipdate') == TARGET_DATE).count()
    q1_results.append((fmt, TARGET_DATE, count))

output_path = OUTPUT_ROOT / 'q1.txt'
with ensure_dir(output_path).open('w', encoding='utf-8') as handle:
    for fmt, ship_date, count in q1_results:
        handle.write(f"{fmt}	{ship_date}	{count}\n")

print('Q1 shipped-item counts:')
for fmt, ship_date, count in q1_results:
    print(f"  {fmt}: {count} items shipped on {ship_date}")


Q1 shipped-item counts:
  text: 3 items shipped on 1995-03-15
  parquet: 3 items shipped on 1995-03-15


### A2. Q2 — clerks by order key (reduce-side join)

In [5]:

from operator import add

TARGET_DATE = '1995-03-15'

q2_outputs = {}

orders_rdd = load_tpch_text('orders').map(lambda row: (row.o_orderkey, row.o_clerk))
lineitem_rdd = load_tpch_text('lineitem').filter(lambda row: row.l_shipdate == TARGET_DATE).map(lambda row: (row.l_orderkey, 1))
joined_rdd = orders_rdd.cogroup(lineitem_rdd).filter(lambda kv: len(kv[1][1]) > 0)
q2_text = (
    joined_rdd
    .flatMap(lambda kv: [(kv[0], clerk) for clerk in kv[1][0]])
    .sortBy(lambda kv: kv[0])
    .take(20)
)
q2_outputs['text'] = q2_text

orders_df = load_tpch_parquet('orders').select('o_orderkey', 'o_clerk')
lineitem_df = load_tpch_parquet('lineitem').filter(F.col('l_shipdate') == TARGET_DATE).select('l_orderkey')
joined_df = orders_df.join(lineitem_df, orders_df.o_orderkey == lineitem_df.l_orderkey, 'inner')
q2_parquet = [
    (int(row.o_orderkey), row.o_clerk)
    for row in joined_df.orderBy('o_orderkey').select('o_orderkey', 'o_clerk').limit(20).collect()
]
q2_outputs['parquet'] = q2_parquet

output_path = OUTPUT_ROOT / 'q2.txt'
with ensure_dir(output_path).open('w', encoding='utf-8') as handle:
    for fmt, rows in q2_outputs.items():
        handle.write(f"[{fmt}]\n")
        for orderkey, clerk in rows:
            handle.write(f"{orderkey}	{clerk}\n")

print(f"Q2 reduce-side join results saved to {output_path}")


Q2 reduce-side join results saved to /home/btj/bda-website/BDA/labs-final/lab4-practice/outputs/q2.txt


### A3. Q3 — part & supplier (broadcast)

In [6]:

from pyspark.sql import functions as F

TARGET_DATE = '1995-03-15'

part_map = {row.p_partkey: row.p_name for row in load_tpch_text('part').collect()}
supplier_map = {row.s_suppkey: row.s_name for row in load_tpch_text('supplier').collect()}
part_bc = spark.sparkContext.broadcast(part_map)
supp_bc = spark.sparkContext.broadcast(supplier_map)

lineitems_rdd = load_tpch_text('lineitem').filter(lambda row: row.l_shipdate == TARGET_DATE)
q3_text = (
    lineitems_rdd
    .map(lambda row: (row.l_orderkey, part_bc.value.get(row.l_partkey), supp_bc.value.get(row.l_suppkey)))
    .filter(lambda tpl: tpl[1] is not None and tpl[2] is not None)
    .sortBy(lambda tpl: (tpl[0], tpl[1], tpl[2]))
    .take(20)
)

part_df = load_tpch_parquet('part').select('p_partkey', 'p_name')
supplier_df = load_tpch_parquet('supplier').select('s_suppkey', 's_name')
line_df = load_tpch_parquet('lineitem').filter(F.col('l_shipdate') == TARGET_DATE)
q3_parquet_df = (
    line_df
    .join(F.broadcast(part_df), line_df.l_partkey == part_df.p_partkey, 'inner')
    .join(F.broadcast(supplier_df), line_df.l_suppkey == supplier_df.s_suppkey, 'inner')
    .select('l_orderkey', 'p_name', 's_name')
    .orderBy('l_orderkey', 'p_name')
    .limit(20)
)
q3_parquet = [(int(row.l_orderkey), row.p_name, row.s_name) for row in q3_parquet_df.collect()]

output_path = OUTPUT_ROOT / 'q3.txt'
with ensure_dir(output_path).open('w', encoding='utf-8') as handle:
    handle.write('[text]\n')
    for orderkey, part_name, supp_name in q3_text:
        handle.write(f"{orderkey}	{part_name}	{supp_name}\n")
    handle.write('[parquet]\n')
    for orderkey, part_name, supp_name in q3_parquet:
        handle.write(f"{orderkey}	{part_name}	{supp_name}\n")

print(f"Q3 broadcast join sample written to {output_path}")


Q3 broadcast join sample written to /home/btj/bda-website/BDA/labs-final/lab4-practice/outputs/q3.txt


### A4. Q4 — shipped to each nation (mixed joins)

In [7]:
from operator import add

TARGET_DATE = '1995-03-15'

customer_map = {row.c_custkey: row.c_nationkey for row in load_tpch_text('customer').collect()}
nation_map = {row.n_nationkey: row.n_name for row in load_tpch_text('nation').collect()}
customer_bc = spark.sparkContext.broadcast(customer_map)
nation_bc = spark.sparkContext.broadcast(nation_map)

lineitem_filtered = load_tpch_text('lineitem').filter(lambda row: row.l_shipdate == TARGET_DATE)
orders_rdd = load_tpch_text('orders').map(lambda row: (row.o_orderkey, row.o_custkey))

line_by_order = lineitem_filtered.map(lambda row: (row.l_orderkey, 1))
joined = line_by_order.join(orders_rdd)

nation_counts = (
    joined
    .map(lambda kv: (customer_bc.value.get(kv[1][1]), kv[1][0]))
    .filter(lambda tpl: tpl[0] is not None)
    .map(lambda tpl: ((tpl[0], nation_bc.value.get(tpl[0])), tpl[1]))
    .filter(lambda tpl: tpl[0][1] is not None)
    .reduceByKey(add)
    .map(lambda tpl: (tpl[0][0], tpl[0][1], tpl[1]))
    .sortBy(lambda tpl: (-tpl[2], tpl[1]))
    .collect()
)

nations_df = spark.createDataFrame(nation_counts, schema=['nationkey', 'nation', 'count'])
output_path = OUTPUT_ROOT / 'q4'
write_csv(nations_df, output_path)

print(f'Q4 shipments per nation written to {output_path}')

Q4 shipments per nation written to /home/btj/bda-website/BDA/labs-final/lab4-practice/outputs/q4


### A5. Q5 — monthly US vs Canada volumes

In [8]:
monthly_df = (
    load_tpch_parquet('lineitem')
    .join(load_tpch_parquet('orders'), F.col('l_orderkey') == F.col('o_orderkey'), 'inner')
    .join(load_tpch_parquet('customer'), F.col('o_custkey') == F.col('c_custkey'), 'inner')
    .join(load_tpch_parquet('nation'), F.col('c_nationkey') == F.col('n_nationkey'), 'inner')
)

monthly_df = monthly_df.withColumn('ship_month', F.date_format(F.to_date('l_shipdate'), 'yyyy-MM'))
result_df = (
    monthly_df
    .filter(F.col('n_name').isin('UNITED STATES', 'CANADA'))
    .groupBy('n_nationkey', 'n_name', 'ship_month')
    .agg(F.count('*').alias('shipment_count'))
    .orderBy('n_name', 'ship_month')
)

output_path = OUTPUT_ROOT / 'q5'
write_csv(result_df, output_path)
print(f'Q5 monthly volume metrics written to {output_path}')
result_df.show(truncate=False)

Q5 monthly volume metrics written to /home/btj/bda-website/BDA/labs-final/lab4-practice/outputs/q5


+-----------+-------------+----------+--------------+
|n_nationkey|n_name       |ship_month|shipment_count|
+-----------+-------------+----------+--------------+
|2          |CANADA       |1995-03   |1             |
|2          |CANADA       |1995-05   |1             |
|1          |UNITED STATES|1995-03   |2             |
|1          |UNITED STATES|1995-04   |1             |
+-----------+-------------+----------+--------------+



## Part B — Streaming (Structured Streaming)

### B1. HourlyTripCount

In [9]:

from pyspark.sql import functions as F
from pyspark.sql import DataFrame

hourly_checkpoint = CHECKPOINT_ROOT / 'hourly_trip_count'
hourly_output = OUTPUT_ROOT / 'hourly_trip_count'
shutil.rmtree(hourly_checkpoint, ignore_errors=True)
shutil.rmtree(hourly_output, ignore_errors=True)
hourly_checkpoint.mkdir(parents=True, exist_ok=True)
hourly_output.mkdir(parents=True, exist_ok=True)

hourly_stream = (
    spark.readStream
    .schema(TAXI_SCHEMA)
    .option('header', True)
    .csv(str(TAXI_DIR))
)

hourly_agg = (
    hourly_stream
    .withColumn('pickup_hour', F.date_trunc('hour', F.col('tpep_pickup_datetime')))
    .groupBy('pickup_hour')
    .agg(F.count('*').alias('trip_count'))
)

def write_hourly(batch_df: DataFrame, epoch_id: int) -> None:
    batch_df.orderBy('pickup_hour').write.mode('overwrite').parquet(str(hourly_output))

hourly_query = (
    hourly_agg.writeStream
    .outputMode('update')
    .trigger(once=True)
    .option('checkpointLocation', str(hourly_checkpoint))
    .foreachBatch(write_hourly)
    .start()
)

hourly_query.awaitTermination()
print(f'Hourly trip counts stored under {hourly_output}')


25/10/14 22:25:53 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Hourly trip counts stored under /home/btj/bda-website/BDA/labs-final/lab4-practice/outputs/hourly_trip_count


### B2. RegionTripCount (goldman, citigroup)

In [10]:

from pyspark.sql import functions as F
from pyspark.sql import DataFrame

region_checkpoint = CHECKPOINT_ROOT / 'region_trip_count'
region_output = OUTPUT_ROOT / 'region_trip_count'
shutil.rmtree(region_checkpoint, ignore_errors=True)
shutil.rmtree(region_output, ignore_errors=True)
region_checkpoint.mkdir(parents=True, exist_ok=True)
region_output.mkdir(parents=True, exist_ok=True)

region_stream = (
    spark.readStream
    .schema(TAXI_SCHEMA)
    .option('header', True)
    .csv(str(TAXI_DIR))
)

bbox_goldman = BOUNDING_BOXES['goldman']
bbox_citigroup = BOUNDING_BOXES['citigroup']

region_enriched = region_stream.withColumn(
    'region',
    F.when(
        (F.col('dropoff_longitude') >= bbox_goldman['lon_min']) &
        (F.col('dropoff_longitude') <= bbox_goldman['lon_max']) &
        (F.col('dropoff_latitude') >= bbox_goldman['lat_min']) &
        (F.col('dropoff_latitude') <= bbox_goldman['lat_max']),
        F.lit('goldman')
    ).when(
        (F.col('dropoff_longitude') >= bbox_citigroup['lon_min']) &
        (F.col('dropoff_longitude') <= bbox_citigroup['lon_max']) &
        (F.col('dropoff_latitude') >= bbox_citigroup['lat_min']) &
        (F.col('dropoff_latitude') <= bbox_citigroup['lat_max']),
        F.lit('citigroup')
    )
)

region_counts = (
    region_enriched
    .filter(F.col('region').isNotNull())
    .withColumn('dropoff_hour', F.date_trunc('hour', F.col('tpep_dropoff_datetime')))
    .groupBy('region', 'dropoff_hour')
    .agg(F.count('*').alias('trip_count'))
)

def write_region(batch_df: DataFrame, epoch_id: int) -> None:
    batch_df.orderBy('dropoff_hour', 'region').write.mode('overwrite').parquet(str(region_output))

region_query = (
    region_counts.writeStream
    .outputMode('update')
    .trigger(once=True)
    .option('checkpointLocation', str(region_checkpoint))
    .foreachBatch(write_region)
    .start()
)

region_query.awaitTermination()
print(f'Region trip counts stored under {region_output}')


25/10/14 22:25:56 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Region trip counts stored under /home/btj/bda-website/BDA/labs-final/lab4-practice/outputs/region_trip_count


### B3. TrendingArrivals (10-minute windows + state)

In [11]:
from pyspark.sql import SparkSession, functions as F, types as T, DataFrame


def build_spark(app_name: str) -> SparkSession:
    return (
        SparkSession.builder
        .appName(app_name)
        .config('spark.sql.shuffle.partitions', '8')
        .getOrCreate()
    )


def run(input_dir: str, checkpoint: str, output: str) -> None:
    spark = build_spark('BDA-Streaming')
    schema = T.StructType([
        T.StructField('pickup_ts', T.TimestampType(), True),
        T.StructField('dropoff_ts', T.TimestampType(), True),
        T.StructField('lon', T.DoubleType(), True),
        T.StructField('lat', T.DoubleType(), True),
        T.StructField('vendor', T.StringType(), True),
    ])

    sdf = (
        spark.readStream
        .schema(schema)
        .option('header', True)
        .csv(input_dir)
    )

    hourly = (
        sdf
        .withWatermark('dropoff_ts', '5 minutes')
        .groupBy(F.window('dropoff_ts', '1 hour'))
        .agg(F.count('*').alias('trip_count'))
    )

    def _write_batch(batch_df: DataFrame, epoch_id: int) -> None:
        batch_df.orderBy('window').write.mode('overwrite').parquet(output)

    query = (
        hourly.writeStream
        .outputMode('update')
        .option('checkpointLocation', checkpoint)
        .trigger(once=True)
        .foreachBatch(_write_batch)
        .start()
    )

    query.awaitTermination()


## Evidence & Reproducibility

In [12]:
import os
import subprocess

java_output = subprocess.check_output(['java', '-version'], stderr=subprocess.STDOUT).decode('utf-8').splitlines()[0]
conf_items = sorted(spark.sparkContext.getConf().getAll())

env_lines = [
    '# Environment Summary',
    '',
    f'- Python: {sys.version.split()[0]}',
    f'- Spark: {spark.version}',
    f'- PySpark: {pyspark.__version__}',
    f'- Java: {java_output}',
    f'- OS: {platform.platform()}',
    '',
    '## Spark Configuration',
]

env_lines.extend(f'- {key} = {value}' for key, value in conf_items)

env_path = BASE_DIR / 'ENV.md'
env_path.write_text(os.linesep.join(env_lines) + os.linesep)
print(f'Environment details saved to {env_path}')

if 'q1_results' in globals():
    print()
    print('Q1 counts (format, date, count):')
    for record in q1_results:
        print('  ', record)

if 'q2_outputs' in globals():
    print()
    print('Q2 sample rows (text vs parquet):')
    for fmt, records in q2_outputs.items():
        print(f'[{fmt}] -> {records}')

plan_path = PROOF_ROOT / 'plan_ppr.txt'
if plan_path.exists():
    print(f'Plan evidence available at {plan_path}')

print()
print('Generated output artifacts:')
for path_entry in sorted(OUTPUT_ROOT.glob('**/*')):
    if path_entry.is_file():
        print(f'  {path_entry.relative_to(BASE_DIR)}')


Environment details saved to /home/btj/bda-website/BDA/labs-final/lab4-practice/ENV.md

Q1 counts (format, date, count):
   ('text', '1995-03-15', 3)
   ('parquet', '1995-03-15', 3)

Q2 sample rows (text vs parquet):
[text] -> [('1', 'Clerk#000000001'), ('2', 'Clerk#000000002')]
[parquet] -> [(1, 'Clerk#000000001'), (1, 'Clerk#000000001'), (2, 'Clerk#000000002')]

Generated output artifacts:
  outputs/hourly_trip_count/._SUCCESS.crc
  outputs/hourly_trip_count/.part-00000-c57884a5-35e0-479f-8064-995eda95d465-c000.snappy.parquet.crc
  outputs/hourly_trip_count/.part-00001-c57884a5-35e0-479f-8064-995eda95d465-c000.snappy.parquet.crc
  outputs/hourly_trip_count/_SUCCESS
  outputs/hourly_trip_count/part-00000-c57884a5-35e0-479f-8064-995eda95d465-c000.snappy.parquet
  outputs/hourly_trip_count/part-00001-c57884a5-35e0-479f-8064-995eda95d465-c000.snappy.parquet
  outputs/q1.txt
  outputs/q2.txt
  outputs/q3.txt
  outputs/q4/._SUCCESS.crc
  outputs/q4/.part-00000-a21c0c0b-ed73-4429-a2ef-304a8