# Event‑Processing Exercises (Python Stdlib Only)

Each section introduces a realistic “events‑in‑memory” problem and solves it using **exactly** one or more items from the cheat‑sheet you’ll want for a 60‑minute live‑coding interview.

Feel free to run each cell as‑is, tweak the synthetic data sizes, or comment things out to profile performance.


## Helper: Synthetic Event Generator

Run this once—subsequent problems re‑use the `generate_events()` helper so the data shape stays consistent.


In [2]:

import random, string, time, json, csv
from datetime import datetime, timedelta

random.seed(42)

EVENT_TYPES = ["CLICK", "VIEW", "LOGIN", "PURCHASE", "LOGOUT"]

def random_timestamp(start: datetime, end: datetime):
    """Return a random datetime between *start* and *end*."""
    delta = end - start
    random_seconds = random.randint(0, int(delta.total_seconds()))
    return start + timedelta(seconds=random_seconds)

def generate_events(n: int):
    """Generate *n* synthetic event dicts with fields id, ts, user, type."""
    start = datetime.now() - timedelta(days=1)
    end = datetime.now()
    for i in range(n):
        yield {
            "id": i,
            "ts": random_timestamp(start, end).isoformat(),
            "user": f"user{random.randint(1, 50)}",
            "type": random.choice(EVENT_TYPES),
            "payload": {
                "value": random.randint(1, 1000),
                "notes": ''.join(random.choices(string.ascii_letters, k=20))
            }
        }

# miniature sanity check
sample = next(generate_events(1))
print(sample)


{'id': 0, 'ts': '2025-06-12T08:08:53.889057', 'user': 'user8', 'type': 'CLICK', 'payload': {'value': 760, 'notes': 'olMJUevblAbkHClEQaPK'}}


---

### Problem 1 – Top‑K Event Types   (`collections.Counter`, `heapq.nlargest`)

> **Task**   Given *N* events, return the *k* most common `type` values.

These two stdlib utilities let you compute frequencies in O(N) and pick the largest K in O(M log K).


In [None]:

from collections import Counter
import heapq

events = list(generate_events(10_000))
k = 3

type_counts = Counter(e['type'] for e in events)
top_k = heapq.nlargest(k, type_counts.items(), key=lambda kv: kv[1])

print(f"Top-{k} event types:", top_k)


---

### Problem 2 – Per‑User Sliding‑Window Rate‑Limiter   (`collections.defaultdict`, `collections.deque`)

> **Task**   Allow at most **X** events per user in any rolling 60‑second window.

The `defaultdict` gives us a bucket per user; each bucket is a `deque` so we can pop expired timestamps in O(1).


In [None]:

from collections import defaultdict, deque
from datetime import datetime, timedelta

WINDOW = timedelta(seconds=60)
LIMIT = 20

buckets: dict[str, deque] = defaultdict(deque)
violations = 0

for ev in generate_events(5_000):
    user = ev['user']
    ts = datetime.fromisoformat(ev['ts'])
    dq = buckets[user]

    # expire old
    while dq and ts - dq[0] > WINDOW:
        dq.popleft()

    dq.append(ts)
    if len(dq) > LIMIT:
        violations += 1

print(f"Violations detected: {violations}")


---

### Problem 3 – LRU Cache for Event Payloads   (`collections.OrderedDict`)

> **Task**   Memoise expensive transformations of `payload` using a 128‑entry LRU.

`OrderedDict` lets us pop the *oldest* item in O(1) while keeping look‑up O(1).


In [None]:

from collections import OrderedDict
import hashlib
import json

CAPACITY = 128
cache: OrderedDict[str, dict] = OrderedDict()

def expensive_transform(payload: dict) -> dict:
    # pretend this is slow
    key = hashlib.sha256(json.dumps(payload, sort_keys=True).encode()).hexdigest()
    if key in cache:
        # mark as most recently used
        cache.move_to_end(key)
        return cache[key]

    # Fake expensive computation
    result = {**payload, "score": sum(payload.values()) if 'value' in payload else 0}

    cache[key] = result
    # evict
    if len(cache) > CAPACITY:
        cache.popitem(last=False)
    return result

# warm the cache
for ev in generate_events(300):
    expensive_transform(ev['payload'])

print(f"Cache size after warm‑up: {len(cache)} (should be ≤ {CAPACITY})")


---

### Problem 4 – Merge K Pre‑sorted Event Streams   (`heapq.merge`, `operator.itemgetter`)

> **Task**   Given *K* sorted lists of events (by timestamp), produce a single sorted iterator.

`heapq.merge` does a k‑way streaming merge in O(N log K) time without materialising intermediate lists.


In [None]:

import heapq, operator

# Build 3 sorted streams
streams = [sorted(generate_events(1_000), key=operator.itemgetter('ts')) for _ in range(3)]

merged = heapq.merge(*streams, key=operator.itemgetter('ts'))

# Pull the first 5 to show order
for _ in range(5):
    print(next(merged)['ts'])


---

### Problem 5 – Count Events in O(log N) Sliding Window   (`bisect`)

> **Task**   Insert timestamps into a **sorted** list and answer “how many events occurred in the last *W* seconds?” quickly.

`bisect_left / bisect_right` give us binary‑search insertion & range counts.


In [None]:

from bisect import bisect_left, bisect_right, insort
from datetime import datetime, timedelta

WINDOW = timedelta(minutes=5)
timestamps: list[datetime] = []

def insert(ev_ts: datetime):
    insort(timestamps, ev_ts)

def count_last_window(now: datetime) -> int:
    start = now - WINDOW
    left = bisect_left(timestamps, start)
    right = bisect_right(timestamps, now)
    return right - left

# Simulate
now = datetime.now()
for ev in generate_events(2_000):
    ts = datetime.fromisoformat(ev['ts'])
    insert(ts)

print("Events in last 5 minutes:", count_last_window(now))


---

### Problem 6 – Daily Cumulative Counts   (`itertools.groupby`, `itertools.accumulate`, `datetime`)

> **Task**   Group events by **calendar day** and emit a running total per day.

`groupby` gives batches; `accumulate` keeps a running sum for dashboards.


In [1]:

from itertools import groupby, accumulate
from datetime import datetime

events = [(datetime.fromisoformat(ev['ts']).date(), ev) for ev in generate_events(3_000)]

events.sort(key = lambda x: x[0])

for day, group in groupby(events, key= lambda x: x[0]):
    print(f"{day}: {len(list(group))}")




NameError: name 'generate_events' is not defined

---

### Problem 7 – Memoised Payload Normalisation   (`functools.lru_cache`)

> **Task**   Normalise a JSON payload (expensive) but many duplicates exist.

`lru_cache` hides the boilerplate from Problem 3.


In [None]:

from functools import lru_cache
import json, hashlib

@lru_cache(maxsize=256)
def normalise(raw_json: str) -> dict:
    data = json.loads(raw_json)
    # pretend this is heavy
    data['checksum'] = hashlib.md5(raw_json.encode()).hexdigest()
    return data

dupes = [json.dumps({'x': 1, 'y': 2})] * 10
for doc in dupes:
    normalise(doc)

print("Cache info:", normalise.cache_info())


---

### Problem 8 – Parse NDJSON → CSV   (`json`, `csv`)

> **Task**   Read newline‑delimited JSON events and write selected fields to CSV.  
> Only stdlib allowed—so no `pandas`.


In [None]:

import io, json, csv, random

# Create fake NDJSON string
ndjson_blob = '\n'.join(json.dumps(ev) for ev in generate_events(100))

# Parse & write to CSV in‑memory
csv_buf = io.StringIO()
writer = csv.writer(csv_buf)
writer.writerow(['id', 'ts', 'user', 'type'])  # header

for line in ndjson_blob.splitlines():
    ev = json.loads(line)
    writer.writerow([ev['id'], ev['ts'], ev['user'], ev['type']])

csv_content = csv_buf.getvalue().splitlines()[:5]
print("Preview CSV rows:")
for row in csv_content:
    print(row)


---

### Problem 9 – Multi‑key Sort (timestamp, type, user)   (`operator.itemgetter`)

> **Task**   Return the first 10 events ordered by `(ts, type, user)` in one pass.


In [None]:

import operator

events = list(generate_events(200))
events_sorted = sorted(events, key=operator.itemgetter('ts', 'type', 'user'))

for ev in events_sorted[:10]:
    print(ev['ts'], ev['type'], ev['user'])


---

### Problem 10 – Finding the Median Event `value` on the Fly   (`heapq` again, but *dual* heaps)

> **Task**   Stream events and be able to query the median of the integer `payload['value']` at any time.

A classic interview favourite that uses **two heaps**—max‑heap for the lower half, min‑heap for the upper half.


In [None]:

import heapq

low, high = [], []  # max‑heap (invert values) & min‑heap

def add(value: int):
    if not low or value <= -low[0]:
        heapq.heappush(low, -value)
    else:
        heapq.heappush(high, value)

    # rebalance
    if len(low) > len(high) + 1:
        heapq.heappush(high, -heapq.heappop(low))
    elif len(high) > len(low):
        heapq.heappush(low, -heapq.heappop(high))

def median() -> float:
    if len(low) == len(high):
        return (-low[0] + high[0]) / 2
    return float(-low[0])

for ev in generate_events(1_000):
    add(ev['payload']['value'])

print("Median payload value:", median())
