In [None]:
import string
import pandas as pd
import random
import csv
from sklearn.metrics import f1_score
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline

# Replace with your Hugging Face access token and model ID
access_token = "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"
model_id = "meta-llama/Meta-Llama-3.2-11B-Instruct"

# Load the tokenizer and model
print("Loading LLaMA 3.2 11B model...")
tokenizer = AutoTokenizer.from_pretrained(model_id, use_auth_token=access_token)
model = AutoModelForCausalLM.from_pretrained(model_id, use_auth_token=access_token, device_map="auto")
llama_pipeline = pipeline("text-generation", model=model, tokenizer=tokenizer, max_length=800, device=0)
print("Model loaded successfully.")

def ask_question_llama(prompt, question, max_length=800, temperature=0.7):
    try:
        full_prompt = f"{prompt}\n{question}"
        response = llama_pipeline(full_prompt, max_length=max_length, temperature=temperature, return_full_text=False)
        llama_answer = response[0]['generated_text'].strip().lower()
        processed_answer = llama_answer.translate(str.maketrans('', '', string.punctuation)).strip().lower()
        return llama_answer, processed_answer
    except Exception as e:
        print(f"An error occurred: {e}")
        return None, None

def summarize_relation(predicted_value):
    question = f"Summarize the term '{predicted_value}' to one of the following: 'activation', 'inhibition', or 'no information'."
    llama_answer, final_answer = ask_question_llama("", question)
    return final_answer

def calculate_metrics(df):
    relations = ['activation', 'inhibition']
    f1_scores = []

    df['processed_predict_relation'] = df['predict_relation'].apply(
        lambda x: x if x in relations or x == "no information" else summarize_relation(x))

    for relation in relations:
        true_values = (df['relation'] == relation)
        predicted_values = (df['processed_predict_relation'] == relation)
        try:
            f1 = f1_score(true_values, predicted_values)
        except ValueError:
            f1 = 0
        f1_scores.append(f1)

    overall_f1 = f1_score(df['relation'], df['processed_predict_relation'], average='micro')
    return overall_f1, f1_scores

def get_fitness(prompt, instruction, training_file="training.csv"):
    """
    Evaluate the fitness of a prompt using the LLaMA model and data from training.csv.
    """
    training_df = pd.read_csv(training_file)

    dp = "Example: Q: What effect does gene EGF have on gene EGFR? A: Activation. " \
         "Q: What effect does gene GRK2 have on gene OR2AJ1? A: Inhibition. " \
         "Q: What effect does gene CDK9 have on gene NELFB? A: No information."

    training_df = training_df.sample(frac=1).reset_index(drop=True)
    answers = []

    for _, row in training_df.iterrows():
        starter = row['starter']
        receiver = row['receiver']
        relation = row['relation_name']

        llama_answer, relation1 = ask_question_llama(prompt + '\n' + dp, instruction.format(gene1=starter.upper(), gene2=receiver.upper()))
        answers.append({'starter': starter, 'receiver': receiver, 'relation': relation,
                        'LLaMA_answer': llama_answer, 'predict_relation': relation1, 'prompt': instruction})

    answer_df = pd.DataFrame(answers)
    overall_f1, f1_scores = calculate_metrics(answer_df)
    return overall_f1

def evaluate_prompts(roles, aims, instructions, descriptions, training_file="training.csv"):
    fitness_scores = []
    for role, aim, instruction, description in zip(roles, aims, instructions, descriptions):
        prompt = f"Act as a {role}, {aim}, {instruction}, {description}"
        fitness_value = get_fitness(prompt, instruction, training_file=training_file)
        fitness_scores.append(fitness_value)
    return fitness_scores

def normalize_scores(f1_scores):
    min_val, max_val = min(f1_scores), max(f1_scores)
    return [(score - min_val) / (max_val - min_val) if max_val - min_val != 0 else 1 for score in f1_scores]

def initialize_population(roles, aims, instructions, descriptions, training_file="training.csv"):
    population = list(zip(roles, aims, instructions, descriptions))
    fitness = evaluate_prompts(roles, aims, instructions, descriptions, training_file=training_file)
    normalized_fitness = normalize_scores(fitness)

    pwf = [(individual, score) for individual, score in zip(population, normalized_fitness)]
    return pwf

def tournament_selection(pwf, tournament_size=5):
    tournament = random.sample(pwf, tournament_size)
    return max(tournament, key=lambda x: x[1])[0]

def single_point_crossover(parent1, parent2):
    crossover_point = random.randint(1, len(parent1) - 1)
    offspring1 = parent1[:crossover_point] + parent2[crossover_point:]
    offspring2 = parent2[:crossover_point] + parent1[crossover_point:]
    return offspring1, offspring2

def perform_crossover(pwf, crossover_rate=0.7, tournament_size=5):
    new_population = []
    while len(new_population) < len(pwf):
        parent1 = tournament_selection(pwf, tournament_size)
        parent2 = tournament_selection(pwf, tournament_size)
        if random.random() < crossover_rate:
            offspring1, offspring2 = single_point_crossover(parent1, parent2)
            new_population.append(offspring1)
            new_population.append(offspring2)
        else:
            new_population.append(parent1)
            new_population.append(parent2)
    return new_population[:len(pwf)]

def mutate_individual(individual, mutation_rate=0.3, population=None):
    if population is None:
        return individual

    roles, aims, instructions, descriptions = zip(*population)
    mutated_individual = list(individual)

    for i in range(len(mutated_individual)):
        if random.random() < mutation_rate:
            if i == 0:  # Mutate "role"
                mutated_individual[i] = random.choice(roles)
            elif i == 1:  # Mutate "aim"
                mutated_individual[i] = random.choice(aims)
            elif i == 2:  # Mutate "instruction"
                mutated_individual[i] = random.choice(instructions)
            elif i == 3:  # Mutate "description"
                mutated_individual[i] = random.choice(descriptions)

    return tuple(mutated_individual)

def perform_mutation(population, mutation_rate=0.3):
    return [mutate_individual(individual, mutation_rate, population) for individual in population]

def write_to_csv(population, fitness_scores, filename="adjusted.csv"):
    with open(filename, "w", newline='') as csvfile:
        csv_writer = csv.writer(csvfile)
        csv_writer.writerow(["role", "aim", "instruction", "description", "normalized_fitness"])
        for individual, fitness in zip(population, fitness_scores):
            role, aim, instruction, description = individual
            csv_writer.writerow([role, aim, instruction, description, fitness])

# Usage
df = pd.read_csv("merged_initial_v828.csv")
roles, aims, instructions, descriptions = df['roles'].tolist(), df['aims'].tolist(), df['instructions'].tolist(), df['descriptions'].tolist()
pop_with_fitness = initialize_population(roles, aims, instructions, descriptions, training_file="training.csv")
new_generation = perform_crossover(pop_with_fitness, crossover_rate=0.7)
mutated_generation = perform_mutation(new_generation, mutation_rate=0.3)
new_fitness = evaluate_prompts(*zip(*mutated_generation), training_file="val.csv")
normalized_new_fitness = normalize_scores(new_fitness)
write_to_csv(mutated_generation, normalized_new_fitness)
