In [10]:
"""
Emotion Painter - Advanced Gradio App
Single-file prototype: app.py

Features:
- Text and audio input (transcription via whisper or faster-whisper)
- Emotion detection (transformers classifier)
- Emotion-to-prompt mapping with blending by confidence
- Image generation with Stable Diffusion (diffusers) with CPU/GPU fallback
- Basic safety checker (CLIP-based NSFW filter from diffusers)
- Gradio UI with preview, prompt display, download
- Configurable generation settings, deterministic seeds, caching of prompts

Notes:
- This is a production-minded prototype. For production you'll want
  to add rate-limiting, persistent storage (S3/DB), authentication, and a job queue
  for high-resolution renders.

Requirements (put in requirements.txt):
# core
transformers>=4.30.0
torch>=2.0.0
gradio>=3.30.0
diffusers>=0.19.0
accelerate
safetensors
ftfy
intel-openmp==2023.1.0  # if using CPU-only on some platforms

# optional (whisper)
openai-whisper>=20230314   # or faster-whisper for speed
# faster-whisper recommended on CPU for speed
# faster-whisper
# git+https://github.com/guillaumekln/faster-whisper

# image processing
Pillow
numpy

# (if you plan to run on GPU) - install matching torch / cuda

"""

# app.py
import os
import io
import math
import time
import json
import hashlib
from typing import List, Tuple, Dict, Optional

from PIL import Image
import numpy as np

import torch
from transformers import pipeline

# diffusers imports
try:
    from diffusers import StableDiffusionPipeline, DPMSolverMultistepScheduler
    from diffusers.pipelines.stable_diffusion.safety_checker import StableDiffusionSafetyChecker
    from transformers import CLIPFeatureExtractor
    DIFFUSERS_AVAILABLE = True
except Exception as e:
    print("diffusers not available:", e)
    DIFFUSERS_AVAILABLE = False

# whisper (transcription)
try:
    import whisper
    WHISPER_AVAILABLE = True
except Exception:
    WHISPER_AVAILABLE = False

# Gradio
import gradio as gr

# -----------------------------
# Config
# -----------------------------
EMOTION_MODEL_ID = "j-hartmann/emotion-english-distilroberta-base"  # good zero-shot model
SD_MODEL_ID = "runwayml/stable-diffusion-v1-5"  # change to your preferred checkpoint
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
CACHE_DIR = os.getenv("EMOTION_PAINTER_CACHE", ".ep_cache")
os.makedirs(CACHE_DIR, exist_ok=True)

# -----------------------------
# Utilities
# -----------------------------

def deterministic_hash(s: str) -> int:
    h = hashlib.sha256(s.encode("utf-8")).hexdigest()
    return int(h[:16], 16)


def to_pil(image_array: np.ndarray) -> Image.Image:
    if image_array.dtype != np.uint8:
        image_array = (255 * np.clip(image_array, 0, 1)).astype(np.uint8)
    return Image.fromarray(image_array)

# -----------------------------
# Emotion classifier wrapper
# -----------------------------

print("Loading emotion classifier...", flush=True)
try:
    emotion_pipe = pipeline("text-classification", model=EMOTION_MODEL_ID, return_all_scores=True, device=0 if DEVICE=="cuda" else -1)
except Exception as e:
    print("Failed to load emotion model; will attempt CPU fallback.", e)
    emotion_pipe = pipeline("text-classification", model=EMOTION_MODEL_ID, return_all_scores=True, device=-1)

# Normalized emotion labels we will support (map model outputs to our canonical set)
CANONICAL_EMOTIONS = ["joy", "sadness", "anger", "fear", "calmness"]
# map a broader set of labels (GoEmotions-like) into canonical categories
LABEL_MAP = {
    # joy-like
    "joy": "joy", "happiness": "joy", "amusement": "joy", "excitement": "joy", "optimism": "joy",
    # sadness-like
    "sadness": "sadness", "grief": "sadness", "disappointment": "sadness",
    # anger-like
    "anger": "anger", "annoyance": "anger", "frustration": "anger",
    # fear-like
    "fear": "fear", "anxiety": "fear", "nervousness": "fear",
    # calm-like / neutral
    "neutral": "calmness", "calmness": "calmness", "relief": "calmness", "contentment": "calmness"
}


def normalize_label(label: str) -> str:
    lab = label.lower()
    return LABEL_MAP.get(lab, lab if lab in CANONICAL_EMOTIONS else "calmness")


def predict_emotions(text: str, top_k: int = 5) -> List[Tuple[str, float]]:
    """Return a sorted list of (emotion, score) mapped to canonical emotions
    """
    if not text or not text.strip():
        return [("calmness", 1.0)]
    try:
        raw = emotion_pipe(text)[0]
    except Exception as e:
        # fallback: very simple heuristics
        txt = text.lower()
        scoremap = {"joy":0.0,"sadness":0.0,"anger":0.0,"fear":0.0,"calmness":0.0}
        if any(w in txt for w in ["happy","joy","glad","excited","love","yay"]): scoremap['joy'] += 0.9
        if any(w in txt for w in ["sad","down","lonely","depressed"]): scoremap['sadness'] += 0.9
        if any(w in txt for w in ["angry","mad","furious","hate"]): scoremap['anger'] += 0.9
        if any(w in txt for w in ["scared","afraid","nervous","anxious"]): scoremap['fear'] += 0.9
        if all(v==0 for v in scoremap.values()): scoremap['calmness'] = 1.0
        return sorted(scoremap.items(), key=lambda x:-x[1])

    # map labels
    agg = {}
    for entry in raw:
        lbl = normalize_label(entry['label'])
        agg[lbl] = agg.get(lbl, 0.0) + entry['score']
    # normalize
    total = sum(agg.values()) or 1.0
    items = [(k, v/total) for k, v in agg.items()]
    items_sorted = sorted(items, key=lambda x: -x[1])[:top_k]
    # ensure canonical coverage
    present = {k for k,_ in items_sorted}
    for emo in CANONICAL_EMOTIONS:
        if emo not in present:
            items_sorted.append((emo, 0.0))
    return items_sorted

# -----------------------------
# Emotion -> prompt mapping
# -----------------------------
EMOTION_MAP = {
    "joy": {"colors":"bright gold, warm orange", "attrs":"sunlit open field, floating ribbons, soft bokeh lights", "style":"impressionist watercolor"},
    "sadness": {"colors":"deep blue, slate grey", "attrs":"gentle rain, empty bench, reflected puddles", "style":"soft pastel realism"},
    "anger": {"colors":"crimson, ebony black", "attrs":"stormy sky, jagged shards, splintered geometry", "style":"expressionist abstract"},
    "fear": {"colors":"desaturated green, cold blue", "attrs":"fog, long corridor, shadowed corners", "style":"cinematic chiaroscuro"},
    "calmness": {"colors":"mint, pale pink, soft cream", "attrs":"still water, smooth gradient, gentle horizon", "style":"minimal japanese ink"}
}

PROMPT_TEMPLATE = "{emotion} mood painting, {attrs}, colors: {colors}, composition: {composition}, style: {style}, ultra-detailed, high resolution, artstation, 8k"


def blend_prompt(emotions: List[Tuple[str, float]], max_terms: int=3) -> str:
    # take top N emotions and blend attributes by weight
    top = [(e,w) for e,w in emotions if w>0]
    top = sorted(top, key=lambda x:-x[1])[:max_terms]
    if not top:
        top = [("calmness", 1.0)]
    # normalized weights
    s = sum(w for _,w in top) or 1.0
    parts = []
    colors = []
    styles = []
    attrs = []
    for e,w in top:
        m = EMOTION_MAP.get(e, EMOTION_MAP['calmness'])
        ratio = w/s
        # include textual hints spread by weight
        attrs.append(f"({m['attrs']}) x{ratio:.2f}")
        colors.append(m['colors'])
        styles.append(m['style'])
    # simple composition heuristic
    composition = "wide and airy" if top[0][0] in ["joy","calmness"] else "tight and dramatic"
    emotion_names = ", ".join([f"{e}({w:.2f})" for e,w in top])
    prompt = PROMPT_TEMPLATE.format(
        emotion=emotion_names,
        attrs='; '.join(attrs),
        colors=' / '.join(colors),
        composition=composition,
        style=' + '.join(styles)
    )
    return prompt

# -----------------------------
# Stable Diffusion generator wrapper
# -----------------------------
pipe = None
safety_model = None
feature_extractor = None

if DIFFUSERS_AVAILABLE:
    try:
        print(f"Loading Stable Diffusion model ({SD_MODEL_ID}) to {DEVICE}...")
        pipe = StableDiffusionPipeline.from_pretrained(SD_MODEL_ID, torch_dtype=torch.float16 if DEVICE=="cuda" else torch.float32)
        # use a faster scheduler
        pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config)
        if DEVICE == "cuda":
            pipe = pipe.to("cuda")
        else:
            pipe = pipe.to("cpu")
        # safety checker
        try:
            safety_model = StableDiffusionSafetyChecker.from_pretrained("CompVis/stable-diffusion-safety-checker")
            feature_extractor = CLIPFeatureExtractor.from_pretrained("openai/clip-vit-base-patch32")
        except Exception as e:
            print("Safety checker not available:", e)
    except Exception as e:
        print("Could not initialize Stable Diffusion pipeline:", e)
        pipe = None


def check_safety(image: Image.Image) -> Tuple[bool, float]:
    """Return (is_safe, unsafe_score) - simple wrapper. If no safety model, return safe True."""
    if safety_model is None or feature_extractor is None:
        return True, 0.0
    # convert
    image_np = np.array(image.convert("RGB"))
    inputs = feature_extractor(images=[image_np], return_tensors="pt")
    try:
        safety_output = safety_model(images=image_np, clip_input=inputs.pixel_values)
        has_nsfw_concept = safety_output.nsfw_content_detected[0]
        score = float(max(safety_output.nsfw_score[0].tolist())) if hasattr(safety_output, 'nsfw_score') else (1.0 if has_nsfw_concept else 0.0)
        return (not has_nsfw_concept), score
    except Exception as e:
        print("Safety check failed:", e)
        return True, 0.0


def generate_from_prompt(prompt: str, seed: Optional[int]=None, guidance_scale: float=7.5, steps: int=30, height: int=512, width: int=512):
    if pipe is None:
        raise RuntimeError("Stable Diffusion pipeline not initialized. Install diffusers and the model weights.")
    generator = None
    if seed is None:
        seed = deterministic_hash(prompt) & 0xFFFFFFFF
    if DEVICE == "cuda":
        generator = torch.Generator(device="cuda").manual_seed(seed)
    else:
        generator = torch.Generator(device="cpu").manual_seed(seed)

    with torch.autocast("cuda") if DEVICE=="cuda" else torch.cpu.amp.autocast(enabled=False):
        image = pipe(prompt, guidance_scale=guidance_scale, num_inference_steps=steps, generator=generator, height=height, width=width).images[0]

    is_safe, score = check_safety(image)
    return image, is_safe, score

# -----------------------------
# Transcription (Whisper) helper
# -----------------------------

def transcribe_audio_file(filepath: str) -> str:
    if not WHISPER_AVAILABLE:
        # fallback: raise informative error
        raise RuntimeError("Whisper is not installed. Install 'whisper' or 'faster-whisper' for transcription.")
    model = whisper.load_model("small")  # choose model size for quality/speed tradeoff
    result = model.transcribe(filepath)
    return result.get('text', '')

# -----------------------------
# Caching utility
# -----------------------------

def cache_image_for_prompt(prompt: str, image: Image.Image) -> str:
    key = hashlib.sha256(prompt.encode('utf-8')).hexdigest()
    path = os.path.join(CACHE_DIR, f"{key}.png")
    image.save(path)
    return path

def load_cached_image(prompt: str) -> Optional[str]:
    key = hashlib.sha256(prompt.encode('utf-8')).hexdigest()
    path = os.path.join(CACHE_DIR, f"{key}.png")
    if os.path.exists(path):
        return path
    return None

# -----------------------------
# Gradio app glue
# -----------------------------

def pipeline_handler(text_input: str, audio_file, seed: Optional[int], guidance_scale: float, steps: int, width: int, height: int, preview_low_res: bool):
    # 1. transcribe if audio
    if audio_file is not None:
        audio_path = audio_file.name
        try:
            text = transcribe_audio_file(audio_path)
        except Exception as e:
            return None, f"Transcription error: {e}", "", []
    else:
        text = text_input or ""

    # 2. emotion detection
    emotions = predict_emotions(text)
    top_emotions_display = [(e, float(s)) for e,s in emotions]

    # 3. prompt generation
    prompt = blend_prompt(emotions)

    # 4. check cache
    cached = load_cached_image(f"{prompt}|{seed}|{guidance_scale}|{steps}|{width}x{height}")
    if cached:
        img = Image.open(cached).convert("RGB")
        return img, prompt, json.dumps(top_emotions_display), top_emotions_display

    # 5. generate (optionally generate low-res preview first)
    try:
        if preview_low_res:
            # quick low-res preview for UX
            preview_prompt = prompt + ", low resolution preview, fast"
            preview_img, safe, score = generate_from_prompt(preview_prompt, seed=seed, guidance_scale=guidance_scale, steps=max(10, steps//3), height=256, width=256)
            # upscale or keep as preview
        img, safe, score = generate_from_prompt(prompt, seed=seed, guidance_scale=guidance_scale, steps=steps, height=height, width=width)
    except Exception as e:
        return None, f"Generation error: {e}", prompt, top_emotions_display

    # 6. safety
    if not safe:
        # mask or refuse
        return None, f"Generated image flagged as unsafe (score {score:.3f}). Try a different input.", prompt, top_emotions_display

    # 7. cache and return
    cache_image_for_prompt(f"{prompt}|{seed}|{guidance_scale}|{steps}|{width}x{height}", img)

    return img, prompt, json.dumps(top_emotions_display), top_emotions_display

# Build Gradio interface
with gr.Blocks(title="Emotion Painter", css=".gradio-container {background: linear-gradient(180deg, #f6fbff, #fff);} .card {border-radius: 12px}") as demo:
    gr.Markdown("# 🎨 Emotion Painter — Advanced prototype")
    with gr.Row():
        with gr.Column(scale=2):
            txt = gr.Textbox(label="Write your feeling (or leave blank to upload audio)", lines=3, placeholder="e.g. I just aced my exam and I feel on top of the world!")
            aud = gr.Audio(source="upload", type="filepath", label="Or upload an audio recording (wav/m4a)")
            with gr.Row():
                seed_in = gr.Number(value=None, label="Seed (optional)")
                guidance = gr.Slider(minimum=1.0, maximum=12.0, step=0.1, value=7.5, label="Guidance Scale")
            with gr.Row():
                steps_in = gr.Slider(minimum=10, maximum=50, step=1, value=30, label="Steps")
                low_res = gr.Checkbox(value=True, label="Generate fast low-res preview first")
            with gr.Row():
                width_in = gr.Dropdown(choices=[256, 512, 768], value=512, label="Width")
                height_in = gr.Dropdown(choices=[256, 512, 768], value=512, label="Height")
            gen_btn = gr.Button("Create Artwork")
            note = gr.Markdown("Tip: Use short expressive sentences. For audio, try to speak clearly. Model quality depends on installed weights and GPU availability.")
        with gr.Column(scale=3):
            output_image = gr.Image(label="Generated artwork", interactive=False)
            output_prompt = gr.Textbox(label="Prompt used for generation", lines=2)
            detected = gr.JSON(label="Detected emotions (top)"
                               )
    gen_btn.click(fn=pipeline_handler, inputs=[txt, aud, seed_in, guidance, steps_in, width_in, height_in, low_res], outputs=[output_image, output_prompt, detected, gr.State()])

# Launch helper
if __name__ == "__main__":
    print(f"Starting demo on {DEVICE}. Diffusers available: {DIFFUSERS_AVAILABLE}. Whisper available: {WHISPER_AVAILABLE}")
    demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", 7860)), share=False)


diffusers not available: No module named 'diffusers'


ModuleNotFoundError: No module named 'gradio'

In [9]:
! pip install gradio-client

Defaulting to user installation because normal site-packages is not writeable
Collecting gradio-client
  Using cached gradio_client-1.11.1-py3-none-any.whl.metadata (7.1 kB)
Using cached gradio_client-1.11.1-py3-none-any.whl (324 kB)
Installing collected packages: gradio-client
Successfully installed gradio-client-1.11.1
