<a href="https://colab.research.google.com/github/STRMNGnwo/Masters-Thesis/blob/main/Srinivas_Masters_Thesis_CODE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Installs

### Added code to use specific versions of keras and tensorflow until the open issue on the keras stable diffusion implementation is resolved

In [None]:
# Uninstall existing versions
!pip uninstall -y keras keras-core keras-cv tensorflow

# Install specific versions
!pip install keras==2.15.0 keras-core==0.1.7 keras-cv==0.9.0 tensorflow==2.15.1

# Restart the runtime to ensure the changes take effect
import os
os.kill(os.getpid(), 9)


In [None]:
'''import keras
import keras_core
import keras_cv

#print("TensorFlow version:", tf.__version__)
print("Keras version:", keras.__version__)
print("Keras Core version:", keras_core.__version__)
print("Keras CV version:", keras_cv.__version__)'''

### The usual NLP plus libraries to access pre-trained models from HuggingFace, the openai api and CLIP

In [None]:
!pip install nltk
#used to perform semantic similarity analysis after evolutionary operators are used (to make sure they make sense)
!pip install sentence_transformers
!pip install language_tool_python

#to use the clip model as a fitness function
!pip install git+https://github.com/openai/CLIP.git
#to use the openai api to access the gpt 4o model
!pip install openai

!pip install transformers

# Imports/ Downloads

In [None]:
#fundamental imports
import os
import sys
import random
import numpy as np
from PIL import Image
import torch

import nltk
from transformers import pipeline
import clip

In [None]:
#downloads that are necessary for the NLP based mutation and cross-over to occur

nltk.download('punkt')

#used to perform pos tagging (used to only mutate adjectives, verbs and nouns in mutation-synonym based and also contextual )
nltk.download('averaged_perceptron_tagger')
nltk.download('maxent_ne_chunker')

#used to obtain synonyms during mutation
nltk.download('words')
nltk.download('wordnet')

#used to ignore/remove stopwords during mutation
nltk.download('stopwords')

In [None]:
from google.colab import files
from google.colab import drive


drive.mount("/content/drive/")
sys.path.insert(0, '/content/drive/MyDrive/Masters-Thesis-Datasets/')

# Checking if GPU is available (and asking Colab to use it if it is)

In [None]:
if torch.cuda.is_available():
  DEVICE=torch.device("cuda")
  torch.cuda.get_device_name(0)
  print("GPU")

else:
  DEVICE="cpu"
  print("CPU")

# Importing and initialising a text to image stable diffusion model

Also a SpaCy English model

In [None]:
import time
import keras_cv
import keras
import matplotlib.pyplot as plt

In [None]:
sd_model = keras_cv.models.StableDiffusion(
    img_width=256, img_height=256, jit_compile=False
)

In [None]:
import spacy

# Loading the SpaCy model to perform Named Entity Recognition to get "descriptive" words
spacy_model = spacy.load("en_core_web_sm")

# Function to plot images

In [None]:
def plot_images(images):
    plt.figure(figsize=(20, 20))
    for i in range(len(images)):
        ax = plt.subplot(1, len(images), i + 1)
        plt.imshow(images[i])
        plt.axis("off")

# Importing the dataset of Van Gogh paintings

These will serve as ground truth reference images and will also be provided to the LLM to get prompts

### Unzipping the dataset from drive

In [None]:
!unzip /content/drive/MyDrive/Masters-Thesis-Datasets/VanGogh.zip

### Function to randomly choose an image from the dataset (or choose a specified image)

In [None]:
base_url="/content/VincentVanGogh/Watercolors"
def choose_image(image_path=None, return_image=True):

  #if no image path is specified choose a random image path from the Watercolors directory
  if image_path==None:
    image_paths = os.listdir(base_url)
    print("Num images:",len(image_paths))
    random_image_path = random.choice(image_paths)
    print("Chosen image path is: ",random_image_path)
    if return_image:
      return Image.open(os.path.join(base_url,random_image_path))
    else:
      return random_image_path

  else:
    if return_image:
      return Image.open(image_path)
    else:
      return image_path

In [None]:
chosen_image=choose_image(image_path=None, return_image=True)
print(type(chosen_image))

display(chosen_image)

# LLM/API sections


### Establishing a connection to the OpenAI API

In [None]:
from google.colab import userdata
secret_key = userdata.get('OPENAI_APIKEY')

# OpenAI API Key
api_key = secret_key

import openai

client = openai.OpenAI(api_key=api_key)
#gpt-3.5-turbo-instruct has a price of US$0.50 /1M INPUT tokens and US$2.00 / 1M OUTPUT tokens

#PROBABLY BEST TO USE THE BELOW MODELS FOR TASKS: correcting grammar & sentences, providing image and getting captions back

#gpt-4o-mini has a price of US$0.150 / 1M input tokens AND US$0.600 / 1M OUTPUT tokens

#gpt-4o-mini-2024-07-18 has a price of US$0.150 / 1M input tokens AND US$0.600 / 1M OUTPUT tokens

### Function to send chosen image from dataset to LLM (via OpenAI API) and get list of descriptions (these will be used as initial population)

In [None]:
import base64
import requests

# Function to encode the image
def encode_image(image_path):
  with open(image_path, "rb") as image_file:
    return base64.b64encode(image_file.read()).decode('utf-8')

def get_initial_descriptions(path_to_image,num_descriptions):

  BASE_PROMPT= f"Could you give me {num_descriptions} alt-text descriptions of the attached image, with each description describing the image?. Each description should vary in complexity (some like a 10-year old  wrote it, some like a professional artist wrote it, some like a writer wrote it etcetera) and length."
  image_path = path_to_image

  # Getting the base64 string
  base64_image = encode_image(image_path)

  headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {api_key}"
  }

  payload = {
    "model": "gpt-4o-mini",
    "messages": [
      {
        "role": "user",
        "content": [
          {
            "type": "text",
            "text": BASE_PROMPT
          },
          {
            "type": "image_url",
            "image_url": {
              "url": f"data:image/jpeg;base64,{base64_image}"
            }
          }
        ]
      }
    ],
    "max_tokens": 800
  }

  response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)

  print(response.json())

  return response

In [None]:
#descriptions_response=get_initial_descriptions(path_to_image="/content/VincentVanGogh/Watercolors/The Langlois Bridge at Arles.jpg",num_descriptions=10)

In [None]:
import re
from copy import deepcopy

'''
#descriptions_response_copy=deepcopy(descriptions_response)

desc_resp_json=descriptions_response.json()
#desc_resp_json["choices"][0]['message']['content']

# Regular expression pattern to extract descriptions, ignoring the initial text
pattern = re.compile(r'\d+\.\s+\*\*.*?\*\*:\s*(.*?)(?=\n\d+\.\s|\Z)', re.DOTALL)

# Remove the introductory part
intro_removed_text = re.sub(r'^Here are \d+ different alt-text descriptions for the image.*?\n\n', '', desc_resp_json["choices"][0]['message']['content'], flags=re.DOTALL).strip()

# Find all descriptions
descriptions = pattern.findall(intro_removed_text)

# Print the list of descriptions
for i, desc in enumerate(descriptions, 1):
    print(f"{desc.strip()}")

'''


### Hugging Face Instruct model - locally

Currently used for sentence correction

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer
device = "cuda" # the device to load the model onto

#"microsoft/Phi-3-mini-4k-instruct"
#"Qwen/Qwen2-7B-Instruct"
llm_model = AutoModelForCausalLM.from_pretrained(
    "microsoft/Phi-3-mini-4k-instruct",
    torch_dtype="auto",
    device_map="auto"
)


In [None]:
tokenizer = AutoTokenizer.from_pretrained("microsoft/Phi-3-mini-4k-instruct")
pipe = pipeline(
    "text-generation",
    model=llm_model,
    tokenizer=tokenizer,
)

In [None]:
#function that should use the initialised microsoft 3B param instruct model locally to correct sentences (instead of making api calls)
def correct_sentences(sentences):
  child_sentences=[]
  for sentence in sentences:
    #sentence="Vibrant boats an orange beach a vivid blue sky their bold colors the ocean."
    prompt = f"Correct this sentence: {sentence} and return only the corrected sentence."
    messages = [
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": prompt}
    ]

    generation_args = {
        "max_new_tokens": 500,
        "return_full_text": False,
        "temperature": 0.0,
        "do_sample": False,
    }

    output = pipe(messages, **generation_args)
    child_sentences.append(output[0]['generated_text'])

  return child_sentences

### Grammar/sentence correction function- OpenAI API

In [None]:
def correct_sentences_api(sentences):
  sentence1,*sentence2=sentences
  response = client.chat.completions.create(
        model="gpt-4o-mini-2024-07-18",
        messages=[
        {"role": "user", "content": f"Correct the following sentences so they become standard English and return each corrected sentence separated by a delimiter '|':\n1. '{sentence1}'\n2. '{sentence2}'"}
        ],
        max_tokens=len(sentence1) + len(sentence2) + 77,  # Ensure enough tokens for the response
        n=1,
        stop=None,
        temperature=0.5,
    )

  corrected_sentences = response.choices[0].message.content
  return corrected_sentences.split("|")

### Function to get "Descriptive words" about the chosen image.

These words can be used in a hopefully hugely beneficial mutation function (this should not be triggered often) that can append words like "painting, Van Gogh" etcetera to the end of a caption to make it more descriptive

In [None]:
def get_descriptive_words_api(path_to_image, num_words):
  BASE_PROMPT= f"Generate {num_words} descriptive words or phrases for this image."
  get_descriptives_prompt= f"Could you give me {num_words} words that are descriptive of this image?. Examples of such words are painting, watercolours"
  image_path = path_to_image

  # Getting the base64 string
  base64_image = encode_image(image_path)

  headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {api_key}"
  }

  payload = {
    "model": "gpt-4o-mini",
    "messages": [
      {
        "role": "user",
        "content": [
          {
            "type": "text",
            "text": BASE_PROMPT
          },
          {
            "type": "image_url",
            "image_url": {
              "url": f"data:image/jpeg;base64,{base64_image}"
            }
          }
        ]
      }
    ],
    "max_tokens": 500
  }

  response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)

  print(response.json())

  resp_words=response.choices[0].message.content

  # Split the text into individual lines
  lines = resp_words.strip().split("\n")

  # Extract the descriptive words/phrases without the numbers and create a list
  descriptive_list = [line.split(". ", 1)[1] for line in lines]

  # Print the list
  print(descriptive_list)
  return descriptive_list

### Function to get descriptive words using NER - Not used yet

In [None]:
# List of image descriptions
descriptions = [
    "The image shows a beach scene with several colorful fishing boats.",
    "Fishing boats are resting on the sandy shore under a clear blue sky.",
    "Brightly painted boats are anchored on the golden beach with the ocean in the background.",
    "A vivid scene of boats on the shore with the horizon meeting the sea.",
    "Fishing vessels with colorful hulls on a sunny beach day.",
]


# Function to extract unique phrases
def extract_unique_phrases(descriptions):
  unique_phrases = set()
  for description in descriptions:
      doc = spacy_model(description)
      for chunk in doc.noun_chunks:
          unique_phrases.add(chunk.text.lower())
  return unique_phrases

# Extract unique phrases
#unique_phrases = extract_unique_phrases(descriptions)

# Print the unique phrases
#print(unique_phrases)

'''
# Extract unique entities
unique_entities = extract_unique_entities(descriptions)

# Print the unique entities
print(unique_entities)
'''

### Testing the LLM that is accessed via API

In [None]:
#incorrect_sentence="On a golden beach , where the lazuline sea kisses the boats are on a beach succeeding to the ocean ."

#print(correct_sentences_api(incorrect_sentence))

# Evolutionary Computing Aspects

In [None]:
# trial generation of an image from initial population
reference_image_path="/content/VincentVanGogh/Watercolors/The Night Cafe in Arles.jpg"

reference_image= Image.open(reference_image_path)

trial_prompt_1="Four colorful boats are on a beach next to the ocean."
trial_prompt_2="Vibrant boats rest on an orange beach under a vivid blue sky, their bold colors creating a striking contrast with the natural surroundings."

trial_prompt_3=" Vibrant boats with their bold colors sail on an orange beach under a vivid blue sky."

### Model Trial

In [None]:
image_1 = sd_model.text_to_image(trial_prompt_2, batch_size=1)
plot_images(image_1)

### Helper function to generate images for a prompt

In [None]:
def generate_image(individual):
  generated_image=sd_model.text_to_image(individual["prompt"], batch_size=1)
  return generated_image

### NLP helper functions to aid mutation and cross-over

In [None]:
from nltk.corpus import wordnet,stopwords
from nltk import pos_tag, word_tokenize
from nltk.chunk import ne_chunk
from sentence_transformers import SentenceTransformer, util
#import language_tool_python

stop_words = set(stopwords.words('english'))

#used to check if sentance after mutation and cross-over is grammatically correct and semantically similar
#tool = language_tool_python.LanguageTool('en-US')
transformer_model = SentenceTransformer('paraphrase-MiniLM-L6-v2')

In [None]:
#used to check whether sentence is semantically similar after synonym mutation
def is_sentence_semantically_similar(original_sentence, mutated_sentence, threshold=0.7):
    embeddings = transformer_model.encode([original_sentence, mutated_sentence])
    similarity = util.pytorch_cos_sim(embeddings[0], embeddings[1]).item()
    #print("Similarity is: "+str(similarity))
    return similarity >= threshold

def get_semantic_similarity(word1, word2):
    embeddings = transformer_model.encode([word1, word2])
    return util.pytorch_cos_sim(embeddings[0], embeddings[1]).item()

#used in the mutation function that is based on synonyms
def get_synonyms(word,threshold=0.8):
    synonyms = set()
    for syn in wordnet.synsets(word):
        for lemma in syn.lemmas():
            synonyms.add(lemma.name())
    synonyms.discard(word)
    synonyms_list=list(synonyms)

#making sure that only synonyms that are similar in meaning to the word being mutated are returned
#boat and gravy boat have a similarity of 0.6, so anything 0.75 and over should be good
    filtered_synonyms=[]
    for synonym in synonyms_list:
      if get_semantic_similarity(word,synonym)>=threshold:
        filtered_synonyms.append(synonym)

    return filtered_synonyms


In [None]:
#utilised in the cross-over function
def extract_phrases(sentence):
    words = word_tokenize(sentence)
    tagged = pos_tag(words)
    chunks = ne_chunk(tagged)
    phrases = []
    current_phrase = []
    for chunk in chunks:
        if hasattr(chunk, 'label'):
            current_phrase.append(' '.join(c[0] for c in chunk))
        else:
            if current_phrase:
                phrases.append(' '.join(current_phrase))
                current_phrase = []
            phrases.append(chunk[0])
    if current_phrase:
        phrases.append(' '.join(current_phrase))
    return phrases

#utilised in the noun phrase based crossover
def extract_noun_phrases_spacy(text):
    # Process the text with spaCy
    doc = spacy_model(text)

    # Extract noun phrases
    phrases = [chunk.text for chunk in doc.noun_chunks]
    return phrases


sentence = "The quick brown fox jumps over the lazy dog near the Eiffel Tower."
phrases=extract_phrases(sentence)
print(phrases)
phrases_spacy = extract_noun_phrases_spacy(sentence)
print(phrases_spacy)

### Defining the SSIM fitness function to compare generated image with reference image

In [None]:
reference_image_resized = reference_image.resize((256, 256))
reference_image_pil=reference_image_resized
reference_image_np=np.asarray(reference_image_pil)

In [None]:
from skimage.metrics import structural_similarity as ssim

def fitness_function_ssim(individual):

  #basic score-> compare similarity between generated image and reference image
  #similarity determined by Structural Similarity Index which considers changes in structural information, luminance, and contrast
  # ssim value ranges from -1 to 1

  #removing the batch dimension from generated image (diffusion model outputs 4 dim batch-first, channel-last image)
  g_image=individual["image"]
  gen_image=g_image[0]
  ssim_index, _ = ssim(gen_image, reference_image_np, full=True,win_size=7, channel_axis=2)
  #print(f'SSIM: {ssim_index}')

  return ssim_index

'''
# testing out ssim
image_1_without_channel=image_1[0]
ssim_index1, _ = ssim(image_1_without_channel, reference_image_np, full=True, win_size=7, channel_axis=2)
print("Fitness of generated image 1, relative to actual image: ", ssim_index1)

image_2_without_channel=image_2[0]

ssim_index2, _ = ssim(image_2_without_channel, reference_image_np, full=True,win_size=7,channel_axis=2)
print("Fitness of generated image 2, relative to actual image: ",ssim_index2)'''

### Defining the CLIP based fitness function

In [None]:
clip_model, preprocess = clip.load("ViT-B/32", device=DEVICE)

In [None]:
# Loading the summarization pipeline that is to be used to summarise descriptions if they're over 77 tokens
# this is required because CLIP only accepts a sequence of 77 tokens as input.
#summarizer = pipeline("summarization")

#function that takes in a individual description/prompt and outputs a summarised version.
def summarize_description(description,max_length):
  #sentence="Vibrant boats an orange beach a vivid blue sky their bold colors the ocean."
  prompt = f"Summarize this sentence: {description}. The summary should have a maximum of {max_length} tokens. Maintain the tone and as much detail as in original sentence as possible"
  messages = [
      {"role": "system", "content": "You are a helpful assistant."},
      {"role": "user", "content": prompt}
  ]

  generation_args = {
      "max_new_tokens": 50,
      "return_full_text": False,
      "temperature": 0.0,
      "do_sample": False,
  }

  output = pipe(messages, **generation_args)

  return output[0]['generated_text']


In [None]:
#image converted to a tensor
def preprocess_image(image):
  image = preprocess(image).unsqueeze(0).to(DEVICE)
  #print("Pre-processed image type:",type(image))
  #print("pre-processed image dimensions,",image.shape)
  return image

def preprocess_caption(caption):
  #if the prompt/caption/description is more than 77 tokens long, pass it into the summarizer to get a shorter summary for CLIP to work on

  if len(caption.split(' ')) > 50:
    print("Description too long for clip, being summarized")
    summ_caption = summarize_description(caption,max_length=50)
    token = clip.tokenize([summ_caption]).to(DEVICE)
    return token,summ_caption

  else:
    token = clip.tokenize([caption]).to(DEVICE)
    return token,caption

def get_image_embedding(image):
    with torch.no_grad():
        image_features = clip_model.encode_image(image)
    return image_features

def get_text_embedding(caption):
    with torch.no_grad():
        text_features = clip_model.encode_text(caption)
    return text_features

img=preprocess_image(reference_image_pil)
print(img.type)

In [None]:
CAPTION="The picture features four distinct boats, each with unique color patterns, against an orange shoreline. The visual elements include dynamic shapes that create a balanced composition, capturing the viewer's eye. The bold orange sands contrast with the fiery orange hues, while the shadows cast by the bamboo masts add depth to the sand and masts. Each boat is set against a deep blueish sky, which serves as the background."
res=preprocess_caption(CAPTION)
tok=res[0]
cap=res[1]
print(cap)
print(tok.shape[1])


In [None]:
CAPTION2= "The scene presents a tableau of four boats, their ornate curves and bold hues harmonizing with the fiery orange sands. Shadows stretch from bamboo masts, casting a playful dance against the azure horizon."
print(summarize_description(CAPTION2, max_length=50))

In [None]:
import torch.nn.functional as F

def compute_similarity(image_embedding, text_embedding):
    # Normalize the embeddings
    image_embedding = F.normalize(image_embedding, p=2, dim=-1)
    text_embedding = F.normalize(text_embedding, p=2, dim=-1)

    # Compute cosine similarity
    similarity = torch.matmul(text_embedding, image_embedding.T).item()
    return similarity


def fitness_function_clip(individual):
  individual_prompt=individual["prompt"]
  ref_image=preprocess_image(reference_image_pil)

  preprocess_caption_response = preprocess_caption(individual_prompt)

  token=preprocess_caption_response[0]
  individual["prompt"]=preprocess_caption_response[1]

  # Get embeddings
  image_embedding = get_image_embedding(ref_image)
  text_embedding = get_text_embedding(token)

  # Compute similarity
  fitness_score = compute_similarity(image_embedding, text_embedding)

  return fitness_score

### Defining the mutation function(s)

Synonym Mutate replaces the words that hit the mutation chance with a semantically similar synonym.

Vocab Mutate appends a descriptive word like "Water colors, Van Gogh, to the description being mutated.

In [None]:
import random as rd
# The MUTATION operator

def synonym_mutate(individual, mutation_rate=0.3):
    print("Performing Synonym mutatation")
    sentence=individual["prompt"]
    words = word_tokenize(sentence)
    pos_tags = pos_tag(words)
    for i in range(len(words)):
        word, pos = pos_tags[i]
        if word.lower() not in stop_words and pos in  ['JJ', 'JJR', 'JJS', 'NN', 'NNS', 'NNP', 'NNPS', 'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ']:  # Adjectives, Verbs and Nouns
            if random.random() < mutation_rate:
                synonyms = get_synonyms(word)
                if synonyms:
                    words[i] = random.choice(synonyms)
    #only return the mutated sentence if it is semantically similar to version pre-mutation (could help avoid boat becoming gravy holder and stuff like that )
    if is_sentence_semantically_similar(sentence,' '.join(words)):
      #print(' '.join(words)+" is similar to "+ sentence)
      return ' '.join(words)

    return sentence

def vocab_mutate(individual,vocab,mutation_rate=0.3):
  sentence=individual["prompt"]
  words=word_tokenize(sentence)

  if random.random()< mutation_rate:
    # Loop until a new vocab word is found
    print("Performing Vocab mutatation")
    while True:
        vocab_word = random.choice(vocab)
        if vocab_word not in words:
            words.append(vocab_word)
            break  # Exit the loop once a new word is added

  return ' '.join(words)

'''

# trialling out both the synonym and context based mutations
sentence = "Vibrant boats rest on an orange beach under a vivid blue sky, their bold colors creating a striking contrast with the natural surroundings."

i1={"prompt":sentence}
mutated_sentence1 = synonym_mutate(i1)
print(f'Mutated Sentence 1 [Synonym based]: {mutated_sentence1}')
#print(f'Mutated Sentence 2 [Contextual]: {mutated_sentence2}')
#mutated_sentence2 = contextual_mutation(i1, vocab)
'''


In [None]:
#this run has a 100% chance of vocab mutate  (if random.random()<0.0)
#this run has a 100% chance of synonym mutate (if random.random()>=0.0)

def mutate_population(population,mutation_rate,vocab):
  mutated_population=[]

  for individual in population:
    if random.random()>=0.0: #chance of synonym mutate
      individual["prompt"]= synonym_mutate(individual,mutation_rate)
      mutated_population.append(individual)

    else:#chance of vocab mutate
      individual["prompt"]=vocab_mutate(individual,vocab,mutation_rate)
      mutated_population.append(individual)

  return mutated_population


### Defining the crossover function

In [None]:
#phrase based crossover
def phrase_based_crossover(parent1, parent2,use_llm_api=False):
  if use_llm_api:
    correction_function=correct_sentences_api
  else:
      correction_function=correct_sentences
  parent1_phrases = extract_phrases(parent1["prompt"])
  parent2_phrases = extract_phrases(parent2["prompt"])

  if len(parent1_phrases) > 1 and len(parent2_phrases) > 1:
      crossover_point1 = random.randint(1, len(parent1_phrases) - 1)
      crossover_point2 = random.randint(1, len(parent2_phrases) - 1)

      child1_phrases = parent1_phrases[:crossover_point1] + parent2_phrases[crossover_point2:]
      child2_phrases = parent2_phrases[:crossover_point2] + parent1_phrases[crossover_point1:]

      #ensure that the children are correct sentences/descriptions
      corrected_child_phrases=correction_function([' '.join(child1_phrases),' '.join(child2_phrases)])

      child1_corrected_phrase=corrected_child_phrases[0]
      child2_corrected_phrase=corrected_child_phrases[1]

      return child1_corrected_phrase, child2_corrected_phrase
  else:
      return parent1, parent2

# the cross over that seems more useful-> workflow involves cross-over + llm based correction
def noun_phrase_based_crossover(parent1, parent2,use_llm_api=False):
    #print("Crossing over parents using their noun phrases")
    if use_llm_api:
      correction_function=correct_sentences_api
    else:
      correction_function=correct_sentences

    parent1_phrases = extract_noun_phrases_spacy(parent1["prompt"])
    parent2_phrases = extract_noun_phrases_spacy(parent2["prompt"])

    if len(parent1_phrases) > 1 and len(parent2_phrases) > 1:
        crossover_point1 = random.randint(1, len(parent1_phrases) - 1)
        crossover_point2 = random.randint(1, len(parent2_phrases) - 1)

        child1_phrases = parent1_phrases[:crossover_point1] + parent2_phrases[crossover_point2:]
        child2_phrases = parent2_phrases[:crossover_point2] + parent1_phrases[crossover_point1:]

        #print("Child 1 before being corrected: ",' '.join(child1_phrases))
        #print("Child 2 before being corrected: ",' '.join(child2_phrases))

        #ensure that the children are correct sentences/descriptions
        #print("Correcting the sentences")
        corrected_child_phrases=correction_function([' '.join(child1_phrases),' '.join(child2_phrases)])

        child1_corrected_phrase=corrected_child_phrases[0]
        child2_corrected_phrase=corrected_child_phrases[1]

        #print("Child 1 AFTER being corrected: ",child1_corrected_phrase)
        #print("Child 2 AFTER being corrected: ",child2_corrected_phrase)

        return child1_corrected_phrase, child2_corrected_phrase
    else:
        return parent1["prompt"], parent2["prompt"]



# TO DO: Phrase based crossover can be fixed length (first one is what we're doing and another one can be "adaptive" n-word number- as n grows larger, allow for more phrases to be crossed over )
trial_prompt_1="Four colorful boats are on a beach next to the ocean."
trial_prompt_2="Vibrant boats rest on an orange beach under a vivid blue sky, their bold colors creating a striking contrast with the natural surroundings."

parent1 = {"prompt":trial_prompt_1}
parent2 = {"prompt":trial_prompt_2}

'''#trialling out the phrase based cross-over
child1, child2 = phrase_based_crossover(parent1, parent2, use_llm_api=False)
print(f'Phrase based Child 1: {child1}')
print(f'Phrase based Child 2: {child2}')'''

print("\n\n")
#trialling out the phrase based cross-over
nchild1, nchild2 = noun_phrase_based_crossover(parent1, parent2,use_llm_api=False)
print(f'noun Phrase based Child 1: {nchild1}')
print(f'noun Phrase based Child 2: {nchild2}')


In [None]:
# The CROSSOVER operator -generates kids until population size is back to normal (after selection process)
def crossover_population(population,cross_over_func,num_parents=2,POP_SIZE=20):

  needed_kids=POP_SIZE-len(population)
  print(f"Crossover needs to create {needed_kids} new samples")
  children=[]

  #using the specified cross over function
  if cross_over_func=="NOUN":
    cross_over_func=noun_phrase_based_crossover
  elif cross_over_func=="PHRASE":
    cross_over_func=phrase_based_crossover

  while len(children)<needed_kids:
    #choosing 2 parents from the population
    parent1,parent2=rd.sample(population,2)
    #actually doing the cross over
    child1prompt,child2prompt=cross_over_func(parent1, parent2,use_llm_api=False)


    if len(children) < needed_kids:
      child1caption=""
      if len(child1prompt.split(' ')) > 50:
        print("Child 1 description too long for clip, being summarized")
        child1caption = summarize_description(child1prompt,max_length=50)

      else:
        child1caption=child1prompt

      child1 = {
          "prompt": child1caption,
          "image": None,
          "fitness": -1000
      }
      child1["fitness"]=round(fitness_function_clip(child1),3)
      children.append(child1)

    if len(children) < needed_kids:
      child2caption=""
      if len(child2prompt.split(' ')) > 50:
        print("Child 2 description too long for clip, being summarized")
        child2caption = summarize_description(child2prompt,max_length=50)

      else:
        child2caption=child2prompt
      child2 = {
          "prompt": child2prompt,
          "image": None,
          "fitness": -1000
      }
      child2["fitness"]=round(fitness_function_clip(child2),3)
      children.append(child2)

  return children

### Tournament Selection

In [None]:
#fight function and tournament selection function
def fight(fight_participants):
  currWinner = fight_participants[0]
    #loop through participants and identify individual with highest fitness
  for i in range(1, len(fight_participants)):
      if fight_participants[i]["fitness"] > currWinner["fitness"]:
          currWinner = fight_participants[i]
  return currWinner

# Tournament selection function should return winners of the tournament
def tournament_selection(population, tournament_size,reference_image=reference_image):
    strongest_individuals=[]

    #making sure every text prompt is involved in the fight
    num_rounds=round(len(population)/tournament_size)
    for i in range(0, num_rounds):

        #giving some prompts a pass or a by if a fight doesn't have enough participants
        if (len(population)<tournament_size):
            for p in population:
                strongest_individuals.append(p)
            return strongest_individuals


        #randomly choosing tournament_size fighters to fight
        individuals_to_fight=rd.sample(population,tournament_size)

        #getting the winner of the fight and adding him to winners list
        strongest_individuals.append(fight(individuals_to_fight))

        #remove combatants from population (a prompt that has already "fought" won't fight again)
        for fighter in individuals_to_fight:
            population.remove(fighter)

    return strongest_individuals


### Defining the basic parameters of the Genetic Algorithm's evolutionary loop

In [None]:
TOTAL_GENERATIONS=50
TOURNAMENT_SIZE=2
NUM_PARENTS=2
MUTATION_RATE=0.3
POP_SIZE=10
CROSSOVER_FUNC="NOUN"

### Initialising the population of prompts and the image they should ideally generate

In [None]:
def read_prompts_from_file(file_path,pop_size):
    with open(file_path, 'r') as file:
        prompts = [line.strip() for line in file if line.strip()]

    prompts_list=[]

    for idx,prompt in enumerate(prompts):
      if idx==pop_size:
        break
      print(f"prompt {idx}: "+ prompt)
      prompts_list.append(prompt.strip())

    return prompts_list

In [None]:
def process_json(api_response):
  # Use the regex to find all matches
  matches = re.findall(r'\d+\.\s\*\*[^:]+:\*\*\s(.+?)(?=\n\d+\.|\Z)', api_response, re.DOTALL)

  # Print the results
  for match in matches:
      print(match.strip())

  return matches

In [None]:
import re
import json
#initially population initialisation can be just reading in the text file of pre-defined prompts
def initialise_population(pop_size=20,image_path=None, read_from_file=False,make_api_call=True):
  print("Initialising population")
  #population is going to be a list of dict objects
  # population=[individual1:{"prompt":"a description of the image","fitness":fitness score, "image":generatedimage}, individual2]
  initial_population=[]

  print("Obtaining prompts")

  if read_from_file:
    # Reading in the prompts from text file (these will make up the initial population)
    prompts = read_prompts_from_file(f"/content/Cafe-Experiment3-Data.txt",pop_size)

    print(f" Read in {len(prompts)} prompts")
    print("Creating representations")
    #initially setting fitness to a very low value for every prompt/text description
    for prompt in prompts:
      individual={}
      individual["prompt"]=prompt
      #individual["image"]=generate_image(individual)
      individual["fitness"]=-1000

      initial_population.append(individual)

#else use the LLM available via API to get initial descriptions
  elif make_api_call:
    print("Using the LLM to get initial descriptions")

    descriptions_response=get_initial_descriptions(path_to_image=image_path,num_descriptions=pop_size)
    desc_resp_json=descriptions_response.json()
    #desc_resp_json["choices"][0]['message']['content']

    file_name = 'descriptions_response.json'

    # Open the file in write mode and dump the JSON data
    with open(file_name, 'w') as file:
      json.dump(desc_resp_json, file)

    print(f"JSON data has been written to {file_name}")

    # Find all descriptions
    descriptions = process_json(desc_resp_json["choices"][0]['message']['content'])#desc_resp_json["choices"][0]['message']['content']
    print("Number of descriptions:",len(descriptions))

    print(descriptions)

    for prompt in descriptions:
      individual={}
      individual["prompt"]=prompt
      #individual["image"]=generate_image(individual)
      individual["fitness"]=-1000
      initial_population.append(individual)

  else: #if api call has been made already

    print("Using LLM api call response stored in file")
    # Find all descriptions
    with open("descriptions_response.json", 'r') as file:
      data = json.load(file)
    print(data['choices'][0]['message']['content'])
    descriptions = process_json(data["choices"][0]['message']['content'])#desc_resp_json["choices"][0]['message']['content']
    print("Number of descriptions:",len(descriptions))

    print(descriptions)

    for prompt in descriptions:
      individual={}
      individual["prompt"]=prompt
      #individual["image"]=generate_image(individual)#Stable Diffusion used to create image from description
      individual["fitness"]=-1000 #fitness score arbitrarily set to a value initially
      initial_population.append(individual)


  return initial_population


In [None]:
'''individual={}
individual["prompt"]="Insert description of image here"
#Stable Diffusion used to create image from description/prompt
individual["image"]=generate_image(individual)
individual["fitness"]=0.18 #fitness score'''

### Function to get descriptive words about an image to be used in Vocab mutate

In [None]:
def get_descriptive_words_api(path_to_image, num_words):
  BASE_PROMPT= f"Generate {num_words} descriptive words or phrases for this image."
  get_descriptives_prompt= f"Could you give me {num_words} words that are descriptive of this image?. Examples of such words are painting, watercolours"
  image_path = path_to_image

  # Getting the base64 string
  base64_image = encode_image(image_path)

  headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {api_key}"
  }

  payload = {
    "model": "gpt-4o-mini",
    "messages": [
      {
        "role": "user",
        "content": [
          {
            "type": "text",
            "text": BASE_PROMPT
          },
          {
            "type": "image_url",
            "image_url": {
              "url": f"data:image/jpeg;base64,{base64_image}"
            }
          }
        ]
      }
    ],
    "max_tokens": 500
  }

  response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)

  print(response.json())

  resp_json=response.json()
  resp=resp_json["choices"][0]['message']['content']

  # Split the text into individual lines
  lines = resp.strip().split("\n")

  # Extract descriptive words/phrases, ignoring the first line (greeting line)
  descriptive_list = [line.split(". ", 1)[1] for line in lines[1:] if ". " in line]

  # Print the list
  print(descriptive_list)

  return descriptive_list

### Statistical stuff and function to evaluate fitness of population

In [None]:
#function that loops through population and evaluates/assigns fitness for each individual
def evaluate_population_fitness(population,use_ssim=False):

  fitness_population=population
  for index,individual in enumerate(fitness_population):
    #every x generations the existing fitness of the individual is multiplied by the ssim score
    if use_ssim:
      individual["fitness"]*=(1+round(fitness_function_ssim(individual),3))
    #normally (for every generation) the fitness of an individual is calculated using clip score.
    else:
      individual["fitness"]=round(fitness_function_clip(individual),3)

  return fitness_population

In [None]:
def get_population_average_fitness(population):
  total_fitness=0
  for individual in population:
    #print(f"Adding {individual['fitness']} to {total_fitness}")
    total_fitness+=individual["fitness"]

  #print(f"Dividing {total_fitness} by {len(population)}")
  return total_fitness/len(population)

In [None]:
def get_population_max_fitness(population):
  max_fitness=0
  fittest_description=None
  for individual in population:
    if individual["fitness"]>max_fitness:
      max_fitness=individual["fitness"]
      fittest_description=individual["prompt"]

  return max_fitness,fittest_description

In [None]:
def print_top_individuals(population, top_n=5):
    # Sort the population by fitness in descending order
    sorted_population = sorted(population, key=lambda x: x["fitness"], reverse=True)
    top_individuals = sorted_population[:top_n]
    print(f"Top {top_n} individuals:")
    for idx, individual in enumerate(top_individuals):
        print(f"Rank {idx+1}: Fitness = {individual['fitness']}, Prompt = {individual['prompt']}")

### Running the Loop

In [None]:
#initialising the population
IMAGE_PATH="/content/VincentVanGogh/Watercolors/The Night Cafe in Arles.jpg"
initial_population=initialise_population(pop_size=10,image_path=IMAGE_PATH,read_from_file=True,make_api_call=False)
#Fitness still needs to be calculated

In [None]:
print(len(initial_population))

#generating the vocabulary of descriptive words
num_words=30
vocab_list=get_descriptive_words_api(IMAGE_PATH, num_words)

vocab_list.append("watercolours painting")
vocab_list.append("Van Gogh style")

In [None]:
import copy
population = copy.deepcopy(initial_population)

avg_fitness_per_generation=[]
max_fitness_per_generation=[]
best_individual_per_generation=[]

MUTATION_RATE=0.3

#Evolutionary loop
for generation in range(1,51):

  #evaluate fitness of entire population
  population=evaluate_population_fitness(population=population,use_ssim=False)
  print(f"\n---------------------------------------Generation {generation}---------------------------------------")
  print_top_individuals(population, top_n=3)

  # TO DO: have an elitism toggle to save a % high fitness individuals every generation to guarantee they make it to the next generation
  #tournament selection
  print("\nTOURNAMENT SELECTION:")
  print("Performing tournament selection on the population")
  print("Number of prompts in population: ",len(population))
  winners=tournament_selection(population=population,tournament_size=TOURNAMENT_SIZE)
  population=winners

  print("Number of prompts in population (After tournament): ",len(population))

  #crossover between winners to generate children which are then added to the population
  print("\nCROSSOVER PHASE:")
  print("Performing Cross-over on the population")
  crossed_over_children=crossover_population(population=population,num_parents=NUM_PARENTS,POP_SIZE=len(initial_population),cross_over_func=CROSSOVER_FUNC) #giving the function the population and the number of parents involved in a crossover
  population.extend(crossed_over_children)
  print("Number of prompts in population (After crossover): ",len(population))
  print("Cross-over phase completed")

  #mutation
  print("\nMUTATION PHASE:")
  print("Mutating population")
  population=mutate_population(population=population,mutation_rate=MUTATION_RATE,vocab=vocab_list)
  print("Mutation phase completed")

  #every 10 generations in, use the ssim score as a multiplier to the clip fitness
  #generate "new" images for current individual prompts in population


  if generation % 10 ==0:
    print("\nIMAGE GENERATION PHASE:")
    print("Generating images for prompts in population")
    for individual in population:
      individual["image"]=generate_image(individual)
    population=evaluate_population_fitness(population=population,use_ssim=True)

  #storing the average fitness per generation and also storing the max fitness per generation (along with the best description per generation)
  avg_fitness=get_population_average_fitness(population)
  avg_fitness_per_generation.append(avg_fitness)

  max_fitness_vals=get_population_max_fitness(population)
  best_individual=max_fitness_vals[1]
  best_individual_per_generation.append(best_individual)
  max_fitness_per_generation.append(max_fitness_vals[0])

  print(f"-------------------Generation {generation} Results summary:---------------------------")
  print(f"Average Fitness in generation {generation}: {avg_fitness}" )
  print(f"Best Individual (fitness:{max_fitness_vals[0]}): {best_individual}")
  print(f"\nEND OF GENERATION {generation}")

  #end of generation

#end of evolutionary loop


In [None]:
#final fitness evaluation after final selection + cross-over/mutation
population=evaluate_population_fitness(population)
print_top_individuals(population,5)
avg_fitness=get_population_average_fitness(population)
avg_fitness_per_generation.append(avg_fitness)

max_fitness_vals=get_population_max_fitness(population)
best_individual_per_generation=max_fitness_vals[1]
max_fitness_per_generation.append(max_fitness_vals[0])

print(f"-------------------Generation {generation} Results summary:---------------------------")
print(f"Average Fitness in generation {generation}: {avg_fitness}" )
print(f"Best Individual (fitness:{max_fitness_vals[0]}): {best_individual}")
print(f"\nEND OF GENERATION {generation}")


### Writing results to a file

In [None]:
def write_fitness_to_file(file_path, average_fitness, max_fitness):
    """
    Writes the average fitness and max fitness per generation to a file.

    Parameters:
    - file_path: Path to the file where the data will be written.
    - average_fitness: List of average fitness values per generation.
    - max_fitness: List of max fitness values per generation.
    """
    with open(file_path, 'w') as file:
        file.write("Generation,Average Fitness,Max Fitness\n")
        for i in range(len(average_fitness)):
          file.write(f"{i},{average_fitness[i]},{max_fitness[i]}\n")

file_path = f"/content/Experiment2-SynonymMutate-population{len(population)}_fitness_per_generation.csv"
write_fitness_to_file(file_path, avg_fitness_per_generation, max_fitness_per_generation)

### Graphs

In [None]:
def plot_fitness(x, y, xlabel, ylabel, title):
    """
    Plots a graph given x and y data along with axis labels and a title.

    Parameters:
    - x: List of values for the x-axis
    - y: List of values for the y-axis
    - xlabel: Label for the x-axis
    - ylabel: Label for the y-axis
    - title: Title of the plot
    """
    #multiplying the fitness values by 100 to make for better visualisation
    transformed_y = [fitness * 100 for fitness in y]
    plt.figure(figsize=(10, 6))
    plt.plot(x, transformed_y, marker='o', linestyle='-', color='b')
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.title(title)
    plt.grid(True)
    plt.show()

In [None]:
print("Plotting Average fitness against generations")
plot_fitness(x=range(len(avg_fitness_per_generation)), y=avg_fitness_per_generation, xlabel='Generation', ylabel='Average Fitness', title=f"Average Fitness per Generation [Population:{len(population)}]")


In [None]:
print("Plotting Maximum fitness against generations")
plot_fitness(x=range(len(max_fitness_per_generation)),y=max_fitness_per_generation,xlabel='Generation',ylabel='Max. Fitness',title=f"Maximum Fitness per Generation [Population:{len(population)}]")

#Experimental stuff /Old code

BEST INDIVIDUAL (pop=10)-> ""The bold brushstrokes depict four boats on an orange sandy beach: an artistically designed fisherman's vessel, colorful boats, and an artistic composition." Fitness of 0.372 (CLIP score).

Improved maximum fitness by 5% after 50 generations


Experiment 2 (Vocab Mutate vs Synonym mutate)
"Fishing boats on the Beach"

Vocab Mutate:
Best Individual (before correction)" "The painting depicts a cluster of four boats with varying colors on a sandy beach , with a vibrant blue sea in the background , and an artful representation of light and shadow . Bold outlines Colorful boats watercolours painting" -> CLIP score of 0.356

Best individual (after correction): ""The painting depicts a cluster of four boats in varying colors on a sandy beach, with a vibrant blue sea in the background, and an artful representation of light and shadow. The bold outlines and colorful boats are rendered in watercolors." -> CLIP score of 0.356

Improved average fitness by 3% (31 to 34) and improved max fitness by 1.5% roughly.


Synonym Mutate:




Experiment 3 (data collected from people)
------------------------------------------------------------
"The Night Cafe in Arles"

Best individual:
"A Van Gogh painting depicts a café with a pool table on the right side, bar stools, bold colors in the background, eccentric lighting, and tables with glassware. The walls have colorful accents."" fitness of 0.412 (CLIP score)

Improved maximum fitness by 3.5% to 4% (from 0.37 to 0.412) after 50 generations and average fitness by 7% at its peak(from 0.32 to 0.39)


"The Langlois Bridge at Arles"

Best Individual: ""A vibrant-colored wooden drawbridge on a windy hill, in the style of Vincent van Gogh, overlooks a flowing river on another windy hill, reflecting the historical context."" - fitness of 0.393 (CLIP score)

Improved maximum fitness by 4 to 5%  (from 0.34 to 0.39) after 50 generations and average fitness by 6% at its peak (from 0.31 to 0.36)

#Old Code

### Mechanism for cross-over (phrase based is fine):

### If a better cross-over mechanism is found and implemented, it can happen x% of the time to direct the evolution. -> implemented noun phrase mutation as default, it doesn't happen x% of the time.

### Mechanism to correct grammar after cross-over -> DONE

### Mutation to add descriptive elements that were obtained initially (extra mutation) -> DONE

### Mutation to remove a phrase randomly (Very low probability)

### To show "directed evolution"-> implement elitism.


2 Fitness functions: DONE

1-> CLIP based (comparing text to reference image in the embedding space) used initially. DONE

2-> Image based fitness function (SSIM) (every 10 generations) DONE





### old code

In [None]:
'''!pip install transformers
from transformers import CLIPProcessor, CLIPModel

model_name = "openai/clip-vit-large-patch14-336"
processor = CLIPProcessor.from_pretrained(model_name)
model = CLIPModel.from_pretrained(model_name)'''

In [None]:
'''
#"Qwen/Qwen2-7B-Instruct" stuff after tokenizer


#sentence="Vibrant boats an orange beach a vivid blue sky their bold colors the ocean."
prompt = f"Correct this sentence: {sentence}"
messages = [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": prompt}
]

text = tokenizer.apply_chat_template(
    messages,
    tokenize=False,
    add_generation_prompt=True
)
model_inputs = tokenizer([text], return_tensors="pt").to(device)

generated_ids = model.generate(
    model_inputs.input_ids,
    max_new_tokens=512
)
generated_ids = [
    output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
]

response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]'''

In [None]:
# Contextual mutation can be used in later stages of the experimentation process, its not ready yet.

#TO DO: Vocabulary should be all words in the initial population of prompts (should I update vocabulary after every generation?).
# OR should Vocabulary be obtained by providing an image to a multi-modal LLM and ask it to provide a list of adjectives that can describe the image (artistic, painting, watercolour and stuff like that).
# Also for the replace bit-> should POS tag all words in prompt being mutated, POS tag vocabulary and then replace noun with noun, verb with verb etcetera.
'''
def contextual_mutation(individual, vocab, mutation_rate=0.3):
    sentence=individual["prompt"]
    words = word_tokenize(sentence)
    for i in range(len(words)):
        if random.random() < mutation_rate:
            #action = random.choice(['replace', 'add', 'remove'])
            action="replace"
            if action == 'replace':
                words[i] = random.choice(vocab)
            elif action == 'add':
                words.insert(i, random.choice(vocab))
            elif action == 'remove' and len(words) > 1:
                words.pop(i)
    return ' '.join(words)