In [1]:
import os
import base64
from openai import OpenAI
import requests
from PIL import Image
from io import BytesIO
import torch
from transformers import CLIPProcessor, CLIPModel
import torchvision.transforms as transforms
from scipy.spatial.distance import cosine
import numpy as np
import random
import sys
from contextlib import redirect_stdout
import logging
from datetime import datetime

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
# Set OpenAI API key
os.environ["OPENAI_API_KEY"] = "your_api_key"

In [3]:
from datetime import datetime

class ImageGenerator:
    def __init__(self):
        """Initialize OpenAI client with API key from environment."""
        if not os.getenv("OPENAI_API_KEY"):
            raise ValueError("OPENAI_API_KEY environment variable must be set")
        self.client = OpenAI()

    def generate(self, prompt: str) -> Image.Image:
        """
        Generate image using DALL-E 3 model.
        
        Args:
            prompt (str): Text prompt to generate image from
            
        Returns:
            PIL.Image: Generated image
        """
        try:
            response = self.client.images.generate(
                model="dall-e-3",
                prompt=prompt,
                size="1024x1024",
                quality="standard",
                n=1
            )
            
            # Get image URL from response
            image_url = response.data[0].url
            
            # Download and convert to PIL Image
            image_response = requests.get(image_url)
            image_response.raise_for_status()
            
            # Save the image to a file and return the file path
            current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
            file_path = f"image_logs/generated_image_{current_time}.png"
            os.makedirs(os.path.dirname(file_path), exist_ok=True)
            with open(file_path, "wb") as f:
                f.write(image_response.content)

            return file_path
            
        except Exception as e:
            raise RuntimeError(f"Image generation failed: {str(e)}")

In [4]:
class SimilarityScorer:
    def __init__(self):
        """Initialize CLIP model and processor."""
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(self.device)
        self.processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
        
        # Freeze model parameters
        for param in self.model.parameters():
            param.requires_grad = False
            
    def preprocess_image(self, image):
        """Convert image to CLIP input format."""
        if isinstance(image, str):
            image = Image.open(image)
        return self.processor(images=image, return_tensors="pt")["pixel_values"].to(self.device)
        
    def extract_features(self, image):
        """Extract image features using CLIP."""
        with torch.no_grad():
            features = self.model.get_image_features(self.preprocess_image(image))
        return features.cpu().numpy()

    def calculate_score(self, original_image, generated_image):
        """
        Calculate similarity score between original and generated images using CLIP.
        
        Args:
            original_image: PIL Image or path to original image
            generated_image: PIL Image or path to generated image
            
        Returns:
            float: Similarity score between 0 and 1
        """
        try:
            # Extract features
            original_features = self.extract_features(original_image)
            generated_features = self.extract_features(generated_image)
            
            # Calculate cosine similarity
            similarity = 1 - cosine(original_features.flatten(), generated_features.flatten())
            return float(similarity)
            
        except Exception as e:
            raise RuntimeError(f"Similarity calculation failed: {str(e)}")

In [5]:
class PromptGenerator:
    def __init__(self):
        """Initialize OpenAI client with learning capabilities."""
        if not os.getenv("OPENAI_API_KEY"):
            raise ValueError("OPENAI_API_KEY environment variable must be set")
        self.client = OpenAI()
        self.feedback_history = []  # Store (prompt, feedback, score) tuples
        self.conversation_history = []
        self.successful_patterns = []  # Track what works well
        self.improvement_areas = []  # Track what needs improvement

    def encode_image(self, image):
        """Convert image to base64 string."""
        try:
            if isinstance(image, str):
                image = Image.open(image)
            buffered = BytesIO()
            image.save(buffered, format="PNG")
            return base64.b64encode(buffered.getvalue()).decode('utf-8')
        except Exception as e:
            raise RuntimeError(f"Image encoding failed: {str(e)}")

    def update_with_feedback(self, feedback, score):
        """
        Learn from feedback and score to improve future prompts.

        Args:
            feedback (str): Feedback from image comparison
            score (float): Similarity score between 0 and 1
        """
        try:
            analysis = self.analyze_feedback(feedback, score)
            self.feedback_history.append({
                'feedback': feedback,
                'score': score,
                'analysis': analysis
            })
            
            # Classify feedback based on score
            if score > 0.7:  # High similarity
                self.successful_patterns.append(analysis)
            else:  # Needs improvement
                self.improvement_areas.append(analysis)
                
        except Exception as e:
            print(f"Failed to update with feedback: {str(e)}")

    def analyze_feedback(self, feedback, score):
        """Extract learning points from feedback with score context."""
        try:
            analysis_prompt = f"""Based on this feedback and similarity score ({score:.3f}), identify:
            1. What worked well in the prompt (if score > 0.7)
            2. What needs improvement (if score < 0.7)
            3. Specific patterns to replicate or avoid"""
            
            response = self.client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "You are analyzing prompt generation feedback."},
                    {"role": "user", "content": f"Score: {score}\nFeedback: {feedback}"}
                ],
                max_tokens=1000
            )
            
            return response.choices[0].message.content
            
        except Exception as e:
            print(f"Feedback analysis failed: {str(e)}")
            return None

    def generate(self, image, reference_prompt=None) -> str:
        """Generate improved prompt using learned patterns."""
        try:
            base64_image = self.encode_image(image)
            
            # Create learning-focused system prompt
            system_prompt = """You are an expert prompt engineer who learns and improves.
            Previously successful patterns included: {}\n
            Areas for improvement were: {}\n
            Generate a detailed image description incorporating these learnings.
            ENSURE THAT EVERYTHING GENERATED IS IN COMPLIANCE WUTH OPENAI'S CONTENT GENERATION POLICY""".format(
                '; '.join(self.successful_patterns[-3:]) if self.successful_patterns else "None yet",
                '; '.join(self.improvement_areas[-3:]) if self.improvement_areas else "None yet"
            )
            
            messages = [{"role": "system", "content": system_prompt}]
            
            # Add relevant feedback history
            if self.feedback_history:
                recent_learnings = [f"Score {f['score']}: {f['analysis']}" 
                                  for f in self.feedback_history[-3:]]
                messages.append({
                    "role": "user",
                    "content": "Recent feedback learnings:\n" + "\n".join(recent_learnings)
                })
            
            # Add image and current request
            user_content = [
                {"type": "text", "text": "Generate an improved image description based on our learnings."},
                {
                    "type": "image_url",
                    "image_url": {
                        "url": f"data:image/png;base64,{base64_image}",
                        "detail": "high"
                    }
                }
            ]
            
            if reference_prompt:
                user_content[0]["text"] += f"\nReference: {reference_prompt}"
            
            messages.append({"role": "user", "content": user_content})
            
            response = self.client.chat.completions.create(
                model="gpt-4o",
                messages=messages,
                max_tokens=1000,
                temperature=0.7
            )
            
            prompt = response.choices[0].message.content
            self.conversation_history.append({
                "role": "assistant",
                "content": prompt
            })
            
            return prompt
            
        except Exception as e:
            raise RuntimeError(f"Prompt generation failed: {str(e)}")

In [6]:
import sys
from contextlib import redirect_stdout
from datetime import datetime

class PRISM:
    def __init__(self, N, K, reference_images,file_scheme):
        self.N = N  # Number of prompt generators
        self.K = K  # Training iterations per generator
        self.reference_images = reference_images
        self.best_generator = None
        self.best_score = float('-inf')
        self.generators = []  # List to store N prompt generators
        self.client = OpenAI()
        
        self.output_file = f"interpretable_feedback_and_prompts/prism_output_{file_scheme}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"

    def train_generator(self, generator, k):
        """Train a single generator for K iterations with feedback."""
        history = []
        best_local_score = float('-inf')
        best_local_prompt = None

        for i in range(k):
            try:
                # Sample reference image
                ref_image = random.choice(self.reference_images)
                print(f"\nIteration {i+1}: Using reference image {ref_image}")
                with open(self.output_file, 'a') as f:
                    f.write(f"\nIteration {i+1}: Using reference image {ref_image}")

                # Generate prompt based on history
                prompt = generator.generate(ref_image, reference_prompt=best_local_prompt)
                print(f"Generated prompt: {prompt[:100]}...")
                with open(self.output_file, 'a') as f:
                    f.write(f"\nGenerated prompt: {prompt}")
                
                # Generate image from prompt
                image_gen = ImageGenerator()
                generated_image = image_gen.generate(prompt)
                print("Generated new image")
                with open(self.output_file, 'a') as f:
                    f.write(f"\nGenerated new image")
                
                # Get similarity score
                scorer = SimilarityScorer()
                similarity_score = scorer.calculate_score(ref_image, generated_image)
                print(f"Similarity score: {similarity_score:.3f}")
                with open(self.output_file, 'a') as f:
                    f.write(f"\nSimilarity score: {similarity_score:.3f}")
                
                # Get feedback using GPT-4V
                feedback = self.get_feedback(ref_image, generated_image, similarity_score)
                print(f"Feedback received: {feedback[:100]}...")
                with open(self.output_file, 'a') as f:
                    f.write(f"\nFeedback received: {feedback}")
                
                # Update generator with feedback and score
                generator.update_with_feedback(feedback, similarity_score)
                
                if similarity_score > best_local_score:
                    best_local_score = similarity_score
                    best_local_prompt = prompt
                
                history.append({
                    'prompt': prompt,
                    'score': similarity_score,
                    'feedback': feedback
                })
                
                print(f"Completed iteration {i+1} with score {similarity_score:.3f}")
                with open(self.output_file, 'a') as f:
                    f.write(f"\nCompleted iteration {i+1} with score {similarity_score:.3f}")
                        
            except Exception as e:
                print(f"Error in iteration {i+1}: {str(e)}")
                with open(self.output_file, 'a') as f:
                    f.write(f"Error in iteration {i+1}: {str(e)}")
                continue
            
        return best_local_prompt, best_local_score, history

    def get_feedback(self, original, generated, score):
        """Get feedback comparing original and generated images using GPT-4V."""
        try:
            prompt = f"""Compare these two images and provide specific feedback on:
            1. What aspects were captured well in the generated image
            2. What important elements were missed
            3. How to improve the prompt to get a more similar image
            Current similarity score: {score:.3f}"""
            
            response = self.client.chat.completions.create(
                model="gpt-4o",
                max_tokens = 1000,
                messages=[
                    {"role": "system", "content": "You are an expert image comparison assistant."},
                    {
                        "role": "user",
                        "content": [
                            {"type": "text", "text": prompt},
                            {
                                "type": "image_url",
                                "image_url": {"url": self._encode_image(original)}
                            },
                            {
                                "type": "image_url", 
                                "image_url": {"url": self._encode_image(generated)}
                            }
                        ]
                    }
                ]
            )
            return response.choices[0].message.content
        except Exception as e:
            return f"Error generating feedback: {str(e)}"

    def _encode_image(self, image):
        """Helper method to encode image for API calls."""
        if isinstance(image, str):
            image = Image.open(image)
        buffered = BytesIO()
        image.save(buffered, format="PNG")
        return f"data:image/png;base64,{base64.b64encode(buffered.getvalue()).decode()}"

    def refine_prompts(self,file_scheme):
        """Main method to run the PRISM algorithm with multiple generators."""
        
        print("Initializing PRISM algorithm...")
        with open(self.output_file, 'a') as f:
            f.write("Initializing PRISM algorithm...")
        self.generators = [PromptGenerator() for _ in range(self.N)]
        generator_results = []

        for i, generator in enumerate(self.generators):
            print(f"\nTraining Generator {i+1}/{self.N}")
            with open(self.output_file, 'a') as f:
                f.write(f"\nTraining Generator {i+1}/{self.N}")
            best_prompt, best_score, history = self.train_generator(generator, self.K)
            
            generator_results.append({
                'generator': generator,
                'best_prompt': best_prompt,
                'best_score': best_score,
                'history': history
            })

        best_result = max(generator_results, key=lambda x: x['best_score'])
        self.best_generator = best_result['generator']
        self.best_score = best_result['best_score']

        print(f"\nBest Generator Score: {self.best_score:.3f}")
        with open(self.output_file, 'a') as f:
            f.write(f"\nBest Generator Score: {self.best_score:.3f}")
        print(f"Best Prompt: {best_result['best_prompt']}")
        with open(f"Best_Prompt_{file_scheme}", 'a') as f:
            f.write(f"\nBest Prompt: {best_result['best_prompt']}")
        
        return best_result['best_prompt']

In [7]:
reference_images = ["AAY.png", "AAY1.png", "AAY2.png","AAY3.png","AAY4.png","ABPMJAY.png","DDUGKY.png","MGNREGA.png", "misc.png","misc1.png","misc2.png","misc3.png","misc4.png","NSAP.png","NSAP1.png","PMAY.png","PMAY1.png","PMAY2.png","PMGSY.png","PMMVY.png"]
prism_algorithm = PRISM(N=3, K=10, reference_images=reference_images,file_scheme="Complete")
best_prompt = prism_algorithm.refine_prompts(file_scheme="Complete")
print(f"The final best prompt for the welfare scheme is: {best_prompt}")

Initializing PRISM algorithm...

Training Generator 1/3

Iteration 1: Using reference image ABPMJAY.png
Generated prompt: The image features a banner for the Ayushman Bharat Pradhan Mantri Jan Arogya Yojana (PM-JAY). On th...
Generated new image
Similarity score: 0.606
Feedback received: 1. **Aspects Captured Well:**
   - **Theme and Concept:** Both images convey themes related to healt...
Completed iteration 1 with score 0.606

Iteration 2: Using reference image PMMVY.png
Generated prompt: The image is a vibrant banner for the Pradhan Mantri Matru Vandana Yojana. In the center, a mother l...
Generated new image
Similarity score: 0.649
Feedback received: 1. **Aspects Captured Well:**
   - Both images depict a mother holding a baby, conveying a similar t...
Completed iteration 2 with score 0.649

Iteration 3: Using reference image NSAP.png
Generated prompt: The image is a vibrant banner for the Pradhan Mantri Matru Vandana Yojana, capturing the essence of ...
Generated new image
Similar