In [8]:
import os
from openai import OpenAI  #OpenAI API client
from dotenv import load_dotenv  #For loading environment variables from .env file
load_dotenv()  #This loads variables from a .env file into environment variables
api_key = os.getenv("OPENAI_API_KEY")  #Get the OpenAI API key from environment variables
client = OpenAI(api_key=api_key)  #Initialize the OpenAI client with the API key
import pandas as pd
from Bio import SeqIO
from tqdm import tqdm


In [9]:
fasta_dir = "sequences"

# Mapping of base categories to questions
questions = {
    "protein_coding": "Does this nucleotide sequence encode a protein? Only answer Yes or No. You must start your answer with 'Yes' or 'No'.",
    "enhancer": "Does this nucleotide sequence function as an enhancer in gene regulation? Only answer Yes or No. You must start your answer with 'Yes' or 'No'.",
    "promoter": "Does this nucleotide sequence act as a promoter for transcription initiation? Only answer Yes or No. You must start your answer with 'Yes' or 'No'.",
    "splice_site": "Does this nucleotide sequence contain a splice site for RNA processing? Only answer Yes or No. You must start your answer with 'Yes' or 'No'.",
    "methylated": "Is this nucleotide sequence methylated as part of epigenetic regulation? Only answer Yes or No. You must start your answer with 'Yes' or 'No'."
}

# Dictionary to hold sequence data
sequence_arrays = {}

# Loop through all FASTA files in the directory
for file in os.listdir(fasta_dir):
    if file.endswith(".fasta"):  # Process only FASTA files
        file_path = os.path.join(fasta_dir, file)
        var_name = os.path.splitext(file)[0]  # Use file name (without extension) as key

        # Determine base category
        if var_name.startswith("non_"):
            base_category = var_name[4:]  # Remove "non_" prefix
        else:
            base_category = var_name
        question = questions[base_category]  # Get the corresponding question

        # Read the FASTA file and create a list of dictionaries
        gene_dicts = []
        for record in SeqIO.parse(file_path, "fasta"):
            gene_dict = {
                "file_name": var_name,
                "gene": str(record.seq),
                "question": question,  # Add the question key
            }
            gene_dicts.append(gene_dict)
        
        # Store the list of dictionaries in the dictionary
        sequence_arrays[var_name] = gene_dicts

        print(f"Stored {var_name} with {len(gene_dicts)} genes.")

Stored enhancer with 98 genes.
Stored methylated with 100 genes.
Stored non_enhancer with 98 genes.
Stored non_methylated with 100 genes.
Stored non_promoter with 100 genes.
Stored non_protein_coding with 88 genes.
Stored non_splice_site with 70 genes.
Stored promoter with 100 genes.
Stored protein_coding with 100 genes.
Stored splice_site with 70 genes.


In [10]:
def generate_hallucination(gene_dict, model):
    response = client.chat.completions.create(
            model= model,  #Use the model specified in the constructor

            messages=
            [
            {"role": "system", "content": "You are an expert in genomics."},
            {"role": "user", "content": f"{gene_dict['gene']} Describe the gene in natural language:"}
        ],  #The conversation context to send to the API


            max_tokens=256,  #Maximum length of the response

            temperature=0.6  #Controls randomness/creativity of the response
        )
    return response.choices[0].message.content

In [11]:
def generate_classification(gene_dict, model):
    response = client.chat.completions.create(
            model = model,  #Use the model specified in the constructor

            messages=
            [
            {"role": "system", "content": "You are an expert in genomics."},
            {"role": "user", "content": f"{gene_dict['gene']} {gene_dict['question']}"}
        ],  #The conversation context to send to the API


            max_tokens=256,  #Maximum length of the response

            temperature=0.6  #Controls randomness/creativity of the response
        )
    return response.choices[0].message.content

In [12]:
def generate_hallucination_classification(gene_dict, model):
    response = client.chat.completions.create(
            model = model,  #Use the model specified in the constructor

            messages=
            [
            {"role": "system", "content": "You are an expert in genomics."},
            {"role": "user", "content": f"{gene_dict['gene']} {gene_dict['hallucination']} {gene_dict['question']}"}
        ],  #The conversation context to send to the API


            max_tokens=256,  #Maximum length of the response

            temperature=0.6  #Controls randomness/creativity of the response
        )
    return response.choices[0].message.content

In [None]:
all_results = []

MODELS = [
    "gpt-3.5-turbo",
    "gpt-4o-mini",

]
for model_idx, model in enumerate(MODELS):
    model_progress = tqdm(total=len(sequence_arrays), desc=f"Model [{model_idx+1}/6]: {model.split('/')[-1]}")
    
    for file_idx, (file, sequences) in enumerate(sequence_arrays.items()):
        file_progress = tqdm(total=len(sequences), desc=f"File [{file_idx+1}/{len(sequence_arrays)}]: {file}", leave=False)
        
        for gene_idx, gene in enumerate(sequences):
            file_progress.set_postfix_str(f"Gene {gene_idx+1}/{len(sequences)}")
            
            result = gene.copy()
            result['model'] = model
            
            try:
                result['hallucination'] = generate_hallucination(result, model)
            except Exception as e:
                print(f"Error generating hallucination for {file}, gene {gene_idx+1}: {str(e)}")
                result['hallucination'] = None
            
            try:
                result['hallucination_classification'] = generate_hallucination_classification(result, model)
            except Exception as e:
                print(f"Error generating hallucination classification for {file}, gene {gene_idx+1}: {str(e)}")
                result['hallucination_classification'] = None
            
            try:
                result['classification'] = generate_classification(result, model)
            except Exception as e:
                print(f"Error generating classification for {file}, gene {gene_idx+1}: {str(e)}")
                result['classification'] = None
            
            all_results.append(result)
            
            file_progress.update(1)
        
        file_progress.close()
        model_progress.update(1)
    
    model_progress.close()


Model [1/6]: gpt-3.5-turbo:   0%|          | 0/10 [00:20<?, ?it/s]
File [1/10]: enhancer:  10%|█         | 10/98 [00:20<02:54,  1.99s/it, Gene 11/98]

In [None]:
df = pd.DataFrame(all_results)
df['correct'] = df['file_name'].str.contains("non", case=True, na=False).map({True: "No", False: "Yes"})
df['hallucination_classification'] = df['hallucination_classification'].str.replace(".", "", regex=False)
df['classification'] = df['classification'].str.replace(".", "", regex=False)

In [None]:
df['hallucination_classification'] = df['hallucination_classification'].apply(
    lambda x: 'Yes' if isinstance(x, str) and x.split()[0].lower() == 'yes' else
              'No' if isinstance(x, str) and x.split()[0].lower() == 'no' else np.nan
)
print(df['hallucination_classification'].values)

df['classification'] = df['classification'].apply(
    lambda x: 'Yes' if isinstance(x, str) and x.split()[0].lower() == 'yes' else
              'No' if isinstance(x, str) and x.split()[0].lower() == 'no' else np.nan
)

In [None]:
df = df.dropna()

df['hallucination_correct'] = df['hallucination_classification'] == df['correct']
df['no_hallucination_correct'] = df['classification'] == df['correct']
df.to_csv("trail_1_GPT.csv", index=False)

In [None]:
grouped_results = df.groupby('model')[['hallucination_correct', 'no_hallucination_correct']].mean()
grouped_results