# Auto-prompt and Product Recontext

GCS input folders for product images are used by a Gemini to generate descriptive prompts and product characteristics, and then fed into the Imagen Product Recontextualization API. The recontextualized images and logs saved back to GCS. The process runs scalably using asynchronous batch jobs.

## Imports

In [None]:
# !pip install --upgrade --user google-cloud-aiplatform

In [10]:

from google import genai
from google.genai import types
from google.cloud import aiplatform, storage
from google.cloud.aiplatform.gapic import PredictResponse
from google.colab import auth

import base64
import io
import re
import timeit
import time
import os
import json
from pathlib import Path
from typing import Any, Dict, List, Generator
from PIL import Image
import matplotlib.pyplot as plt

import asyncio
import pandas as pd
from datetime import datetime

In [2]:
# --------------------------------------------------
# Authenticate your Colab session with GCP
from google.colab import auth
auth.authenticate_user()
print("Authenticated with Google Cloud")
# --------------------------------------------------


Authenticated with Google Cloud


## Helper Functions


In [4]:
#Core Helper Functions for GCS scanning and image display

# GCS‑scanning + debug prints

def get_mime_type(uri: str) -> str:
      """
    Determines the MIME type based on the file extension of a given URI.

    This function supports common image file extensions like PNG and JPEG.
    """
    ext = os.path.splitext(uri)[1].lower()
    if ext == ".png":
        return "image/png"
    elif ext in (".jpg", ".jpeg"):
        return "image/jpeg"
    else:
        raise ValueError(f"Unsupported extension: {ext}")

# Display helpers
def download_gcs_image_bytes(uri: str) -> bytes:
      """
    Downloads image bytes from a Google Cloud Storage (GCS) URI.

    This function parses a `gs://` URI, extracts the bucket and object names,
    and uses the `google.cloud.storage.Client` to download the blob's content
    as raw bytes.
    """
    m = re.match(r"gs://([^/]+)/(.*)", uri)
    if not m:
        raise ValueError(f"Invalid GCS URI: {uri}")
    bucket_name, obj = m.groups()
    client = storage.Client()
    return client.bucket(bucket_name).blob(obj).download_as_bytes()

def prediction_to_pil_image(pred: PredictResponse, size=(640, 640)) -> Image.Image:
    """
    Converts a prediction response containing a base64 encoded image into a PIL Image object.

    The function decodes the base64 string, opens it as an image, and optionally
    resizes it as a thumbnail to the specified `size`.
    """
    b64 = pred["bytesBase64Encoded"]
    data = base64.b64decode(b64)
    img = Image.open(io.BytesIO(data))
    img.thumbnail(size)
    return img

def display_row(items: List[Any], figsize=(12, 4)):
    """
    Displays a list of items (images or text) in a single row using Matplotlib.
    """
    if not items:
        print("No items to display.")
        return
    fig, axes = plt.subplots(1, len(items), figsize=figsize)
    if len(items) == 1:
        axes = [axes]
    for ax, it in zip(axes, items):
        if isinstance(it, Image.Image):
            ax.imshow(it)
        elif isinstance(it, dict) and "bytesBase64Encoded" in it:
            ax.imshow(prediction_to_pil_image(it))
        else:
            ax.text(0.5, 0.5, str(it), ha="center", va="center", wrap=True)
        ax.axis("off")
    plt.tight_layout()
    plt.show()

# generate() with fixed SafetySetting keyword args
def generate(image_parts: List[types.Part]) -> Dict[str, str]:
    """
    Generates AI-driven product descriptions and recontextualization prompts using the Gemini 2.5 Flash model.

    This function interacts with the Google GenAI API client configured for Vertex AI.
    It constructs a specific user instruction and a detailed system instruction directly
    within the function, outlining the role, objectives (accurate product description,
    compelling background/scene proposal), and the desired JSON output format.

    The model is configured with a fixed set of safety settings (all categories set to OFF),
    a low temperature for less randomness, and streams the content generation.
    The output is expected to be a JSON object, which is then parsed and returned.
    """
    import json, re
    from google import genai
    from google.genai import types

    client = genai.Client(
        vertexai=True,
        project="cpg-cdp",
        location="global",
    )

    user_instr = types.Part.from_text(text="""
Analyze the provided images of a single product (up to 3). First, identify and describe the product in accurate, natural language: focus on material, color, shape, form, pattern, and distinctive design features.

Then, determine a visually appropriate and realistic background or scene where the product would naturally appear and look appealing. Base this on the product’s style and category — for example, place a desk lamp in a home office, or a sneaker in a modern studio.

DO NOT GENERATE PEOPLE/CHILDREN
Your output should be returned as a JSON object in the format below:
{
  "Prompt": "<rich description of product and proposed scene>, high-quality product photography aesthetic, Commercial Lifestyle Color Grading",
  "product_description": "<just the product, no scene>"
}

Do not reference the original image’s background or lighting.
Do not use placeholders like “in a nice room” — be specific about the setting (e.g., “in a sunlit bohemian-style bedroom with woven textures and indoor plants”).
""")

    system_instr = types.Part.from_text(text="""
Role:
You are an expert visual analyst and prompt engineer for AI-based image generation. Your task is to analyze up to 3 input images of the same product, and generate a single, high-quality prompt suitable for AI-driven product image recontextualization.

Your objectives are twofold:

Describe the product accurately: Identify product category, form, material, texture, color, patterns, and notable design features.

Propose a compelling background/scene: Select a suitable environment in which the product would naturally and attractively appear — based on its likely usage, aesthetic, and category.

DO NOT GENERATE PEOPLE/CHILDREN

Input:
Up to 3 images of the same product (e.g., different angles or lighting).
No metadata, no background descriptions provided — just images.

Output Format:
Return a JSON object in this structure:
{
  "Prompt": "<natural language prompt for recontextualized image generation>",
  "product_description": "<just the product description, no scene>"
}""")

    contents = [ types.Content(role="user", parts=[user_instr, *image_parts]) ]
    config = types.GenerateContentConfig(
        temperature=0.2,
        top_p=0.95,
        max_output_tokens=8192,
        safety_settings=[
            types.SafetySetting(category="HARM_CATEGORY_HATE_SPEECH", threshold="OFF"),
            types.SafetySetting(category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="OFF"),
            types.SafetySetting(category="HARM_CATEGORY_SEXUALLY_EXPLICIT", threshold="OFF"),
            types.SafetySetting(category="HARM_CATEGORY_HARASSMENT", threshold="OFF"),
        ],
        system_instruction=[system_instr],
        thinking_config=types.ThinkingConfig(thinking_budget=0)
    )

    output = ""
    for chunk in client.models.generate_content_stream(
        model="gemini-2.5-flash", contents=contents, config=config
    ):
        output += chunk.text

    clean = output.strip()
    clean = re.sub(r"^```(?:\w+)?\n","", clean)
    clean = re.sub(r"\n```$","", clean)
    return json.loads(clean)


In [17]:
def call_product_recontext(
    image_bytes_list=None,
    image_uris_list=None,
    prompt=None,
    product_description=None,
    disable_prompt_enhancement=True,
    sample_count=1,
    base_steps=None,
    safety_setting=None,
    person_generation=None,
    storage_uri: str = None,
) -> PredictResponse:
    """Calls the Imagen Product Recontextualization API."""
    inst: Dict[str, Any] = {"productImages": []}
    if image_uris_list:
        for uri in image_uris_list:
            inst["productImages"].append({"image": {"gcsUri": uri}})
    if not inst["productImages"]:
        raise ValueError("No product images provided.")
    if product_description:
        if "productConfig" not in inst["productImages"][0]:
            inst["productImages"][0]["productConfig"] = {}
        inst["productImages"][0]["productConfig"]["productDescription"] = product_description
    if prompt:
        inst["prompt"] = prompt

    params = {"sampleCount": sample_count}
    if disable_prompt_enhancement: params["enhancePrompt"] = False
    if safety_setting:    params["safetySetting"] = safety_setting
    if person_generation:  params["personGeneration"] = person_generation
    if base_steps:         params["baseSteps"] = base_steps
        if storage_uri:
        params["storageUri"] = storage_uri

    start = timeit.default_timer()
    resp = predict_client.predict(
        endpoint=model_endpoint,
        instances=[inst],
        parameters=params,
    )
    print(f"Recontext API call took {timeit.default_timer()-start:.2f}s")
    return resp

async def process_batch_async(batch, semaphore, output_gcs_uri_prefix: str):
    """
    Async worker that takes a product batch and saves the result to a GCS prefix.
    """
    async with semaphore:
        product_folder = batch['product_folder']
        print(f"-> Starting async job for: {product_folder}")
        try:
            gen_result = await asyncio.to_thread(
                generate,
                batch["image_parts"]
            )
            prompt = gen_result.get("Prompt", "")
            product_description = gen_result.get("product_description", "")
            print(f"   - Generated prompt for {product_folder}")

            # define the output GCS path for this specific batch ---
            output_storage_uri = f"{output_gcs_uri_prefix.rstrip('/')}/{product_folder}"

            recontext_response = await asyncio.to_thread(
                call_product_recontext,
                prompt=prompt,
                product_description=product_description,
                image_uris_list=batch["product_uris"],
                disable_prompt_enhancement=False,
                sample_count=3,
                safety_setting="block_low_and_above",
                person_generation="allow_adult",
                storage_uri=output_storage_uri
            )
            print(f"   - API is saving recontextualized image(s) to: {output_storage_uri}")

            # extract the final output URIs from the API response
            preds = recontext_response.predictions
            output_uris = [pred["gcsUri"] for pred in preds if "gcsUri" in pred]

            print(f"✓ Finished async job for: {product_folder}")
            return {
                "product_folder": product_folder,
                "status": "Success",
                "generated_prompt": prompt,
                "product_description": product_description,
                "output_uris": output_uris,
                "source_uris": batch["product_uris"]
            }

        except Exception as e:
            print(f"[ERROR] processing batch {product_folder}: {e}")
            return {
                "product_folder": product_folder,
                "status": "Failed",
                "error": str(e),
                "generated_prompt": None,
                "product_description": None,
                "output_uris": [],
                "source_uris": batch["product_uris"]
            }


def discover_product_batches(input_gcs_uri_prefix: str) -> Generator[Dict[str, object], None, None]:
    """
    Scans a GCS URI prefix for product subfolders and yields batches of images.

    Yields dicts with:
     - product_folder (e.g. "product_5")
     - image_parts (List[types.Part])
     - product_uris (List[str])
    """
    # Use regex to parse the bucket and prefix from the full GCS URI
    m = re.match(r"gs://([^/]+)/(.+)", input_gcs_uri_prefix)
    if not m:
        raise ValueError(f"Invalid GCS URI format: {input_gcs_uri_prefix}. Expected gs://bucket-name/prefix.")

    bucket_name, base_prefix = m.groups()
    # Ensure the prefix ends with a slash for proper folder matching
    prefix = base_prefix.rstrip("/") + "/"

    client = storage.Client()
    print(f"Scanning GCS path: gs://{bucket_name}/{prefix}")

    # Fetch all blobs under the specified prefix
    blobs = list(client.list_blobs(bucket_name, prefix=prefix))
    print(f"  • Found {len(blobs)} total objects under the prefix.")

    folder_map: Dict[str, List[storage.blob.Blob]] = {}
    for b in blobs:
        relative_path = b.name[len(prefix):]
        parts = relative_path.split("/")

        if len(parts) != 2:
            continue

        folder, filename = parts
        if not filename.lower().endswith((".png", ".jpg", ".jpeg")):
            continue

        folder_map.setdefault(folder, []).append(b)

    print(f"  • Discovered product folders: {list(folder_map.keys())}")

    for folder, blob_list in folder_map.items():
        blob_list.sort(key=lambda b: os.path.basename(b.name))
        blob_list = blob_list[:3]
        uris = [f"gs://{b.bucket.name}/{b.name}" for b in blob_list]
        parts = [
            types.Part(file_data=types.FileData(file_uri=uri, mime_type=get_mime_type(uri)))
            for uri in uris
        ]

        yield {
            "product_folder": folder,
            "image_parts":    parts,
            "product_uris":   uris,
        }


## Initialize Vertex AI Client

In [25]:
# Init clients
PROJECT_ID = "cpg-cdp" # @param
LOCATION   = "us-central1" # @param

aiplatform.init(project=PROJECT_ID, location=LOCATION)
predict_client = aiplatform.gapic.PredictionServiceClient(
    client_options={"api_endpoint": f"{LOCATION}-aiplatform.googleapis.com"}
)
model_endpoint = (
    f"projects/{PROJECT_ID}/locations/{LOCATION}"
    + "/publishers/google/models/imagen-product-recontext-preview-06-30"
)
print("Prediction client ready")


Prediction client ready


### Run Async Batch Jobs

In [24]:
INPUT_GCS_URI_PREFIX = "gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/" # @param
OUTPUT_GCS_URI_BASE = "gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/outputs" # @param
CONCURRENT_REQUESTS = 5 # @param

In [24]:
print(f"Discovering product batches from: {INPUT_GCS_URI_PREFIX}")
batches = list(discover_product_batches(INPUT_GCS_URI_PREFIX))
print(f"Total product folders discovered: {len(batches)}")

if not batches:
    raise RuntimeError("No product batches found—check your GCS path & permissions!")

# Async Job
start_time = time.time()

# Output folder
run_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
final_output_prefix = f"{OUTPUT_GCS_URI_BASE.rstrip('/')}/{run_timestamp}"

print(f"\nStarting async processing for {len(batches)} batches...")
print(f"Concurrency level: {CONCURRENT_REQUESTS}")
print(f"Output will be saved to: {final_output_prefix}")

semaphore = asyncio.Semaphore(CONCURRENT_REQUESTS)
tasks = [process_batch_async(batch, semaphore, final_output_prefix) for batch in batches]
list_of_results = await asyncio.gather(*tasks)

end_time = time.time()
print(f"\n--- Batch processing complete ---")
print(f"Total time: {end_time - start_time:.2f} seconds")

# Table Results
results_df = pd.DataFrame(list_of_results)

print("\nBatch Job Complete. Results:")
pd.set_option('display.max_colwidth', 150)
display(results_df)

m = re.match(r"gs://([^/]+)/(.+)", final_output_prefix)
output_bucket_name, output_prefix = m.groups()
log_filename = f"{output_prefix}/run_log_{run_timestamp}.csv"

csv_buffer = io.StringIO()
results_df.to_csv(csv_buffer, index=False)

storage_client = storage.Client()
bucket = storage_client.bucket(output_bucket_name)
log_blob = bucket.blob(log_filename)
log_blob.upload_from_string(csv_buffer.getvalue(), content_type="text/csv")

print(f"\nUploaded final run log to: gs://{output_bucket_name}/{log_filename}")

Discovering product batches from: gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/
🔍 Scanning GCS path: gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/
  • Found 19 total objects under the prefix.
  • Discovered product folders: ['product_01', 'product_02', 'product_03', 'product_04', 'product_05', 'product_06', 'product_07', 'product_08', 'product_09', 'product_10', 'product_11']
Total product folders discovered: 11

Starting async processing for 11 batches...
Concurrency level: 5
Output will be saved to: gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/outputs/20250717_174757
-> Starting async job for: product_01
-> Starting async job for: product_02
-> Starting async job for: product_03
-> Starting async job for: product_04
-> Starting async job for: product_05
   - Generated prompt for product_05
   - Generated prompt for product_04
   - Generated prompt for pr

Unnamed: 0,product_folder,status,generated_prompt,product_description,output_uris,source_uris,error
0,product_01,Success,A dark glass bottle of 'Oi! 100% Pure Toasted Sesame Oil' with a black screw cap and a white label featuring a speech bubble logo and green and br...,A dark glass bottle of 'Oi! 100% Pure Toasted Sesame Oil' with a black screw cap. The bottle has a white label featuring a black speech bubble log...,[gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/outputs/20250717_174757/product_01/1752774496116/sample_...,[gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/product_01/Chosen-Foods-100-Pure-Toasted-Sesame-Oil-Glas...,
1,product_02,Success,"A clear plastic bottle of Spice World minced garlic, filled with finely chopped, light beige garlic, featuring a dark blue label with light blue s...","A clear plastic bottle of Spice World minced garlic, filled with finely chopped, light beige garlic. It has a dark blue label with light blue spec...",[gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/outputs/20250717_174757/product_02/1752774490975/sample_...,[gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/product_02/bc1a24d6-30c3-4da5-a9cd-87c920fcbecb.6e9799e7...,
2,product_03,Success,"A clear plastic bag of ""A Taste of Thai"" Vermicelli Rice Noodles, featuring a prominent blue and gold label with traditional Thai architectural mo...","A clear plastic bag of ""A Taste of Thai"" Vermicelli Rice Noodles, featuring a prominent blue and gold label with traditional Thai architectural mo...",[gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/outputs/20250717_174757/product_03/1752774496106/sample_...,[gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/product_03/A-Taste-of-Thai-Vermicelli-Rice-Noodles-8-8-o...,
3,product_04,Success,"A cylindrical cardboard container of Great Value Plain Salt, primarily dark blue with a pattern of lighter blue dots, featuring a white label with...","A cylindrical cardboard container of Great Value Plain Salt, primarily dark blue with a pattern of lighter blue dots, featuring a white label with...",[gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/outputs/20250717_174757/product_04/1752774493227/sample_...,[gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/product_04/96797321-6a5d-485b-8cc6-dd1f16f829e6.a0f3723b...,
4,product_05,Success,"A Spice World Minced Ginger bottle, featuring a clear plastic body filled with light brown minced ginger, a blue plastic cap, and a blue label wit...","A Spice World Minced Ginger bottle, featuring a clear plastic body filled with light brown minced ginger, a blue plastic cap, and a blue label wit...",[gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/outputs/20250717_174757/product_05/1752774501763/sample_...,[gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/product_05/4415607e-2075-4e7c-bef7-b5df7417f890.e6dbddd6...,
5,product_06,Success,"A fresh, dark green cucumber, whole and sliced into round pieces, with a slightly bumpy texture and light green flesh with small white seeds, arra...","A fresh, dark green cucumber, whole and sliced into round pieces, with a slightly bumpy texture and light green flesh with small white seeds.",[gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/outputs/20250717_174757/product_06/1752774508273/sample_...,[gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/product_06/6a33a2ac-71e8-4147-8eb9-7bdb7b175acb.c0e6cedd...,
6,product_07,Failed,,,[],[gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/product_07/Mizkan-Seasoned-Rice-Vinegar-Mild-and-Sweet-1...,503 Image editing failed with the following error: Failed to retrieve RAI response.
7,product_08,Success,"A clear plastic spice shaker bottle filled with small, oval, off-white sesame seeds, topped with a black plastic cap. The bottle features a white ...","A clear plastic spice shaker bottle filled with small, oval, off-white sesame seeds, topped with a black plastic cap. The bottle features a white ...",[gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/outputs/20250717_174757/product_08/1752774516243/sample_...,[gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/product_08/1c961451-f190-4879-bd40-7a90349a9826.a90790e8...,
8,product_09,Success,"A fresh bunch of vibrant green scallions, also known as green onions or spring onions, featuring long, slender hollow green stalks that transition...","A fresh bunch of vibrant green scallions, also known as green onions or spring onions, featuring long, slender hollow green stalks that transition...",[gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/outputs/20250717_174757/product_09/1752774508808/sample_...,[gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/product_09/2098edc7-38fd-4a50-bb14-7ac2fc978738.a620ac76...,
9,product_10,Success,"A dark brown glass bottle of Great Value Soy Sauce, featuring a red plastic cap and a cream-colored label with black and red botanical illustratio...","A dark brown glass bottle of Great Value Soy Sauce, featuring a red plastic cap and a cream-colored label with black and red botanical illustratio...",[gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/outputs/20250717_174757/product_10/1752774513975/sample_...,[gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/product_10/Great-Value-Naturally-Brewed-Soy-Sauce-15-fl-...,



Uploaded final run log to: gs://sandbox-401718-visual-cat-enrichment/essentials-journey/Essentials Journey - noodle/outputs/20250717_174757/run_log_20250717_174757.csv
