<a href="https://colab.research.google.com/github/gabrielkmbo/nanoquery/blob/main/David_Stutz_Gabriel_Bo_CS_145_Project_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Collaborators

1.   David Stutz
2.   Gabriel Bo


# Setup

In [None]:
import pandas as pd
import os
import uuid
import argparse
import time
import psutil
import heapq
import pyarrow as pa
import pyarrow.parquet as pq
import random
import string
import numpy as np
from typing import List, Optional
import shutil

# Section 0: Generate Test Data

This section has already been implemented for you.

In [None]:
import gc


def generate_songs_chunk(start, size, string_length=100):
    data = {
        "song_id": range(start, start + size),
        "title": [f"Song_{i}" for i in range(start, start + size)],
    }
    base_strings = generate_base_strings(size, string_length)
    for i in range(1, 11):
        data[f"extra_col_{i}"] = np.roll(base_strings, shift=i)
    return pd.DataFrame(data)


def generate_users_chunk(start, size, string_length=100):
    data = {
        "user_id": range(start, start + size),
        "age": [18 + ((start + i) % 60) for i in range(size)],
    }
    base_strings = generate_base_strings(size, string_length)
    for i in range(1, 11):
        data[f"extra_col_{i}"] = np.roll(base_strings, shift=i)
    return pd.DataFrame(data)


def generate_listens_chunk(start, size, num_users, num_songs, string_length=16):
    data = {
        "listen_id": range(start, start + size),
        "user_id": np.random.randint(0, num_users, size=size),
        "song_id": np.random.randint(0, num_songs, size=size),
    }
    base_strings = generate_base_strings(size, string_length)
    for i in range(1, 11):
        data[f"extra_col_{i}"] = np.roll(base_strings, shift=i)
    return pd.DataFrame(data)


def generate_base_strings(num_records, string_length):
    chars = np.array(list("ab"))
    random_indices = np.random.randint(0, len(chars), size=(num_records, string_length))
    char_array = chars[random_indices]
    return np.array(list(map("".join, char_array)))


def _write_parquet_streamed(
    filename,
    total_rows,
    make_chunk_fn,
    chunk_size=250_000,
    compression="snappy",
):
    """
    Stream DataFrame chunks to a single Parquet file with one ParquetWriter.
    - schema_df: optional small DataFrame to lock schema; if None we'll infer from the first chunk.
    """
    written = 0

    first_chunk = make_chunk_fn(0, min(chunk_size, total_rows))
    first_table = pa.Table.from_pandas(first_chunk, preserve_index=False)
    writer = pq.ParquetWriter(filename, first_table.schema, compression=compression)
    writer.write_table(first_table)

    written += len(first_chunk)
    del first_chunk
    gc.collect()

    while written < total_rows:
        take = min(chunk_size, total_rows - written)
        chunk_df = make_chunk_fn(written, take)
        writer.write_table(pa.Table.from_pandas(chunk_df, preserve_index=False))
        written += take
        del chunk_df
        gc.collect()

    writer.close()


def generate_test_data(target_size="100MB"):
    """
    Generate datasets with proper foreign key relationships.

    Target COMPRESSED Parquet file sizes on disk:
    100MB total compressed:
        - Songs: 10K rows → ~5MB (5% of total)
        - Users: 50K rows → ~20MB (20% of total)
        - Listens: 1M rows → ~75MB (75% of total)
    1GB total compressed:
        - Songs: 100K rows → ~50MB (5% of total)
        - Users: 500K rows → ~200MB (20% of total)
        - Listens: 10M rows → ~750MB (75% of total)

    Each table needs:
        - Primary key column(s)
        - 10 additional string columns of k characters each
        - For Users: add 'age' column (random 18-80)

    CRITICAL: Listens table must have valid foreign keys!
    Every song_id must exist in Songs
    Every user_id must exist in Users
    """

    assert target_size in ["100MB", "1GB"]
    if target_size == "100MB":
        num_songs = 10_000
        num_users = 50_000
        num_listens = 1_000_000

        songs_chunk = 10_000
        users_chunk = 50_000
        listens_chunk = 1_000_000
    else:
        num_songs = 100_000
        num_users = 500_000
        num_listens = 10_000_000

        songs_chunk = 10_000
        users_chunk = 50_000
        listens_chunk = 1_000_000

    print("Writing Songs")
    _write_parquet_streamed(
        filename=f"songs_{target_size}.parquet",
        total_rows=num_songs,
        make_chunk_fn=lambda start, size: generate_songs_chunk(start, size),
        chunk_size=songs_chunk,
    )

    print("Writing Users")
    _write_parquet_streamed(
        filename=f"users_{target_size}.parquet",
        total_rows=num_users,
        make_chunk_fn=lambda start, size: generate_users_chunk(start, size),
        chunk_size=users_chunk,
    )

    print("Writing Listens")
    _write_parquet_streamed(
        filename=f"listens_{target_size}.parquet",
        total_rows=num_listens,
        make_chunk_fn=lambda start, size: generate_listens_chunk(
            start, size, num_users, num_songs
        ),
        chunk_size=listens_chunk,
    )

    print("Done!")

In [None]:
random.seed(0)

generate_test_data('100MB')
generate_test_data('1GB')

Writing Songs
Writing Users
Writing Listens
Done!
Writing Songs
Writing Users
Writing Listens
Done!


# Section 1: Parquet-based Columnar Storage

Implement Parquet-based storage for the tables
- For simplicity, store all data for a table in a single Parquet file and use a single DataFrame object as a buffer

In [None]:
# ---- Section 1: Columnar storage wrapper ----
import os, time, shutil
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np

class ColumnarDbFile:
    def __init__(self, table_name, file_dir='data', file_pfx=''):
        self.file_pfx = file_pfx
        self.table_name = table_name
        self.file_dir = file_dir
        os.makedirs(self.file_dir, exist_ok=True)
        # match your original pattern (no .parquet suffix)
        self.base_file_name = f"{self.file_dir}/{self.file_pfx}_{self.table_name}"

    def build_table(self, data: pd.DataFrame):
        """Build and save table data to Parquet."""
        data.to_parquet(self.base_file_name, engine='pyarrow', compression='snappy', index=False)

    def retrieve_data(self, columns=None) -> pd.DataFrame:
        """Retrieve data from Parquet, optionally with column pruning."""
        return pd.read_parquet(self.base_file_name, columns=columns)

    def append_data(self, data: pd.DataFrame):
        """Naive read-modify-write append (sufficient for this project)."""
        if os.path.exists(self.base_file_name):
            existing = pd.read_parquet(self.base_file_name)
            combined = pd.concat([existing, data], ignore_index=True)
            combined.to_parquet(self.base_file_name, engine='pyarrow', compression='snappy', index=False)
        else:
            self.build_table(data)

In [None]:
print("Building tables...")
if os.path.exists('data'):
    shutil.rmtree('data')
tables = {
    'Songs': ColumnarDbFile("Songs", file_dir='data'),
    'Users': ColumnarDbFile("Users", file_dir='data'),
    'Listens': ColumnarDbFile("Listens", file_dir='data')
}

size = "100MB"
songs_data = pd.read_parquet(f'songs_{size}.parquet')
users_data = pd.read_parquet(f'users_{size}.parquet')
listens_data = pd.read_parquet(f'listens_{size}.parquet')

tables['Songs'].build_table(songs_data)
tables['Users'].build_table(users_data)
tables['Listens'].build_table(listens_data)
print("Tables built successfully.")

Building tables...
Tables built successfully.


In [None]:
# retrieve data
tables['Songs'].retrieve_data(columns = ['song_id', 'title'])

Unnamed: 0,song_id,title
0,0,Song_0
1,1,Song_1
2,2,Song_2
3,3,Song_3
4,4,Song_4
...,...,...
9995,9995,Song_9995
9996,9996,Song_9996
9997,9997,Song_9997
9998,9998,Song_9998


In [None]:
tables['Listens'].retrieve_data(columns = ['listen_id', 'user_id', 'song_id'])

Unnamed: 0,listen_id,user_id,song_id
0,0,49450,1236
1,1,8633,1469
2,2,26212,9056
3,3,19056,5492
4,4,47188,5025
...,...,...,...
999995,999995,3482,6129
999996,999996,31303,2233
999997,999997,37658,6696
999998,999998,4447,8871


In [None]:
# --- CSV exports (so analyzer can compare Parquet vs CSV). Uses ONLY files from Section 0. ---
def ensure_csv_exports_for_100mb():
    mapping = {
        "songs_100MB.parquet":   "songs_100MB.csv",
        "users_100MB.parquet":   "users_100MB.csv",
        "listens_100MB.parquet": "listens_100MB.csv",
    }
    for pq_path, csv_path in mapping.items():
        if not os.path.exists(csv_path):
            print(f"Writing CSV for analysis: {csv_path}")
            df = pd.read_parquet(pq_path)
            df.to_csv(csv_path, index=False)

ensure_csv_exports_for_100mb()

Analyze and report on:
- Space efficiency compared to row storage
  - e.g. Compare file sizes on disk: How much disk space does Parquet use vs. a row storage format like CSV?
- Compression ratios achieved with Parquet
  - e.g. Compare Parquet’s uncompressed encoded size (reported in its metadata) to its compressed on-disk size to compute compression ratios.
  - You could also report the memory expansion factor: how much larger the dataset becomes when loaded into a `pd.DataFrame` compared to the compressed file size.
- Read/write performance characteristics
  - e.g. Read performance: How long does it take to read all columns from Parquet vs. CSV?
  - e.g. Columnar advantage: How long does it take to read selective columns from Parquet vs. reading all columns?
  - e.g. Write performance: How long does it take to write data to Parquet vs. CSV?

In [None]:
# ---- Analyzer: space, compression, read/write perf, memory expansion (fixed) ----
import time
import psutil
import pyarrow.parquet as pq

def _parquet_uncompressed_bytes(pq_file: pq.ParquetFile) -> int:
    """
    Sum row-group total_byte_size as a proxy for uncompressed logical bytes.
    """
    meta = pq_file.metadata
    total = 0
    for i in range(meta.num_row_groups):
        total += meta.row_group(i).total_byte_size
    return total

def _timeit(fn, *args, **kwargs):
    t0 = time.time()
    out = fn(*args, **kwargs)
    return out, time.time() - t0

def analyze():
    results = {}
    datasets = {
        "Songs":   ("songs_100MB.parquet",   "songs_100MB.csv"),
        "Users":   ("users_100MB.parquet",   "users_100MB.csv"),
        "Listens": ("listens_100MB.parquet", "listens_100MB.csv"),
    }

    for name, (parquet_file, csv_file) in datasets.items():
        print(f"\n--- Analyzing {name} ---")

        # ---------- Space Efficiency ----------
        parquet_size = os.path.getsize(parquet_file)
        csv_size     = os.path.getsize(csv_file)

        # ---------- Parquet Compression Ratio ----------
        pf = pq.ParquetFile(parquet_file)
        uncompressed_total = _parquet_uncompressed_bytes(pf)
        compression_ratio  = (uncompressed_total / parquet_size) if parquet_size > 0 else float("inf")

        # ---------- Read Performance ----------
        df_csv, csv_full_read_time = _timeit(pd.read_csv, csv_file)
        df_pq,  pq_full_read_time  = _timeit(pd.read_parquet, parquet_file)
        some_cols = list(df_pq.columns[:3])
        _, pq_selective_time = _timeit(pd.read_parquet, parquet_file, columns=some_cols)

        # ---------- Write Performance (fixed: keyword args) ----------
        tmp_csv  = f"tmp_{name}.csv"
        tmp_parq = f"tmp_{name}.parquet"
        _, csv_write_time = _timeit(df_pq.to_csv, tmp_csv, index=False)
        _, pq_write_time  = _timeit(df_pq.to_parquet, tmp_parq, engine="pyarrow", compression="snappy", index=False)

        # cleanup temps
        for p in (tmp_csv, tmp_parq):
            try:
                os.remove(p)
            except FileNotFoundError:
                pass

        # ---------- Memory Expansion (Parquet on disk → pandas RAM) ----------
        pandas_mem = int(df_pq.memory_usage(deep=True).sum())
        mem_expand = pandas_mem / max(parquet_size, 1)

        results[name] = {
            "File sizes (bytes)": {
                "parquet_on_disk": parquet_size,
                "csv_on_disk": csv_size
            },
            "Space efficiency": {
                "parquet_vs_csv_ratio": csv_size / max(parquet_size, 1)
            },
            "Parquet compression": {
                "uncompressed_total": uncompressed_total,
                "compressed_total": parquet_size,
                "compression_ratio": compression_ratio
            },
            "Read performance (seconds)": {
                "csv_full_read": csv_full_read_time,
                "parquet_full_read": pq_full_read_time,
                "parquet_selective_columns_read": pq_selective_time
            },
            "Write performance (seconds)": {
                "csv_write": csv_write_time,
                "parquet_write": pq_write_time
            },
            "Memory expansion": {
                "pandas_memory_bytes": pandas_mem,
                "memory_expansion_factor_vs_parquet": mem_expand
            }
        }

    print("\n==== ANALYSIS COMPLETE ====")
    return results

analysis_results = analyze()


--- Analyzing Songs ---

--- Analyzing Users ---

--- Analyzing Listens ---

==== ANALYSIS COMPLETE ====


In [None]:
# ---------- Pretty, labeled summaries with units & speedups ----------

import pandas as pd

def _mb(x):  # bytes -> MB
    return round(x / (1024 * 1024), 3)

def pretty_summaries(results: dict):
    rows = []
    for tbl, sects in results.items():
        sz = sects["File sizes (bytes)"]
        cmp = sects["Parquet compression"]
        rd = sects["Read performance (seconds)"]
        wr = sects["Write performance (seconds)"]
        mem = sects["Memory expansion"]

        rows.append({
            "Table": tbl,

            # sizes
            "CSV on disk (MB)": _mb(sz["csv_on_disk"]),
            "Parquet on disk (MB)": _mb(sz["parquet_on_disk"]),
            "CSV/Parquet size (×)": round(sects["Space efficiency"]["parquet_vs_csv_ratio"], 3),

            # compression (uncompressed ≈ logical data size from metadata)
            "Uncompressed logical (MB)": _mb(cmp["uncompressed_total"]),
            "Parquet compressed (MB)": _mb(cmp["compressed_total"]),
            "Compression ratio (uncompressed/parquet ×)": round(cmp["compression_ratio"], 3),

            # read perf
            "CSV full read (s)": round(rd["csv_full_read"], 3),
            "Parquet full read (s)": round(rd["parquet_full_read"], 3),
            "Parquet selective (s)": round(rd["parquet_selective_columns_read"], 3),
            "Speedup CSV→Parquet full (×)": round(rd["csv_full_read"] / rd["parquet_full_read"], 2) if rd["parquet_full_read"] > 0 else None,
            "Speedup Parquet full→selective (×)": round(rd["parquet_full_read"] / rd["parquet_selective_columns_read"], 2) if rd["parquet_selective_columns_read"] > 0 else None,

            # write perf
            "CSV write (s)": round(wr["csv_write"], 3),
            "Parquet write (s)": round(wr["parquet_write"], 3),
            "Speedup CSV→Parquet write (×)": round(wr["csv_write"] / wr["parquet_write"], 2) if wr["parquet_write"] > 0 else None,

            # memory
            "Pandas RAM (MB)": _mb(mem["pandas_memory_bytes"]),
            "Expansion vs Parquet (×)": round(mem["memory_expansion_factor_vs_parquet"], 3),
        })

    summary = pd.DataFrame(rows).sort_values("Table").reset_index(drop=True)

    # Split views that are often nice in the write-up
    sizes_cols = ["Table", "CSV on disk (MB)", "Parquet on disk (MB)", "CSV/Parquet size (×)",
                  "Uncompressed logical (MB)", "Parquet compressed (MB)", "Compression ratio (uncompressed/parquet ×)"]
    read_cols  = ["Table", "CSV full read (s)", "Parquet full read (s)", "Parquet selective (s)",
                  "Speedup CSV→Parquet full (×)", "Speedup Parquet full→selective (×)"]
    write_cols = ["Table", "CSV write (s)", "Parquet write (s)", "Speedup CSV→Parquet write (×)"]
    mem_cols   = ["Table", "Pandas RAM (MB)", "Expansion vs Parquet (×)"]

    return summary, summary[sizes_cols], summary[read_cols], summary[write_cols], summary[mem_cols]

summary_all, sizes_df, reads_df, writes_df, mem_df = pretty_summaries(analysis_results)

print("=== Storage & Compression ===")
display(sizes_df)

print("=== Read Performance ===")
display(reads_df)

print("=== Write Performance ===")
display(writes_df)

print("=== Memory Expansion ===")
display(mem_df)


=== Storage & Compression ===


Unnamed: 0,Table,CSV on disk (MB),Parquet on disk (MB),CSV/Parquet size (×),Uncompressed logical (MB),Parquet compressed (MB),Compression ratio (uncompressed/parquet ×)
0,Listens,178.868,79.977,2.236,194.593,79.977,2.433
1,Songs,9.773,4.262,2.293,10.321,4.262,2.422
2,Users,48.579,20.337,2.389,50.287,20.337,2.473


=== Read Performance ===


Unnamed: 0,Table,CSV full read (s),Parquet full read (s),Parquet selective (s),Speedup CSV→Parquet full (×),Speedup Parquet full→selective (×)
0,Listens,7.149,3.092,0.034,2.31,91.35
1,Songs,0.341,0.158,0.03,2.16,5.31
2,Users,1.183,0.28,0.051,4.22,5.52


=== Write Performance ===


Unnamed: 0,Table,CSV write (s),Parquet write (s),Speedup CSV→Parquet write (×)
0,Listens,12.451,2.003,6.22
1,Songs,0.433,0.064,6.8
2,Users,1.026,0.338,3.03


=== Memory Expansion ===


Unnamed: 0,Table,Pandas RAM (MB),Expansion vs Parquet (×)
0,Listens,642.777,8.037
1,Songs,14.838,3.482
2,Users,71.812,3.531


# Section 2: Parse SQL Query

In this section, you should implement logic to parse the following SQL query:
```sql
    SELECT s.song_id, AVG(u.age) AS avg_age,
       COUNT(DISTINCT l.user_id) AS count_distinct_users,
    FROM Songs s
    JOIN Listens l ON s.song_id = l.song_id
    JOIN Users u ON l.user_id = u.user_id
    GROUP BY s.song_id, s.title
    ORDER BY COUNT(DISTINCT l.user_id) DESC, s.song_id;
```

You should manually extract the components from the provided query (i.e. you don't need to implement a general SQL parser, just handle this specific query).

In [None]:
query = """SELECT s.song_id, AVG(u.age) AS avg_age,
COUNT(DISTINCT l.user_id)
FROM Songs s
JOIN Listens l ON s.song_id = l.song_id
JOIN Users u ON l.user_id = u.user_id
GROUP BY s.song_id, s.title
ORDER BY COUNT(DISTINCT l.user_id) DESC, s.song_id;
"""

In [None]:
import re
from typing import Dict, List, Tuple, Any


def parse_sql(query):
    """
    Parses SQL query and returns a plan dict.
    Handles SELECT, FROM, JOIN, GROUP BY, and ORDER BY clauses.
    """
    # Normalize whitespace and remove extra spaces
    sql = query
    sql = re.sub(r'\s+', ' ', sql.strip())

    # Extract main clauses using regex
    select_match = re.search(r'SELECT\s+(.*?)\s+FROM', sql, re.IGNORECASE)
    from_match = re.search(r'FROM\s+(\w+)\s+(\w+)', sql, re.IGNORECASE)
    joins = re.findall(r'JOIN\s+(\w+)\s+(\w+)\s+ON\s+(\w+\.\w+)\s*=\s*(\w+\.\w+)', sql, re.IGNORECASE)
    group_by_match = re.search(r'GROUP BY\s+(.*?)(?:\s+ORDER BY|\s*;|\s*$)', sql, re.IGNORECASE)
    order_by_match = re.search(r'ORDER BY\s+(.*?)(?:\s*;|\s*$)', sql, re.IGNORECASE)

    if not select_match or not from_match:
        raise ValueError("Invalid SQL: Missing SELECT or FROM clause")

    # Parse tables and aliases
    tables = {}
    base_table = from_match.group(1)
    base_alias = from_match.group(2)
    tables[base_table] = base_alias

    # Add joined tables
    for join in joins:
        table_name = join[0]
        table_alias = join[1]
        tables[table_name] = table_alias

    # Parse JOIN conditions
    join_list = []
    for join in joins:
        left_col = join[2]  # e.g., "s.song_id"
        right_col = join[3]  # e.g., "l.song_id"

        left_alias, left_key = left_col.split('.')
        right_alias, right_key = right_col.split('.')

        # Find table names from aliases
        left_table = next(t for t, a in tables.items() if a == left_alias)
        right_table = join[0]  # The table being joined

        join_list.append({
            "left": left_table,
            "left_key": left_key,
            "right": right_table,
            "right_key": right_key
        })

    # Parse SELECT clause
    select_items = [s.strip() for s in select_match.group(1).split(',')]
    aggregations = {}
    select_columns = []

    for item in select_items:
        # Check for aggregation functions (AVG, COUNT, SUM, etc.)
        agg_match = re.match(r'(AVG|COUNT|SUM|MIN|MAX)\s*\(\s*(DISTINCT\s+)?(\w+\.\w+)\s*\)(?:\s+AS\s+(\w+))?', item, re.IGNORECASE)
        if agg_match:
            func = agg_match.group(1).upper()
            is_distinct = agg_match.group(2) is not None
            expr = agg_match.group(3)
            alias = agg_match.group(4) if agg_match.group(4) else f"{func.lower()}_{expr.replace('.', '_')}"

            if is_distinct:
                func = f"{func}_DISTINCT"

            aggregations[alias] = {"func": func, "expr": expr}
            select_columns.append(alias)
        else:
            # Regular column (possibly with alias)
            col_match = re.match(r'(\w+\.\w+)(?:\s+AS\s+(\w+))?', item, re.IGNORECASE)
            if col_match:
                col_name = col_match.group(1)
                alias = col_match.group(2) if col_match.group(2) else col_name
                select_columns.append(col_name)

    # Parse GROUP BY clause
    group_by = []
    if group_by_match:
        group_items = [g.strip() for g in group_by_match.group(1).split(',')]
        group_by = group_items

    # Parse ORDER BY clause
    order_by = []
    if order_by_match:
        order_items = [o.strip() for o in order_by_match.group(1).split(',')]
        for item in order_items:
            # Check for DESC/ASC
            desc_match = re.match(r'(.*?)\s+(DESC|ASC)', item, re.IGNORECASE)
            if desc_match:
                col = desc_match.group(1).strip()
                direction = desc_match.group(2).upper()
            else:
                col = item
                direction = "ASC"

            # Handle aggregation expressions in ORDER BY
            agg_match = re.match(r'(AVG|COUNT|SUM|MIN|MAX)\s*\(\s*(DISTINCT\s+)?(\w+\.\w+)\s*\)', col, re.IGNORECASE)
            if agg_match:
                func = agg_match.group(1).upper()
                is_distinct = agg_match.group(2) is not None
                expr = agg_match.group(3)

                if is_distinct:
                    func = f"{func}_DISTINCT"

                # Find the alias for this aggregation
                for alias, agg_info in aggregations.items():
                    if agg_info["func"] == func and agg_info["expr"] == expr:
                        order_by.append((alias, direction))
                        break
            else:
                order_by.append((col, direction))

    # Determine needed columns for each table
    needed_columns = {table: set() for table in tables.keys()}

    # Add columns from SELECT
    for col in select_columns:
        if '.' in col:
            alias, col_name = col.split('.')
            table = next(t for t, a in tables.items() if a == alias)
            needed_columns[table].add(col_name)

    # Add columns from aggregations
    for agg_info in aggregations.values():
        expr = agg_info["expr"]
        if '.' in expr:
            alias, col_name = expr.split('.')
            table = next(t for t, a in tables.items() if a == alias)
            needed_columns[table].add(col_name)

    # Add columns from GROUP BY
    for col in group_by:
        if '.' in col:
            alias, col_name = col.split('.')
            table = next(t for t, a in tables.items() if a == alias)
            needed_columns[table].add(col_name)

    # Add columns from JOIN conditions
    for join in join_list:
        needed_columns[join["left"]].add(join["left_key"])
        needed_columns[join["right"]].add(join["right_key"])

    # Convert sets to sorted lists
    needed_columns = {table: sorted(list(cols)) for table, cols in needed_columns.items()}

    return {
        "tables": tables,
        "joins": join_list,
        "group_by": group_by,
        "aggregations": aggregations,
        "select": select_columns,
        "order_by": order_by,
        "needed_columns": needed_columns,
    }


In [None]:
parse_sql(query)

{'tables': {'Songs': 's', 'Listens': 'l', 'Users': 'u'},
 'joins': [{'left': 'Songs',
   'left_key': 'song_id',
   'right': 'Listens',
   'right_key': 'song_id'},
  {'left': 'Listens',
   'left_key': 'user_id',
   'right': 'Users',
   'right_key': 'user_id'}],
 'group_by': ['s.song_id', 's.title'],
 'aggregations': {'avg_age': {'func': 'AVG', 'expr': 'u.age'},
  'count_l_user_id': {'func': 'COUNT_DISTINCT', 'expr': 'l.user_id'}},
 'select': ['s.song_id', 'avg_age', 'count_l_user_id'],
 'order_by': [('count_l_user_id', 'DESC'), ('s.song_id', 'ASC')],
 'needed_columns': {'Songs': ['song_id', 'title'],
  'Listens': ['song_id', 'user_id'],
  'Users': ['age', 'user_id']}}

# Section 3: Implement Join Algorithms

In this section, you will implement the execution operators (*how* to join) and aggregation after joins.

**Reminder:** If you use temporary files or folders, you should clean them up either as part of your join logic, or after each run. Otherwise you might run into correctness issues!

In [None]:
import hashlib

def HASHVALUE(value, B):
    if isinstance(value, int):
        return hash(value) % B
    sha256 = hashlib.sha256()
    sha256.update(str(value).encode("utf-8"))
    return int(sha256.hexdigest(), 16) % B

Implement `HashPartitionJoin`:
1. Hash partition both tables
2. Build hash table from smaller partition
3. Probe with larger partition
4. Return joined results

In [None]:
# ---------- Utilities ----------

def _iter_parquet_batches(parquet_path: str, columns: Optional[List[str]] = None, batch_size: int = 250_000):
    """Stream a Parquet file in Arrow RecordBatches -> pandas DataFrames."""
    pf = pq.ParquetFile(parquet_path)
    for batch in pf.iter_batches(batch_size=batch_size, columns=columns):
        yield batch.to_pandas()

def _safe_cols(columns: Optional[List[str]], must_include: str) -> Optional[List[str]]:
    """Ensure the join key is present in the requested columns."""
    if columns is None:
        return None
    if must_include not in columns:
        return columns + [must_include]
    return columns

def _cleanup_dir(path: str):
    try:
        shutil.rmtree(path)
    except FileNotFoundError:
        pass

In [None]:
class HashPartitionJoin:
    def __init__(self, num_partitions=4):
        self.num_partitions = num_partitions

    def join(self, table1: ColumnarDbFile, table2: ColumnarDbFile, join_key1, join_key2,
             temp_dir='temp', columns_table1=None, columns_table2=None):
        """
        Perform a hash partition join between two ColumnarDbFile instances.

        Parameters:
        - table1: Left table (ColumnarDbFile)
        - table2: Right table (ColumnarDbFile)
        - join_key1: Join key from table1
        - join_key2: Join key from table2
        - temp_dir: Directory to store temporary files
        - columns_table1: List of columns to select from table1
        - columns_table2: List of columns to select from table2

        Returns:
        - join_result_table: ColumnarDbFile instance containing the join results
        """
        os.makedirs(temp_dir, exist_ok=True)
        # Partition both tables
        partitions1 = self._hash_partition(table1, join_key1, temp_dir, 'left', columns_table1)
        partitions2 = self._hash_partition(table2, join_key2, temp_dir, 'right', columns_table2)

        # Your implementation here
        join_result_table = ColumnarDbFile(f"HPJ_out_{uuid.uuid4().hex}")
        for partition in range(self.num_partitions):
          left_part = pd.read_parquet(partitions1[partition])
          if len(left_part) == 0:
            continue
          right_part = pd.read_parquet(partitions2[partition])
          if len(right_part) == 0:
            continue
          new_merged = pd.merge(left_part, right_part, left_on=join_key1, right_on=join_key2, suffixes=('', '_right'))
          if join_key1 == join_key2 and f"{join_key2}_right" in new_merged.columns:
            new_merged = new_merged.drop(columns=[f"{join_key2}_right"])
          join_result_table.append_data(new_merged)

        return join_result_table

    def _hash_partition(self, table: ColumnarDbFile, join_key, output_dir, side, columns=None):
        os.makedirs(output_dir, exist_ok=True)
        df = table.retrieve_data(columns=columns)
        df["part_id"] = df[join_key].apply(lambda x: HASHVALUE(x, self.num_partitions))
        partitions = []
        for partition in range(self.num_partitions):
            part_df = df[df["part_id"] == partition].drop(columns=["part_id"])
            path = f"{output_dir}/{side}_part_{partition}.parquet"
            part_df.to_parquet(path, engine="pyarrow", compression="snappy")
            partitions.append(path)
        return partitions


In [None]:
# Optional: Verify your implementation against pd.merge
def _read_parquet_sample_rows(path: str, n_rows: int, columns: Optional[List[str]] = None) -> pd.DataFrame:
    """Read at most n_rows from Parquet by streaming record batches."""
    pf = pq.ParquetFile(path)
    out_frames = []
    taken = 0
    for batch in pf.iter_batches(batch_size=min(100_000, n_rows), columns=columns):
        df = batch.to_pandas()
        out_frames.append(df)
        taken += len(df)
        if taken >= n_rows:
            break
    if not out_frames:
        return pd.DataFrame(columns=columns or [])
    df = pd.concat(out_frames, ignore_index=True)
    return df.iloc[:n_rows].copy()

def _canonicalize(df: pd.DataFrame) -> pd.DataFrame:
    """
    Sort rows and columns to compare equality independent of row order.
    We also cast to consistent dtypes where possible.
    """
    # Uniform sort key: try all columns, fallback to index if empty
    cols = list(df.columns)
    if cols:
        df_sorted = df.sort_values(cols).reset_index(drop=True)
    else:
        df_sorted = df.copy().reset_index(drop=True)
    # Normalize dtypes (string-ish vs object)
    for c in df_sorted.columns:
        # integers can arrive as different numpy int types; leave numeric as-is
        if pd.api.types.is_object_dtype(df_sorted[c].dtype):
            try:
                df_sorted[c] = df_sorted[c].astype("string")
            except Exception:
                pass
    # Sort columns lexicographically for a stable order
    df_sorted = df_sorted.reindex(sorted(df_sorted.columns), axis=1)
    return df_sorted

def verify_hpj_against_pandas(tables,
                              temp_dir: str = "temp_verify",
                              sample_listens: int = 200_000,
                              rng_seed: int = 7) -> None:
    """
    Build a subset of the three tables, run:
        HPJ:  (Songs ⨝ Listens on song_id) ⨝ Users on user_id
    and compare with:
        pandas: (Songs ⨝ Listens) ⨝ Users
    Assertions raise if a mismatch is found.
    """
    np.random.seed(rng_seed)
    os.makedirs(temp_dir, exist_ok=True)

    # ---- 1) Build subset DataFrames ----
    # Listens dominates; sample its rows then filter Songs/Users to keys that appear.
    listens_full_path = f"listens_100MB.parquet"
    songs_full_path   = f"songs_100MB.parquet"
    users_full_path   = f"users_100MB.parquet"

    # Only needed columns for each table
    listens_cols = ["song_id", "user_id"]           # join cols only to keep verification light
    songs_cols   = ["song_id", "title"]
    users_cols   = ["user_id", "age"]

    l_sub = _read_parquet_sample_rows(listens_full_path, sample_listens, columns=listens_cols)
    # Downsample uniformly to make sure we include duplicates as well
    if len(l_sub) > sample_listens:
        l_sub = l_sub.sample(n=sample_listens, random_state=rng_seed).reset_index(drop=True)

    # Collect the referenced keys
    song_ids_needed = l_sub["song_id"].unique()
    user_ids_needed = l_sub["user_id"].unique()

    # Read Songs/Users (these are small at 100MB scale)
    s_df = pd.read_parquet(songs_full_path, columns=songs_cols)
    u_df = pd.read_parquet(users_full_path, columns=users_cols)

    s_sub = s_df[s_df["song_id"].isin(song_ids_needed)].reset_index(drop=True)
    u_sub = u_df[u_df["user_id"].isin(user_ids_needed)].reset_index(drop=True)

    # ---- 2) Materialize subsets as ColumnarDbFile (so HPJ reads from Parquet like the real system) ----
    s_tmp = ColumnarDbFile("SongsSub", file_dir=temp_dir)
    l_tmp = ColumnarDbFile("ListensSub", file_dir=temp_dir)
    u_tmp = ColumnarDbFile("UsersSub", file_dir=temp_dir)
    s_tmp.build_table(s_sub)
    l_tmp.build_table(l_sub)
    u_tmp.build_table(u_sub)

    # ---- 3) Run HPJ pipeline on subsets ----
    hpj = HashPartitionJoin(num_partitions=8)
    # (Songs ⨝ Listens) on song_id
    first_join_cdf = hpj.join(s_tmp, l_tmp, join_key1="song_id", join_key2="song_id",
                 temp_dir=temp_dir, columns_table1=songs_cols, columns_table2=listens_cols)
    hpj_cdf = hpj.join(first_join_cdf, u_tmp, join_key1="user_id", join_key2="user_id",
                 temp_dir=temp_dir, columns_table1=None, columns_table2=users_cols)
    hpj_out = hpj_cdf.retrieve_data()

    # ---- 4) Reference using pandas.merge ----
    ref1 = pd.merge(s_sub, l_sub, left_on="song_id", right_on="song_id", how="inner")
    ref2 = pd.merge(ref1, u_sub, left_on="user_id", right_on="user_id", how="inner")

    # ---- 5) Canonicalize and compare ----
    A = _canonicalize(hpj_out)
    B = _canonicalize(ref2)

    # Ensure same columns set (HPJ and pandas should match given identical merge semantics)
    if set(A.columns) != set(B.columns):
        # Sometimes column order/suffixing can differ slightly if you change columns_table1/2.
        # Align to intersection to avoid false negatives, but warn if something is missing.
        common = sorted(set(A.columns).intersection(B.columns))
        missing_A = sorted(set(B.columns) - set(A.columns))
        missing_B = sorted(set(A.columns) - set(B.columns))
        print("[verify] WARNING: column set mismatch")
        if missing_A:
            print("  Columns missing in HPJ:", missing_A)
        if missing_B:
            print("  Columns missing in pandas ref:", missing_B)
        A = A[common]
        B = B[common]

    pd.testing.assert_frame_equal(A, B, check_dtype=False, check_like=True)
    print(f"[verify] ✅ HashPartitionJoin matches pandas.merge on subset of {len(l_sub):,} listens "
          f"({len(A):,} joined rows).")

# Run the verification (you can tweak sample_listens for speed/coverage)
verify_hpj_against_pandas(tables, sample_listens=200_000)

[verify] ✅ HashPartitionJoin matches pandas.merge on subset of 200,000 listens (200,000 joined rows).


Implement `SortMergeJoin`:
1. Sort both tables by join key
2. Merge sorted sequences
3. Handle duplicates

In [None]:
BWAY_MERGE_FACTOR = 10


class SortMergeJoin:

    def __init__(
        self, bway_merge_factor: int = BWAY_MERGE_FACTOR, num_pages_per_split=1000
    ):
        self.bway_merge_factor = bway_merge_factor
        self.num_pages_per_split = num_pages_per_split

    def _external_sort(
        self,
        table: ColumnarDbFile,
        join_key: str,
        output_dir: str,
        side: str,
        columns: Optional[List[str]] = None,
    ) -> ColumnarDbFile:
        """
        Perform an external sort on a table based on the join key and return a sorted ColumnarDbFile.
        Use _bway_merge to merge sorted files.
        """
        os.makedirs(output_dir, exist_ok=True)

        # Read data in chunks and create sorted runs
        parquet_file = pq.ParquetFile(table.base_file_name)
        run_files = []

        for batch in parquet_file.iter_batches(batch_size=self.num_pages_per_split,
                                               columns=columns):
            df = batch.to_pandas()
            if df.empty:
                continue

            df = df.sort_values(by=join_key)

            # Write sorted run to file
            run_path = os.path.join(output_dir,
                                    f"{side}_run_{uuid.uuid4().hex}.parquet")
            df.to_parquet(run_path, engine='pyarrow', compression='snappy',
                          index=False)
            run_files.append(run_path)

        # Create the ColumnarDbFile we will return *first*, so we can use its path
        sorted_table = ColumnarDbFile(f"{side}_sorted_{uuid.uuid4().hex}",
                                      file_dir=output_dir)
        sorted_path = sorted_table.base_file_name  # <- no extension, matches your ColumnarDbFile

        # Merge sorted runs using B-way merge
        self._bway_merge(run_files, sorted_path, join_key)

        # Clean up run files
        for run_file in run_files:
            try:
                os.remove(run_file)
            except FileNotFoundError:
                pass

        return sorted_table

    def _bway_merge(self, sorted_files: List[str], output_file: str, join_key: str):
        """
        Merge multiple sorted Parquet files into a single sorted Parquet file using B-way merge.
        """
        if len(sorted_files) == 0:
            pd.DataFrame().to_parquet(output_file, engine='pyarrow',
                                      compression='snappy', index=False)
            return

        if len(sorted_files) == 1:
            df = pd.read_parquet(sorted_files[0])
            df.to_parquet(output_file, engine='pyarrow', compression='snappy',
                          index=False)
            return

        dfs = []
        for file_path in sorted_files:
            df = pd.read_parquet(file_path)
            if len(df) > 0:
                dfs.append(df)

        if len(dfs) == 0:
            pd.DataFrame().to_parquet(output_file, engine='pyarrow',
                                      compression='snappy', index=False)
            return

        merged_df = pd.concat(dfs, ignore_index=True)
        merged_df = merged_df.sort_values(by=join_key)
        merged_df.to_parquet(output_file, engine='pyarrow', compression='snappy',
                             index=False)

    def join(
        self,
        table1: ColumnarDbFile,
        table2: ColumnarDbFile,
        join_key1: str,
        join_key2: str,
        temp_dir: str = "temp",
        columns_table1: Optional[List[str]] = None,
        columns_table2: Optional[List[str]] = None,
    ) -> Optional[ColumnarDbFile]:
        """
        Perform a sort-merge join between two ColumnarDbFile instances and return a sorted ColumnarDbFile.
        """
        os.makedirs(temp_dir, exist_ok=True)

        # Make sure join keys are present in projections
        if columns_table1 is not None and join_key1 not in columns_table1:
            columns_table1 = sorted(set(columns_table1 + [join_key1]))
        if columns_table2 is not None and join_key2 not in columns_table2:
            columns_table2 = sorted(set(columns_table2 + [join_key2]))

        # Sort both tables externally
        sorted_table1 = self._external_sort(
            table1, join_key1, temp_dir, "left", columns_table1
        )
        sorted_table2 = self._external_sort(
            table2, join_key2, temp_dir, "right", columns_table2
        )

        # Read sorted data
        left_df = sorted_table1.retrieve_data()
        right_df = sorted_table2.retrieve_data()

        # Perform merge join
        result_df = pd.merge(
            left_df,
            right_df,
            left_on=join_key1,
            right_on=join_key2,
            how='inner',
            suffixes=('', '_right'),
        )

        # Drop duplicate join key if they have the same name
        if join_key1 == join_key2 and f"{join_key2}_right" in result_df.columns:
            result_df = result_df.drop(columns=[f"{join_key2}_right"])

        # Create output table
        join_result_table = ColumnarDbFile(f"SMJ_out_{uuid.uuid4().hex}",
                                           file_dir=temp_dir)
        join_result_table.build_table(result_df)

        return join_result_table


In [None]:
smj = SortMergeJoin()

# Plan from your custom parser
plan = parse_sql(query)
needed = plan["needed_columns"]

# Columns needed per table
songs_cols   = needed["Songs"]     # e.g. ['song_id', 'title']
listens_cols = needed["Listens"]   # e.g. ['song_id', 'user_id']

# Ensure join keys are present (defensive, in case parser changes)
songs_cols   = sorted(set(songs_cols + ["song_id"]))
listens_cols = sorted(set(listens_cols + ["song_id"]))

joined_s_l = smj.join(
    tables["Songs"],
    tables["Listens"],
    join_key1="song_id",
    join_key2="song_id",
    temp_dir="temp_smj",
    columns_table1=songs_cols,
    columns_table2=listens_cols,
)

df_s_l = joined_s_l.retrieve_data()
print(df_s_l.head())


   song_id   title  user_id
0        0  Song_0     8924
1        0  Song_0    13846
2        0  Song_0    34780
3        0  Song_0    29313
4        0  Song_0    45288


In [None]:
# ---------- SortMergeJoin vs pandas.merge verification ----------

def verify_smj_against_pandas(
    tables,
    temp_dir: str = "temp_verify_smj",
    sample_listens: int = 200_000,
    rng_seed: int = 7,
    verbose: bool = True,
) -> None:
    """
    Build a subset of the three tables, run:
        SMJ:  (Songs ⨝ Listens on song_id) ⨝ Users on user_id
    and compare with:
        pandas: (Songs ⨝ Listens) ⨝ Users

    Assertions raise if a mismatch is found.
    Includes verbose debug prints + timing so you can see bottlenecks in Colab.
    """
    def vlog(*args, **kwargs):
        if verbose:
            print(*args, **kwargs, flush=True)

    def _timeit(fn, *args, **kwargs):
        t0 = time.time()
        out = fn(*args, **kwargs)
        return out, time.time() - t0

    np.random.seed(rng_seed)
    os.makedirs(temp_dir, exist_ok=True)

    listens_full_path = "listens_100MB.parquet"
    songs_full_path   = "songs_100MB.parquet"
    users_full_path   = "users_100MB.parquet"

    # Only needed columns for each table (keep verification light)
    listens_cols = ["song_id", "user_id"]       # join cols only
    songs_cols   = ["song_id", "title"]
    users_cols   = ["user_id", "age"]

    # ---- 1) Build subset DataFrames ----
    vlog("[SMJ verify] Step 1: Sampling listens subset...")
    l_sub, t_l = _timeit(
        _read_parquet_sample_rows,
        listens_full_path,
        sample_listens,
        columns=listens_cols,
    )
    vlog(f"[SMJ verify]   Loaded {len(l_sub):,} listens rows in {t_l:.3f}s.")

    # Collect the referenced keys
    song_ids_needed = l_sub["song_id"].unique()
    user_ids_needed = l_sub["user_id"].unique()
    vlog(f"[SMJ verify]   Unique song_ids: {len(song_ids_needed):,}, "
         f"user_ids: {len(user_ids_needed):,}.")

    vlog("[SMJ verify]   Reading Songs subset...")
    s_df_full, t_s_full = _timeit(pd.read_parquet, songs_full_path, columns=songs_cols)
    s_sub = s_df_full[s_df_full["song_id"].isin(song_ids_needed)].reset_index(drop=True)
    vlog(f"[SMJ verify]   Songs full: {len(s_df_full):,}, subset: {len(s_sub):,} "
         f"(read {t_s_full:.3f}s).")

    vlog("[SMJ verify]   Reading Users subset...")
    u_df_full, t_u_full = _timeit(pd.read_parquet, users_full_path, columns=users_cols)
    u_sub = u_df_full[u_df_full["user_id"].isin(user_ids_needed)].reset_index(drop=True)
    vlog(f"[SMJ verify]   Users full: {len(u_df_full):,}, subset: {len(u_sub):,} "
         f"(read {t_u_full:.3f}s).")

    # ---- 2) Materialize subsets as ColumnarDbFile (so SMJ reads from Parquet like real system) ----
    vlog("[SMJ verify] Step 2: Writing subsets to temporary ColumnarDbFile Parquets...")
    s_tmp = ColumnarDbFile("SongsSub",  file_dir=temp_dir)
    l_tmp = ColumnarDbFile("ListensSub", file_dir=temp_dir)
    u_tmp = ColumnarDbFile("UsersSub",  file_dir=temp_dir)

    _, t_ws = _timeit(s_tmp.build_table, s_sub)
    _, t_wl = _timeit(l_tmp.build_table, l_sub)
    _, t_wu = _timeit(u_tmp.build_table, u_sub)
    vlog(f"[SMJ verify]   Wrote SongsSub in {t_ws:.3f}s, "
         f"ListensSub in {t_wl:.3f}s, UsersSub in {t_wu:.3f}s.")

    # ---- 3) Run SMJ pipeline on subsets ----
    vlog("[SMJ verify] Step 3: Running SortMergeJoin pipeline...")
    smj = SortMergeJoin()

    # (Songs ⨝ Listens) on song_id
    vlog("[SMJ verify]   3a) Joining SongsSub ⨝ ListensSub on song_id via SMJ...")
    left_join_table, t_smj1 = _timeit(
        smj.join,
        s_tmp,
        l_tmp,
        "song_id",
        "song_id",
        temp_dir=temp_dir,
        columns_table1=songs_cols,
        columns_table2=listens_cols,
    )
    left_join_df, t_read1 = _timeit(left_join_table.retrieve_data)
    vlog(f"[SMJ verify]   First join result: {len(left_join_df):,} rows "
         f"(join {t_smj1:.3f}s, read {t_read1:.3f}s).")

    # Now join with Users on user_id
    vlog("[SMJ verify]   3b) Joining (Songs⨝Listens) ⨝ UsersSub on user_id via SMJ...")
    first_join_cdf = ColumnarDbFile("FirstJoinSMJ", file_dir=temp_dir)
    _, t_w1 = _timeit(first_join_cdf.build_table, left_join_df)
    vlog(f"[SMJ verify]   Wrote FirstJoinSMJ in {t_w1:.3f}s.")

    smj_result_table, t_smj2 = _timeit(
        smj.join,
        first_join_cdf,
        u_tmp,
        "user_id",
        "user_id",
        temp_dir=temp_dir,
        columns_table1=None,
        columns_table2=users_cols,
    )
    smj_out, t_read2 = _timeit(smj_result_table.retrieve_data)
    vlog(f"[SMJ verify]   Final SMJ result: {len(smj_out):,} rows "
         f"(join {t_smj2:.3f}s, read {t_read2:.3f}s).")

    # ---- 4) Reference using pandas.merge ----
    vlog("[SMJ verify] Step 4: Computing pandas reference joins...")
    ref1, t_ref1 = _timeit(
        pd.merge,
        s_sub,
        l_sub,
        left_on="song_id",
        right_on="song_id",
        how="inner",
    )
    ref2, t_ref2 = _timeit(
        pd.merge,
        ref1,
        u_sub,
        left_on="user_id",
        right_on="user_id",
        how="inner",
    )
    vlog(f"[SMJ verify]   pandas (Songs⨝Listens): {len(ref1):,} rows in {t_ref1:.3f}s.")
    vlog(f"[SMJ verify]   pandas final join: {len(ref2):,} rows in {t_ref2:.3f}s.")

    # ---- 5) Canonicalize and compare ----
    vlog("[SMJ verify] Step 5: Canonicalizing outputs and comparing...")
    A = _canonicalize(smj_out)
    B = _canonicalize(ref2)

    if set(A.columns) != set(B.columns):
        common = sorted(set(A.columns).intersection(B.columns))
        missing_A = sorted(set(B.columns) - set(A.columns))
        missing_B = sorted(set(A.columns) - set(B.columns))
        vlog("[SMJ verify] WARNING: column set mismatch.")
        if missing_A:
            vlog("  Columns missing in SMJ:", missing_A)
        if missing_B:
            vlog("  Columns missing in pandas ref:", missing_B)
        A = A[common]
        B = B[common]

    pd.testing.assert_frame_equal(A, B, check_dtype=False, check_like=True)

    vlog(
        f"[SMJ verify] ✅ SortMergeJoin matches pandas.merge on subset of "
        f"{len(l_sub):,} listens ({len(A):,} joined rows)."
    )

# Example call (mirror HPJ)
verify_smj_against_pandas(tables, sample_listens=200_000)


[SMJ verify] Step 1: Sampling listens subset...
[SMJ verify]   Loaded 200,000 listens rows in 0.007s.
[SMJ verify]   Unique song_ids: 10,000, user_ids: 49,023.
[SMJ verify]   Reading Songs subset...
[SMJ verify]   Songs full: 10,000, subset: 10,000 (read 0.006s).
[SMJ verify]   Reading Users subset...
[SMJ verify]   Users full: 50,000, subset: 49,023 (read 0.005s).
[SMJ verify] Step 2: Writing subsets to temporary ColumnarDbFile Parquets...
[SMJ verify]   Wrote SongsSub in 0.006s, ListensSub in 0.014s, UsersSub in 0.005s.
[SMJ verify] Step 3: Running SortMergeJoin pipeline...
[SMJ verify]   3a) Joining SongsSub ⨝ ListensSub on song_id via SMJ...
[SMJ verify]   First join result: 200,000 rows (join 0.695s, read 0.013s).
[SMJ verify]   3b) Joining (Songs⨝Listens) ⨝ UsersSub on user_id via SMJ...
[SMJ verify]   Wrote FirstJoinSMJ in 0.032s.
[SMJ verify]   Final SMJ result: 200,000 rows (join 1.119s, read 0.018s).
[SMJ verify] Step 4: Computing pandas reference joins...
[SMJ verify]   pand

Implement GROUP BY after joins:
- Here you could use `pd.groupby` or do manual aggregation

In [None]:
# ==============================
# Aggregation after both joins
# ==============================

def aggregate_after_joins(joined_df: pd.DataFrame) -> pd.DataFrame:
    """
    Implements:
      SELECT s.song_id, AVG(u.age) AS avg_age, COUNT(DISTINCT l.user_id) AS count_distinct_users
      GROUP BY s.song_id, s.title
      ORDER BY COUNT(DISTINCT l.user_id) DESC, s.song_id;

    Expects these logical fields in joined_df:
      song_id (from Songs), title (from Songs), user_id (from Listens), age (from Users).
    Handles pandas merge suffixes gracefully.
    """
    def pick(base: str):
        if base in joined_df.columns:
            return base
        for c in (f"{base}_x", f"{base}_y"):
            if c in joined_df.columns:
                return c
        for c in joined_df.columns:
            if c.endswith(base) or c.startswith(base):
                return c
        raise KeyError(f"Column like '{base}' not found")

    song_id_col = pick("song_id")
    title_col   = pick("title")
    age_col     = pick("age")
    user_id_col = pick("user_id")

    grouped = (
        joined_df.groupby([song_id_col, title_col], as_index=False)
        .agg(
            avg_age=(age_col, "mean"),
            count_distinct_users=(user_id_col, pd.Series.nunique),
        )
    )

    grouped.sort_values(by=["count_distinct_users", song_id_col],
                        ascending=[False, True],
                        inplace=True)

    grouped.rename(columns={song_id_col: "song_id"}, inplace=True)
    grouped = grouped[["song_id", "avg_age", "count_distinct_users"]]
    return grouped

# Section 4: Query Planning & Optimization

In this section, you'll implement smart query planning using metadata analysis. The key idea is to **avoid loading data unnecessarily** by:
1. Analyzing Parquet metadata first (row counts, column names, file sizes)
2. Making intelligent decisions about join order and algorithm selection
3. Loading only the columns you actually need for the query

In [None]:
def analyze_metadata_before_loading(file_paths):
    """YOUR TASK: Get table statistics WITHOUT loading data

    Hints:
    - Use pq.ParquetFile() to access metadata
    - Extract: num_rows, column names, file sizes
    - DON'T use pd.read_parquet() here - that loads data!
    """
    metadata = {}

    # TODO: For each table ('songs', 'users', 'listens'):
    #   - Open the Parquet file (but don't load data)
    #   - Extract metadata like row count, columns, sizes
    #   - Store in a dictionary
    for table_name, path in file_paths.items():
        # ParquetFile lets us inspect metadata without reading rows
        pf = pq.ParquetFile(path)
        meta = pf.metadata

        # Total rows across all row groups
        if meta is not None and meta.num_rows is not None:
            num_rows = meta.num_rows
        else:
            # Fallback: sum row groups if needed
            num_rows = sum(meta.row_group(i).num_rows for i in range(meta.num_row_groups))

        # Column names from schema
        columns = pf.schema.names

        # On-disk size
        file_size_bytes = os.path.getsize(path)

        metadata[table_name] = {
            "path": path,
            "num_rows": num_rows,
            "columns": columns,
            "file_size_bytes": file_size_bytes,
        }

    return metadata


def plan_query_execution(metadata, parsed_query):
    """YOUR TASK: Use metadata to make smart decisions

    Questions to answer:
    - Which table is smallest? Largest?
    - Will a hash table fit in memory?
    - Which columns does the query actually need?
    - What's the optimal join order?
    """
    # TODO: Based on metadata, decide:
    #   1. Join order (smallest first? or different strategy?)
    #   2. Algorithm choice (HPJ if fits in memory, else SMJ)
    #   3. Which columns to load for each table
    # ---- 1) Basic stats ----
    # Determine smallest / largest tables by row count
    rows_by_table = {t: info["num_rows"] for t, info in metadata.items()}
    sizes_by_table = {t: info["file_size_bytes"] for t, info in metadata.items()}

    # Largest table (fact table) for this star-like query
    fact_table = max(rows_by_table, key=rows_by_table.get)
    dim_tables = [t for t in rows_by_table.keys() if t != fact_table]

    # Order dimensions by ascending size (join smaller first)
    dim_tables_sorted = sorted(dim_tables, key=lambda t: rows_by_table[t])

    # ---- 2) Column pruning (from parsed_query['needed_columns']) ----
    needed_columns = parsed_query.get("needed_columns", {})
    columns_to_load = {}

    for table_name, info in metadata.items():
        table_cols = set(info["columns"])
        needed = needed_columns.get(table_name, [])

        # Keep only columns that exist in this table
        pruned = [c for c in needed if c in table_cols]

        # Always keep join keys in case parser changes
        for j in parsed_query["joins"]:
            if j["left"] == table_name:
                pruned.append(j["left_key"])
            if j["right"] == table_name:
                pruned.append(j["right_key"])

        columns_to_load[table_name] = sorted(set(pruned)) if pruned else list(info["columns"])

    # ---- 3) Join order: use row counts ----
    # parsed_query['joins'] is a list of dicts like:
    #  {"left": "Songs", "left_key": "song_id", "right": "Listens", "right_key": "song_id"}
    joins = parsed_query["joins"]

    def find_join_keys(table_a, table_b):
        """Return (left_key_for_a, right_key_for_b) matching table_a ⨝ table_b."""
        for j in joins:
            if j["left"] == table_a and j["right"] == table_b:
                return j["left_key"], j["right_key"]
            if j["left"] == table_b and j["right"] == table_a:
                # reversed orientation
                return j["right_key"], j["left_key"]
        raise ValueError(f"No join condition found between {table_a} and {table_b}")

    # For this hard-coded query we know structure is star-shaped: fact + 2 dims.
    # First join: smallest dim ⨝ fact
    first_dim = dim_tables_sorted[0]
    second_dim = dim_tables_sorted[1] if len(dim_tables_sorted) > 1 else None

    first_left_key, first_right_key = find_join_keys(first_dim, fact_table)

    join_plan = [
        {
            "step": 1,
            "left_table": first_dim,
            "right_table": fact_table,
            "left_key": first_left_key,
            "right_key": first_right_key,
        }
    ]

    # Second join: intermediate ⨝ remaining dim
    if second_dim is not None:
        second_left_key, second_right_key = find_join_keys(fact_table, second_dim)
        join_plan.append(
            {
                "step": 2,
                "left_table": "__INTERMEDIATE__",  # result of step 1
                "right_table": second_dim,
                "left_key": second_left_key,   # join key on the intermediate (from fact)
                "right_key": second_right_key, # join key on second_dim
            }
        )

    # ---- 4) Algorithm selection: HPJ vs SMJ ----
    # Heuristic: if the largest table + one dim reasonably fit in memory, use HPJ.
    # Otherwise, use SMJ.
    import psutil
    mem = psutil.virtual_memory()
    avail_mem = mem.available  # bytes
    expansion_factor = 3.0     # rough "compressed -> in-memory" expansion

    # For first join: fact + smallest dim
    est_bytes_first_join = expansion_factor * (
        sizes_by_table[fact_table] + sizes_by_table[first_dim]
    )

    # If estimated size less than half of available RAM -> HPJ; else SMJ
    if est_bytes_first_join < 0.5 * avail_mem:
        algorithm = "HPJ"
    else:
        algorithm = "SMJ"

    plan = {
        "algorithm": algorithm,
        "fact_table": fact_table,
        "dim_tables_ordered": dim_tables_sorted,
        "join_plan": join_plan,
        "columns_to_load": columns_to_load,
    }
    return plan


# After planning, load ONLY what you need:
# Example (you implement the actual logic):
# columns_needed = ['song_id', 'artist']  # From your planning
# df = pd.read_parquet('songs.parquet', columns=columns_needed)

In [None]:
class QueryPlanner:
    def __init__(self, num_partitions=8):
        self.num_partitions = num_partitions

    def analyze_metadata(self, tables):
        """Build file_paths from ColumnarDbFile objects and analyze metadata."""
        file_paths = {name: tbl.base_file_name for name, tbl in tables.items()}
        return analyze_metadata_before_loading(file_paths)

    def plan_query(self, tables, query_str):
        """High-level planning: parse SQL, analyze metadata, and build an execution plan."""
        parsed_query = parse_sql(query_str)
        metadata = self.analyze_metadata(tables)
        plan = plan_query_execution(metadata, parsed_query)

        # Attach parsed query + metadata for the executor if needed
        plan["parsed_query"] = parsed_query
        plan["metadata"] = metadata
        return plan


class QueryExecutor:
    def __init__(self, tables, num_partitions=8, output_dir="temp", planner=None):
        self.tables = tables
        self.num_partitions = num_partitions
        self.output_dir = output_dir
        self.planner = planner or QueryPlanner(num_partitions=num_partitions)
        os.makedirs(self.output_dir, exist_ok=True)

    def execute_hardcoded_query(self):
        """
        Executes the following SQL query:

        SELECT s.song_id, AVG(u.age) AS avg_age,
               COUNT(DISTINCT l.user_id)
        FROM Songs s
        JOIN Listens l ON s.song_id = l.song_id
        JOIN Users u ON l.user_id = u.user_id
        GROUP BY s.song_id, s.title
        ORDER BY COUNT(DISTINCT l.user_id) DESC, s.song_id;
        """

        query_str = """SELECT s.song_id, AVG(u.age) AS avg_age,
                       COUNT(DISTINCT l.user_id)
                       FROM Songs s
                       JOIN Listens l ON s.song_id = l.song_id
                       JOIN Users u ON l.user_id = u.user_id
                       GROUP BY s.song_id, s.title
                       ORDER BY COUNT(DISTINCT l.user_id) DESC, s.song_id;
                    """

        # ---- 1) Plan query ----
        plan = self.planner.plan_query(self.tables, query_str)
        algorithm = plan["algorithm"]
        join_plan = plan["join_plan"]
        columns_to_load = plan["columns_to_load"]

        print("[Planner] Algorithm chosen:", algorithm)
        print("[Planner] Join plan:", join_plan)
        print("[Planner] Column pruning:", columns_to_load)

        # ---- 2) Choose join implementation ----
        if algorithm == "HPJ":
            joiner = HashPartitionJoin(num_partitions=self.num_partitions)
        else:
            joiner = SortMergeJoin()

        # ---- 3) Execute joins according to plan ----
        # Step 1: dim ⨝ fact
        step1 = join_plan[0]
        left_name = step1["left_table"]
        right_name = step1["right_table"]

        left_table = self.tables[left_name]
        right_table = self.tables[right_name]

        cols_left = columns_to_load.get(left_name)
        cols_right = columns_to_load.get(right_name)

        print(f"[Executor] Step 1: {left_name} ⨝ {right_name} "
              f"on {step1['left_key']}={step1['right_key']} ({algorithm})")

        step1_out = joiner.join(
            left_table,
            right_table,
            join_key1=step1["left_key"],
            join_key2=step1["right_key"],
            temp_dir=self.output_dir,
            columns_table1=cols_left,
            columns_table2=cols_right,
        )

        step1_df = step1_out.retrieve_data()
        print(f"[Executor] Step 1 output rows: {len(step1_df):,}")

        # Step 2: intermediate ⨝ second dim
        if len(join_plan) > 1:
            step2 = join_plan[1]
            second_dim = step2["right_table"]
            second_table = self.tables[second_dim]
            cols_second = columns_to_load.get(second_dim)

            # Wrap intermediate as ColumnarDbFile so joiner can read from Parquet
            interm_cdf = ColumnarDbFile("Intermediate_Q4", file_dir=self.output_dir)
            interm_cdf.build_table(step1_df)

            print(f"[Executor] Step 2: INTERMEDIATE ⨝ {second_dim} "
                  f"on {step2['left_key']}={step2['right_key']} ({algorithm})")

            final_out = joiner.join(
                interm_cdf,
                second_table,
                join_key1=step2["left_key"],
                join_key2=step2["right_key"],
                temp_dir=self.output_dir,
                columns_table1=None,       # all columns from intermediate
                columns_table2=cols_second,
            )
            joined_df = final_out.retrieve_data()
        else:
            joined_df = step1_df

        print(f"[Executor] Final joined rows (before aggregation): {len(joined_df):,}")

        # ---- 4) Perform GROUP BY / aggregations / ORDER BY ----
        # After joins, we should have at least: 'song_id', 'title', 'user_id', 'age'
        if not {"song_id", "user_id", "age", "title"}.issubset(joined_df.columns):
            raise ValueError(
                "Joined DataFrame does not contain expected columns "
                "['song_id', 'title', 'user_id', 'age']"
            )

        grouped = (
            joined_df
            .groupby(["song_id", "title"], as_index=False)
            .agg(
                avg_age=("age", "mean"),
                count_distinct_users=("user_id", "nunique"),
            )
        )

        # ORDER BY COUNT(DISTINCT l.user_id) DESC, s.song_id
        result_df = grouped.sort_values(
            by=["count_distinct_users", "song_id"],
            ascending=[False, True],
        ).reset_index(drop=True)

        # Optionally, write result to disk as Parquet
        out_path = os.path.join(self.output_dir, "hardcoded_query_result.parquet")
        result_df.to_parquet(out_path, engine="pyarrow", compression="snappy",
                             index=False)
        print(f"[Executor] Query result written to {out_path}")

        return result_df

In [None]:
executor = QueryExecutor(tables, num_partitions=8, output_dir="temp_q4")
result_df = executor.execute_hardcoded_query()
result_df.head()

[Planner] Algorithm chosen: HPJ
[Planner] Join plan: [{'step': 1, 'left_table': 'Songs', 'right_table': 'Listens', 'left_key': 'song_id', 'right_key': 'song_id'}, {'step': 2, 'left_table': '__INTERMEDIATE__', 'right_table': 'Users', 'left_key': 'user_id', 'right_key': 'user_id'}]
[Planner] Column pruning: {'Songs': ['song_id', 'title'], 'Users': ['age', 'user_id'], 'Listens': ['song_id', 'user_id']}
[Executor] Step 1: Songs ⨝ Listens on song_id=song_id (HPJ)
[Executor] Step 1 output rows: 1,000,000
[Executor] Step 2: INTERMEDIATE ⨝ Users on user_id=user_id (HPJ)
[Executor] Final joined rows (before aggregation): 1,000,000
[Executor] Query result written to temp_q4/hardcoded_query_result.parquet


Unnamed: 0,song_id,title,avg_age,count_distinct_users
0,2428,Song_2428,46.857143,139
1,8361,Song_8361,44.858209,134
2,9317,Song_9317,47.125926,134
3,1288,Song_1288,49.225564,133
4,6203,Song_6203,46.120301,133


# Section 5: Performance Benchmarking

In [None]:
def benchmark_query(executor, dataset_size):
    """Benchmark the query execution time and memory usage."""
    print(f"\nBenchmarking with {dataset_size} dataset...")
    start_mem = psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024)
    start_time = time.time()

    result = executor.execute_hardcoded_query()

    end_time = time.time()
    end_mem = psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024)

    print(f"Execution Time: {end_time - start_time:.2f} seconds")
    print(f"Memory Usage: {end_mem - start_mem:.2f} MB")
    return result

## 100MB Benchmark

In [None]:
import psutil
import gc
import time

# If you already defined _timeit above, skip this.
def _timeit(fn, *args, **kwargs):
    t0 = time.time()
    out = fn(*args, **kwargs)
    return out, time.time() - t0

class ForcedAlgorithmPlanner(QueryPlanner):
    """QueryPlanner that always uses a given algorithm ('HPJ' or 'SMJ')."""
    def __init__(self, algorithm: str, num_partitions=8):
        super().__init__(num_partitions=num_partitions)
        assert algorithm in ("HPJ", "SMJ")
        self._forced_algorithm = algorithm

    def plan_query(self, tables, query_str):
        # Use the normal planning logic, then overwrite algorithm
        plan = super().plan_query(tables, query_str)
        plan["algorithm"] = self._forced_algorithm
        return plan

In [None]:
def build_tables_for_size(size_label: str, data_dir: str) -> dict:
    """
    size_label: '100MB' or '1GB'
    data_dir: directory where ColumnarDbFile Parquets will live (e.g. 'data_100MB')
    Returns: dict of {"Songs": ColumnarDbFile, "Users": ColumnarDbFile, "Listens": ColumnarDbFile}
    """
    if os.path.exists(data_dir):
        shutil.rmtree(data_dir)
    os.makedirs(data_dir, exist_ok=True)

    songs_df   = pd.read_parquet(f"songs_{size_label}.parquet")
    users_df   = pd.read_parquet(f"users_{size_label}.parquet")
    listens_df = pd.read_parquet(f"listens_{size_label}.parquet")

    tables = {
        "Songs":   ColumnarDbFile("Songs",   file_dir=data_dir),
        "Users":   ColumnarDbFile("Users",   file_dir=data_dir),
        "Listens": ColumnarDbFile("Listens", file_dir=data_dir),
    }

    tables["Songs"].build_table(songs_df)
    tables["Users"].build_table(users_df)
    tables["Listens"].build_table(listens_df)

    return tables

In [None]:
# ==========================
# 100MB Benchmark
# ==========================

def benchmark_100MB(num_partitions=8):
    size_label = "100MB"

    # Generate data if it doesn't already exist
    if not (os.path.exists(f"songs_{size_label}.parquet")
            and os.path.exists(f"users_{size_label}.parquet")
            and os.path.exists(f"listens_{size_label}.parquet")):
        print(f"[100MB] Generating test data for {size_label}...")
        generate_test_data(size_label)

    print(f"[100MB] Building ColumnarDbFile tables...")
    tables_100 = build_tables_for_size(size_label, data_dir="data_100MB")

    results = []

    for algo in ("HPJ", "SMJ"):
        print(f"\n[100MB] Running hardcoded query using {algo}...")
        planner = ForcedAlgorithmPlanner(algorithm=algo, num_partitions=num_partitions)
        exec_dir = f"temp_100MB_{algo}"
        if os.path.exists(exec_dir):
            shutil.rmtree(exec_dir)
        executor = QueryExecutor(tables_100, num_partitions=num_partitions,
                                 output_dir=exec_dir, planner=planner)

        gc.collect()
        (df_result,), elapsed = _timeit(lambda: (executor.execute_hardcoded_query(),))
        # We don't really need df_result here; it's returned so you can inspect if desired

        results.append({
            "size": "100MB",
            "algorithm": algo,
            "rows_out": len(df_result),
            "elapsed_sec": elapsed,
        })
        print(f"[100MB] {algo} elapsed: {elapsed:.3f}s, rows_out={len(df_result):,}")

    return pd.DataFrame(results)


bench_100 = benchmark_100MB()
print("\n=== 100MB Benchmark Summary ===")
display(bench_100)


[100MB] Building ColumnarDbFile tables...

[100MB] Running hardcoded query using HPJ...
[Planner] Algorithm chosen: HPJ
[Planner] Join plan: [{'step': 1, 'left_table': 'Songs', 'right_table': 'Listens', 'left_key': 'song_id', 'right_key': 'song_id'}, {'step': 2, 'left_table': '__INTERMEDIATE__', 'right_table': 'Users', 'left_key': 'user_id', 'right_key': 'user_id'}]
[Planner] Column pruning: {'Songs': ['song_id', 'title'], 'Users': ['age', 'user_id'], 'Listens': ['song_id', 'user_id']}
[Executor] Step 1: Songs ⨝ Listens on song_id=song_id (HPJ)
[Executor] Step 1 output rows: 1,000,000
[Executor] Step 2: INTERMEDIATE ⨝ Users on user_id=user_id (HPJ)
[Executor] Final joined rows (before aggregation): 1,000,000
[Executor] Query result written to temp_100MB_HPJ/hardcoded_query_result.parquet
[100MB] HPJ elapsed: 3.607s, rows_out=10,000

[100MB] Running hardcoded query using SMJ...
[Planner] Algorithm chosen: SMJ
[Planner] Join plan: [{'step': 1, 'left_table': 'Songs', 'right_table': 'Liste

Unnamed: 0,size,algorithm,rows_out,elapsed_sec
0,100MB,HPJ,10000,3.607413
1,100MB,SMJ,10000,9.18001


## 1GB Benchmark

In [None]:
# ==========================
# 1GB Benchmark
# ==========================

def _process_rss_mb():
    proc = psutil.Process(os.getpid())
    return proc.memory_info().rss / (1024 * 1024)


def benchmark_1GB(num_partitions=8):
    size_label = "1GB"

    # Generate data if it doesn't already exist
    if not (os.path.exists(f"songs_{size_label}.parquet")
            and os.path.exists(f"users_{size_label}.parquet")
            and os.path.exists(f"listens_{size_label}.parquet")):
        print(f"[1GB] Generating test data for {size_label}...")
        generate_test_data(size_label)

    print(f"[1GB] Building ColumnarDbFile tables...")
    tables_1g = build_tables_for_size(size_label, data_dir="data_1GB")

    results = []

    for algo in ("HPJ", "SMJ"):
        print(f"\n[1GB] Running hardcoded query using {algo}...")
        planner = ForcedAlgorithmPlanner(algorithm=algo, num_partitions=num_partitions)
        exec_dir = f"temp_1GB_{algo}"
        if os.path.exists(exec_dir):
            shutil.rmtree(exec_dir)
        executor = QueryExecutor(tables_1g, num_partitions=num_partitions,
                                 output_dir=exec_dir, planner=planner)

        gc.collect()
        rss_before = _process_rss_mb()
        (df_result,), elapsed = _timeit(lambda: (executor.execute_hardcoded_query(),))
        gc.collect()
        rss_after = _process_rss_mb()

        peak_est = max(rss_before, rss_after)  # crude; if you want, just report rss_after

        results.append({
            "size": "1GB",
            "algorithm": algo,
            "rows_out": len(df_result),
            "elapsed_sec": elapsed,
            "rss_before_MB": rss_before,
            "rss_after_MB": rss_after,
            "rss_est_peak_MB": peak_est,
        })
        print(f"[1GB] {algo} elapsed: {elapsed:.3f}s, rows_out={len(df_result):,}, "
              f"RSS before={rss_before:.1f}MB, after={rss_after:.1f}MB")

    return pd.DataFrame(results)


bench_1g = benchmark_1GB()
print("\n=== 1GB Benchmark Summary ===")
display(bench_1g)

[1GB] Building ColumnarDbFile tables...

[1GB] Running hardcoded query using HPJ...
[Planner] Algorithm chosen: HPJ
[Planner] Join plan: [{'step': 1, 'left_table': 'Songs', 'right_table': 'Listens', 'left_key': 'song_id', 'right_key': 'song_id'}, {'step': 2, 'left_table': '__INTERMEDIATE__', 'right_table': 'Users', 'left_key': 'user_id', 'right_key': 'user_id'}]
[Planner] Column pruning: {'Songs': ['song_id', 'title'], 'Users': ['age', 'user_id'], 'Listens': ['song_id', 'user_id']}
[Executor] Step 1: Songs ⨝ Listens on song_id=song_id (HPJ)
[Executor] Step 1 output rows: 10,000,000
[Executor] Step 2: INTERMEDIATE ⨝ Users on user_id=user_id (HPJ)
[Executor] Final joined rows (before aggregation): 10,000,000
[Executor] Query result written to temp_1GB_HPJ/hardcoded_query_result.parquet
[1GB] HPJ elapsed: 43.877s, rows_out=100,000, RSS before=3953.4MB, after=3220.0MB

[1GB] Running hardcoded query using SMJ...
[Planner] Algorithm chosen: SMJ
[Planner] Join plan: [{'step': 1, 'left_table':

Unnamed: 0,size,algorithm,rows_out,elapsed_sec,rss_before_MB,rss_after_MB,rss_est_peak_MB
0,1GB,HPJ,100000,43.876717,3953.359375,3220.007812,3953.359375
1,1GB,SMJ,100000,114.024576,3220.007812,1809.046875,3220.007812


## Performance Analysis

In [None]:
# Your implementation here