In [1]:
import os
import json
import boto3
from botocore.exceptions import ClientError
import re
from dotenv import load_dotenv
from typing import List, Dict

load_dotenv()

class MathReasoningLLM:
    def __init__(self):
        self.region_name = os.getenv('AWS_REGION')
        self.model_id = os.getenv('BEDROCK_MODEL_ID')
        self.aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
        self.aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
        
        if not all([self.region_name, self.model_id, self.aws_access_key_id, self.aws_secret_access_key]):
            raise ValueError("Missing required environment variables. Please check your .env file.")
        
        self.client = boto3.client('bedrock-runtime', 
                                   region_name=self.region_name,
                                   aws_access_key_id=self.aws_access_key_id,
                                   aws_secret_access_key=self.aws_secret_access_key)
        self.dataset = self.load_dataset()[:5]  # Load only first 5 questions

    def load_dataset(self) -> List[Dict]:
        with open('RepresentativeDataset.json', 'r') as f:
            return json.load(f)

    def create_prompt(self, question: str, options: List[str]) -> str:
        prompt = f"Solve the following mathematical problem:\n\nQuestion: {question}\n"
        prompt += f"Options: {', '.join(options)}\n\n"
        prompt += "Provide your solution step by step and clearly state your final answer at the end, prefixed with the option letter as 'Final Answer: '."
        return f"""<begin_of_text>
<start_header_id>user<end_header_id>
{prompt}
<eot_id>
<start_header_id>assistant<end_header_id>
"""

    def create_grading_prompt(self, problem: Dict, solution: str) -> str:
        prompt = f"""Evaluate the following solution to this math problem:

Problem: {problem['question']}
Correct Answer: {problem['answer']}

Given Solution:
{solution}

Please provide:
1. A score from 0 to 1, where 0 is completely incorrect and 1 is perfectly correct.
2. A brief explanation of the score.
3. State whether the final answer matches the correct answer (Yes/No).

Format your response as:
Score: [score]
Explanation: [your explanation]
Correct Final Answer: [Yes/No]
"""
        return f"""<begin_of_text>
<start_header_id>user<end_header_id>
{prompt}
<eot_id>
<start_header_id>assistant<end_header_id>
"""

    def invoke_model(self, prompt: str, max_tokens: int = 1024, temperature: float = 0.7) -> str:
        request_body = {
            "prompt": prompt,
            "max_gen_len": max_tokens,
            "temperature": temperature,
        }
        
        try:
            response = self.client.invoke_model(
                body=json.dumps(request_body),
                contentType='application/json',
                modelId=self.model_id
            )
            response_body = json.loads(response['body'].read().decode('utf-8'))
            return response_body.get('generation', '')
        except ClientError as e:
            print(f"ERROR: Can't invoke '{self.model_id}'. Reason: {e}")
            return None

    def extract_model_answer(self, model_solution: str) -> str:
        match = re.search(r"Final Answer:\s*(\w+)", model_solution, re.IGNORECASE)
        if match:
            return match.group(1).lower()
        return None

    def generate_solution_paths(self, problem: Dict, max_paths: int = 5) -> List[str]:
        paths = []
        for _ in range(max_paths):
            response = self.invoke_model(self.create_prompt(problem['question'], problem['options']))
            if response and response not in paths:
                paths.append(response)
            if len(paths) == max_paths or _ == max_paths - 1:
                break
        return paths

    def grade_solution_path(self, problem: Dict, solution: str) -> Dict:
        response = self.invoke_model(self.create_grading_prompt(problem, solution))
        
        score_match = re.search(r"Score:\s*(\d+(\.\d+)?)", response)
        explanation_match = re.search(r"Explanation:\s*(.+?)(?:\n|$)", response, re.DOTALL)
        correct_answer_match = re.search(r"Correct Final Answer:\s*(Yes|No)", response)
        
        score = float(score_match.group(1)) if score_match else 0.0
        explanation = explanation_match.group(1) if explanation_match else "No explanation provided"
        correct_final_answer = correct_answer_match.group(1) == "Yes" if correct_answer_match else False
        
        return {
            "score": score,
            "explanation": explanation,
            "correct_final_answer": correct_final_answer
        }

    def validate_answer(self, problem: Dict, model_answer: str) -> bool:
        correct_answer = problem['answer'].lower()
        
        if model_answer and len(model_answer) == 1 and model_answer.isalpha():
            return model_answer.lower() == correct_answer
        
        if correct_answer.isalpha() and ord('a') <= ord(correct_answer) <= ord('d'):
            index = ord(correct_answer) - ord('a')
            if 0 <= index < len(problem['options']):
                correct_option = problem['options'][index]
                return self.compare_latex(model_answer, correct_option)
        
        return False

    def compare_latex(self, expr1: str, expr2: str) -> bool:
        clean1 = re.sub(r'[\\{\}\s]', '', expr1)
        clean2 = re.sub(r'[\\{\}\s]', '', expr2)
        return clean1 == clean2

    def solve_problem(self, problem: Dict) -> Dict:
        paths = self.generate_solution_paths(problem)
        graded_paths = []
        for path in paths:
            model_answer = self.extract_model_answer(path)
            grade_result = self.grade_solution_path(problem, path)
            is_correct = self.validate_answer(problem, model_answer) if model_answer else False
            
            if grade_result['correct_final_answer'] and is_correct:
                graded_paths.append({
                    "solution": path,
                    "grade": grade_result['score'],
                    "explanation": grade_result['explanation'],
                    "model_answer": model_answer,
                    "is_correct": is_correct
                })
        
        graded_paths.sort(key=lambda x: x["grade"], reverse=True)
        return {
            "question": problem['question'],
            "options": problem['options'],
            "correct_answer": problem['answer'],
            "solutions": graded_paths
        }

    def improve_reasoning(self) -> List[Dict]:
        results = []
        for problem in self.dataset:
            result = self.solve_problem(problem)
            results.append(result)
        return results

def main():
    try:
        llm = MathReasoningLLM()
        results = llm.improve_reasoning()

        print("Results:")
        total_correct = 0
        for i, result in enumerate(results, 1):
            print(f"\nProblem {i}:")
            print(f"Q: {result['question']}")
            print(f"Options: {', '.join(result['options'])}")
            print(f"Correct answer: {result['correct_answer']}")
            for j, solution in enumerate(result['solutions'], 1):
                print(f"\nSolution {j} (Grade: {solution['grade']:.2f}):")
                print(f"Model's solution:\n{solution['solution']}")
                print(f"Model's extracted answer: {solution['model_answer']}")
                print(f"Is correct: {solution['is_correct']}")
                print(f"Explanation: {solution['explanation']}")
            
            correct_solutions = len(result['solutions'])
            total_correct += 1 if correct_solutions > 0 else 0
            print(f"Number of correct solutions: {correct_solutions}")
            print("-" * 50)

        accuracy = total_correct / len(results) if results else 0
        print(f"\nOverall Accuracy: {accuracy:.2%}")

    except ValueError as e:
        print(f"Error: {e}")
        print("Please ensure all required environment variables are set in your .env file:")
        print("AWS_REGION, BEDROCK_MODEL_ID, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY")

if __name__ == "__main__":
    main()

Results:

Problem 1:
Q: If \(\int\left(e^{2 x}+2 e^{x}-e^{-x}-1\right) e^{\left(e^{x}+e^{-x}\right)} d x=g(x) e^{\left(e^{x}+e^{-x}\right)}+c\), where \(c\) is a constant of integeration, then \(g(0)\) is equal to:
Options: \(e\), \(e^{2}\), 1, 2
Correct answer: d

Solution 1 (Grade: 0.80):
Model's solution:
To solve this problem, we can start by noticing that the given integral is a product of two functions:

$$\int\left(e^{2 x}+2 e^{x}-e^{-x}-1\right) e^{\left(e^{x}+e^{-x}\right)} d x$$

Let's call the first function $f(x) = e^{2 x}+2 e^{x}-e^{-x}-1$ and the second function $h(x) = e^{\left(e^{x}+e^{-x}\right)}$.

We can then use the substitution method to evaluate the integral. Let's substitute $u = e^{x} + e^{-x}$, which implies that $du/dx = e^{x} - e^{-x}$. Then, we have:

$$dx = \frac{du}{e^{x} - e^{-x}}$$

Now, we can rewrite the integral as:

$$\int\left(e^{2 x}+2 e^{x}-e^{-x}-1\right) e^{\left(e^{x}+e^{-x}\right)} d x = \int\left(u^{2}+2 u-u^{-1}-1\right) e^{u} \frac{du}{e^{x