In [None]:
import json
from pathlib import Path
import openai
import re
import math
import numpy as np
import torch
from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer
import time

from utils import extract_numbers,find_first_empty_index,process_scientific_notation,extract_numeric_answer
openai.api_key  = "YOUR OPENAI API KEY"

from sentence_transformers import SentenceTransformer

model_sentence = SentenceTransformer("all-MiniLM-L6-v2")

In [None]:
# Download APBench_Gamma.json from woodywu/APBench 
filename = 'APBench_Gamma.json'
data = json.loads(Path(filename).read_text())
# Print the tenth problem and the 1st question among the questions
index, order = 10, 1
content   = data[index]['Content']
question_solution_so_on = data[index]['Questions'][order]
print("Question:", question_solution_so_on['Question'])
print("Solution:", question_solution_so_on['Solution'])
print("Answer:", question_solution_so_on['Answer'])
print("Format:", question_solution_so_on['Format'])
print("Source:", question_solution_so_on['Source'])

In [None]:
def get_embedding(text):
    return model_sentence.encode(text)

def cosine_similarity(a, b):
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

### Example question

In [None]:
msg = '''The Viking Mars orbiter probe was placed in a circular orbit 17,000 km above the Martian surface. Doppler measurements of the transmitted signals from the probe indicated that it was at an orbital velocity, \( V_V \), of 1.46 km/s. The diameter of Mars, \( d_M \), is 6,770 km and the radius of the Mars orbit about the Sun, \( a_M \), is 1.524 AU. Note: 1 AU = 1.495 × 10^8 km; 1 year = 3.156 × 10^7 s. Calculate the mass of Mars, \( M_M \), in terms of the mass of the Sun, \( M_S \). Without any extra words, please specify the answer in the final sentence with this form: 'the answer is (answer)'. The answer should either be numeric or symbolic (not words).'''

In [None]:
# Load the model and tokenizer
model = AutoModelForCausalLM.from_pretrained("AstroMLab/AstroSage-8b", device_map="auto")
tokenizer = AutoTokenizer.from_pretrained("AstroMLab/AstroSage-8b")

# Function to generate a response
def generate_response(prompt):
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

    outputs = model.generate(
        **inputs,
        # max_new_tokens=256,
        # max_length=2024,
        max_length=6000,
        do_sample=True,
        pad_token_id=tokenizer.eos_token_id,
    )
    response = outputs[0][inputs['input_ids'].shape[-1]:]
    decoded = tokenizer.decode(response, skip_special_tokens=True)

    return decoded

# Example usage
prompt = """
You are an expert in general astrophysics. Your task is to answer the following question:
What are the main components of a galaxy?
"""
response = generate_response(prompt)
print(response)


In [None]:
start_time = time.time()
for i in range(1):
    # messages = [
    #     {"role": "user", "content": msg}
    # ]
    # generated_text = pipe(
    #     messages,
    #     max_length = 2024
    # )
    reply = generate_response(msg)
    print(reply)
print(f'Elapsed time: {(time.time()-start_time)/1}')

In [None]:
len(data[10]['Questions'])

In [None]:
match_result = []  # Universal array of results

for idx in range(len(data)): #
    # idx = 2
    print("_____________")
    print("ID", idx)
    print("current_match_result: ", match_result)


    content_str = data[idx]['Content']
    questions_solutions = data[idx]['Questions']
    question_strs = [entry['Question'] for entry in questions_solutions]
    solution_strs = [entry['Solution'] for entry in questions_solutions]
    answer_strs = [entry['Answer'] for entry in questions_solutions]
    format_strs = [entry['Format'] for entry in questions_solutions]
    match_results_local = [[] for entry in questions_solutions]

    # print("content: ",content_str)

    correct_QAs = []
    for q_idx, question_str in enumerate(question_strs):
        print("______")
        print("q_idx: ", q_idx)
        format_str = format_strs[q_idx]
        solution_str = solution_strs[q_idx]
        answer_str = answer_strs[q_idx]
        print("Answer: ", answer_str)
        all_correct_QAs = " ".join(correct_QAs)
        msg = f"{all_correct_QAs} {content_str} {question_str} Without any extra words, please specify the answer in the final sentence with this form: 'the answer is (answer)'. The answer should either be numeric or symbolic (not words)."

        print("msg", msg)
        
        # calling model
        reply = generate_response(msg)
        print("reply: ", reply)
        
        pattern = r'the answer is \s*(.*)|The answer is \s*(.*)' 
        match = re.search(pattern, reply)
        try:
            user_answer_str = match.group(0)
            print("user_answer_str: ", user_answer_str)
        except:
            user_answer_str = None

        if 'message' == format_str:
            print("MESSAGE")

            score = 0

            # embedding score
            score_embedding = cosine_similarity(get_embedding(reply), get_embedding(answer_str))
            print(f"similarity: {score_embedding}")
            
            # gpt score
            # gpt prompt
            message = f"""
            Compare the following reply with the expected answer and evaluate their alignment using these criteria:

            1. **Relevance:** Does the reply address the scientific goals and concepts mentioned in the expected answer?  
            2. **Completeness:** Does the reply cover the key points about the need for a deep periapsis and a large range of distances?  
            3. **Accuracy:** Is the reasoning in the reply correct and consistent with the scientific explanation?  

            Question: {question_str}  
            Expected Answer: {answer_str}  
            Reply: {reply}  

            After evaluating, provide a similarity score between 0 and 10, where:  
            - **10** means the reply perfectly aligns with the expected answer.  
            - **0** means the reply does not align at all.  

            Only return the numeric score as your final output.
            """

            # response part
            response = openai.chat.completions.create(
                model="gpt-4",
                messages=[{"role": "user", "content": message}],
                temperature=0,
            )
            gpt_similarity = response.choices[0].message.content
            score_gpt = float(gpt_similarity) / 10 

            # combining scores
            score = score_embedding*0.5 + score_gpt*0.5
            print(f"score_gpt: {score_gpt}")
            print(f"SCORE: {score}")
            
            if score > 0.6: # more than 60% affirmative
                match_results_local[q_idx] = 1
                extra_str = "For reference, a previous question: '" + content_str + question_str + " '  has answer " + reply
                correct_QAs.append(extra_str)
            else:
                match_results_local[q_idx] = 0 

            ## formated output
            print(f'LLM answer: {reply}')
            print(f'Reference answer: {answer_str}')
            print('True' if match_results_local[q_idx]==1 else 'False')
           
        if 'numeric' == format_str:
            print("NUMERIC")

            power = []
            try:
                if user_answer_str is None:
                    user_answer_str = ''
                    raise TypeError("user_answer_str cannot be None")
                user_answer_num_str = extract_numeric_answer(user_answer_str)
                user_answer_num = extract_numbers(user_answer_num_str)[0]
            except:
                # check if answer string has power expression
                user_answer_str, power = process_scientific_notation(user_answer_str)
                user_answer_num = None

            # when openai is able to handle the operation
            if not power:
                user_answer_num = user_answer_num
                answer_str_num = extract_numbers(answer_str)[0]
                match_results_local[q_idx] = 0
            # other cases
            else:
                try:
                    if power:
                        print("pow")
                        user_answer_pow = extract_numbers(user_answer_str.strip())[2]
                        user_answer_num = extract_numbers(user_answer_str.strip())[0] * 10**user_answer_pow
                        power = extract_numbers(answer_str)[1]
                        answer_str_num = extract_numbers(answer_str)[0] * 10**power

                        print("user_answer_num", user_answer_num)
                        print("answer_str_num", answer_str_num)

                    else:
                        print("no pow")
                        print(user_answer_str.strip())
                        user_answer_num = extract_numbers(user_answer_str.strip())[0]
                        answer_str_num = extract_numbers(answer_str)[0]

                        print("user_answer_num", user_answer_num)
                        print("answer_str_num", answer_str_num)

                except:
                    user_answer_num = None
                    answer_str_num = None
                    match_results_local[q_idx] = 0

            if user_answer_num is not None and user_answer_num != 0:
                
                margin = 0.1 + 0.01 * math.log(abs(user_answer_num))
                print("margin: ", margin)
                if answer_str_num == user_answer_num:
                    match_results_local[q_idx] = 1
                    extra_str = "For reference, a previous question: " + question_str + ". It has answer " + str(user_answer_num)
                    correct_QAs.append(extra_str)
                    # print("Exact Match")
                elif abs(answer_str_num - user_answer_num) / abs(user_answer_num) < abs(margin):
                    match_results_local[q_idx] = 1
                    extra_str = "For reference, a previous question: " + question_str + ". It has answer " + str(user_answer_num)
                    correct_QAs.append(extra_str)
                    # print("Rough Match")
                else:
                    match_results_local[q_idx] = 0
                    # print("No Match")
            
            ## formated output
            print(f'LLM answer: {user_answer_str}')
            print(f'Reference answer: {answer_str}')
            print('True' if match_results_local[q_idx]==1 else 'False')

    if match_results_local:
        match_results_local_new = [item for item in match_results_local if not (isinstance(item, list) and len(item) == 0)]
        print("local", match_results_local_new)
        match_result.extend(match_results_local_new)

# Print the universal match result array
print("Universal match result array:", match_result)


total_accuracy = sum(match_result) / len(match_result)

In [None]:
print(total_accuracy)