In [None]:
#install what needs to be installed setting up instance here
!pip uninstall -y bitsandbytes accelerate
%pip install -U bitsandbytes accelerate
%pip install langdetect

import bitsandbytes as bnb
import accelerate
from langdetect import detect

#Google Drive mount here
from google.colab import drive
drive.mount('/content/gdrive/')

In [None]:
#enabling import of custom module for colab
! cp /content/gdrive/MyDrive/Util.py . 

In [None]:
#set up file location here
import sys
sys.path.append('/content/gdrive/MyDrive')

# import libraries
from Util import load_data_JSON, load_model, write_out
import pandas as pd
from huggingface_hub import login
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, pipeline
import torch

In [None]:
#load the data
general_df, prompt_g, correct_g, gtp4_g, Info_g = load_data_JSON('/content/gdrive/MyDrive/general_data.json', 'general')
qa_df, prompt_q, correct_q, gtp4_q, Info_q = load_data_JSON('/content/gdrive/MyDrive/qa_data.json', 'qa')
sum_df, prompt_s, correct_s, gtp4_s, Info_s = load_data_JSON('/content/gdrive/MyDrive/summarization_data.json', 'sum')

In [None]:
# Access input (API key, etc.)
access = input('Access code?')

In [None]:
#setup of model
tokenizer, model = load_model("meta-llama/Llama-2-7b-chat-hf", access)

In [None]:
gen = pipeline("text-generation", model=model,torch_dtype = torch.float16, tokenizer=tokenizer)

In [None]:
# System prompt to enforce English-only responses
system_prompt = """[INST] <>
You are a helpful assistant. Please ensure all responses are in clear and fluent English, without using any other languages.
<>
"""

In [None]:
# Function to check if the text is in English
def is_english(text):
    try:
        lang = detect(text)
        return lang == 'en'
    except:
        return False

In [None]:
# Get response without retries, enforce English via system prompt
def get_response(prompt, max_len):
    prompt_with_system = system_prompt + prompt + " [/INST]"
    
    sequences = gen(prompt_with_system, do_sample=True, top_k=5, num_return_sequences=1, 
                    eos_token_id=tokenizer.eos_token_id, max_length=max_len + len(prompt))
    response = sequences[0]['generated_text']

    # Detect language of the response
    if is_english(response):
        return response
    else:
        return "Non-English response detected."

In [None]:
def chainpoll(model, tokenizer, prompts, num_responses=5):
    out_q = []

    for i, prompt in enumerate(prompts[:2]):
        hallucination_scores = []
        explanations = []

        prompt_with_system = system_prompt + f"{prompt} [/INST]"

        for _ in range(num_responses):
            inputs = tokenizer(prompt_with_system, return_tensors="pt").to("cuda")
            with torch.no_grad():
                outputs = model.generate(
                    inputs['input_ids'],
                    max_new_tokens=50,
                    num_beams=5,
                    early_stopping=True
                )
            response = tokenizer.decode(outputs[0], skip_special_tokens=True)

            # Ask about hallucinations for each response
            hallucination_prompt = f"Does the following output contain hallucinations? Explain in detail:\n\nOutput: {response}\n"
            hallucination_inputs = tokenizer(hallucination_prompt, return_tensors="pt").to("cuda")
            with torch.no_grad():
                hall_outputs = model.generate(
                    hallucination_inputs['input_ids'],
                    max_new_tokens=100,
                    num_beams=5,
                    early_stopping=True
                )
            hallucination_response = tokenizer.decode(hall_outputs[0], skip_special_tokens=True)

            # Evaluate hallucination score based on the explanation (if "yes" or hallucinations are detected)
            if "yes" in hallucination_response.lower():
                hallucination_scores.append(1)
            else:
                hallucination_scores.append(0)
            
            explanations.append(hallucination_response)

        # Calculate average hallucination score for this prompt
        avg_hallucination_score = sum(hallucination_scores) / num_responses

        # Append result with average score and explanations
        out_q.append({
            "prompt": prompt,
            "response": response,  # Only the final response will be saved (can change if needed)
            "hallucination_score": avg_hallucination_score,
            "explanation": explanations  # All explanations will be stored
        })

    return out_q

In [None]:
# Function to ensure output is in English before formatting
def format_output_english_only(chainpoll_results):
    formatted_out_q = []
    
    for result in chainpoll_results:
        # Check if response is in English
        if is_english(result['response']):
            formatted_output = f"""
            Prompt: {result['prompt']}

            Response: {result['response']}

            Hallucination Score: {result['hallucination_score']}

            Explanation: {result['explanation']}
            """
            formatted_out_q.append(formatted_output)
        else:
            # Optional: If you want to flag non-English responses
            print(f"Non-English response detected, skipping:\n{result['response']}\n")
    
    return formatted_out_q

In [None]:
chainpoll_results = chainpoll(model, tokenizer, prompt_q)

In [None]:
# Format the output
formatted_out_q = format_output_english_only(chainpoll_results)

In [None]:
# Write the results
write_out("/content/gdrive/MyDrive/ChainPoll_qa_results.csv", formatted_out_q)