# Rev2 Docs Analysis Notebook

This notebook analyzes the JSON file at:

`/Users/aaronsteiner/Downloads/rev2_docs_since_2020_01_01.json`

It reports:
- File size and line count
- Number of records
- Top-level field presence and type distribution
- Length statistics for string and list fields; range and mean for numeric fields
- A small sample of records

No external dependencies are required beyond the Python standard library.


In [1]:
from __future__ import annotations
import json
import os
import sys
from collections import Counter, defaultdict
from statistics import mean
from typing import Any, Dict, Iterable, List, Tuple, Union

FILE_PATH = "/Users/aaronsteiner/Downloads/rev2_docs_since_2020_01_01.json"

print(f"Python {sys.version}")
print(f"Analyzing: {FILE_PATH}")

# Basic file metadata
try:
    file_size_bytes = os.path.getsize(FILE_PATH)
    file_size_mb = file_size_bytes / (1024 * 1024)
    print(f"File size: {file_size_bytes:,} bytes ({file_size_mb:.2f} MB)")
except FileNotFoundError:
    print("File not found. Check FILE_PATH.")
    raise

# Count lines efficiently
line_count = 0
with open(FILE_PATH, "rb") as f:
    for _ in f:
        line_count += 1
print(f"Approx line count (for NDJSON): {line_count:,}")


Python 3.12.8 | packaged by Anaconda, Inc. | (main, Dec 11 2024, 10:37:40) [Clang 14.0.6 ]
Analyzing: /Users/aaronsteiner/Downloads/rev2_docs_since_2020_01_01.json
File size: 5,014,752,861 bytes (4782.44 MB)
Approx line count (for NDJSON): 6,309,224


In [3]:
# Helper functions

def try_parse_json_lines(path: str) -> List[Dict[str, Any]]:
    """Attempt to parse file as NDJSON (one JSON document per line)."""
    records: List[Dict[str, Any]] = []
    with open(path, "r", encoding="utf-8") as f:
        for idx, line in enumerate(f, start=1):
            line = line.strip()
            if not line:
                continue
            try:
                obj = json.loads(line)
                if isinstance(obj, dict):
                    records.append(obj)
                else:
                    # If it is an array per line, extend by dicts only
                    if isinstance(obj, list):
                        records.extend([x for x in obj if isinstance(x, dict)])
            except json.JSONDecodeError as e:
                # Fail fast: not valid NDJSON
                raise ValueError(f"Not NDJSON at line {idx}: {e}")
    return records


def try_parse_json_array(path: str) -> List[Dict[str, Any]]:
    """Attempt to parse file as a single JSON array of objects."""
    with open(path, "r", encoding="utf-8") as f:
        data = json.load(f)
    if isinstance(data, list):
        return [x for x in data if isinstance(x, dict)]
    if isinstance(data, dict):
        # If it's a dict with a key that holds an array
        for k, v in data.items():
            if isinstance(v, list) and all(isinstance(x, dict) for x in v):
                return v
    raise ValueError("File is neither NDJSON nor a JSON array of objects.")


def load_records(path: str) -> List[Dict[str, Any]]:
    """Load records trying NDJSON first, then JSON array."""
    try:
        return try_parse_json_lines(path)
    except Exception:
        return try_parse_json_array(path)


records = load_records(FILE_PATH)
print(f"Parsed records: {len(records):,}")
print("First keys sample:", list(records[0].keys()) if records else [])


Parsed records: 6,309,224
First keys sample: ['shop_id', 'ean', 'mpnr', 'price', 'product_id', 'name', 'cat_id', 'id', 'brand', 'shop_cat', 'aid', 'dlv_time', 'desc']


In [None]:
# Field presence and type statistics

def type_name(value: Any) -> str:
    if value is None:
        return "null"
    if isinstance(value, bool):
        return "bool"
    if isinstance(value, int) and not isinstance(value, bool):
        return "int"
    if isinstance(value, float):
        return "float"
    if isinstance(value, str):
        return "str"
    if isinstance(value, list):
        return "list"
    if isinstance(value, dict):
        return "dict"
    return type(value).__name__

field_presence: Counter[str] = Counter()
field_type_counts: Dict[str, Counter[str]] = defaultdict(Counter)

for rec in records:
    for key, val in rec.items():
        field_presence[key] += 1
        field_type_counts[key][type_name(val)] += 1

num_records = len(records)
print(f"Total records: {num_records:,}")
print("Top 20 fields by presence:")
for key, cnt in field_presence.most_common(20):
    pct = (cnt / num_records * 100) if num_records else 0
    print(f"- {key}: {cnt} ({pct:.1f}%) | types: {dict(field_type_counts[key])}")


In [None]:
# Length statistics for strings/lists and numeric summaries

from math import isnan

string_lengths: Dict[str, List[int]] = defaultdict(list)
list_lengths: Dict[str, List[int]] = defaultdict(list)
numeric_values: Dict[str, List[float]] = defaultdict(list)

for rec in records:
    for key, val in rec.items():
        if isinstance(val, str):
            string_lengths[key].append(len(val))
        elif isinstance(val, list):
            list_lengths[key].append(len(val))
        elif isinstance(val, (int, float)) and not isinstance(val, bool):
            if isinstance(val, float) and isnan(val):
                continue
            numeric_values[key].append(float(val))


def summarize_lengths(values: List[int]) -> Dict[str, Union[int, float]]:
    if not values:
        return {}
    return {
        "count": len(values),
        "min": min(values),
        "max": max(values),
        "mean": round(mean(values), 3),
    }


def summarize_numeric(values: List[float]) -> Dict[str, Union[int, float]]:
    if not values:
        return {}
    return {
        "count": len(values),
        "min": round(min(values), 6),
        "max": round(max(values), 6),
        "mean": round(mean(values), 6),
    }

print("String field length summaries (top 20 by count):")
for key, vals in sorted(string_lengths.items(), key=lambda kv: len(kv[1]), reverse=True)[:20]:
    print(key, summarize_lengths(vals))

print("\nList field length summaries (top 20 by count):")
for key, vals in sorted(list_lengths.items(), key=lambda kv: len(kv[1]), reverse=True)[:20]:
    print(key, summarize_lengths(vals))

print("\nNumeric field summaries (top 20 by count):")
for key, vals in sorted(numeric_values.items(), key=lambda kv: len(kv[1]), reverse=True)[:20]:
    print(key, summarize_numeric(vals))


In [None]:
# Show some samples

SAMPLE_COUNT = 3
for i, rec in enumerate(records[:SAMPLE_COUNT], start=1):
    print(f"Record {i}:")
    for k, v in list(rec.items())[:20]:
        print(f"  {k}: {str(v)[:200]}")
    print("-")


In [4]:
# Unique product counts and text length statistics

from math import isnan
from statistics import mean
from typing import Union

hashable_types = (str, int, float)

unique_values: Dict[str, set] = {
    "product_id": set(),
    "ean": set(),
    "mpnr": set(),
    "id": set(),
}

name_lengths: List[int] = []
desc_lengths: List[int] = []

for rec in records:
    for key in unique_values.keys():
        val = rec.get(key)
        if isinstance(val, float) and isnan(val):
            continue
        if isinstance(val, hashable_types) and val is not None and val != "":
            unique_values[key].add(val)

    name_val = rec.get("name")
    if isinstance(name_val, str):
        name_lengths.append(len(name_val))

    desc_val = rec.get("desc")
    if isinstance(desc_val, str):
        desc_lengths.append(len(desc_val))

unique_counts = {k: len(v) for k, v in unique_values.items()}

# Heuristic: pick the best available unique identifier
primary_unique_field_used = None
if unique_counts.get("product_id", 0) > 0:
    primary_unique_field_used = "product_id"
elif unique_counts.get("ean", 0) > 0:
    primary_unique_field_used = "ean"
elif unique_counts.get("id", 0) > 0:
    primary_unique_field_used = "id"
else:
    primary_unique_field_used = "mpnr"

primary_unique_product_count = unique_counts.get(primary_unique_field_used, 0)

print("Unique value counts by field:")
for k, v in unique_counts.items():
    print(f"- {k}: {v:,}")

print(f"\nAssumed primary product key: {primary_unique_field_used}")
print(f"Unique products (by {primary_unique_field_used}): {primary_unique_product_count:,}")


def safe_summary(values: List[int]) -> Dict[str, Union[int, float]]:
    if not values:
        return {"count": 0, "min": 0, "max": 0, "mean": 0.0}
    return {
        "count": len(values),
        "min": min(values),
        "max": max(values),
        "mean": round(mean(values), 3),
    }

title_length_summary = safe_summary(name_lengths)
desc_length_summary = safe_summary(desc_lengths)

print("\nTitle length summary (field: 'name'):")
print(title_length_summary)

print("\nDescription length summary (field: 'desc'):")
print(desc_length_summary)


Unique value counts by field:
- product_id: 692,495
- ean: 694,941
- mpnr: 970,957
- id: 6,309,216

Assumed primary product key: product_id
Unique products (by product_id): 692,495

Title length summary (field: 'name'):
{'count': 6309224, 'min': 0, 'max': 30595, 'mean': 56.126}

Description length summary (field: 'desc'):
{'count': 6308967, 'min': 0, 'max': 32259, 'mean': 423.009}


In [5]:
# Distribution of offers per product (quantiles)

from collections import Counter
from math import ceil, isnan
from statistics import mean as stat_mean

# Decide which product key to group by
candidate_keys = [
    locals().get("primary_unique_field_used"),
    "product_id",
    "ean",
    "id",
    "mpnr",
]
product_key = next((k for k in candidate_keys if k and any(isinstance(rec.get(k), (str, int, float)) for rec in records)), None)
if product_key is None:
    raise ValueError("Could not determine a suitable product key to group by.")

counts_by_product: Counter = Counter()
for rec in records:
    val = rec.get(product_key)
    if isinstance(val, float) and isnan(val):
        continue
    if isinstance(val, (str, int, float)) and val is not None and val != "":
        counts_by_product[val] += 1

counts = list(counts_by_product.values())
num_products = len(counts)
num_offers = sum(counts)

print(f"Grouping key: {product_key}")
print(f"Distinct products: {num_products:,}")
print(f"Total offers (records with a {product_key}): {num_offers:,}")
print(f"Mean offers per product: {stat_mean(counts):.3f}")

if not counts:
    raise ValueError("No counts available to compute quantiles.")

counts_sorted = sorted(counts)

def percentile(sorted_values, p):
    if not sorted_values:
        return None
    n = len(sorted_values)
    # Nearest-rank method
    rank = ceil(p / 100.0 * n)
    idx = max(0, min(n - 1, rank - 1))
    return sorted_values[idx]

quantiles = {
    "p50": percentile(counts_sorted, 50),
    "p75": percentile(counts_sorted, 75),
    "p90": percentile(counts_sorted, 90),
    "p95": percentile(counts_sorted, 95),
    "p99": percentile(counts_sorted, 99),
    "min": counts_sorted[0],
    "max": counts_sorted[-1],
}

print("Quantiles (offers per product):")
for k in ["min", "p50", "p75", "p90", "p95", "p99", "max"]:
    print(f"- {k}: {quantiles[k]}")

# Show top-N products with most offers
TOP_N = 10
print(f"\nTop {TOP_N} products by number of offers:")
for prod, cnt in counts_by_product.most_common(TOP_N):
    print(f"- {prod}: {cnt}")


Grouping key: product_id
Distinct products: 692,495
Total offers (records with a product_id): 6,309,224
Mean offers per product: 9.111
Quantiles (offers per product):
- min: 1
- p50: 4
- p75: 7
- p90: 13
- p95: 23
- p99: 94
- max: 4609

Top 10 products by number of offers:
- 361833689: 4609
- 1715666714: 4555
- 474165642: 4441
- 620779739: 4404
- 82250313: 4344
- 933916144: 4333
- 1812895161: 4317
- 466781994: 4283
- 607686873: 4263
- 254381862: 4205
