# Vertex AI Search for Retail: Product Catalog Generation

This notebook demonstrates how to use a large language model (LLM) on Vertex AI to generate a synthetic product catalog for a retailer. This is a common first step when building a demo or proof-of-concept for Vertex AI Search for Retail.

**Key Features:**

*   **Modular Configuration:** Uses external files for schema, prompts, and other configurations, making it easy to adapt for different retailers or use cases.
*   **Scalable Data Generation:** Leverages concurrent processing to generate a large number of products efficiently.
*   **Robust JSON Parsing:** Includes logic to handle and clean potential formatting issues in the LLM's JSON output.
*   **BigQuery Integration:** Creates a BigQuery dataset and table, and loads the generated catalog data.

## 1. Setup and Authentication

Install necessary libraries, authenticate to Google Cloud, and set up project-specific variables.

In [None]:
!pip install google-cloud-aiplatform google-cloud-bigquery google-cloud-secret-manager tqdm pandas

In [None]:
import os
import sys

# If you are running this notebook in a Colab environment, you will need to
# authenticate your user account.
if "google.colab" in sys.modules:
    from google.colab import auth
    auth.authenticate_user()

In [None]:
!GOOGLE_CLOUD_PROJECT=`gcloud config set project partarch-ecommerce-demo`


In [None]:

#@title Configure Project and Generation Parameters
import re
import os

#@markdown ### Project and GCP Configuration
#@markdown Configure the core GCP and Retailer settings here.
PROJECT_ID = "partarch-ecommerce-demo" #@param {type:"string"}
LOCATION = "" #@param {type:"string"}
LOCATION = LOCATION or "us-central1"
RETAILER = "Toys R Us" #@param {type:"string"}
RETAILER = RETAILER or "Wayfair"
MODEL_NAME = "" #@param {type:"string"}
MODEL_NAME = MODEL_NAME or "gemini-2.5-pro"
#@markdown ---
#@markdown ### URIs
#@markdown Configure the URIs for the site and images.
SITE_URI = "https://vibe-commerce-713683707236.us-central1.run.app/" #@param {type:"string"}
#@markdown   Example: `https://vibe-commerce-123456789000.us-central1.run.app/`
IMAGE_URI = "https://storage.googleapis.com/{PROJECT_ID}-images/product_images/"
#@markdown ---
#@markdown ### Model Generation Parameters
#@markdown Configure the model's generation settings.
MAX_OUTPUT_TOKENS = 65535 #@param {type:"integer"}
TEMPERATURE = 1.0 #@param {type:"number"}
TOP_P = 0.95 #@param {type:"number"}
#@markdown ---
#@markdown ### Data Generation Parameters
#@markdown Adjust the volume and concurrency of the data generation process.
NUMBER_OF_PRIMARY_PRODUCTS = 20 #@param {type:"integer"}
NUMBER_OF_VARIANTS_PER_PRIMARY = 5 #@param {type:"integer"}
PRODUCTS_PER_BATCH = 10 #@param {type:"integer"}
MAX_WORKERS = 20 #@param {type:"integer"}
#@markdown ---
#@markdown ### Product Categories
#@markdown Provide a path to a local file with one category per line. If left blank, a default list is used.
#@markdown **Sample Format:**
#@markdown ```
#@markdown Furniture > Living Room Furniture > Sofas & Couches
#@markdown Furniture > Living Room Furniture > Coffee Tables
#@markdown Furniture > Bedroom Furniture > Beds
#@markdown Furniture > Bedroom Furniture > Dressers & Chests
#@markdown Outdoor > Outdoor Seating > Patio Sofas
#@markdown ```
CATEGORIES_FILE_PATH = "custom_categories.txt" #@param {type:"string"}

# --- Start of Processing Logic ---

# Replace placeholder in IMAGE_URI
IMAGE_URI = IMAGE_URI.format(PROJECT_ID=PROJECT_ID)

# --- Load Product Categories ---
product_categories_content = ""
default_categories = """Furniture > Living Room Furniture > Sofas & Couches
Furniture > Living Room Furniture > Coffee Tables
Furniture > Bedroom Furniture > Beds
Furniture > Bedroom Furniture > Dressers & Chests
Outdoor > Outdoor Seating > Patio Sofas
Outdoor > Grills & Outdoor Cooking > Gas Grills
Bed & Bath > Bedding > Comforters & Sets
Bed & Bath > Bath Linens > Bath Towels
Rugs > Area Rugs > Modern Rugs
Decor & Pillows > Wall Decor > Wall Art
Lighting > Ceiling Fans > Fans with Lights
Kitchen & Tabletop > Cookware > Pots & Pans Sets
Storage & Organization > Closet Organizers > Closet Systems
Baby & Kids > Nursery Furniture > Cribs
Home Improvement > Flooring > Hardwood Flooring
"""

if CATEGORIES_FILE_PATH.strip():
    path_to_check = CATEGORIES_FILE_PATH.strip()
    # In Colab, if a user provides a filename without a full path,
    # it's helpful to assume it's in the default /content/ directory.
    if not os.path.isabs(path_to_check):
        path_to_check = os.path.join("/content", path_to_check)

    print(f"Attempting to load categories from file: {path_to_check}")
    if not os.path.exists(path_to_check):
        raise FileNotFoundError(f"The specified category file was not found: {path_to_check}")

    with open(path_to_check, 'r') as f:
        lines = f.readlines()

    validated_lines = []
    for i, line in enumerate(lines):
        stripped_line = line.strip()
        if not stripped_line:
            continue
        # Basic validation: check for unexpected characters. Allows letters, numbers, spaces, and common delimiters.
        if not re.match(r"^[a-zA-Z0-9\s>&/(),.'-]+", stripped_line):
             raise ValueError(f"Invalid characters found in category file on line {i+1}: '{stripped_line}'")
        validated_lines.append(stripped_line)

    if not validated_lines:
        raise ValueError(f"The category file '{path_to_check}' is empty or contains only whitespace.")

    product_categories_content = "\n".join(validated_lines)
    print("Successfully loaded and validated categories from file.")

else:
    print("No category file provided. Using the default list.")
    product_categories_content = default_categories


# The GCS bucket name will be the same as the Project ID
GCS_BUCKET_NAME = PROJECT_ID

# --- Sanitize RETAILER input ---
original_retailer = RETAILER
sanitized_retailer = re.sub(r'[^a-z0-9]', '', original_retailer.lower())

if original_retailer != sanitized_retailer:
    print(f"⚠️ WARNING: The retailer name '{original_retailer}' contained invalid characters or was not lowercase.")
    RETAILER = sanitized_retailer
    print(f"   It has been sanitized to: '{RETAILER}'\n")
else:
    RETAILER = sanitized_retailer

# BigQuery and GCS configuration (Derived from the above)
BQ_DATASET = "retail"
BQ_TABLE = f"products-{RETAILER}"
GCS_CATALOG_DIR = f"gs://{GCS_BUCKET_NAME}/retail_catalog/{RETAILER}"
GENERATED_JSONL_GCS_PATH = f"{GCS_CATALOG_DIR}/products.jsonl"

# --- Display Configuration ---
print("\n--- Project and GCP Configuration ---")
print(f"Project ID: {PROJECT_ID}")
print(f"GCS Bucket: {GCS_BUCKET_NAME} (set to be the same as Project ID)")
print(f"Location: {LOCATION}")
print(f"Retailer: {RETAILER}")
print(f"Model Name: {MODEL_NAME}")
print(f"Site URI: {SITE_URI}")
print(f"Image URI: {IMAGE_URI}")
print("\n--- Model Generation Parameters ---")
print(f"Max Output Tokens: {MAX_OUTPUT_TOKENS}")
print(f"Temperature: {TEMPERATURE}")
print(f"Top P: {TOP_P}")
print("\n--- Data Generation Parameters ---")
print(f"Primary Products to Generate: {NUMBER_OF_PRIMARY_PRODUCTS}")
print(f"Variants per Primary: {NUMBER_OF_VARIANTS_PER_PRIMARY}")
print(f"Total Products (approx): {NUMBER_OF_PRIMARY_PRODUCTS * (1 + NUMBER_OF_VARIANTS_PER_PRIMARY)}")
print(f"Products per Batch Call: {PRODUCTS_PER_BATCH}")
print(f"Max Parallel Workers: {MAX_WORKERS}")
print("\n--- Product Categories to be Used ---")
print(product_categories_content)
print("\n--- Derived Configuration ---")
print(f"BigQuery Destination: {PROJECT_ID}.{BQ_DATASET}.{BQ_TABLE}")
print(f"GCS Path: {GENERATED_JSONL_GCS_PATH}")


In [None]:
import vertexai
from google.cloud import bigquery

vertexai.init(project=PROJECT_ID, location=LOCATION)
bq_client = bigquery.Client(project=PROJECT_ID)

print("Vertex AI and BigQuery clients initialized.")

## 2. Load Configurations

Load the schema, field requirements, product categories, and the generation prompt from external files. This makes the notebook highly reusable.

In [None]:
# This step creates the necessary configuration files in a human-readable format.
# In a real-world scenario, you would upload these files instead of creating them here.
import json
import os

# Create directories if they don't exist
os.makedirs('config', exist_ok=True)
os.makedirs('prompts', exist_ok=True)

# 1. Create schema.json in a readable way
schema_data = [
    { "name": "name", "type": "STRING", "mode": "NULLABLE" },
    { "name": "id", "type": "STRING", "mode": "REQUIRED" },
    { "name": "type", "type": "STRING", "mode": "NULLABLE" },
    { "name": "primaryProductId", "type": "STRING", "mode": "NULLABLE" },
    { "name": "collectionMemberIds", "type": "STRING", "mode": "REPEATED" },
    { "name": "gtin", "type": "STRING", "mode": "NULLABLE" },
    { "name": "categories", "type": "STRING", "mode": "REPEATED" },
    { "name": "title", "type": "STRING", "mode": "REQUIRED" },
    { "name": "brands", "type": "STRING", "mode": "REPEATED" },
    { "name": "description", "type": "STRING", "mode": "NULLABLE" },
    { "name": "languageCode", "type": "STRING", "mode": "NULLABLE" },
    { "name": "attributes", "type": "RECORD", "mode": "REPEATED", "fields": [
        { "name": "key", "type": "STRING", "mode": "NULLABLE" },
        { "name": "value", "type": "RECORD", "mode": "NULLABLE", "fields": [
            { "name": "text", "type": "STRING", "mode": "REPEATED" },
            { "name": "numbers", "type": "FLOAT", "mode": "REPEATED" }
        ] }
    ] },
    { "name": "tags", "type": "STRING", "mode": "REPEATED" },
    { "name": "priceInfo", "type": "RECORD", "mode": "NULLABLE", "fields": [
        { "name": "currencyCode", "type": "STRING", "mode": "NULLABLE" },
        { "name": "price", "type": "FLOAT", "mode": "NULLABLE" },
        { "name": "originalPrice", "type": "FLOAT", "mode": "NULLABLE" },
        { "name": "cost", "type": "FLOAT", "mode": "NULLABLE" },
        { "name": "priceEffectiveTime", "type": "STRING", "mode": "NULLABLE" },
        { "name": "priceExpireTime", "type": "STRING", "mode": "NULLABLE" }
    ] },
    { "name": "rating", "type": "RECORD", "mode": "NULLABLE", "fields": [
        { "name": "ratingCount", "type": "INTEGER", "mode": "NULLABLE" },
        { "name": "averageRating", "type": "FLOAT", "mode": "NULLABLE" },
        { "name": "ratingHistogram", "type": "INTEGER", "mode": "REPEATED" }
    ] },
    { "name": "expireTime", "type": "STRING", "mode": "NULLABLE" },
    { "name": "ttl", "type": "RECORD", "mode": "NULLABLE", "fields": [
        { "name": "seconds", "type": "INTEGER", "mode": "NULLABLE" },
        { "name": "nanos", "type": "INTEGER", "mode": "NULLABLE" }
    ] },
    { "name": "availableTime", "type": "STRING", "mode": "NULLABLE" },
    { "name": "availability", "type": "STRING", "mode": "NULLABLE" },
    { "name": "availableQuantity", "type": "INTEGER", "mode": "NULLABLE" },
    { "name": "fulfillmentInfo", "type": "RECORD", "mode": "REPEATED", "fields": [
        { "name": "type", "type": "STRING", "mode": "NULLABLE" },
        { "name": "placeIds", "type": "STRING", "mode": "REPEATED" }
    ] },
    { "name": "uri", "type": "STRING", "mode": "NULLABLE" },
    { "name": "images", "type": "RECORD", "mode": "REPEATED", "fields": [
        { "name": "uri", "type": "STRING", "mode": "REQUIRED" },
        { "name": "height", "type": "INTEGER", "mode": "NULLABLE" },
        { "name": "width", "type": "INTEGER", "mode": "NULLABLE" }
    ] },
    { "name": "audience", "type": "RECORD", "mode": "NULLABLE", "fields": [
        { "name": "genders", "type": "STRING", "mode": "REPEATED" },
        { "name": "ageGroups", "type": "STRING", "mode": "REPEATED" }
    ] },
    { "name": "colorInfo", "type": "RECORD", "mode": "NULLABLE", "fields": [
        { "name": "colorFamilies", "type": "STRING", "mode": "REPEATED" },
        { "name": "colors", "type": "STRING", "mode": "REPEATED" }
    ] },
    { "name": "sizes", "type": "STRING", "mode": "REPEATED" },
    { "name": "materials", "type": "STRING", "mode": "REPEATED" },
    { "name": "patterns", "type": "STRING", "mode": "REPEATED" },
    { "name": "conditions", "type": "STRING", "mode": "REPEATED" },
    { "name": "publishTime", "type": "STRING", "mode": "NULLABLE" },
    { "name": "promotions", "type": "RECORD", "mode": "REPEATED", "fields": [
        { "name": "promotionId", "type": "STRING", "mode": "NULLABLE" }
    ] }
]
with open('config/schema.json', 'w') as f:
    json.dump(schema_data, f, indent=2)

# 2. Create field_requirements.txt in a readable way
field_requirements_content = """
name: Immutable. Full resource name of the product, such as projects/*/locations/global/catalogs/default_catalog/branches/default_branch/products/productId.
id: Immutable. Product identifier, which is the final component of name. For example, this field is "id_1", if name is projects/*/locations/global/catalogs/default_catalog/branches/default_branch/products/id_1. CRITICAL: THIS ID MUST BE GLOBALLY UNIQUE AND IS CASE-INSENSITIVE. DO NOT REPEAT ANY IDS. This field must be a UTF-8 encoded string with a length limit of 128 characters.
type: Immutable. The type of the product. Must be one of 'PRIMARY', 'VARIANT', or 'COLLECTION'.
primaryProductId: Variant group identifier. Must be an id of another product. For PRIMARY products, this field can only be empty or set to the same value as id. For VARIANT products, this field cannot be empty.
collectionMemberIds: IMPORTANT: This field should ONLY be populated when `type` is 'COLLECTION'. For 'PRIMARY' and 'VARIANT' products, this field must be omitted or be an empty array.
gtin: The Global Trade Item Number (GTIN) of the product. Must be a valid, numerical GTIN (e.g., a 12 or 13-digit UPC/EAN). Do NOT include any letters, spaces, or symbols.
categories: Product categories. Use '>' to separate hierarchies. Must be set for PRIMARY products. At most 250 values are allowed. Each value must be a UTF-8 encoded string with a length limit of 5,000 characters.
title: Required. Product title. Must be a UTF-8 encoded string with a length limit of 1,000 characters.
brands: The brands of the product. A maximum of 30 brands are allowed. Each brand must be a UTF-8 encoded string with a length limit of 1,000 characters.
description: Product description. Must be a UTF-8 encoded string with a length limit of 5,000 characters.
languageCode: Language of the title/description. Use BCP 47 language tags. Defaults to "en-US".
attributes: Highly encouraged. Extra product attributes. Max 200 entries. Key must match pattern: [a-zA-Z0-9][a-zA-Z0-9_]*. CRITICAL: For each attribute object, you must provide a value for EITHER `text` OR `numbers`, but NEVER BOTH. Do not set both `text` and `numbers` fields for the same attribute.
tags: Custom tags for filtering. At most 250 values are allowed. Each value must be a UTF-8 encoded string with a length limit of 1,000 characters.
priceInfo: Product price and cost information.
priceInfo.currencyCode: The 3-letter currency code defined in ISO 4217.
priceInfo.price: Price of the product.
priceInfo.originalPrice: Price of the product without any discount. Should be >= price.
priceInfo.cost: The costs associated with the sale of a particular product.
priceInfo.priceEffectiveTime: Timestamp (RFC 3339) when the price starts to be effective. CRITICAL: If you set this field, you MUST also set `originalPrice`.
priceInfo.priceExpireTime: Timestamp (RFC 3339) when the price stops to be effective.
rating: The rating of this product.
rating.ratingCount: The total number of ratings. Must be non-negative.
rating.averageRating: The average rating of the Product, scaled at 1-5.
rating.ratingHistogram: List of rating counts per rating value (index = rating - 1). Size must be 5 if non-empty.
expireTime: Timestamp (RFC 3339) when the product expires. Must be later than availableTime and publishTime. CRITICAL: This field and `ttl` are mutually exclusive. You can only set one of them for a product, not both.
ttl: Input only. The TTL (time to live) of the product. CRITICAL: This field and `expireTime` are mutually exclusive. You can only set one of them for a product, not both. The `seconds` value within this field MUST be a non-negative integer. DO NOT provide negative values for `ttl.seconds`.
availableTime: The timestamp (RFC 3339) when this Product becomes available for Search.
availability: The online availability of the Product. One of 'IN_STOCK', 'OUT_OF_STOCK', 'PREORDER', 'BACKORDER'. Default to 'IN_STOCK'.
availableQuantity: The available quantity of the item.
fulfillmentInfo: Fulfillment information.
fulfillmentInfo.type: The fulfillment type. Must be one of 'custom-type-1', 'custom-type-2', 'custom-type-3', 'custom-type-4', 'custom-type-5', 'next-day-delivery', 'pickup-in-store', 'same-day-delivery', 'ship-to-store'.
fulfillmentInfo.placeIds: The IDs for this type, such as store IDs. IMPORTANT: Each ID must match the pattern [a-zA-Z0-9_]+ (letters, numbers, and underscores only, NO hyphens). For example: "store_123", "warehouse_a". Max 3000 values.
uri: Canonical URL directly linking to the product detail page. Length limit of 5,000 characters.
images: Product images. Main image first. A maximum of 300 images are allowed.
images.uri: Required. URI of the image. Length limit of 5,000 characters.
images.height: Height of the image in pixels. Must be non-negative.
images.width: Width of the image in pixels. Must be non-negative.
audience: The target group associated with a given audience.
audience.genders: Genders of the audience. e.g., "male", "female", "unisex". At most 5 values.
audience.ageGroups: Age groups of the audience. e.g., "newborn", "infant", "toddler", "kids", "adult". At most 5 values.
colorInfo: The color of the product.
colorInfo.colorFamilies: Standard color families. e.g., "Red", "Blue", "Green". Max 5 values.
colorInfo.colors: The color display names. Max 75 colors.
sizes: The size of the product. e.g., "S", "M", "L". Max 20 values.
materials: The material of the product. e.g., "leather", "wooden". Max 20 values.
patterns: The pattern or graphic print of the product. e.g., "striped", "polka dot". Max 20 values.
conditions: The condition of the product. e.g., "new", "refurbished", "used". Max 1 value.
publishTime: The timestamp (RFC 3339) when the product is published by the retailer for the first time.
promotions: The promotions applied to the product. Max 10 values.
promotions.promotionId: Promotion identifier.
"""
with open('config/field_requirements.txt', 'w') as f:
    f.write(field_requirements_content.strip())

# 3. Create product_categories.txt from the configured variable
# The 'product_categories_content' variable is now defined in the main configuration cell.
with open('config/product_categories.txt', 'w') as f:
    f.write(product_categories_content.strip())

# 4. Create data_generation_prompt_primary.txt
data_generation_prompt_primary_content = """
You are an expert in generating synthetic data for retail product catalogs.
Your task is to generate a list of {num_products} unique and realistic product entries for the retailer '{retailer}'.

**ABSOLUTELY CRITICAL RULES:**
1.  **GLOBALLY UNIQUE ID IS PARAMOUNT:** Every single product you generate in the output JSON array **MUST** have a completely unique `id`. This ID must be globally unique across all products ever generated and is case-insensitive. **DO NOT REPEAT OR REUSE ANY IDs, EVER.** This is the most important rule.
2.  All products you generate MUST have their `type` field set to 'PRIMARY'.

The products should belong to the following categories:
--- START PRODUCT CATEGORIES ---
{categories}
--- END PRODUCT CATEGORIES ---

Each product entry must strictly adhere to the following JSON schema. Do not add any fields that are not in the schema.
--- START SCHEMA ---
{schema}
--- END SCHEMA ---

Pay close attention to the following requirements and constraints for each field:
--- START FIELD REQUIREMENTS ---
{requirements}
--- END FIELD REQUIREMENTS ---

IMPORTANT: The entire response must be a single, valid JSON array containing the product objects.
Do not include any text, explanations, or markdown formatting before or after the JSON array.
The output should start with `[` and end with `]`.
"""
with open('prompts/data_generation_prompt_primary.txt', 'w') as f:
    f.write(data_generation_prompt_primary_content.strip())

# 5. Create data_generation_prompt_variant.txt
data_generation_prompt_variant_content = """
You are an expert in generating synthetic data for retail product catalogs. Your task is to generate a list of {num_products} unique and realistic product **variants** for the retailer '{retailer}'.

**ABSOLUTELY CRITICAL RULES:**
1.  **GLOBALLY UNIQUE ID IS PARAMOUNT:** Every single product you generate in the output JSON array **MUST** have a completely unique `id`. This ID must be globally unique, case-insensitive, and different from any other primary or variant ID. **DO NOT REPEAT OR REUSE ANY IDs, EVER.** This is the most important rule.
2.  **Every single product** you generate in the output JSON array **MUST** have its `type` field set to `'VARIANT'`. There can be no exceptions.
3.  **Every single VARIANT product MUST** have a `primaryProductId` field.
4.  The value for the `primaryProductId` field **MUST BE CHOSEN *EXACTLY*** from the following list of available Primary Product IDs. Do not invent, alter, or imagine any other IDs.

--- START ALLOWED PRIMARY PRODUCT IDS ---
{primary_product_ids}
--- END ALLOWED PRIMARY PRODUCT IDS ---

All other product details should be appropriate for a variant of the chosen primary product. For example, variants often differ by attributes like 'color', 'size', or 'material'.

Each product entry must strictly adhere to the following JSON schema:
--- START SCHEMA ---
{schema}
--- END SCHEMA ---

Pay close attention to all other field requirements:
--- START FIELD REQUIREMENTS ---
{requirements}
--- END FIELD REQUIREMENTS ---

IMPORTANT FINAL REMINDER:
- The entire response must be a single, valid JSON array.
- Every object in the array must be a product of `type: 'VARIANT'`.
- Every product must have a unique `id`.
- Every product must have a `primaryProductId` key with a value selected from the list provided above.
- Start the output with `[` and end with `]`. Do not include any other text.
"""
with open('prompts/data_generation_prompt_variant.txt', 'w') as f:
    f.write(data_generation_prompt_variant_content.strip())

print("Configuration files created.")

In [None]:
import json

def load_text_file(path):
    with open(path, 'r') as f:
        return f.read()

def load_json_file(path):
    with open(path, 'r') as f:
        return json.load(f)

# Load schema and requirements
schema_path = 'config/schema.json'
requirements_path = 'config/field_requirements.txt'
categories_path = 'config/product_categories.txt'
prompt_template_path = 'prompts/data_generation_prompt.txt'

bq_schema = load_json_file(schema_path)
schema_str = json.dumps(bq_schema)
field_requirements = load_text_file(requirements_path)
product_categories = load_text_file(categories_path)
prompt_template = load_text_file(prompt_template_path)

print("Configurations loaded successfully.")
print(f"Loaded {len(bq_schema)} fields in schema.")

### 3a. Generate PRIMARY Products

This cell runs the first stage of the generation process. It uses the `data_generation_prompt_primary.txt` to create the specified `NUMBER_OF_PRODUCTS` with the type 'PRIMARY'. The unique IDs of these products are collected to be used as `primaryProductId` for the variants in the next step.

In [None]:
import math
import concurrent.futures
import json
import re
import time
from tqdm.notebook import tqdm
from vertexai.generative_models import GenerativeModel, HarmCategory, HarmBlockThreshold

def clean_and_parse_json(text: str) -> list:
    """
    Cleans the raw text output from the LLM and parses it into a Python list of dicts.
    """
    start_index = text.find('[')
    end_index = text.rfind(']')
    if start_index == -1 or end_index == -1:
        start_index = text.find('{')
        end_index = text.rfind('}')
        if start_index == -1 or end_index == -1:
            raise ValueError("No JSON array or object found in the model's response.")
        json_str = text[start_index:end_index + 1]
        try:
            return [json.loads(json_str)]
        except json.JSONDecodeError as e:
            raise ValueError(f"Error decoding single JSON object: {e}. String: {json_str[:500]}")
    json_str = text[start_index:end_index + 1]
    try:
        return json.loads(json_str)
    except json.JSONDecodeError as e:
        raise ValueError(f"Error decoding JSON array: {e}. String: {json_str[:500]}")

def generate_product_batch_raw(prompt: str) -> list:
    """
    Generates a single batch of products, returns raw parsed JSON without validation.
    """
    model = GenerativeModel(MODEL_NAME)
    safety_settings = {
        HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
        HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE,
        HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE,
        HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
    }
    generation_config = {
        "max_output_tokens": MAX_OUTPUT_TOKENS,
        "temperature": TEMPERATURE,
        "top_p": TOP_P,
        "response_mime_type": "application/json",
    }
    max_retries = 3
    for attempt in range(max_retries):
        try:
            response = model.generate_content(prompt, generation_config=generation_config, safety_settings=safety_settings)
            if response.candidates[0].finish_reason.name == "MAX_TOKENS":
                print("Warning: Response was truncated (MAX_TOKENS). Consider reducing PRODUCTS_PER_BATCH.")
                return []
            return clean_and_parse_json(response.text)
        except Exception as e:
            print(f"API call failed on attempt {attempt + 1}: {e}")
            if attempt == max_retries - 1: return []
            time.sleep(2 ** attempt)
    return []

def validate_and_filter_products(products: list, existing_ids: set):
    """
    Validates products, filters for unique IDs, and post-processes URIs.
    """
    valid_products = []
    newly_added_ids = set()
    for product in products:
        is_valid = True
        product_id = product.get('id')

        if not product_id:
            print(f"Warning: Discarding product with no ID: {str(product)[:100]}...")
            is_valid = False

        elif product_id.lower() in existing_ids or product_id.lower() in newly_added_ids:
            print(f"Warning: Discarding duplicate product with ID: {product_id}")
            is_valid = False

        if 'attributes' in product and product['attributes'] is not None:
            for attr in product['attributes']:
                if 'value' in attr and attr['value'] is not None:
                    value_dict = attr['value']
                    has_text = 'text' in value_dict and value_dict.get('text')
                    has_numbers = 'numbers' in value_dict and value_dict.get('numbers')
                    if has_text and has_numbers:
                        del value_dict['numbers']

        if 'ttl' in product and 'expireTime' in product:
            del product['ttl']

        if 'ttl' in product and isinstance(product.get('ttl'), dict):
            ttl_seconds = product['ttl'].get('seconds')
            if isinstance(ttl_seconds, (int, float)) and ttl_seconds < 0:
                print(f"Warning: Discarding product with negative ttl.seconds: {product_id}")
                is_valid = False

        if 'priceInfo' in product and isinstance(product.get('priceInfo'), dict):
            price_info = product['priceInfo']
            price = price_info.get('price')
            original_price = price_info.get('originalPrice')
            if isinstance(price, (int, float)) and isinstance(original_price, (int, float)) and price > original_price:
                price_info['originalPrice'] = price

        if product.get('type') == 'VARIANT' and not product.get('primaryProductId'):
             print(f"Warning: Discarding VARIANT product with no primaryProductId: {product_id}")
             is_valid = False

        if is_valid:
            # --- Post-processing for URI and Image URI ---
            if SITE_URI.strip() and product_id:
                product['uri'] = f"{SITE_URI.rstrip('/')}/product/{product_id}"

            if IMAGE_URI.strip() and product_id and product.get('images'):
                for image_info in product['images']:
                    image_filename = f"{product_id}.png"
                    image_info['uri'] = f"{IMAGE_URI.rstrip('/')}/{image_filename}"
                    image_info['height'] = 0
                    image_info['width'] = 0

            valid_products.append(product)
            newly_added_ids.add(product_id.lower())

    return valid_products, newly_added_ids


# --- Main Generation Logic ---
ALL_GENERATED_IDS = set()
primary_prompt_template = load_text_file('prompts/data_generation_prompt_primary.txt')

# --- 3a. Generate PRIMARY Products ---
number_of_batches = math.ceil(NUMBER_OF_PRIMARY_PRODUCTS / PRODUCTS_PER_BATCH)
print(f"--- Generating {NUMBER_OF_PRIMARY_PRODUCTS} PRIMARY Products in {number_of_batches} batches ---")

all_primary_products = []
primary_product_ids = []
primary_jsonl_path = "products_primary.jsonl"

with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    with open(primary_jsonl_path, "w") as f:
        futures = []
        for i in range(number_of_batches):
            prompt = primary_prompt_template.format(
                num_products=PRODUCTS_PER_BATCH,
                retailer=RETAILER, categories=product_categories, schema=schema_str, requirements=field_requirements,
                existing_ids='\n'.join(list(ALL_GENERATED_IDS))
            )
            futures.append(executor.submit(generate_product_batch_raw, prompt))

        pbar = tqdm(concurrent.futures.as_completed(futures), total=number_of_batches, desc="Generating PRIMARY Batches")
        for future in pbar:
            try:
                raw_batch = future.result()
                if raw_batch:
                    valid_batch, new_ids = validate_and_filter_products(raw_batch, ALL_GENERATED_IDS)
                    if valid_batch:
                        all_primary_products.extend(valid_batch)
                        ALL_GENERATED_IDS.update(new_ids)
                        for product in valid_batch:
                            f.write(json.dumps(product) + '\n')
                            if product.get('type') == 'PRIMARY':
                                primary_product_ids.append(product['id'])
                        pbar.set_postfix_str(f"{len(valid_batch)} unique products added. Total IDs: {len(ALL_GENERATED_IDS)}")
            except Exception as exc:
                pbar.set_postfix_str(f"Batch generated an exception: {exc}")

print("--- PRIMARY Generation Complete ---")
print(f"Total PRIMARY products generated: {len(all_primary_products)}")

### 3a-2. Final Verification of Primary Product IDs

This cell performs a final verification step to ensure that every `id` in the generated set of primary products is globally unique. It checks the `all_primary_products` list created in the previous step, reports any duplicates it finds, and removes them to prevent issues in downstream processes like variant generation or BigQuery loading.

In [None]:
# --- Final Verification and Cleanup for Primary Product IDs ---
print("\n--- Verifying Global Uniqueness of Primary Product IDs ---")

# The list `all_primary_products` contains all products generated in the previous step.
# We will now perform a final check to discard any duplicates that might have been generated
# across different parallel batches.

initial_count = len(all_primary_products)
seen_ids = set()
unique_products = []

for product in all_primary_products:
    product_id = product.get('id')
    if product_id:
        # Check against a case-insensitive set of seen IDs
        if product_id.lower() not in seen_ids:
            unique_products.append(product)
            seen_ids.add(product_id.lower())

final_count = len(unique_products)
duplicates_found = initial_count - final_count

if duplicates_found > 0:
    print(f"🚨 WARNING: Found and removed {duplicates_found} duplicate product ID(s).")
    # Overwrite the list to contain only unique products
    all_primary_products = unique_products
    # It's also critical to update the list of IDs that will be used for variant generation
    primary_product_ids = [p['id'] for p in all_primary_products if p.get('type') == 'PRIMARY']
    print(f"Corrected number of unique primary products: {final_count}")
else:
    print("✅ All primary product IDs are unique. No duplicates found.")

# This ensures the list passed to the next step (variant generation) is clean.

### 3b. Generate VARIANT Products

This cell runs the second stage of the generation process. It is configured to generate 10 times the number of primary products as variants.

It uses the `data_generation_prompt_variant.txt` template, which is dynamically populated with a random sample of the `primary_product_ids` collected in the previous step. This ensures that the generated variants are realistically associated with existing primary products.

In [None]:
# --- 3b. Generate VARIANT Products ---
import random
NUMBER_OF_VARIANTS = NUMBER_OF_PRIMARY_PRODUCTS * NUMBER_OF_VARIANTS_PER_PRIMARY
variant_prompt_template = load_text_file('prompts/data_generation_prompt_variant.txt')

if not primary_product_ids:
    print("Skipping VARIANT generation: No primary product IDs were created.")
else:
    number_of_variant_batches = math.ceil(NUMBER_OF_VARIANTS / PRODUCTS_PER_BATCH)
    print(f"--- Generating {NUMBER_OF_VARIANTS} VARIANT Products in {number_of_variant_batches} batches ---")

    all_variant_products = []
    variant_jsonl_path = "products_variant.jsonl"

    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        with open(variant_jsonl_path, "w") as f:
            futures = []
            for i in range(number_of_variant_batches):
                id_sample = random.sample(primary_product_ids, k=min(50, len(primary_product_ids)))
                variant_prompt = variant_prompt_template.format(
                    num_products=PRODUCTS_PER_BATCH,
                    retailer=RETAILER,
                    categories=product_categories,
                    schema=schema_str,
                    requirements=field_requirements,
                    primary_product_ids='\n'.join(id_sample),
                    existing_ids='\n'.join(list(ALL_GENERATED_IDS)) # Best-effort instruction
                )
                futures.append(executor.submit(generate_product_batch_raw, variant_prompt))

            pbar = tqdm(concurrent.futures.as_completed(futures), total=number_of_variant_batches, desc="Generating VARIANT Batches")

            for future in pbar:
                try:
                    raw_batch = future.result()
                    if raw_batch:
                        valid_batch, new_ids = validate_and_filter_products(raw_batch, ALL_GENERATED_IDS)
                        if valid_batch:
                            all_variant_products.extend(valid_batch)
                            ALL_GENERATED_IDS.update(new_ids)
                            for product in valid_batch:
                                f.write(json.dumps(product) + '\n')
                            pbar.set_postfix_str(f"{len(valid_batch)} unique products added. Total IDs: {len(ALL_GENERATED_IDS)}")
                except Exception as exc:
                    pbar.set_postfix_str(f"A batch generated an exception: {exc}")

    print("--- VARIANT Generation Complete ---")
    print(f"Total VARIANT products generated: {len(all_variant_products)}")

### 3c. Combine PRIMARY and VARIANT Products

Now that both the primary and variant products have been generated into separate files, this step combines them into a single `products.jsonl` file. This consolidated file will then be used for uploading to Google Cloud Storage and loading into BigQuery.

In [None]:
import glob

# List of files to combine
files_to_combine = ['products_primary.jsonl', 'products_variant.jsonl']
combined_jsonl_path = 'products.jsonl'

# The original `local_jsonl_path` variable already points to 'products.jsonl',
# so the existing upload and BigQuery load cells will work correctly with this combined file.

total_lines_written = 0

with open(combined_jsonl_path, 'w') as outfile:
    for filename in files_to_combine:
        try:
            with open(filename, 'r') as infile:
                for line in infile:
                    outfile.write(line)
                    total_lines_written += 1
            print(f"Successfully combined '{filename}'.")
        except FileNotFoundError:
            print(f"Warning: File '{filename}' not found. Skipping.")

print("-" * 30)
print(f"Combined file created at: {combined_jsonl_path}")
print(f"Total products in combined file: {total_lines_written}")

## 4. Upload to Google Cloud Storage

The generated JSONL file is uploaded to a GCS bucket to be used as a source for the BigQuery load job.

In [None]:
from google.cloud import storage

def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client(project=PROJECT_ID)
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_filename(source_file_name)

    print(f"File {source_file_name} uploaded to {destination_blob_name}.")

# The destination path in GCS, removing the 'gs://' prefix
destination_blob_name = GENERATED_JSONL_GCS_PATH.replace(f"gs://{GCS_BUCKET_NAME}/", "")

# Use the correct variable from the previous file-combination cell
upload_to_gcs(GCS_BUCKET_NAME, combined_jsonl_path, destination_blob_name)

## 5. Load Data into BigQuery

This section creates the dataset and table in BigQuery (if they don't already exist) and then loads the data from the GCS file.

In [None]:
from google.cloud import bigquery
import google.api_core.exceptions

# 1. Create the BigQuery Dataset if it doesn't exist
dataset_id = f"{PROJECT_ID}.{BQ_DATASET}"
dataset = bigquery.Dataset(dataset_id)
dataset.location = LOCATION
try:
    dataset = bq_client.create_dataset(dataset, timeout=30)
    print(f"Created dataset {dataset_id}")
except google.api_core.exceptions.Conflict:
    print(f"Dataset {dataset_id} already exists.")

# 2. Create the BigQuery Table with the specified schema
table_id = f"{PROJECT_ID}.{BQ_DATASET}.{BQ_TABLE}"
schema = [bigquery.SchemaField.from_api_repr(field) for field in bq_schema]
table = bigquery.Table(table_id, schema=schema)
try:
    table = bq_client.create_table(table)
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")
except google.api_core.exceptions.Conflict:
    print(f"Table {table.project}.{table.dataset_id}.{table.table_id} already exists.")

# 3. Load the data from GCS into the BigQuery table
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
    schema=schema,
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, # Overwrite table if it exists
)

load_job = bq_client.load_table_from_uri(
    GENERATED_JSONL_GCS_PATH, table_id, job_config=job_config
)

print(f"Starting job {load_job.job_id} to load data into {table_id}")

load_job.result()  # Waits for the job to complete.

destination_table = bq_client.get_table(table_id)
print(f"Load job finished. Loaded {destination_table.num_rows} rows.")

## 6. (Optional) Cleanup

Run the following cell to delete the resources created in this notebook.

In [None]:
# Set to True to delete the created resources
delete_resources = False #@param {type:"boolean"}

if delete_resources:
    # Delete BigQuery table
    print(f"Deleting BigQuery table: {table_id}")
    bq_client.delete_table(table_id, not_found_ok=True)
    print("Table deleted.")

    # Delete GCS file
    print(f"Deleting GCS file: {GENERATED_JSONL_GCS_PATH}")
    try:
        storage_client = storage.Client(project=PROJECT_ID)
        bucket = storage_client.bucket(GCS_BUCKET_NAME)
        blob = bucket.blob(destination_blob_name)
        blob.delete()
        print("GCS file deleted.")
    except google.api_core.exceptions.NotFound:
        print("GCS file not found, skipping deletion.")
else:
    print("Cleanup skipped. Set 'delete_resources' to True to delete created resources.")

In [None]:
#

In [None]:
# @title
#
# CELL 1: SETUP AND CONFIGURATION (FOR VERTEX AI)
#
# This cell contains all the variables you need to change.
# Fill these out with your specific details before running the script.
#
import os
from google.cloud import secretmanager
from datetime import datetime

# --- ⚠️ ACTION REQUIRED: Replace these placeholder values ---
GCP_PROJECT_ID = "partarch-ecommerce-demo"
SECRET_ID = "github-token"  # The name you gave the secret in Secret Manager
GITHUB_USERNAME = "cloud-jake"
GITHUB_EMAIL = "jake.holmquist@gmail.com"
REPO_NAME = "colab-catalog-generation"
# --- End of required changes ---

# --- 1. Securely retrieve the GitHub token from GCP Secret Manager ---
print("🔑 Accessing GitHub token from GCP Secret Manager...")

def access_secret_version(project_id, secret_id, version_id="latest"):
    """
    Access the payload for the given secret version and return it.
    """
    client = secretmanager.SecretManagerServiceClient()
    name = f"projects/{project_id}/secrets/{secret_id}/versions/{version_id}"
    response = client.access_secret_version(request={"name": name})
    return response.payload.data.decode("UTF-8")

try:
    GIT_TOKEN = access_secret_version(GCP_PROJECT_ID, SECRET_ID)
    print("   ✅ Token accessed successfully.")
except Exception as e:
    print(f"   ❌ Error accessing secret: {e}")
    raise Exception("Could not access secret. Check permissions and that the secret exists.")

# --- 2. Define repository URL and local path ---
repo_url = f"https://{GITHUB_USERNAME}:{GIT_TOKEN}@github.com/{GITHUB_USERNAME}/{REPO_NAME}.git"
repo_path = f"/content/{REPO_NAME}"

# --- 3. Clone the repository or pull latest changes ---
print(f"\n📂 Checking for repository at '{repo_path}'...")
if os.path.exists(repo_path):
    print("   Repository already exists. Pulling latest changes...")
    %cd {repo_path}
    !git pull
    %cd /content
    print("   ✅ Pull complete.")
else:
    print("   Repository not found. Cloning from GitHub...")
    !git clone {repo_url}
    print("   ✅ Clone complete.")

# --- 4. Configure Git for commits ---
print("\n👤 Configuring Git user...")
%cd {repo_path}
!git config user.name "{GITHUB_USERNAME}"
!git config user.email "{GITHUB_EMAIL}"
print("   ✅ Git user configured.")

# --- 5. Copy all files from /content into the repository ---
print("\n📋 Copying all files from /content directory...")
all_content_items = os.listdir('/content')
items_to_copy = [item for item in all_content_items if item not in [REPO_NAME, 'sample_data']]

if not items_to_copy:
    print("   No new files to copy.")
else:
    for item in items_to_copy:
        source_path = f"/content/{item}"
        print(f"   - Copying '{source_path}'...")
        !cp -r "{source_path}" .
    print("   ✅ All files copied.")

# --- 6. Add, commit, and push changes ---
print("\n🚀 Staging, committing, and pushing changes to GitHub...")
!git add .
commit_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
commit_message = f"Update from Colab Enterprise at {commit_time}"
!git commit -m "{commit_message}"
!git push -u origin main

print("\n🎉 Success! All changes have been pushed to your GitHub repository.")
