In [None]:
def stream_jsonl(path: Path):
    for obj in read_jsonl(str(path)):
        yield obj


def map_stream(iterable, fn: Callable[[Dict[str, Any]], Dict[str, Any]]):
    for item in iterable:
        yield fn(item)


def filter_stream(iterable, pred: Callable[[Dict[str, Any]], bool]):
    for item in iterable:
        if pred(item):
            yield item


def take_n(iterable, n: int):
    i = 0
    for item in iterable:
        if i >= n:
            break
        yield item
        i += 1



In [None]:
from collections import deque
from typing import Iterable, List


def moving_average_bounded(values: Iterable[float], k: int) -> Iterable[float]:
    if k <= 0:
        return
    window = deque(maxlen=k)
    total = 0.0
    for v in values:
        if len(window) == k:
            total -= window[0]
        window.append(v)
        total += v
        yield total / len(window)


def small_right_join(left: Iterable[Dict[str, Any]], right_lookup: Dict[str, Any], key: str = "id") -> Iterable[Dict[str, Any]]:
    for row in left:
        rid = row.get(key)
        meta = right_lookup.get(rid)
        out = dict(row)
        if meta is not None:
            out["meta"] = meta
        yield out



In [None]:
# --- Autograder checks (do not modify) ---
jsonl_path = data_path("toy/toy_jsonl.jsonl")

with timer("stream_count"):
    count = sum(1 for _ in stream_jsonl(jsonl_path))
chk1 = lambda: assert_equal("JSONL stream count", count, 3)

# map + filter pipeline
s = stream_jsonl(jsonl_path)
s2 = map_stream(s, lambda d: {**d, "text": d["text"].upper()})
s3 = filter_stream(s2, lambda d: d["value"] > 0.5)
res = list(take_n(s3, 10))
chk2 = lambda: assert_equal("filter count >0.5", len(res), 1)
chk3 = lambda: assert_equal("upper text", res[0]["text"], "BETA")

# moving average bounded
ma = list(moving_average_bounded([1,2,3,4], k=2))
chk4 = lambda: assert_equal("moving average k=2", ma, [1.0, 1.5, 2.5, 3.5])

# small right join
right = {1: {"tag": "A"}, 3: {"tag": "C"}}
joined = list(small_right_join(stream_jsonl(jsonl_path), right, key="id"))
chk5 = lambda: assert_true("join added meta for id=1", any(x.get("id") == 1 and "meta" in x for x in joined))
chk6 = lambda: assert_true("join no meta for id=2", any(x.get("id") == 2 and "meta" not in x for x in joined))

run_checks(chk1, chk2, chk3, chk4, chk5, chk6)

