# Setup (path, imports, helpers)

In [1]:
from __future__ import annotations

import os
import sys
import gc
import time
from dataclasses import dataclass
from typing import Callable, Any

from memory_profiler import memory_usage

NOTEBOOK_DIR = os.getcwd()
REPO_ROOT = os.path.abspath(os.path.join(NOTEBOOK_DIR, ".."))

if REPO_ROOT not in sys.path:
    sys.path.insert(0, REPO_ROOT)

DATASET_PATH = os.path.join(REPO_ROOT, "data", "farmers-protest-tweets-2021-2-4.json")
assert os.path.exists(DATASET_PATH), f"Dataset not found at: {DATASET_PATH}"

# Import challenge functions
from src.q1_time import q1_time
from src.q1_memory import q1_memory

from src.q2_time import q2_time
from src.q2_memory import q2_memory

from src.q3_time import q3_time
from src.q3_memory import q3_memory


@dataclass
class BenchResult:
    name: str
    mean_s: float
    sd_s: float
    mean_peak_mib: float
    sd_peak_mib: float
    result_preview: Any

def bench(fn: Callable[[str], Any], path: str, name: str, repeats: int = 1) -> BenchResult:
    """
    Measures wall-clock time and peak RSS delta (MiB) using memory_profiler.

    - repeats: number of independent runs.
    - reports mean ¬± standard deviation for time and peak RSS delta across runs.
    """
    times = []
    peaks = []
    last_out = None

    for _ in range(repeats):
        gc.collect()
        t0 = time.perf_counter()

        mem_trace, out = memory_usage((fn, (path,)), retval=True, interval=0.05, timeout=None)
        t1 = time.perf_counter()

        last_out = out
        times.append(t1 - t0)
        peaks.append(max(mem_trace) - min(mem_trace))  # delta MiB during this call

    # compute stats (stdev requires at least 2 samples)
    mean_s = sum(times) / len(times)
    sd_s = (sum((x - mean_s) ** 2 for x in times) / (len(times) - 1)) ** 0.5 if len(times) > 1 else 0.0

    mean_peak = sum(peaks) / len(peaks)
    sd_peak = (sum((x - mean_peak) ** 2 for x in peaks) / (len(peaks) - 1)) ** 0.5 if len(peaks) > 1 else 0.0

    preview = last_out[:3] if isinstance(last_out, list) else last_out

    return BenchResult(
        name=name,
        mean_s=float(mean_s),
        sd_s=float(sd_s),
        mean_peak_mib=float(mean_peak),
        sd_peak_mib=float(sd_peak),
        result_preview=preview,
    )


## Reproducibility
This notebook reports the Python/runtime environment to make results comparable across machines.

In [2]:
import sys, platform
import emoji
import pandas as pd
import memory_profiler

print("Python:", sys.version)
print("Platform:", platform.platform())
print("emoji:", getattr(emoji, "__version__", "unknown"))
print("pandas:", pd.__version__)
print("memory_profiler:", getattr(memory_profiler, "__version__", "unknown"))


Python: 3.13.3 (tags/v3.13.3:6280bb5, Apr  8 2025, 14:47:33) [MSC v.1943 64 bit (AMD64)]
Platform: Windows-11-10.0.26100-SP0
emoji: 2.10.0
pandas: 2.3.3
memory_profiler: 0.61.0


# Data Engineer Challenge ‚Äî Benchmark Notebook

## Dataset
- Format: NDJSON (one JSON object per line).
- Path used in this notebook: `data/farmers-protest-tweets-2021-2-4.json`.

## Field assumptions (based on the dataset structure)
- **Tweet text** for Q2: Use the canonical `content` field.
- **Mentions** for Q3: Treat `mentionedUsers` as the canonical structured signal for mentions (when present).

These assumptions reduce ambiguity and make results reproducible across environments.

## Benchmark methodology
The benchmark reports:
- **Wall-clock time** (seconds) using `time.perf_counter()`.
- **Peak memory delta (MiB)** during the function call using `memory_profiler.memory_usage()`:
  - Take `max(trace) - min(trace)` as an approximation of incremental RSS during execution.

To reduce noise:
- Run each function **N times** (default: `repeats=5`) and report **mean ¬± standard deviation** for time and memory.
- Call `gc.collect()` before each run.

Limitations:
- RSS-based memory measurements vary by OS and Python allocator behavior. Values are intended for relative comparisons within the same environment.

Notes on interpretation:
- RSS deltas can be **near-zero** or differ by only a few KB due to allocator reuse (pymalloc), fragmentation, and OS behavior.
- Therefore, memory-oriented variants are justified primarily by **design choices** (streaming iteration / bounded aggregation domains), not by tiny RSS deltas.

## Approach overview 

### Q1 ‚Äî Top dates and most active user per date
**Inputs used:**
- `record["date"]` (ISO datetime)
- `record["user"]["username"]`

**Outputs:**
- Top 10 dates by tweet count.
- For each selected date, the most active user (with deterministic tie-breaking).

**q1_time**
- Single pass; builds a full `date -> {user -> count}` map for all dates.
- Pros: minimal post-processing, straightforward selection.
- Cons: higher memory (stores per-user counts for every date).

**q1_time ‚Äî stages**
1) **Streaming read + parsing:** iterate NDJSON lines; ignore empty/malformed JSON lines.
2) **Normalization:** parse `date` to a `datetime.date`; extract `username`.
3) **Aggregation:** update `date -> {user -> count}` for all dates.
4) **Selection:** compute top-10 dates by tweet count.
5) **Per-date winner:** for each selected date, choose the max count user; break ties lexicographically.

**q1_memory**
- Two passes:
  1) Count tweets per date only.
  2) For the top-10 dates, count users and select most active.
- Pros: lower memory peak by restricting per-user counting to the top-10 dates.
- Cons: reads the dataset twice (more I/O).

**q1_memory ‚Äî stages**
1) **Pass 1 (date volume only):** streaming read; count tweets per `date` only.
2) **Top dates selection:** identify top-10 dates by tweet count.
3) **Pass 2 (bounded per-user counts):** streaming read again; only for records whose `date` is in top-10, update `user -> count`.
4) **Per-date winner:** select most active user per date with deterministic tie-breaking.


### Q2 ‚Äî Top emojis
**Input used:**
- `record["content"]`

**Outputs:**
- Top 10 emojis by total usage across all tweets, ordered deterministically (count desc, emoji asc).

**q2_time:** 
- uses `emoji.emoji_list(text)`, which materializes a list of emoji matches per record.

**q2_time ‚Äî stages**
1) **Streaming read + parsing:** iterate NDJSON; ignore empty/malformed JSON lines.
2) **Text extraction:** use `content` as the canonical text.
3) **Emoji extraction:** call `emoji.emoji_list(text)` (materializes a list of matches per record).
4) **Aggregation:** increment global counts per emoji.
5) **Selection:** sort counts (desc) and emoji (asc), take top-10.

**q2_memory:** 
- uses `emoji.analyze(text)` to stream emoji tokens without per-record list materialization.

**q2_memory ‚Äî stages**
1) **Streaming read + parsing:** iterate NDJSON; ignore empty/malformed JSON lines.
2) **Text extraction:** use `content` as the canonical text.
3) **Emoji streaming:** iterate `emoji.analyze(text)` tokens (no per-record list materialization).
4) **Aggregation:** increment global counts per emoji token.
5) **Selection:** deterministic top-10.

Given that memory is measured as RSS delta, small differences (KB-range) are often dominated by allocator noise. The memory optimization is primarily justified mechanically by streaming iteration.


### Q3 ‚Äî Top mentioned users
**Inputs used:**
- Primary: `record["mentionedUsers"]` (structured list of mentioned usernames)
- Secondary (time only): parse `record["content"]` with a mention regex if `mentionedUsers` is missing/empty.

**Outputs:**
- Top 10 mentioned usernames across the dataset (count desc, username asc).

**q3_time**
- Uses `mentionedUsers` as primary (fast, structured).
- Falls back to regex parsing of `content` only when structured mentions are missing/empty (higher recall).

**q3_time ‚Äî stages**
1) **Streaming read + parsing:** iterate NDJSON; ignore empty/malformed JSON lines.
2) **Canonical extraction:** if `mentionedUsers` exists and is non-empty, count those usernames directly.
3) **Fallback extraction:** if `mentionedUsers` is missing/empty, apply regex to `content` to increase recall.
4) **Aggregation:** increment global counts per mentioned username.
5) **Selection:** deterministic top-10.

**q3_memory**
- Uses only `mentionedUsers` as the canonical signal.
- Rationale: minimizes temporary allocations and avoids ambiguous false positives from raw-text parsing (emails/URLs/text artifacts).

**q3_memory ‚Äî stages**
1) **Streaming read + parsing:** iterate NDJSON; ignore empty/malformed JSON lines.
2) **Canonical-only extraction:** count only `mentionedUsers` usernames.
3) **Aggregation + selection:** deterministic top-10.


# Correctness parity checks (time vs memory variants must match)

In [3]:
q1_time_res = q1_time(DATASET_PATH)
q1_memory_res = q1_memory(DATASET_PATH)
q2_time_res = q2_time(DATASET_PATH)
q2_memory_res = q2_memory(DATASET_PATH)
q3_time_res = q3_time(DATASET_PATH)
q3_memory_res = q3_memory(DATASET_PATH)

print("Q1 time (preview):", q1_time_res[:3])
print("Q1 memory (preview):", q1_memory_res[:3])

print("Q2 time (preview):", q2_time_res[:3])
print("Q2 memory (preview):", q2_memory_res[:3])

print("Q3 time (preview):", q3_time_res[:3])
print("Q3 memory (preview):", q3_memory_res[:3])

assert q1_time_res == q1_memory_res, "Q1 mismatch between time and memory variants"
assert q2_time_res == q2_memory_res, "Q2 mismatch between time and memory variants"
assert q3_time_res == q3_memory_res, "Q3 mismatch between time and memory variants"
print("OK: time vs memory variants match for Q1/Q2/Q3.")

Q1 time (preview): [(datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur')]
Q1 memory (preview): [(datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur')]
Q2 time (preview): [('üôè', 5049), ('üòÇ', 3072), ('üöú', 2972)]
Q2 memory (preview): [('üôè', 5049), ('üòÇ', 3072), ('üöú', 2972)]
Q3 time (preview): [('narendramodi', 2265), ('Kisanektamorcha', 1840), ('RakeshTikaitBKU', 1644)]
Q3 memory (preview): [('narendramodi', 2265), ('Kisanektamorcha', 1840), ('RakeshTikaitBKU', 1644)]
OK: time vs memory variants match for Q1/Q2/Q3.


# Benchmark (time and memory)

In [4]:
benchmarks = [
    (q1_time,   "Q1 time"),
    (q1_memory, "Q1 memory"),
    (q2_time,   "Q2 time"),
    (q2_memory, "Q2 memory"),
    (q3_time,   "Q3 time"),
    (q3_memory, "Q3 memory"),
]

results = []
for fn, name in benchmarks:
    r = bench(fn, DATASET_PATH, name=name, repeats=5) 
    results.append(r)

results


[BenchResult(name='Q1 time', mean_s=3.6089521399931983, sd_s=0.27005408844711903, mean_peak_mib=0.9296875, sd_peak_mib=0.44858991312361224, result_preview=[(datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur')]),
 BenchResult(name='Q1 memory', mean_s=6.18068759997841, sd_s=0.1768491259211185, mean_peak_mib=1.20390625, sd_peak_mib=0.6073270374115529, result_preview=[(datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur')]),
 BenchResult(name='Q2 time', mean_s=10.192137160012498, sd_s=0.17357495320130495, mean_peak_mib=0.01171875, sd_peak_mib=0.013531646934131853, result_preview=[('üôè', 5049), ('üòÇ', 3072), ('üöú', 2972)]),
 BenchResult(name='Q2 memory', mean_s=11.365693859988824, sd_s=0.08328115398665827, mean_peak_mib=0.0, sd_peak_mib=0.0, result_preview=[('üôè', 5049), ('üòÇ', 3072), ('üöú', 2972)]),

# Results Table

In [5]:
import pandas as pd

df = pd.DataFrame([{
    "task": r.name,
    "time_mean_s": round(r.mean_s, 4),
    "time_sd_s": round(r.sd_s, 4),
    "peak_mem_mean_mib": round(r.mean_peak_mib, 3),
    "peak_mem_sd_mib": round(r.sd_peak_mib, 3),
    "result_preview": r.result_preview
} for r in results]).sort_values("task")

df


Unnamed: 0,task,time_mean_s,time_sd_s,peak_mem_mean_mib,peak_mem_sd_mib,result_preview
1,Q1 memory,6.1807,0.1768,1.204,0.607,"[(2021-02-12, RanbirS00614606), (2021-02-13, M..."
0,Q1 time,3.609,0.2701,0.93,0.449,"[(2021-02-12, RanbirS00614606), (2021-02-13, M..."
3,Q2 memory,11.3657,0.0833,0.0,0.0,"[(üôè, 5049), (üòÇ, 3072), (üöú, 2972)]"
2,Q2 time,10.1921,0.1736,0.012,0.014,"[(üôè, 5049), (üòÇ, 3072), (üöú, 2972)]"
5,Q3 memory,3.2171,0.1169,0.0,0.0,"[(narendramodi, 2265), (Kisanektamorcha, 1840)..."
4,Q3 time,3.215,0.0896,0.05,0.058,"[(narendramodi, 2265), (Kisanektamorcha, 1840)..."


## Results and Discussion

### Q1
The time-oriented implementation (`q1_time`) is faster than the memory-oriented version, as expected due to the two-pass strategy in `q1_memory`.

In this dataset, the RSS delta does not show a memory advantage for `q1_memory`. This is consistent with the fact that the memory-oriented approach provides a worst-case bounding strategy, while observed gains depend on the data distribution and the coarseness of RSS-based measurement.

This outcome is expected given the dataset characteristics:
- The number of distinct dates and per-date user distributions do not cause the full `date ‚Üí user ‚Üí count` structure to grow excessively.
- As a result, the two-pass strategy in `q1_memory` does not yield a memory advantage here, while it incurs additional I/O and processing cost.

This highlights an important point: **memory-oriented designs provide worst-case guarantees, but their benefits depend on data distribution**.

### Q2
Runtime is dominated by emoji parsing. In this run, the memory-oriented variant is slightly slower, which is consistent with different constant factors between `emoji_list()` and `analyze()`.

The memory-oriented variant avoids per-record list materialization by streaming emoji tokens via `emoji.analyze(text)`. For RSS delta, small differences (including near-zero values) may be dominated by allocator/OS behavior; the memory justification is therefore mechanical (streaming iteration) rather than marginal RSS deltas.

This confirms that:
- Both approaches share the same asymptotic complexity.
- The memory-oriented implementation avoids per-record list materialization by design via streaming iteration (`emoji.analyze`).

For RSS delta, the observed differences (including near-zero values) may be dominated by allocator/OS behavior; consequently, the memory justification is primarily mechanical (streaming iteration) rather than marginal RSS deltas.

### Q3
Runtime differences between the two versions are negligible.
The time-oriented implementation includes a fallback to parse mentions from raw text, but in practice this path is rarely triggered because the dataset provides structured `mentionedUsers` metadata.

The memory-oriented version avoids text parsing entirely and relies only on the canonical structured signal, resulting in the lowest observed memory overhead.

### Overall
Across all questions:
- Differences between *time* and *memory* implementations are driven by **engineering tradeoffs and constant factors**, not asymptotic complexity.
- Dataset characteristics strongly influence whether a memory-oriented strategy yields visible gains.
- Explicitly separating these approaches makes the tradeoffs transparent and reproducible.

## Conclusions
- Q1 shows a clear tradeoff: the memory variant bounds the aggregation domain (top dates) at the cost of a second pass.
- Q2 runtime is dominated by emoji parsing; the memory-oriented variant avoids per-record list materialization by design, with modest differences in runtime driven by constant factors.
- Q3 memory variant can be faster by relying only on canonical structured mentions and avoiding regex fallback.

Benchmarks confirm correctness parity across variants; RSS deltas are reported as a coarse proxy and may be near-zero due to allocator/OS effects.
