In [2]:
import polars as pl

lf = pl.scan_parquet("data/subset_part_*.parquet")
print(lf.select(pl.len()).collect())
print(lf.fetch(5))

shape: (1, 1)
┌────────┐
│ len    │
│ ---    │
│ u32    │
╞════════╡
│ 200000 │
└────────┘
shape: (5, 8)
┌──────────────┬──────────────┬──────┬─────────┬────────────┬──────────┬─────────────┬─────────────┐
│ reviewerID   ┆ unixReviewTi ┆ year ┆ overall ┆ asin       ┆ verified ┆ summary     ┆ reviewText  │
│ ---          ┆ me           ┆ ---  ┆ ---     ┆ ---        ┆ ---      ┆ ---         ┆ ---         │
│ str          ┆ ---          ┆ i64  ┆ f64     ┆ str        ┆ bool     ┆ str         ┆ str         │
│              ┆ i64          ┆      ┆         ┆            ┆          ┆             ┆             │
╞══════════════╪══════════════╪══════╪═════════╪════════════╪══════════╪═════════════╪═════════════╡
│ AVIWE1LJXCG7 ┆ 1515110400   ┆ 2018 ┆ 1.0     ┆ B017O9P72A ┆ false    ┆ Returning   ┆ Pretty      │
│ 7            ┆              ┆      ┆         ┆            ┆          ┆ to          ┆ crappy.     │
│              ┆              ┆      ┆         ┆            ┆          ┆             ┆ 

  print(lf.fetch(5))


In [3]:
from pathlib import Path
from datetime import datetime, timezone
import polars as pl


In [None]:
# parameters
YEAR_MIN, YEAR_MAX = 2018, 2021     # 4-year window only
USER_MOD, USER_KEEP = 20, 0         # only about 5% users
CHUNK_ROWS = 200_000                # rows
MAX_TOTAL_ROWS = 300_000            

OUT_DIR = Path("/kaggle/working/subset_parquet")
OUT_DIR.mkdir(parents=True, exist_ok=True)

def keep_user(user_id: str, mod: int, keep: int) -> bool:
    h = hashlib.md5(user_id.encode("utf-8")).hexdigest()
    return (int(h[:8], 16) % mod) == keep

buffer = []
written_parts = 0
kept_rows = 0
bad_json = 0

with open(JSON_PATH, "r", encoding="utf-8") as f:
    for line_no, line in enumerate(f, start=1):
        line = line.strip()
        if not line:
            continue

        try:
            r = json.loads(line)
        except json.JSONDecodeError:
            bad_json += 1
            continue

        reviewerID = r.get("reviewerID")
        unixTime   = r.get("unixReviewTime")
        rating     = r.get("overall")
        asin       = r.get("asin")
        text       = r.get("reviewText")
        verified   = r.get("verified")
        summary    = r.get("summary")

        if reviewerID is None or unixTime is None or rating is None:
            continue

        # year filter
        year = datetime.fromtimestamp(int(unixTime), timezone.utc).year
        if year < YEAR_MIN or year > YEAR_MAX:
            continue

        # stable user sample
        if USER_MOD > 1 and not keep_user(reviewerID, USER_MOD, USER_KEEP):
            continue

        buffer.append({
            "reviewerID": reviewerID,
            "unixReviewTime": int(unixTime),
            "year": year,
            "overall": float(rating),
            "asin": asin,
            "verified": bool(verified) if verified is not None else None,
            "summary": summary,
            "reviewText": text,
        })

        if len(buffer) >= CHUNK_ROWS:
            df = pl.DataFrame(buffer)
            out_path = OUT_DIR / f"subset_part_{written_parts:04d}.parquet"
            df.write_parquet(out_path)
            written_parts += 1
            kept_rows += len(buffer)
            buffer = []
            print(f"Wrote {out_path} | kept_rows={kept_rows:,} | line_no={line_no:,} | bad_json={bad_json:,}")

            if kept_rows >= MAX_TOTAL_ROWS:
                break

        if kept_rows >= MAX_TOTAL_ROWS:
            break

# flush remaining
if buffer:
    df = pl.DataFrame(buffer)
    out_path = OUT_DIR / f"subset_part_{written_parts:04d}.parquet"
    df.write_parquet(out_path)
    kept_rows += len(buffer)
    print(f"Wrote {out_path} | kept_rows={kept_rows:,} | line_no={line_no:,} | bad_json={bad_json:,}")

print("Done.")
print("Total kept rows:", kept_rows)
print("Bad JSON lines skipped:", bad_json)
print("Output files:", len(list(OUT_DIR.glob('*.parquet'))))
print("Example files:", [p.name for p in sorted(OUT_DIR.glob('*.parquet'))[:5]])


In [20]:
lf = pl.scan_parquet("data/subset_part_*.parquet")
print(lf.select(pl.len().alias("n_rows")).collect())
print(lf.fetch(5))


shape: (1, 1)
┌────────┐
│ n_rows │
│ ---    │
│ u32    │
╞════════╡
│ 200000 │
└────────┘
shape: (5, 8)
┌──────────────┬──────────────┬──────┬─────────┬────────────┬──────────┬─────────────┬─────────────┐
│ reviewerID   ┆ unixReviewTi ┆ year ┆ overall ┆ asin       ┆ verified ┆ summary     ┆ reviewText  │
│ ---          ┆ me           ┆ ---  ┆ ---     ┆ ---        ┆ ---      ┆ ---         ┆ ---         │
│ str          ┆ ---          ┆ i64  ┆ f64     ┆ str        ┆ bool     ┆ str         ┆ str         │
│              ┆ i64          ┆      ┆         ┆            ┆          ┆             ┆             │
╞══════════════╪══════════════╪══════╪═════════╪════════════╪══════════╪═════════════╪═════════════╡
│ AVIWE1LJXCG7 ┆ 1515110400   ┆ 2018 ┆ 1.0     ┆ B017O9P72A ┆ false    ┆ Returning   ┆ Pretty      │
│ 7            ┆              ┆      ┆         ┆            ┆          ┆ to          ┆ crappy.     │
│              ┆              ┆      ┆         ┆            ┆          ┆             ┆ 

  print(lf.fetch(5))
