In [None]:
import argparse
import json
import re
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional

import pandas as pd
import yaml

# ------------------------
# Configuration
# ------------------------
SPEC_KEYS_MAP = {
    # Display / Panel
    "Display Technology": "display_technology",
    "Display Type": "display_technology",
    "LED Panel Type": "panel_type",
    "Backlight Type": "backlight_type",
    "High Dynamic Range (HDR)": "hdr",
    "High Dynamic Range Format": "hdr_formats",
    "Resolution": "resolution",
    "Refresh Rate": "refresh_rate",
    "Aspect Ratio": "aspect_ratio",
    "Standing screen display size": "screen_size",
    "Screen Size": "screen_size",
    "Screen Size Class": "screen_size_class",

    # Connectivity
    "Connectivity Technology": "connectivity",
    "Number of HDMI Inputs (Total)": "hdmi_inputs",
    "Number of HDMI Inputs": "hdmi_inputs",
    "HDMI number": "hdmi_inputs",
    "USB Ports": "usb_ports",
    "Number Of USB Port(s) (Total)": "usb_ports",
    "Wireless Connectivity": "wireless",
    "Wireless technology": "wireless",

    # Audio / Features
    "Built-In Speakers": "built_in_speakers",
    "Special Features": "special_features",
    "Motion Enhancement Technology": "motion_tech",

    # Physical / Dimensions
    "Product Dimensions": "product_dimensions",
    "Item Weight": "item_weight",
    "Product Width": "width",
    "Product Height": "height",
    "Product Depth": "depth",
    "Product Height Without Stand": "height_without_stand",
    "Product Depth Without Stand": "depth_without_stand",
    "Product Weight Without Stand": "weight_without_stand",

    # Mounting
    "VESA Wall Mount Standard": "vesa",
    "Vesa mounting pattern": "vesa",

    # Platform / OS
    "Smart Platform": "smart_platform",
    "Platform": "smart_platform",

    # Identity
    "Brand Name": "brand",
    "Brand": "brand",
    "Model Number": "model_number",
    "Item model number": "model_number",
    "ASIN": "asin",
    "UPC": "upc",
}

COLUMNS = [
    "source", "domain", "url", "title",
    "brand", "model_number", "model_name", "series",
    "asin", "sku", "upc", "gtin",

    # Pricing
    "final_price", "initial_price", "currency", "discount", "deal_type",

    # Availability & logistics
    "availability_text", "is_available", "return_policy", "delivery_info",

    # Reviews
    "rating", "reviews_count", "top_review", "customers_say",

    # Categories & seller/store
    "categories", "badges", "seller_name", "seller_rating", "store_name", "store_location",

    # Display block
    "screen_size_inches", "screen_size_class", "resolution", "refresh_rate", "aspect_ratio",
    "display_technology", "panel_type", "backlight_type", "hdr", "hdr_formats",

    # Connectivity block
    "smart_platform", "voice_assistants", "wireless", "connectivity", "hdmi_inputs", "usb_ports",

    # Physical block
    "product_dimensions", "item_weight", "height", "width", "depth",
    "height_without_stand", "depth_without_stand", "weight_without_stand", "vesa",

    # Content
    "features", "description", "images", "image_count",

    # Misc
    "timestamp", "bought_past_month", "amazon_prime", "badges_all", "raw_specs_json"
]

DATA_DICTIONARY = {
    "description": "Unified competitor product schema across Amazon, Walmart, and Best Buy.",
    "columns": {
        c: "See inline comments in code for meaning" for c in COLUMNS
    },
    "notes": [
        "raw_specs_json stores the original spec key/value bag from each source for traceability.",
        "images stores a semicolon-separated list of image URLs.",
        "categories stores a semicolon-separated list of category labels.",
    ],
}

# ------------------------
# Helpers
# ------------------------


def to_str(val):
    """Coerce any value to a string suitable for CSV/Parquet.
    - list -> 'a; b; c'
    - dict -> JSON string
    - None -> None
    - scalar -> str(scalar)
    """
    if val is None:
        return None
    if isinstance(val, list):
        return "; ".join(str(v) for v in val if v is not None)
    if isinstance(val, dict):
        try:
            return json.dumps(val, ensure_ascii=False)
        except Exception:
            return str(val)
    return str(val)

def to_float(val: Any) -> Optional[float]:
    if val is None:
        return None
    if isinstance(val, (int, float)):
        return float(val)
    if isinstance(val, str):
        s = re.sub(r"[^0-9.]+", "", val)
        return float(s) if s else None
    return None

def first_non_empty(*vals):
    for v in vals:
        if v is not None and v != "" and v != []:
            return v
    return None

def unwrap_richtext_if_needed(obj: Any) -> List[Dict[str, Any]]:
    """
    Best Buy / Walmart exports sometimes use a dict like:
    { "<GUID>:content": { "type": "richtext", "value": { "text": "[ { ... }, { ... } ]" }}}
    This attempts to unwrap into a Python list.
    """
    if isinstance(obj, list):
        return obj
    if isinstance(obj, dict):
        try:
            first_key = next(iter(obj))
            txt = obj[first_key]["value"]["text"]
            txt_clean = txt.replace("[This Output has been Truncated]", "")
            parsed = json.loads(txt_clean)
            if isinstance(parsed, list):
                return parsed
        except Exception:
            # fallback: find any list inside values
            for v in obj.values():
                if isinstance(v, list):
                    return v
    return []

def collect_specs(item: Dict[str, Any]) -> Dict[str, Any]:
    """
    Collapse all source-specific spec arrays into a single flat dict.
    Supports:
      - Amazon: product_details [{ type, value }]
      - Walmart: specifications [{ name, value }]
      - Best Buy: product_specifications [{ specification_name, specification_value }]
    """
    bag = {}
    for d in item.get("product_details", []) or []:
        k, v = d.get("type"), d.get("value")
        if k: bag[k] = v
    for s in item.get("specifications", []) or []:
        k, v = s.get("name"), s.get("value")
        if k: bag[k] = v
    for s in item.get("product_specifications", []) or []:
        k, v = s.get("specification_name"), s.get("specification_value")
        if k: bag[k] = v
    return bag

def normalize_specs(specs_flat: Dict[str, Any]) -> Dict[str, Any]:
    norm = {}
    for src_k, norm_k in SPEC_KEYS_MAP.items():
        if src_k in specs_flat and specs_flat[src_k] not in (None, ""):
            norm[norm_k] = specs_flat[src_k]
    return norm

def infer_source(domain_or_url: Optional[str]) -> Optional[str]:
    s = (domain_or_url or "").lower()
    if "amazon" in s:
        return "Amazon"
    if "walmart" in s:
        return "Walmart"
    if "bestbuy" in s:
        return "Best Buy"
    return None

def extract_screen_inches(val: Optional[str]) -> Optional[float]:
    if not val:
        return None
    m = re.search(r"(\d+(?:\.\d+)?)", str(val))
    return float(m.group(1)) if m else None

def join_list(vals: Any) -> Optional[str]:
    if isinstance(vals, list):
        return "; ".join(str(v) for v in vals if v is not None)
    return vals

def is_tv_item(item: Dict[str, Any], specs_flat: Dict[str, Any]) -> bool:
    """
    Heuristic: keep TVs only (optional). Checks categories/specs for 'TV', 'Television', 'QLED', 'LED'.
    """
    cats = item.get("categories")
    cats_str = join_list(cats) or ""
    title = (item.get("title") or "").lower()
    display = (specs_flat.get("Display Type") or specs_flat.get("Display Technology") or "").lower()
    return any(keyword in cats_str for keyword in ["TV", "Television", "Smart TVs", "LED & LCD TVs", "QLED TVs"]) \
           or "tv" in title \
           or any(k in display for k in ["qled", "led", "oled"])

# ------------------------
# Main pipeline
# ------------------------
def harmonize_records(records: List[Dict[str, Any]], tv_only: bool = True) -> List[Dict[str, Any]]:
    rows = []
    for item in records:
        specs_flat = collect_specs(item)
        normalized_specs = normalize_specs(specs_flat)

        # Optional TV-only filtering
        if tv_only and not is_tv_item(item, specs_flat):
            continue

        screen_size_str = first_non_empty(normalized_specs.get("screen_size"), normalized_specs.get("screen_size_class"))
        screen_size_inches = extract_screen_inches(screen_size_str)

        # Voice assistants: various locations across sources
        voice_assistants = first_non_empty(item.get("voice_assistants"), specs_flat.get("Virtual asst."))
        if not voice_assistants:
            va = specs_flat.get("Voice Assistant Built-in")
            ww = specs_flat.get("Works With")
            voice_assistants = ", ".join([x for x in [va, ww] if x]) or None

        # Price
        final_price = to_float(first_non_empty(item.get("final_price"), item.get("offer_price")))
        initial_price = to_float(item.get("initial_price"))
        currency = item.get("currency")
        discount = item.get("discount")
        deal_type = None
        if isinstance(item.get("prices_breakdown"), dict):
            deal_type = item["prices_breakdown"].get("deal_type")

        # Reviews
        rating = item.get("rating")
        reviews_count = first_non_empty(item.get("reviews_count"), item.get("review_count"))
        top_review = to_str(item.get("top_review"))
        customers_say = first_non_empty(item.get("customer_says"), item.get("customers_say"), item.get("customers\\_say"))
        if isinstance(customers_say, dict) and customers_say.get("text"):
            customers_say = customers_say["text"]
        customers_say = to_str(customers_say)

        # Identity / source
        source_domain = first_non_empty(item.get("domain"), item.get("origin_url"))
        source = infer_source(source_domain or item.get("url"))
        url = item.get("url")

        # Categories
        categories = to_str(join_list(item.get("categories")))

        # Images & features
        images_list = item.get("images") or item.get("image_urls")
        image_count = len(images_list) if isinstance(images_list, list) else None
        images = to_str(join_list(images_list) or item.get("image"))
        features = to_str(join_list(item.get("features")))

        # Seller & store
        seller_name = item.get("seller_name")
        seller_rating = item.get("buybox_seller_rating")
        store_name = item.get("store_name")
        store_location = item.get("store_location")

        # Availability / returns / delivery
        availability_text = first_non_empty(item.get("availability"), item.get("availability_text"))
        # normalize to string
        availability_text = to_str(availability_text)
        is_available = item.get("is_available")
        return_policy = to_str(item.get("return_policy"))
        delivery_info = to_str(join_list(item.get("delivery")))

        # Misc
        badges = to_str(first_non_empty(item.get("badge"), item.get("badges")))
        badges_all = to_str(item.get("all_badges"))
        timestamp = item.get("timestamp")
        bought_past_month = item.get("bought_past_month")
        amazon_prime = item.get("amazon_prime")

        row = {
            "source": source,
            "domain": source_domain,
            "url": url,
            "title": item.get("title"),

            # identity
            "brand": first_non_empty(item.get("brand"), normalized_specs.get("brand")),
            "model_number": normalized_specs.get("model_number"),
            "model_name": specs_flat.get("Model name"),
            "series": specs_flat.get("Series"),
            "asin": normalized_specs.get("asin"),
            "sku": first_non_empty(item.get("sku"), item.get("us_item_id"), item.get("product_id")),
            "upc": first_non_empty(item.get("upc"), normalized_specs.get("upc")),
            "gtin": item.get("gtin"),
            "product_id": first_non_empty(item.get("asin"), item.get("sku"), item.get("us_item_id"), item.get("gtin"), item.get("upc"))

            # pricing
            "final_price": final_price,
            "initial_price": initial_price,
            "currency": currency,
            "discount": discount,
            "deal_type": deal_type,

            # availability
            "availability_text": availability_text,
            "is_available": is_available,
            "return_policy": return_policy,
            "delivery_info": delivery_info,

            # reviews
            "rating": rating,
            "reviews_count": reviews_count,
            "top_review": top_review,
            "customers_say": customers_say,

            # categories & seller/store
            "categories": categories,
            "badges": badges,
            "seller_name": seller_name,
            "seller_rating": seller_rating,
            "store_name": store_name,
            "store_location": store_location,

            # display
            "screen_size_inches": screen_size_inches,
            "screen_size_class": normalized_specs.get("screen_size_class"),
            "resolution": normalized_specs.get("resolution"),
            "refresh_rate": normalized_specs.get("refresh_rate"),
            "aspect_ratio": normalized_specs.get("aspect_ratio"),
            "display_technology": normalized_specs.get("display_technology"),
            "panel_type": normalized_specs.get("panel_type"),
            "backlight_type": normalized_specs.get("backlight_type"),
            "hdr": normalized_specs.get("hdr"),
            "hdr_formats": normalized_specs.get("hdr_formats"),

            # connectivity
            "smart_platform": normalized_specs.get("smart_platform"),
            "voice_assistants": voice_assistants,
            "wireless": normalized_specs.get("wireless"),
            "connectivity": normalized_specs.get("connectivity"),
            "hdmi_inputs": normalized_specs.get("hdmi_inputs"),
            "usb_ports": normalized_specs.get("usb_ports"),

            # physical
            "product_dimensions": first_non_empty(normalized_specs.get("product_dimensions"), item.get("product_dimensions")),
            "item_weight": first_non_empty(normalized_specs.get("item_weight"), item.get("item_weight")),
            "height": normalized_specs.get("height"),
            "width": normalized_specs.get("width"),
            "depth": normalized_specs.get("depth"),
            "height_without_stand": normalized_specs.get("height_without_stand"),
            "depth_without_stand": normalized_specs.get("depth_without_stand"),
            "weight_without_stand": normalized_specs.get("weight_without_stand"),
            "vesa": normalized_specs.get("vesa"),

            # content
            "features": features,
            "description": to_str(first_non_empty(item.get("description"), item.get("product_description"))),
            "images": images,
            "image_count": image_count,

            # misc
            "timestamp": timestamp,
            "bought_past_month": bought_past_month,
            "amazon_prime": amazon_prime,
            "badges_all": badges_all,
            "raw_specs_json": json.dumps(specs_flat, ensure_ascii=False),
        }

        # Ensure all columns exist
        for c in COLUMNS:
            row.setdefault(c, None)
        rows.append(row)
    return rows

def load_records_from_folder(input_dir: Path, pattern: str) -> List[Dict[str, Any]]:
    all_records: List[Dict[str, Any]] = []
    for path in input_dir.glob(pattern):
        with path.open("r", encoding="utf-8") as f:
            obj = json.load(f)
        records = unwrap_richtext_if_needed(obj)
        if not records and isinstance(obj, list):
            records = obj
        if not records:
            # If file is a newline-delimited JSON (ndjson)
            try:
                with path.open("r", encoding="utf-8") as f:
                    lines = [json.loads(line) for line in f if line.strip().startswith("{")]
                records = lines
            except Exception:
                records = []
        all_records.extend(records)
    return all_records

def save_outputs(df: pd.DataFrame, out_prefix: str):
    csv_path = Path(f"{out_prefix}.csv")
    parquet_path = Path(f"{out_prefix}.parquet")
    dd_path = Path(f"{out_prefix}_data_dictionary.yaml")

    df.to_csv(csv_path, index=False)
    df.to_parquet(parquet_path, index=False)
    with open(dd_path, "w", encoding="utf-8") as f:
        yaml.safe_dump(DATA_DICTIONARY, f, sort_keys=False, allow_unicode=True)

    print(f"Saved: {csv_path}")
    print(f"Saved: {parquet_path}")
    print(f"Saved: {dd_path}")

def main():
    parser = argparse.ArgumentParser(description="Harmonize competitor product data across Amazon/Walmart/Best Buy.")
    parser.add_argument("--input", type=str, default="./", help="Input folder containing JSON files.")
    parser.add_argument("--pattern", type=str, default="*.json", help="Glob pattern for files.")
    parser.add_argument("--out-prefix", type=str, default="harmonized_competitor_products", help="Output file prefix.")
    parser.add_argument("--tv-only", type=str, default="true", help="Filter to TVs only (true/false).")
    args = parser.parse_args()

    input_dir = Path(args.input)
    tv_only = str(args.tv_only).lower() in ["true", "1", "yes", "y"]

    records = load_records_from_folder(input_dir, args.pattern)
    print(f"Loaded {len(records)} raw records.")

    rows = harmonize_records(records, tv_only=tv_only)
    print(f"Keeping {len(rows)} harmonized rows{' (TV-only)' if tv_only else ''}.")

    df = pd.DataFrame(rows, columns=COLUMNS)

    # Light normalization
    if "smart_platform" in df.columns:
        df["smart_platform"] = df["smart_platform"].astype(str).str.strip()

    save_outputs(df, args.out_prefix)

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print("Error:", e, file=sys.stderr)
        sys.exit(1)


