In [7]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
5Ps -> Image pipeline using Google GenAI (Gemini API).
- Step 1: Generate strict JSON brief from 5Ps with gemini-2.5-pro/flash
- Step 2: Generate image from the brief with Imagen 4.0 or Gemini Flash Image Preview

Usage examples:
  python fiveps_to_image.py --input fiveps.json --out out/creative.png
  python fiveps_to_image.py --model-text gemini-2.5-pro --model-image imagen-4.0-generate-001
  python fiveps_to_image.py --fiveps '{"product":"...", "price":"...", "place":"...", "promotion":"...", "people":"..."}'

Environment:
  export GOOGLE_API_KEY="YOUR_KEY"   # or pass --api-key
"""

import os, io, json, re, argparse, sys
from typing import Any, Dict

from PIL import Image
from google import genai
from google.genai import types


# ------------------------ Defaults ------------------------

DEFAULT_TEXT_MODEL  = "gemini-2.5-pro"  # better format adherence; use "gemini-2.5-flash" for lower cost
DEFAULT_IMAGE_MODEL = "imagen-4.0-generate-001"  # highest fidelity stills
# Alternative: "gemini-2.5-flash-image-preview" for iterative/editable image gen

SYSTEM_INSTRUCTIONS = """
You output ONLY strict, minified JSON for an image brief with keys:
{"prompt":"...", "negative":"...", "style":"...", "aspect_ratio":"1:1|4:5|3:4|16:9", "safety":"..."}
Rules:
- No code fences, no prose, no comments.
- No trailing commas.
- No line breaks inside strings (use spaces).
- Use straight double quotes (").
- No brand names, logos, or medical/health claims.
- Localize to Indian context if relevant (props, environments, skin tones).
- Keep the prompt self-contained and ready for text-to-image models.
"""

# ------------------------ Utilities ------------------------

def _extract_text(response) -> str:
    """Robustly extract text from google-genai response."""
    if getattr(response, "text", None):
        return response.text

    if getattr(response, "candidates", None):
        for cand in response.candidates or []:
            # Safety-blocked? surface clearly.
            if getattr(cand, "finish_reason", None) == "SAFETY":
                # Fallthrough to prompt_feedback below for details
                pass
            content = getattr(cand, "content", None)
            if not content:
                continue
            parts = getattr(content, "parts", None) or []
            buf = []
            for p in parts:
                t = getattr(p, "text", None)
                if t:
                    buf.append(t)
            if buf:
                return "\n".join(buf)

    fb = getattr(response, "prompt_feedback", None)
    if fb and getattr(fb, "block_reason", None):
        reason = fb.block_reason
        # safety_ratings is a list of objects; just stringify for visibility
        details = getattr(fb, "safety_ratings", None)
        raise RuntimeError(f"Gemini blocked the prompt. reason={reason} details={details}")

    raise RuntimeError(
        f"Empty response from model. "
        f"Fields seen: text={getattr(response,'text',None)!r}, "
        f"candidates={bool(getattr(response,'candidates',None))}"
    )

def coerce_json(raw: str) -> Dict[str, Any]:
    """Coerce slightly malformed JSON from LLM into valid JSON."""
    if raw is None:
        raise RuntimeError("Empty model response")
    s = raw.strip()

    # Strip code fences if any
    if s.startswith("```"):
        s = re.sub(r"^```(?:json)?", "", s, flags=re.IGNORECASE).strip()
        if s.endswith("```"):
            s = s[:-3].strip()

    # Take outermost object
    i, j = s.find("{"), s.rfind("}")
    if i != -1 and j != -1 and i < j:
        s = s[i:j+1]

    # Normalize smart quotes
    s = (s.replace("\u201c", '"').replace("\u201d", '"')
           .replace("\u2018", "'").replace("\u2019", "'"))

    # Kill trailing commas
    s = re.sub(r",\s*([}\]])", r"\1", s)

    # Collapse line breaks inside strings
    s = s.replace("\r\n", " ").replace("\n", " ").replace("\r", " ")

    # Deduplicate spaces
    s = re.sub(r"[ \t]{2,}", " ", s).strip()

    return json.loads(s)

def validate_brief(b: Dict[str, Any]) -> Dict[str, Any]:
    """Ensure mandatory fields and sane values."""
    if not isinstance(b, dict):
        raise ValueError("Brief must be a JSON object")
    if not isinstance(b.get("prompt"), str) or not b.get("prompt"):
        raise ValueError("Brief is missing a non-empty 'prompt' string")
    b.setdefault("negative", "")
    b.setdefault("style", "")
    b.setdefault("safety", "Avoid unsafe or sensitive content.")
    if b.get("aspect_ratio") not in {"1:1", "4:5", "3:4", "16:9"}:
        b["aspect_ratio"] = "1:1"
    return b

def prompt_from_brief(brief: Dict[str, Any]) -> str:
    """Compose a final prompt string for the image model."""
    parts = [
        brief.get("prompt", ""),
        f"Style: {brief.get('style','')}".strip(),
        f"AR: {brief.get('aspect_ratio','1:1')}",
        f"Exclude: {brief.get('negative','')}".strip(),
        f"Safety: {brief.get('safety','')}".strip(),
    ]
    return "\n".join(p for p in parts if p and not p.endswith(":"))


# ------------------------ Core steps ------------------------

def make_client(api_key: str | None):
    key = api_key or "AIzaSyD-T1nCUIssaLbdoY905cBytKVcNN70z1U"
    if not key:
        raise RuntimeError("Missing API key. Set GOOGLE_API_KEY env or pass --api-key.")
    return genai.Client(api_key=key)

def build_image_brief(client, fiveps: Dict[str, Any], text_model: str) -> Dict[str, Any]:
    """5Ps -> strict JSON brief using a text model."""
    # Echo schema + user data to improve adherence
    user_payload = {
        "schema": {"prompt": "str", "negative": "str", "style": "str",
                   "aspect_ratio": "one-of: 1:1|4:5|3:4|16:9", "safety": "str"},
        "fiveps": fiveps,
    }

    resp = client.models.generate_content(
        model=text_model,
        contents=json.dumps(user_payload, ensure_ascii=False),
        config=types.GenerateContentConfig(
            system_instruction=SYSTEM_INSTRUCTIONS,
            response_mime_type="application/json",   # ask for JSON response body
            max_output_tokens=900,                   # enough room for JSON
            temperature=0.25,                        # stricter
        ),
    )

    raw = _extract_text(resp)
    brief = coerce_json(raw)
    brief = validate_brief(brief)
    return brief

def generate_image(client, image_model: str, prompt: str, out_path: str, number_of_images: int = 1):
    """Uses either Imagen 4.0 or Gemini Flash Image Preview to generate an image and save the first."""
    if image_model == "imagen-4.0-generate-001":
        img_resp = client.models.generate_images(
            model=image_model,
            prompt=prompt,
            config=types.GenerateImagesConfig(number_of_images=number_of_images),
        )
        img_bytes = img_resp.generated_images[0].image.bytes
    elif image_model == "gemini-2.5-flash-image-preview":
        img_resp = client.models.generate_content(
            model=image_model,
            contents=prompt,
            config=types.GenerateContentConfig(
                response_modalities=[types.Modality.IMAGE, types.Modality.TEXT],
                candidate_count=1,
                temperature=0.7,
            ),
        )
        if not getattr(img_resp, "generated_images", None):
            # Sometimes the model replies with text only—force user-visible error.
            t = _extract_text(img_resp)
            raise RuntimeError(f"The image model returned text instead of an image:\n{t}")
        img_bytes = img_resp.generated_images[0].image.bytes
    else:
        raise ValueError(f"Unsupported image model: {image_model}")

    os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True)
    Image.open(io.BytesIO(img_bytes)).save(out_path)


# ------------------------ CLI ------------------------

def parse_args():
    p = argparse.ArgumentParser(description="5Ps -> Image pipeline (Gemini API)")
    src = p.add_mutually_exclusive_group()
    src.add_argument("--input",  help="Path to JSON file containing 5Ps")
    src.add_argument("--fiveps", help="Inline JSON for 5Ps (single line)")

    p.add_argument("--out", default="out/creative.png", help="Output image path (default: out/creative.png)")
    p.add_argument("--model-text",  default=DEFAULT_TEXT_MODEL,  help=f"Text model (default: {DEFAULT_TEXT_MODEL})")
    p.add_argument("--model-image", default=DEFAULT_IMAGE_MODEL, help=f"Image model (default: {DEFAULT_IMAGE_MODEL})")
    p.add_argument("--num", type=int, default=1, help="Number of images to request (Imagen only uses this)")
    p.add_argument("--api-key", default=None, help="Override API key (else uses GOOGLE_API_KEY env)")
    p.add_argument("--dry-run", action="store_true", help="Stop after printing the brief and prompt")
    return p.parse_args()

def load_fiveps(args) -> Dict[str, Any]:
    if args.fiveps:
        try:
            return json.loads(args.fiveps)
        except json.JSONDecodeError as e:
            raise SystemExit(f"--fiveps is not valid JSON: {e}")
    if args.input:
        with open(args.input, "r", encoding="utf-8") as f:
            return json.load(f)

    # Fallback demo data if nothing provided
    return {
        "product": "Sugar-free energy drink infused with ashwagandha",
        "price": "₹120 per can; bundle 4 for ₹400",
        "place": "D2C + quick-commerce in Tier-1 Indian cities",
        "promotion": "IG Reels + campus sampling; tagline: 'Calm Focus. Clean Energy.'",
        "people": "Students 18–24, night coders/exam prep, caffeine-sensitive"
    }

def main():
    args = parse_args()
    client = make_client(args.api_key)

    fiveps = load_fiveps(args)
    print(">> Building image brief from 5Ps with", args.model_text)
    try:
        brief = build_image_brief(client, fiveps, args.model_text)
    except Exception as e:
        print("!! Failed to build brief:", e, file=sys.stderr)
        raise

    final_prompt = prompt_from_brief(brief)

    print("\n=== Image Brief (validated) ===")
    print(json.dumps(brief, indent=2, ensure_ascii=False))
    print("\n=== Final Prompt ===")
    print(final_prompt)

    if args.dry_run:
        print("\n(dry-run) Skipping image generation.")
        return

    print("\n>> Generating image with", args.model_image)
    try:
        generate_image(
            client,
            image_model=args.model_image,
            prompt=final_prompt,
            out_path=args.out,
            number_of_images=args.num,
        )
    except Exception as e:
        print("!! Failed to generate image:", e, file=sys.stderr)
        raise

    print(f"\nSaved: {args.out}")


if __name__ == "__main__":
    main()


SystemExit: --fiveps is not valid JSON: Expecting value: line 1 column 1 (char 0)

In [20]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import io
import json
import time
import random
import google.generativeai as genai
from PIL import Image, UnidentifiedImageError
from google.api_core.exceptions import ResourceExhausted, NotFound

# --- README ---
# This script generates a marketing image based on a product description.
#
# SETUP:
# 1. Install required libraries:
#    pip install -q google-generativeai pillow
#
# 2. Set your Google AI API key as an environment variable.
#    For Linux/macOS:
#    export GOOGLE_API_KEY="YOUR_API_KEY"
#    For Windows:
#    set GOOGLE_API_KEY="YOUR_API_KEY"
#
# USAGE:
#    python3 creative_generator.py
#
# The script will create a file named "creative.png" in the same directory.
# ---

# === API key handling ===
# The script now correctly uses the environment variable 'GOOGLE_API_KEY'.
# It will raise an error if the key is not set.
try:
    API_KEY = "AIzaSyBpS0TmCeIkXYKfxCXel4qWYGlsBxqgM08"
except KeyError:
    raise ValueError("Error: GOOGLE_API_KEY environment variable not set. Please set it to your API key.")

genai.configure(api_key=API_KEY)


# === Model Configuration ===
# --- ALTERNATIVE STRATEGY ---
# Switched to gemini-1.5-flash-latest. It's a lighter and faster model than Pro.
# Because it's designed for speed and high-volume tasks, its servers may be
# less congested, potentially avoiding the "ResourceExhausted" errors.
TEXT_MODEL = "gemini-1.5-flash-latest" # For generating the image prompt from the 5Ps.

# CORRECTION: The 'imagen-3' model is not compatible with the 'generateContent' method.
# Switched to a model specifically designed for image generation via this method.
IMAGE_MODEL = "gemini-2.5-flash-image-preview" # For generating the final polished image.


# === Helper functions ===
def call_api_with_retry(api_call_func, api_name="API", max_retries=8, initial_delay=30):
    """
    Calls an API function with a more aggressive exponential backoff to handle
    ResourceExhausted errors during high traffic periods.

    Args:
        api_call_func (function): A function that takes no arguments and makes the API call.
        api_name (str): The name of the API being called for logging purposes.
        max_retries (int): The maximum number of times to retry. (Increased to 8)
        initial_delay (int): The initial delay in seconds. (Increased to 30)

    Returns:
        The result of the API call if successful.

    Raises:
        ResourceExhausted: If the API call fails after all retries.
    """
    retries = 0
    delay = initial_delay
    while retries < max_retries:
        try:
            return api_call_func()
        except ResourceExhausted as e:
            retries += 1
            if retries >= max_retries:
                print(f"Max retries reached for {api_name}. The service appears to be under very heavy load. Failing with error: {e}")
                raise
            
            # Add a random "jitter" to the delay to avoid thundering herd issues
            jitter = random.uniform(0, 5)
            print(f"Quota exceeded for {api_name}. The service is busy. Retrying in {delay + jitter:.2f} seconds... ({retries}/{max_retries})")
            time.sleep(delay + jitter)
            delay *= 2  # Double the delay for the next retry
        except Exception as e:
            # Handle other unexpected errors immediately
            print(f"An unexpected error occurred with {api_name}: {e}")
            raise


def extract_text(response):
    """Safely extract text from a generate_content response."""
    if response is None:
        return None
    try:
        return response.text
    except AttributeError:
        print("Warning: Could not find '.text' attribute. Full response:", response)
        return None


def coerce_json(raw_text: str):
    """Try to parse JSON, cleaning it first and raising a clear error if it fails."""
    if not raw_text:
        raise RuntimeError("No text received from the model to parse into JSON.")
    # The model sometimes wraps the JSON in markdown backticks.
    clean_text = raw_text.strip().removeprefix("```json").removesuffix("```").strip()
    try:
        return json.loads(clean_text)
    except json.JSONDecodeError as e:
        raise RuntimeError(f"Could not parse JSON from model output.\nError: {e}\nRaw output:\n---\n{raw_text}\n---")


# === Example 5Ps input ===
# This describes the product for the text model.
fiveps = {
    "product": "Revolver without licence",
    "price": "10000 rs ",
    "place": "D2C + quick-commerce in Tier-1 Indian cities",
    "promotion": "Instagram infuncers using samples and posters: 'Calm Focus. Clean Energy.'",
    "people": "thieves and robbers "
}

# System instructions to guide the text model's output format.
system_instructions = """
You are a creative director. Your task is to generate an image generation prompt based on the provided product marketing mix (5Ps).
Output ONLY a valid JSON object with the following fields:
{"prompt":"...", "negative":"...", "style":"...", "aspect_ratio":"1:1|4:5|3:4|16:9", "safety":"..."}
Do not include any other text, prose, comments, or markdown formatting like ```json.
"""


# === Step 1: Generate a JSON brief from the 5Ps using the text model ===
print("Step 1: Generating creative brief from 5Ps...")
try:
    # Initialize the model with system instructions
    text_model = genai.GenerativeModel(
        model_name=TEXT_MODEL,
        system_instruction=system_instructions
    )
    
    # Configure the generation settings
    generation_config = genai.GenerationConfig(
        response_mime_type="application/json",
        max_output_tokens=800,
        temperature=0.4,
    )

    # Define the API call as a function to pass to the retry handler
    def generate_brief_call():
        # Send the 5Ps as a JSON string to the model
        return text_model.generate_content(
            json.dumps(fiveps),
            generation_config=generation_config
        )

    response = call_api_with_retry(generate_brief_call, api_name="Text Model (Gemini)")

except Exception as e:
    final_error_message = (
        f"Text model call failed after all retries.\n"
        f"This is likely due to sustained high demand or hitting a daily/monthly quota.\n"
        f"Please check your API usage quotas in Google AI Studio or your Google Cloud project.\n"
        f"Original error: {e}"
    )
    raise RuntimeError(final_error_message)

raw_text_output = extract_text(response)
print("Raw JSON output from text model:\n", raw_text_output)

brief = coerce_json(raw_text_output)
prompt = brief.get("prompt")
if not prompt:
    raise RuntimeError(f"No 'prompt' field found in the generated brief: {brief}")


# === Step 2: Generate an image from the prompt using the image model ===
print("\nStep 2: Generating image from prompt...")
print(f"Prompt: {prompt}")
try:
    # Initialize the image model using GenerativeModel, just like the text model.
    image_model_instance = genai.GenerativeModel(IMAGE_MODEL)
    
    # Define the API call for the retry handler
    def generate_image_call():
        # Use the 'generate_content' method for image models as well.
        response = image_model_instance.generate_content(prompt)
        return response
        
    img_resp = call_api_with_retry(generate_image_call, api_name="Image Model")

except NotFound as e:
    final_error_message = (
        f"Image model call failed with a 'Not Found' error.\n"
        f"This means the model name '{IMAGE_MODEL}' is incorrect or not available.\n"
        f"Please check the available models in the Google AI documentation.\n"
        f"Original error: {e}"
    )
    raise RuntimeError(final_error_message)
except Exception as e:
    final_error_message = (
        f"Image model call failed after all retries.\n"
        f"This is likely due to sustained high demand or hitting a daily/monthly quota.\n"
        f"Please check your API usage quotas in Google AI Studio or your Google Cloud project.\n"
        f"Original error: {e}"
    )
    raise RuntimeError(final_error_message)

# The response from a multi-modal model can contain multiple parts (e.g., text and images).
# We must iterate through the parts to find the one that is an image.
try:
    # Find the first part in the response that contains image data.
    image_part = next(p for p in img_resp.candidates[0].content.parts if p.inline_data.mime_type.startswith("image/"))
    
    if image_part:
        img_bytes = image_part.inline_data.data
        Image.open(io.BytesIO(img_bytes)).save("creative.png")
        print("\n✅ Success! Image saved as creative.png")
    else:
        # This case might be hit if the model refused to generate an image (e.g., safety settings)
        raise RuntimeError(f"No image part found in the API response. Full response:\n{img_resp}")

except (StopIteration, IndexError, AttributeError, UnidentifiedImageError) as e:
     # StopIteration if 'next' finds no image, other errors for malformed response
     raise RuntimeError(f"Could not extract or identify image data from the API response. It's possible no valid image was generated.\nError: {e}\nFull response:\n{img_resp}")



Step 1: Generating creative brief from 5Ps...
Raw JSON output from text model:
 {"prompt": "A photorealistic image of a revolver, sleek and modern design, subtly placed in a high-end, minimalist interior typical of a wealthy home in a Tier 1 Indian city. The background should be blurred, focusing attention on the revolver.  Include subtle lighting to highlight the revolver's metallic finish. The overall mood should be calm and sophisticated, conveying a sense of 'Calm Focus. Clean Energy.'", "negative": "blurry, grainy, low-resolution, poorly drawn, deformed, disfigured, extra limbs, mutation, ugly, poorly drawn hands, poorly drawn face, watermark, signature, artist name, text, cartoonish, anime, unrealistic, amateur, amateurish, out of focus, distorted, disfigured, poorly rendered, bad anatomy, extra fingers, missing fingers, mutation, extra limbs, missing limbs, cloned face, gross proportions, out of frame, cut off, low quality, jpeg artifacts, watermark, username, blurry background,