# Extract Product Metadata

In [3]:
# CATEGORY = "Baby_Products"
CATEGORY = "Video_Games"

# Define sequence lengths
MIN_SEQUENCE_LENGTH = 3
MAX_SEQUENCE_LENGTH = 100  # Adjust as needed

In [4]:
import sys
from pathlib import Path

NOTEBOOK_DIR = Path.cwd()
PROJECT_ROOT = NOTEBOOK_DIR.parent
sys.path.append(str(PROJECT_ROOT))

# Data directory
DATA_DIR = Path(PROJECT_ROOT, "data")
DATA_DIR.mkdir(exist_ok=True)

In [5]:
import json
import logging
import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

import polars as pl
from dotenv import load_dotenv
from google import genai
from IPython.display import display
from tqdm.notebook import tqdm

from src.logger import setup_logger

# Set environment variables to reduce Google API verbosity
os.environ["GRPC_VERBOSITY"] = "ERROR"
os.environ["GLOG_minloglevel"] = "2"

logger = setup_logger("clean-items")
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("google_genai").setLevel(logging.WARNING)

In [6]:
load_dotenv()

True

In [7]:
# API settings
MODEL = "gemini-2.5-flash-lite"
# MODEL = "gemini-2.5-flash"
BATCH_SIZE: int = 10  # Items per API call (can process more titles at once)
MAX_RETRIES: int = 5
RETRY_DELAY: float = 1.0  # Seconds
RATE_LIMIT_DELAY: float = 0.1  # Seconds between API calls (faster for shorter text)
MAX_THREADS: int = 15  # Can use more threads for shorter text

# Text processing settings for titles
MAX_TITLE_LENGTH: int = 200  # Truncate titles before sending to API
MIN_TITLE_LENGTH: int = 10  # Minimum length to consider valid

## 2. Load Data and Analyze Titles

In [8]:
# LOAD ORIGINAL METADATA
metadata_output_path = DATA_DIR / "output" / f"{CATEGORY}_items.parquet"
item_df = pl.read_parquet(metadata_output_path)
logger.info(f"Loaded {len(item_df):,} items from {metadata_output_path}")

# LOAD CLEANED DESCRIPTIONS
cleaned_descriptions_path = DATA_DIR / "output" / f"{CATEGORY}_descriptions_clean.csv"
cleaned_descriptions_df = pl.read_csv(cleaned_descriptions_path)
cleaned_descriptions_df = cleaned_descriptions_df.unique(subset=["parent_asin"])
cleaned_descriptions_df = cleaned_descriptions_df.filter(pl.col("clean_description") != "NA")
logger.info(f"Loaded {len(cleaned_descriptions_df):,} cleaned descriptions from {cleaned_descriptions_path}")

# LOAD CLEANED TITLES
cleaned_titles_path = DATA_DIR / "output" / f"{CATEGORY}_titles_clean.csv"
cleaned_titles_df = pl.read_csv(cleaned_titles_path)
cleaned_titles_df = cleaned_titles_df.unique(subset=["parent_asin"])
cleaned_titles_df = cleaned_titles_df.filter(pl.col("clean_title") != "NA")
logger.info(f"Loaded {len(cleaned_titles_df):,} cleaned titles from {cleaned_titles_path}")

# MERGE DATAFRAMES
item_df = item_df.join(cleaned_descriptions_df, on="parent_asin", how="inner")
item_df = item_df.join(cleaned_titles_df, on="parent_asin", how="inner")
logger.info(f"Merged {len(item_df):,} items with cleaned descriptions and titles")

21:04:12 - Loaded 465 items from /Users/amirmasoud/personal/v_tests/semantic-ids-llm/data/output/Video_Games_items.parquet
21:04:12 - Loaded 465 cleaned descriptions from /Users/amirmasoud/personal/v_tests/semantic-ids-llm/data/output/Video_Games_descriptions_clean.csv
21:04:12 - Loaded 465 cleaned titles from /Users/amirmasoud/personal/v_tests/semantic-ids-llm/data/output/Video_Games_titles_clean.csv
21:04:12 - Merged 465 items with cleaned descriptions and titles


In [9]:
# Clean up xml tags that remain
item_df = item_df.with_columns(
    pl.col("clean_description")
    .str.replace_all(r"</?[^>]+>", " ")  # Remove all tags
    .str.replace_all(r"\s+", " ")  # Collapse multiple spaces
    .str.strip_chars()  # Trim whitespace
    .alias("clean_description"),
    pl.col("clean_title")
    .str.replace_all(r"</?[^>]+>", " ")  # Remove all tags
    .str.replace_all(r"\s+", " ")  # Collapse multiple spaces
    .str.strip_chars()  # Trim whitespace
    .alias("clean_title"),
)

assert (
    len(
        item_df.filter(pl.col("clean_description").str.contains("clean_description")).select(
            ["description_text", "clean_description"]
        )
    )
    == 0
)
assert len(item_df.filter(pl.col("clean_title").str.contains("clean_title")).select(["title", "clean_title"])) == 0

In [10]:
item_df = item_df.rename(
    {
        "description_text": "original_description",
        "clean_description": "description_text",
        "title": "original_title",
        "clean_title": "title",
    }
)
item_df.head()

parent_asin,original_title,original_description,features_text,main_category,categories_text,store,average_rating,rating_number,price,item_context,description_text,title
str,str,str,str,str,str,str,str,i64,str,str,str,str
"""B071GL43XB""","""VR-robot New 2.4GHz Wireless G…","""Support Android 2.3 or above s…","""Support Android 2.3 or above s…","""Computers""","""Video Games > PC > Accessories…","""VR-robot""","""3.3""",6,"""""","""Product: VR-robot New 2.4GHz W…","""Enhance your gaming experience…","""VR-Robot 2.4GHz Wireless Game …"
"""B007ZU4QG2""","""WCI Quality Clip Mount For Mic…","""Introducing The New Xbox Kinec…","""Perfect to mount on flat-panel…","""Video Games""","""Video Games > Legacy Systems >…","""WCI""","""2.4""",6,"""""","""Product: WCI Quality Clip Moun…","""Introducing the WCI Xbox Kinec…","""WCI Quality Clip Mount for Mic…"
"""B009D4O0FK""","""Pro Controller U for Wii and W…","""Finally! By popular demand. A …","""Finally! By popular demand. A …","""Video Games""","""Video Games > Legacy Systems >…","""InterWorks Unlimited, Inc.""","""3.0""",43,"""""","""Product: Pro Controller U for …","""Introducing the Hardcore Contr…","""Wii and Wii U Pro Controller -…"
"""B000YG0PUU""","""PS3 Master Dance Pad Non-Slip""","""Start off your Dance Dance Rev…","""Compatible with Playstation 3 …","""Video Games""","""Video Games > Legacy Systems >…","""Sony""","""3.5""",84,"""""","""Product: PS3 Master Dance Pad …","""Enhance your Dance Dance Revol…","""PlayStation 3 Master Dance Pad…"
"""B000IVF3VC""","""Zenith LCD Display for Gamecub…","""Featuring a 5.0"" DSTN LCD Colo…","""Mounting bracket, on screen di…","""Video Games""","""Video Games > Legacy Systems >…","""Zenith""","""5.0""",1,"""""","""Product: Zenith LCD Display fo…","""Enjoy GameCube games and DVD m…","""Zenith LCD Display for GameCub…"


In [11]:
item_df.head()

parent_asin,original_title,original_description,features_text,main_category,categories_text,store,average_rating,rating_number,price,item_context,description_text,title
str,str,str,str,str,str,str,str,i64,str,str,str,str
"""B071GL43XB""","""VR-robot New 2.4GHz Wireless G…","""Support Android 2.3 or above s…","""Support Android 2.3 or above s…","""Computers""","""Video Games > PC > Accessories…","""VR-robot""","""3.3""",6,"""""","""Product: VR-robot New 2.4GHz W…","""Enhance your gaming experience…","""VR-Robot 2.4GHz Wireless Game …"
"""B007ZU4QG2""","""WCI Quality Clip Mount For Mic…","""Introducing The New Xbox Kinec…","""Perfect to mount on flat-panel…","""Video Games""","""Video Games > Legacy Systems >…","""WCI""","""2.4""",6,"""""","""Product: WCI Quality Clip Moun…","""Introducing the WCI Xbox Kinec…","""WCI Quality Clip Mount for Mic…"
"""B009D4O0FK""","""Pro Controller U for Wii and W…","""Finally! By popular demand. A …","""Finally! By popular demand. A …","""Video Games""","""Video Games > Legacy Systems >…","""InterWorks Unlimited, Inc.""","""3.0""",43,"""""","""Product: Pro Controller U for …","""Introducing the Hardcore Contr…","""Wii and Wii U Pro Controller -…"
"""B000YG0PUU""","""PS3 Master Dance Pad Non-Slip""","""Start off your Dance Dance Rev…","""Compatible with Playstation 3 …","""Video Games""","""Video Games > Legacy Systems >…","""Sony""","""3.5""",84,"""""","""Product: PS3 Master Dance Pad …","""Enhance your Dance Dance Revol…","""PlayStation 3 Master Dance Pad…"
"""B000IVF3VC""","""Zenith LCD Display for Gamecub…","""Featuring a 5.0"" DSTN LCD Colo…","""Mounting bracket, on screen di…","""Video Games""","""Video Games > Legacy Systems >…","""Zenith""","""5.0""",1,"""""","""Product: Zenith LCD Display fo…","""Enjoy GameCube games and DVD m…","""Zenith LCD Display for GameCub…"


In [12]:
item_df = item_df.with_columns(
    pl.concat_str(
        [
            pl.lit("Title: "),
            pl.col("title"),
            pl.lit("\nDescription: "),
            pl.col("description_text"),
            pl.lit("\nCategory: "),
            pl.col("categories_text"),
            pl.lit("\nFeatures: "),
            pl.col("features_text"),
        ]
    ).alias("item_context")
)

In [13]:
print(item_df.select("item_context").head(1).item())

Title: VR-Robot 2.4GHz Wireless Game Controller Gamepad Joystick for Android TV Box
Description: Enhance your gaming experience with this 2.4GHz wireless gamepad, compatible with Android 2.3+ and Windows 8/7/XP systems (not compatible with Windows 10). Connects seamlessly to smartphones (OTG cable included), tablets, TVs, and TV boxes via the included USB RF receiver. This ergonomic gamepad features a fine dual analog joystick, precise D-pad, and action buttons designed for comfort and grip. Enjoy lag-free wireless control up to 30 feet away using 2.4GHz RF technology. It supports simultaneous use of two controllers and includes standard Android and Windows menu keys. Powered by 2 AA batteries (not included). The package includes one 2.4GHz wireless gamepad, one OTG converter, and one USB receiver.
Category: Video Games > PC > Accessories > Controllers > Gamepads & Standard Controllers
Features: Support Android 2.3 or above system and Windows 10/8.1/8/7/XP computer, Android TV BOX, PS3

# Initialize Gemini client

In [14]:
VERTEX_PROJECT = os.environ.get("GOOGLE_CLOUD_PROJECT")
VERTEX_LOCATION = os.environ.get("GOOGLE_CLOUD_LOCATION")

# client = genai.Client()
client = genai.Client(
        vertexai=True,
        project=VERTEX_PROJECT,
        location=VERTEX_LOCATION,
    )
logger.info("Gemini client initialized successfully")

21:04:15 - Gemini client initialized successfully


In [15]:
try:
    test_response = client.models.generate_content(
        model=MODEL, contents="Say 'API connection successful' if you can read this."
    )
    logger.info(f"API Test: {test_response.text}")
except Exception as e:
    logger.error(f"Failed to initialize Gemini client: {e}")
    logger.error("Please ensure GEMINI_API_KEY environment variable is set")
    raise

21:04:16 - API Test: API connection successful


## 4. Define Metadata Extraction Functions

In [16]:
PROMPT = """
Extract structured metadata from the following video game product information. 
Analyze the title, description, category, and features to identify key attributes.

Product Information:
<product>
{item}
</product>

Return ONLY a JSON object with the following fields (use null for unknown/not applicable):
{{
  "product_type": "Game|Hardware|Accessory|DLC|Bundle|<other relevant type>|null",
  "platform": ["PS5", "PS4", "Xbox Series X", "Xbox One", "Nintendo Switch", "PC", "Steam Deck", <other platforms>] or null,
  "genre": ["Action", "RPG", "Strategy", "Sports", "Racing", "Roguelike", "Soulslike", "Metroidvania", "Battle Royale", "MOBA", "Auto Battler", "Gacha", "Idle", "Tower Defense", <any other relevant genres/subgenres>] or null for non-games,
  "hardware_type": "Console|Controller|Headset|Cable|Storage|Skin|Stand|Cooling|Capture Card|<other hardware types>|null" for non-hardware,
  "brand": "Primary manufacturer or brand name" or null,
  "multiplayer": "Single-player|Local Co-op|Online Multiplayer|MMO|MMORPG|PvP|PvE|Couch Co-op|<other multiplayer modes>|null",
  "franchise": "Game series or franchise name if applicable" or null,
}}

Guidelines:
- Use arrays for fields that can have multiple values (especially for genre and platform)
- Keep values concise but descriptive
- For platform, use common abbreviations (PS5, Xbox One, Switch, etc.)
- For genres, be SPECIFIC and COMPREHENSIVE:
  * Include main genres AND subgenres (e.g., ["RPG", "JRPG", "Turn-based"])
  * Use modern gaming terminology (Roguelike, Soulslike, Metroidvania, Battle Royale, etc.)
  * Include thematic descriptors when relevant (Open World, Sandbox, Survival, Horror, etc.)
  * Can combine multiple descriptors (e.g., "Open World MMORPG", "Tactical Turn-based RPG")
- Feel free to use gaming-specific terms that accurately describe the product
- The examples shown are NOT exhaustive - use whatever terms best describe the product
- Return valid JSON only
""".strip()

In [17]:
# EXTRACT METADATA
def extract_metadata(prompt: str, item: str, model: str = MODEL) -> dict:
    """
    Extract metadata from item context using Gemini API.

    Args:
        prompt: The extraction instructions prompt
        item: The product context to analyze
        model: The Gemini model to use (defaults to MODEL constant)

    Returns:
        Dictionary of extracted metadata or None if failed
    """
    attempt = 0
    while attempt < MAX_RETRIES:
        try:
            # Format the full prompt with the item context
            full_prompt = prompt.format(item=item)

            # Generate content using Gemini
            response = client.models.generate_content(model=model, contents=full_prompt)

            # Parse JSON response
            response_text = response.text.strip()

            # Clean up markdown code blocks if present
            if response_text.startswith("```json"):
                response_text = response_text[7:]
            if response_text.startswith("```"):
                response_text = response_text[3:]
            if response_text.endswith("```"):
                response_text = response_text[:-3]

            # Parse the JSON
            metadata = json.loads(response_text.strip())

            # Validate that we got a dictionary with expected structure
            if isinstance(metadata, dict):
                return metadata
            else:
                attempt += 1
                if attempt < MAX_RETRIES:
                    logger.warning(f"Invalid response structure, retrying... (attempt {attempt}/{MAX_RETRIES})")
                    time.sleep(RETRY_DELAY)
                continue

        except json.JSONDecodeError as e:
            attempt += 1
            if attempt < MAX_RETRIES:
                logger.warning(f"JSON parse error: {e}, retrying... (attempt {attempt}/{MAX_RETRIES})")
                time.sleep(RETRY_DELAY)
            else:
                logger.error(f"Failed to parse JSON after {MAX_RETRIES} attempts")
                return None
        except Exception as e:
            attempt += 1
            if attempt < MAX_RETRIES:
                logger.warning(f"Error extracting metadata: {e}, retrying... (attempt {attempt}/{MAX_RETRIES})")
                time.sleep(RETRY_DELAY)
            else:
                logger.error(f"Failed after {MAX_RETRIES} attempts: {e}")
                return None

    return None


# Test the function with a sample item
test_item = item_df.head(1).select("item_context").item()
metadata = extract_metadata(PROMPT, test_item)
logger.info(f"Sample item context (first 200 chars): {test_item[:200]}...")
logger.info(f"Extracted metadata: {json.dumps(metadata, indent=2)}")

21:04:17 - Sample item context (first 200 chars): Title: VR-Robot 2.4GHz Wireless Game Controller Gamepad Joystick for Android TV Box
Description: Enhance your gaming experience with this 2.4GHz wireless gamepad, compatible with Android 2.3+ and Wind...
21:04:17 - Extracted metadata: {
  "product_type": "Accessory",
  "platform": [
    "Android",
    "PC",
    "Android TV Box",
    "PS3"
  ],
  "genre": null,
  "hardware_type": "Controller",
  "brand": "VR-Robot",
  "multiplayer": "Local Co-op",
  "franchise": null
}


## 5. Parallel Metadata Extraction with Checkpointing

In [18]:
# Setup checkpoint file (using JSONL for structured data)
checkpoint_file = DATA_DIR / "output" / f"{CATEGORY}_metadata.jsonl"

# Load existing processed ASINs if checkpoint exists
processed_asins = set()
if checkpoint_file.exists():
    try:
        with open(checkpoint_file, "r", encoding="utf-8") as f:
            for line in f:
                if line.strip():
                    data = json.loads(line)
                    processed_asins.add(data["parent_asin"])
        logger.info(f"Found {len(processed_asins):,} already processed items in checkpoint file")
    except Exception as e:
        logger.error(f"Error loading checkpoint: {e}")
        processed_asins = set()
else:
    # Create new empty JSONL file
    checkpoint_file.touch()
    logger.info(f"Created new checkpoint file: {checkpoint_file}")

# Filter to only unprocessed items
items_to_process = item_df.filter(~pl.col("parent_asin").is_in(processed_asins))
logger.info(f"Items to process: {len(items_to_process):,} out of {len(item_df):,} total items")

# Thread-safe file writing lock
file_lock = threading.Lock()


def process_single_item(item_data):
    """Process a single item and extract metadata."""
    asin, item_context = item_data

    try:
        # Extract metadata
        metadata = extract_metadata(PROMPT, item_context)

        if metadata:
            # Create result with ASIN
            result = {"parent_asin": asin, **metadata}

            # Write immediately to JSONL with lock
            with file_lock:
                with open(checkpoint_file, "a", encoding="utf-8") as f:
                    f.write(json.dumps(result) + "\n")

            # Rate limiting
            time.sleep(RATE_LIMIT_DELAY)
            return asin, metadata, True
        else:
            # Log failed extraction
            logger.warning(f"Failed to extract metadata for {asin}")

            # Write null result to maintain record
            result = {"parent_asin": asin, "extraction_failed": True}
            with file_lock:
                with open(checkpoint_file, "a", encoding="utf-8") as f:
                    f.write(json.dumps(result) + "\n")

            return asin, None, False

    except Exception as e:
        logger.error(f"Failed to process {asin}: {e}")
        # Write error record
        result = {"parent_asin": asin, "error": str(e)}
        with file_lock:
            with open(checkpoint_file, "a", encoding="utf-8") as f:
                f.write(json.dumps(result) + "\n")
        return asin, None, False


# Prepare items for processing
items = [(row["parent_asin"], row["item_context"]) for row in items_to_process.iter_rows(named=True)]

if len(items) > 0:
    logger.info(f"Starting parallel processing with {MAX_THREADS} workers...")

    # Process with ThreadPoolExecutor
    successful = 0
    failed = 0

    with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
        # Submit all tasks
        futures = {executor.submit(process_single_item, item): item for item in items}

        # Process completions with progress bar
        with tqdm(total=len(futures), desc="Extracting metadata") as pbar:
            for future in as_completed(futures):
                try:
                    asin, result, success = future.result()
                    if success:
                        successful += 1
                    else:
                        failed += 1
                    pbar.update(1)
                    pbar.set_postfix({"success": successful, "failed": failed})

                except Exception as e:
                    failed += 1
                    pbar.update(1)
                    pbar.set_postfix({"success": successful, "failed": failed})
                    logger.error(f"Future failed: {e}")

    logger.info(f"Processing complete! Successful: {successful:,}, Failed: {failed:,}")

else:
    logger.info("No items to process - all items already in checkpoint file!")

21:04:17 - Found 465 already processed items in checkpoint file
21:04:17 - Items to process: 0 out of 465 total items
21:04:17 - No items to process - all items already in checkpoint file!


In [19]:
# Load and verify the extracted metadata
if checkpoint_file.exists():
    # Load JSONL file into a list of dictionaries
    metadata_records = []
    with open(checkpoint_file, "r", encoding="utf-8") as f:
        for line in f:
            if line.strip():
                try:
                    metadata_records.append(json.loads(line))
                except json.JSONDecodeError as e:
                    logger.warning(f"Skipping invalid JSON line: {e}")

    logger.info(f"Loaded {len(metadata_records):,} metadata records from {checkpoint_file}")

    # Display sample of extracted metadata
    logger.info("Sample of extracted metadata:")
    for i, record in enumerate(metadata_records[:3]):
        logger.info(f"\nItem {i + 1} (ASIN: {record.get('parent_asin', 'N/A')}):")
        for key, value in record.items():
            if key != "parent_asin":
                logger.info(f"  {key}: {value}")

    # Analyze metadata coverage without converting to DataFrame first
    logger.info("\nMetadata field coverage:")
    field_counts = {}
    for record in metadata_records:
        for key, value in record.items():
            if key not in ["parent_asin", "extraction_failed", "error"]:
                if key not in field_counts:
                    field_counts[key] = 0
                if value is not None:
                    field_counts[key] += 1

    for field, count in sorted(field_counts.items()):
        coverage = (count / len(metadata_records)) * 100
        logger.info(f"  {field}: {count:,} / {len(metadata_records):,} ({coverage:.1f}%)")

    # Analyze product types
    logger.info("\nProduct type distribution:")
    product_type_counts = {}
    for record in metadata_records:
        pt = record.get("product_type")
        if pt:
            product_type_counts[pt] = product_type_counts.get(pt, 0) + 1

    for pt, count in sorted(product_type_counts.items(), key=lambda x: x[1], reverse=True)[:10]:
        logger.info(f"  {pt}: {count:,}")

    # Analyze platforms (handling arrays)
    logger.info("\nPlatform distribution (flattened):")
    platform_counts = {}
    for record in metadata_records:
        platforms = record.get("platform")
        if platforms:
            if isinstance(platforms, list):
                for p in platforms:
                    platform_counts[p] = platform_counts.get(p, 0) + 1
            else:
                platform_counts[platforms] = platform_counts.get(platforms, 0) + 1

    for platform, count in sorted(platform_counts.items(), key=lambda x: x[1], reverse=True)[:15]:
        logger.info(f"  {platform}: {count:,}")

    # Analyze genres (handling arrays)
    logger.info("\nGenre distribution (flattened):")
    genre_counts = {}
    for record in metadata_records:
        genres = record.get("genre")
        if genres:
            if isinstance(genres, list):
                for g in genres:
                    genre_counts[g] = genre_counts.get(g, 0) + 1
            else:
                genre_counts[genres] = genre_counts.get(genres, 0) + 1

    for genre, count in sorted(genre_counts.items(), key=lambda x: x[1], reverse=True)[:20]:
        logger.info(f"  {genre}: {count:,}")

    # Show multiplayer modes distribution
    logger.info("\nMultiplayer mode distribution:")
    multiplayer_counts = {}
    for record in metadata_records:
        mp = record.get("multiplayer")
        if mp:
            if isinstance(mp, list):
                for m in mp:
                    multiplayer_counts[m] = multiplayer_counts.get(m, 0) + 1
            else:
                multiplayer_counts[mp] = multiplayer_counts.get(mp, 0) + 1

    for mp_mode, count in sorted(multiplayer_counts.items(), key=lambda x: x[1], reverse=True)[:10]:
        logger.info(f"  {mp_mode}: {count:,}")

else:
    logger.warning("No checkpoint file found. Run the processing cell first.")

21:04:18 - Loaded 465 metadata records from /Users/amirmasoud/personal/v_tests/semantic-ids-llm/data/output/Video_Games_metadata.jsonl
21:04:18 - Sample of extracted metadata:
21:04:18 - 
Item 1 (ASIN: B08JK7ZSXW):
21:04:18 -   product_type: Accessory
21:04:18 -   platform: ['PC', 'Mac OS']
21:04:18 -   genre: None
21:04:18 -   hardware_type: Gaming Mice
21:04:18 -   brand: ZUOYA
21:04:18 -   multiplayer: None
21:04:18 -   franchise: None
21:04:18 - 
Item 2 (ASIN: B08RXJT6RF):
21:04:18 -   product_type: Accessory
21:04:18 -   platform: ['PC']
21:04:18 -   genre: None
21:04:18 -   hardware_type: Gaming Mice
21:04:18 -   brand: Razer
21:04:18 -   multiplayer: None
21:04:18 -   franchise: None
21:04:18 - 
Item 3 (ASIN: B088LRDFDQ):
21:04:18 -   product_type: Accessory
21:04:18 -   platform: None
21:04:18 -   genre: None
21:04:18 -   hardware_type: Controller
21:04:18 -   brand: Alvatron
21:04:18 -   multiplayer: None
21:04:18 -   franchise: None
21:04:18 - 
Metadata field coverage:
21:04:

## 6. Save Metadata as DataFrame

In [20]:
# Convert metadata to DataFrame format - handling mixed types properly
if checkpoint_file.exists() and len(metadata_records) > 0:
    # Filter out error records
    clean_records = [
        record for record in metadata_records if not record.get("extraction_failed") and not record.get("error")
    ]

    # Define expected fields - ONLY keep these to avoid schema conflicts from extra Gemini output
    expected_fields = ["parent_asin", "product_type", "platform", "genre", "hardware_type", "brand", "multiplayer", "franchise"]
    
    # Define which fields should be lists
    list_fields = ["platform", "genre"]

    # Normalize records to ensure consistent types
    normalized_records = []
    for record in clean_records:
        normalized = {}
        # Only process expected fields to avoid schema conflicts
        for key in expected_fields:
            value = record.get(key)
            if key in list_fields:
                # Ensure list fields are always lists of strings
                if value is None:
                    normalized[key] = None
                elif isinstance(value, list):
                    # Convert all list items to strings to avoid nested type issues
                    normalized[key] = [str(v) for v in value if v is not None]
                else:
                    normalized[key] = [str(value)]  # Convert single values to list
            else:
                # For non-list fields, ensure they're strings or None
                if value is None:
                    normalized[key] = None
                elif isinstance(value, list):
                    # If a non-list field has a list value, join it
                    normalized[key] = ", ".join(str(v) for v in value if v is not None) if value else None
                elif isinstance(value, dict):
                    # If it's a dict (shouldn't happen but just in case), convert to string
                    normalized[key] = str(value)
                else:
                    normalized[key] = str(value)
        normalized_records.append(normalized)

    # Create DataFrame with normalized records
    metadata_df = pl.DataFrame(normalized_records)

    # Save as Parquet file (Parquet natively supports nested data types like lists)
    metadata_parquet_path = DATA_DIR / "output" / f"{CATEGORY}_metadata_extracted.parquet"
    metadata_df.write_parquet(metadata_parquet_path)
    logger.info(f"Saved metadata DataFrame with {len(metadata_df):,} records to {metadata_parquet_path}")

    # Display DataFrame info
    logger.info(f"\nDataFrame shape: {metadata_df.shape}")
    logger.info(f"Columns: {metadata_df.columns}")

    # Show sample of the DataFrame
    logger.info("\nSample of metadata DataFrame:")
    display(metadata_df.head(5))

else:
    logger.warning("No metadata records to convert to DataFrame")

21:04:19 - Saved metadata DataFrame with 465 records to /Users/amirmasoud/personal/v_tests/semantic-ids-llm/data/output/Video_Games_metadata_extracted.parquet
21:04:19 - 
DataFrame shape: (465, 8)
21:04:19 - Columns: ['parent_asin', 'product_type', 'platform', 'genre', 'hardware_type', 'brand', 'multiplayer', 'franchise']
21:04:19 - 
Sample of metadata DataFrame:


parent_asin,product_type,platform,genre,hardware_type,brand,multiplayer,franchise
str,str,list[str],list[str],str,str,str,str
"""B08JK7ZSXW""","""Accessory""","[""PC"", ""Mac OS""]",,"""Gaming Mice""","""ZUOYA""",,
"""B08RXJT6RF""","""Accessory""","[""PC""]",,"""Gaming Mice""","""Razer""",,
"""B088LRDFDQ""","""Accessory""",,,"""Controller""","""Alvatron""",,
"""B01MPZV6BB""","""Hardware""","[""PS3""]",,"""Console""","""Sony""",,"""PlayStation"""
"""B07DMC3M42""","""Hardware""","[""PS4""]",,"""Console""","""PlayStation""",,


In [21]:
metadata_df.filter(pl.col("multiplayer").is_not_null())

parent_asin,product_type,platform,genre,hardware_type,brand,multiplayer,franchise
str,str,list[str],list[str],str,str,str,str
"""B001BM1BAO""","""Game""","[""Wii""]","[""Puzzle"", ""Sandbox""]",,"""Nintendo""","""Single-player""","""Line Rider"""
"""B0B5SVYQFG""","""Game""","[""PS4""]","[""Action"", ""Platformer"", … ""Roguelike""]",,,"""Single-player""",
"""B001ELJFO6""","""Game""","[""Mac""]","[""Action"", ""Shooter"", … ""War""]",,,"""Online Multiplayer""","""Battlefield"""
"""B07PVPL7LV""","""Game""","[""PS4""]","[""Horror"", ""Adventure"", … ""First-person""]",,,"""Single-player""","""Among The Sleep"""
"""B00001LDBU""","""Game""","[""PC""]","[""Racing"", ""Arcade"", ""Action""]",,"""Microsoft""","""Single-player""","""Monster Truck Madness"""
…,…,…,…,…,…,…,…
"""B00BG2LKNM""","""Game""","[""PC""]","[""Action"", ""Strategy"", ""Tower Defense""]",,,"""Single-player""",
"""B00004SVYS""","""Bundle""","[""Nintendo NES""]","[""Sports"", ""Baseball""]",,"""Nintendo""","""Single-player""",
"""B009AYLONO""","""Game""","[""Xbox 360""]","[""Educational"", ""Children's""]",,"""Sesame Street""","""Single-player""","""Kinect"""
"""B00002SUL2""","""Game""","[""Sega Genesis""]","[""Sports"", ""Soccer Simulation""]",,"""EA Sports""","""Single-player""","""FIFA"""


In [23]:
metadata_df.filter(pl.col("multiplayer").is_not_null())

parent_asin,product_type,platform,genre,hardware_type,brand,multiplayer,franchise
str,str,list[str],list[str],str,str,str,str
"""B001BM1BAO""","""Game""","[""Wii""]","[""Puzzle"", ""Sandbox""]",,"""Nintendo""","""Single-player""","""Line Rider"""
"""B0B5SVYQFG""","""Game""","[""PS4""]","[""Action"", ""Platformer"", … ""Roguelike""]",,,"""Single-player""",
"""B001ELJFO6""","""Game""","[""Mac""]","[""Action"", ""Shooter"", … ""War""]",,,"""Online Multiplayer""","""Battlefield"""
"""B07PVPL7LV""","""Game""","[""PS4""]","[""Horror"", ""Adventure"", … ""First-person""]",,,"""Single-player""","""Among The Sleep"""
"""B00001LDBU""","""Game""","[""PC""]","[""Racing"", ""Arcade"", ""Action""]",,"""Microsoft""","""Single-player""","""Monster Truck Madness"""
…,…,…,…,…,…,…,…
"""B00BG2LKNM""","""Game""","[""PC""]","[""Action"", ""Strategy"", ""Tower Defense""]",,,"""Single-player""",
"""B00004SVYS""","""Bundle""","[""Nintendo NES""]","[""Sports"", ""Baseball""]",,"""Nintendo""","""Single-player""",
"""B009AYLONO""","""Game""","[""Xbox 360""]","[""Educational"", ""Children's""]",,"""Sesame Street""","""Single-player""","""Kinect"""
"""B00002SUL2""","""Game""","[""Sega Genesis""]","[""Sports"", ""Soccer Simulation""]",,"""EA Sports""","""Single-player""","""FIFA"""
