In [2]:
import os
os.environ['HF_HOME'] = '/workspace'
print("HF_HOME is set to:", os.getenv('HF_HOME'))

HF_HOME is set to: /workspace


In [3]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer,DataCollatorForLanguageModeling
from peft import LoraConfig, get_peft_model
from datasets import Dataset

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
import json
import re
import torch
import sys
import traceback
import pandas as pd
from transformers import pipeline
import logging
from tqdm import tqdm

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

import json
import re
import torch
import sys
from tqdm import tqdm
import traceback
import pandas as pd
from transformers import pipeline, AutoTokenizer
from datasets import Dataset
from torch.utils.data import DataLoader


def inspect_json_data(file_path, num_samples=5):
    with open(file_path, 'r') as file:
        data = json.load(file)
        for i in range(num_samples):
            print(f"Sample {i+1}: {data[i]}")


def load_json_file(file_path):
    def fix_data_types(item):
        for key, value in item.items():
            if isinstance(value, float):
                item[key] = str(value)  # Convert floats to strings (if applicable)
            elif isinstance(value, list):
                # Recursively handle lists
                item[key] = [str(v) if isinstance(v, float) else v for v in value]
            elif isinstance(value, dict):
                # Recursively handle nested dictionaries
                item[key] = fix_data_types(value)
        return item

    try:
        with open(file_path, 'r') as file:
            data = json.load(file)
        data=data[:16]
        # Ensure all items are dictionaries and fix data types
        return [fix_data_types(item) for item in data if isinstance(item, dict)]
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON file: {e}")
        sys.exit(1)
    except FileNotFoundError:
        print(f"File not found: {file_path}")
        sys.exit(1)
    except Exception as e:
        print(f"An error occurred while reading the file: {e}")
        sys.exit(1)

        

def extract_variables(input_values):
    filtered_keys = [key for key in input_values.keys()]
    return filtered_keys

def generate_python_code_batch(pipe, latex_expressions, variables_list):
    prompts = [f"""Convert the following LaTeX expression into a Python function named 'calculate' that takes the given parameters:
{latex_expression}

Follow these guidelines strictly:
1. The code must contain a Python function definition using `def calculate({', '.join(variables)}):` for all equations including derivatives. Import necessary libraries and functions at the beginning of the function. Prefer using `numpy` for mathematical operations and `SymPy` for symbolic computations.
2. Use `SymPy` exclusively to solve equations involving calculus operations like derivative, differentiation, integration, and logarithmic functions. For example, if the expression involves differentiation, compute the derivative directly inside the function. Do not use `sp.lambdify`.
3. Ensure the function is callable with the provided parameters and returns the calculated result as `int`, `float`, or `complex number` with 'return'.

Now, apply the above instructions method to convert the given LaTeX expression.

Python Code:
```python
"""



        for latex_expression, variables in zip(latex_expressions, variables_list)
    ]

    gen_config = {
        "max_new_tokens": 512,
        "do_sample": False,
    }

    try:
        outputs = pipe(prompts, **gen_config)
        results = []
        for output, latex_expression in zip(outputs, latex_expressions):
            generated_text = output[0]["generated_text"]
            python_code = re.findall(r"```python(.*?)```", generated_text, re.DOTALL)
            if not python_code:
                logger.error(f"No Python code generated for LaTeX expression: {latex_expression}")
                results.append((None, "No Python code generated"))
            else:
                results.append((python_code[0].strip(), None))
        return results
    except Exception as e:
        logger.error(f"Error generating Python code for batch: {e}")
        return [(None, f"Error generating Python code: {str(e)}") for _ in latex_expressions]


def process_json_data_batch(pipe, json_data_list, batch_size=8):
    results = []
    total_batches = (len(json_data_list) + batch_size - 1) // batch_size  # Calculate total number of batches

    for i in tqdm(range(0, len(json_data_list), batch_size), total=total_batches, desc="Processing batches"):
        batch = json_data_list[i:i+batch_size]
        
        latex_expressions = []
        variables_list = []
        task_ids = []

        for json_data in batch:
            task_id = json_data.get("task_id", "unknown")
            latex_expression = json_data.get("latex_expression")
            test_cases = json_data.get("test_cases", [])

            if not latex_expression:
                logger.warning(f"Task ID {task_id}: No LaTeX expression provided")
                results.append((task_id, [{"latex_expression": latex_expression, "error": "Error: No LaTeX expression provided"}]))
                continue

            if not test_cases:
                logger.warning(f"Task ID {task_id}: No test cases provided")
                results.append((task_id, [{"latex_expression": latex_expression, "error": "Error: No test cases provided"}]))
                continue

            variables = extract_variables(test_cases[0].get("input", {}))
            
            latex_expressions.append(latex_expression)
            variables_list.append(variables)
            task_ids.append(task_id)

        if latex_expressions:
            batch_results = generate_python_code_batch(pipe, latex_expressions, variables_list)
            for task_id, latex_expression, (python_code, error) in zip(task_ids, latex_expressions, batch_results):
                if error:
                    logger.error(f"Task ID {task_id}: {error}")
                    results.append((task_id, [{"latex_expression": latex_expression, "error": error}]))
                else:
                    results.append((task_id, [{"latex_expression": latex_expression, "generated_code": python_code}]))

    return results

def main(input_file, output_file):
    try:
        model_name = './fine_tuned_aimo_lora_model_v3'
        # Load the model
        pipe = pipeline("text-generation", model=model_name, tokenizer='./full_latest_v3_tokenizer', torch_dtype=torch.bfloat16, device_map="auto")
        pipe.model.eval()
        logger.info("Model loaded and evaluation mode set.")

        # Load JSON data
        json_data_list = load_json_file(input_file)

        # Initialize a dictionary to store results
        results_dict = {
            "id": [],
            "latex_expression": [],
            "generated_code": [],
            "error": []
        }

        # Process JSON objects in batches and store results in the dictionary
        batch_size = 8  # You can adjust this value based on your GPU memory
        results = process_json_data_batch(pipe, json_data_list, batch_size)

        logger.info("Processing results...")
        for task_id, task_results in tqdm(results, desc="Storing results"):
            for result in task_results:
                results_dict["id"].append(task_id)
                results_dict["latex_expression"].append(result.get("latex_expression", ""))
                results_dict["generated_code"].append(result.get("generated_code", ""))
                results_dict["error"].append(result.get("error", ""))

        # Convert the dictionary to a DataFrame
        df = pd.DataFrame(results_dict)

        # Write the DataFrame to a CSV file
        df.to_csv(output_file, index=False)

        logger.info(f"Results have been written to {output_file}")

    except Exception as e:
        logger.error(f"An unexpected error occurred: {e}")
        traceback.print_exc()

input_file = 'public_test_new_no_sol_no_out.json'
output_file = 'codes_26_3.csv'
main(input_file, output_file)

2024-08-26 17:51:18,553 - INFO - We will use 90% of the memory on device 0 for storing the model, and 10% for the buffer to avoid OOM. You can set `max_memory` in to a higher value to use more memory (at your own risk).
2024-08-26 17:51:31,568 - INFO - Model loaded and evaluation mode set.
Processing batches:   0%|          | 0/2 [00:00<?, ?it/s]

In [None]:
import json
import re
import torch
import sys
from tqdm import tqdm
import traceback
import pandas as pd
from transformers import pipeline, AutoTokenizer
from datasets import Dataset
from torch.utils.data import DataLoader


def inspect_json_data(file_path, num_samples=5):
    with open(file_path, 'r') as file:
        data = json.load(file)
        for i in range(num_samples):
            print(f"Sample {i+1}: {data[i]}")


def load_json_file(file_path):
    def fix_data_types(item):
        for key, value in item.items():
            if isinstance(value, float):
                item[key] = str(value)  # Convert floats to strings (if applicable)
            elif isinstance(value, list):
                # Recursively handle lists
                item[key] = [str(v) if isinstance(v, float) else v for v in value]
            elif isinstance(value, dict):
                # Recursively handle nested dictionaries
                item[key] = fix_data_types(value)
        return item

    try:
        with open(file_path, 'r') as file:
            data = json.load(file)
        data=data[:8]
        # Ensure all items are dictionaries and fix data types
        return [fix_data_types(item) for item in data if isinstance(item, dict)]
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON file: {e}")
        sys.exit(1)
    except FileNotFoundError:
        print(f"File not found: {file_path}")
        sys.exit(1)
    except Exception as e:
        print(f"An error occurred while reading the file: {e}")
        sys.exit(1)

        

def extract_variables(input_values):
    filtered_keys = [key for key in input_values.keys()]
    return filtered_keys

def generate_python_code(pipe, latex_expression, variables):
    prompt = f"""
Convert this LaTeX expression to a Python function named 'def calculate' that takes the following parameters: {','.join(variables)}
Latex: {latex_expression}
Just Give me a python function with all necessary libraries and functions used and put all code inside the function. The function must give output in integer or float or complex number.
1. Use `numpy` for general mathematical operations (e.g., `1/np.tan` instead of `np.cot`).
2. Use `sympy` for logarithms, calculus, differentiation, and integrals. Handle complex numbers if they appear.
3.Use python and sympy to define the function directly.

Python Code:
"""

    gen_config = {
        "max_new_tokens": 2000,  # Increased token limit
        "do_sample": False,
        "stop_strings": ["```output"],
        "tokenizer": pipe.tokenizer,
    }
    try:
        outputs = pipe(prompt, **gen_config)
        generated_text = outputs[0]["generated_text"]

        # Print the raw output for debugging
        #print(f"Raw generated text for LaTeX expression '{latex_expression}':\n{generated_text}\n")

        python_code = re.findall(r"```python(.*?)```", generated_text, re.DOTALL)
        if not python_code:
            return None, "No Python code generated"
        return python_code[0].strip(), None
    except Exception as e:
        return None, f"Error generating Python code: {str(e)}"

def process_json_data(pipe, json_data):
    task_id = json_data.get("task_id", "unknown")
    latex_expression = json_data.get("latex_expression")
    test_cases = json_data.get("test_cases", [])

    print(f"Processing task ID: {task_id}")

    if not latex_expression:
        return task_id, [{"latex_expression": latex_expression, "error": "Error: No LaTeX expression provided"}]

    if not test_cases:
        return task_id, [{"latex_expression": latex_expression, "error": "Error: No test cases provided"}]

    if not isinstance(test_cases[0], dict):
        return task_id, [{"latex_expression": latex_expression, "error": "Error: Test case is not a dictionary"}]

    variables = extract_variables(test_cases[0].get("input", {}))
    python_code, error = generate_python_code(pipe, latex_expression, variables)
    if error:
        return task_id, [{"latex_expression": latex_expression, "error": error}]

    return task_id, [{"latex_expression": latex_expression, "generated_code": python_code}]

def custom_collate(batch):
    return [{k: v for k, v in item.items() if v is not None} for item in batch if item]

def process_batch(pipe, batch):
    results = []
    for item in batch:
        task_id, result = process_json_data(pipe, item)
        results.extend([(task_id, r) for r in result])
    return results

from datasets import Dataset
import pandas as pd
from torch.utils.data import DataLoader

def main(input_file, output_file):
    try:
        model_name = './fine_tuned_aimo_lora_model_v2'


        pipe = pipeline("text-generation", model=model_name,torch_dtype=torch.bfloat16, device_map="auto")
        pipe.model.eval()

        # Load JSON data and fix types
        json_data_list = load_json_file(input_file)

        # Create a Hugging Face Dataset
        dataset = Dataset.from_list(json_data_list)

        # Create a DataLoader with custom collate function
        dataloader = DataLoader(dataset, batch_size=8, shuffle=False, collate_fn=custom_collate)

        results_dict = {
            "id": [],
            "latex_expression": [],
            "generated_code": [],
            "error": []
        }

        for batch in tqdm(dataloader, desc="Processing batches"):
            batch_results = process_batch(pipe, batch)
            for task_id, result in batch_results:
                results_dict["id"].append(task_id)
                results_dict["latex_expression"].append(result.get("latex_expression", ""))
                results_dict["generated_code"].append(result.get("generated_code", ""))
                results_dict["error"].append(result.get("error", ""))

        df = pd.DataFrame(results_dict)
        df.to_csv(output_file, index=False)

        print(f"Results have been written to {output_file}")

    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        traceback.print_exc()

input_file = 'public_test_new_no_sol_no_out.json'
output_file = 'codes_25_2.csv'
main(input_file, output_file)



In [2]:
import re
import torch
from transformers import pipeline

pipe = pipeline("text-generation", model="./fine_tuned_aimo_lora_model_v3",tokenizer='AI-MO/NuminaMath-7B-TIR', torch_dtype=torch.bfloat16, device_map="auto")

messages = [
    {"role": "user", "content": "Convert this LaTeX expression '\\mathtt{\\text{Derivative(a*x + b + x**2 + 4*x + sqrt(a + exp(x)) + 3, x)}}' to a Python function named 'calculate'. Just Give me a callable function with all necessary libraries and functions used and put all code inside the function. The function must give output in integer or float."},
]
prompt = pipe.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)

gen_config = {
    "max_new_tokens": 1024,
    "do_sample": False,
    "stop_strings": ["```output"], # Generate until Python code block is complete
    "tokenizer": pipe.tokenizer,
}

outputs = pipe(prompt, **gen_config)
text = outputs[0]["generated_text"]
print(text)


python_code = re.findall(r"```python(.*?)```", text, re.DOTALL)[0]
exec(python_code)


### Problem: Convert this LaTeX expression '\mathtt{\text{Derivative(a*x + b + x**2 + 4*x + sqrt(a + exp(x)) + 3, x)}}' to a Python function named 'calculate'. Just Give me a callable function with all necessary libraries and functions used and put all code inside the function. The function must give output in integer or float.
### Solution: 
To solve the problem, we need to convert the given LaTeX expression into a Python function that can be called. The expression is a derivative calculation involving symbolic variables and functions.

The given LaTeX expression is:
\[ \text{Derivative(a*x + b + x**2 + 4*x + \sqrt{a + \exp(x)} + 3, x)} \]

We will use the `sympy` library in Python to perform the differentiation. Here's the step-by-step process:

1. **Import the necessary libraries:**
   - `sympy` is a Python library for symbolic mathematics.

2. **Define the necessary symbols and functions:**
   - We will define the symbols `a`, `b`, and `x`.
   - We will also define the function `sq

# Testing on Competition Data without Finetuning

In [None]:
import json
import re
import torch
from transformers import pipeline
import sys
import traceback

def load_json_file(file_path):
    try:
        with open(file_path, 'r') as file:
            return json.load(file)
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON file: {e}")
        sys.exit(1)
    except FileNotFoundError:
        print(f"File not found: {file_path}")
        sys.exit(1)
    except Exception as e:
        print(f"An error occurred while reading the file: {e}")
        sys.exit(1)

def extract_variables(input_values):
    return input_values.keys()

def generate_python_code(pipe, latex_expression, variables):
    prompt = f"""Convert this LaTeX expression to a Python function named 'calculate' that takes the following parameters: {', '.join(variables)}:
{latex_expression}

The function should evaluate the expression and return a numerical result as a float or an integer.
Use math functions from the math module (e.g., math.sin, math.cos) for trigonometric functions.
For inverse trigonometric functions, use math.asin, math.acos, math.atan.
For hyperbolic functions, use math.sinh, math.cosh, math.tanh.
For logarithms, use math.log for natural log and math.log10 for base 10 log.
Use math.pi for π and math.e for e.
For complex numbers, use the cmath module.

Python function:"""

    gen_config = {
        "max_new_tokens": 1024,
        "do_sample": False,
        "stop_strings": ["```output"],
        "tokenizer": pipe.tokenizer,
    }
    try:
        outputs = pipe(prompt, **gen_config)
        generated_text = outputs[0]["generated_text"]
        python_code = re.findall(r"```python(.*?)```", generated_text, re.DOTALL)
        if not python_code:
            return None, "No Python code generated"
        return python_code[0].strip(), None
    except Exception as e:
        return None, f"Error generating Python code: {str(e)}"


def execute_python_code(code, input_values):
    try:
        local_scope = {}
        exec(code, globals(), local_scope)
        if 'calculate' not in local_scope:
            return 0
        
        # Dynamically pass all input variables to the calculate function
        result = local_scope['calculate'](**input_values)
        
        # Ensure that the result is evaluated to a number
        if isinstance(result, str):
            result = eval(result)

        if isinstance(result, complex):
            return f"{result.real}+{result.imag}j"
        return str(result)
    except Exception as e:
        return 0


def process_json_data(pipe, json_data):
    task_id = json_data.get("task_id", "unknown")
    latex_expression = json_data.get("latex_expression")
    test_cases = json_data.get("test_cases", [])

    if not latex_expression:
        return task_id, json.dumps(["Error: No LaTeX expression provided"])

    # Use the input variables from the first test case to generate the function
    if not test_cases:
        return task_id, json.dumps(["Error: No test cases provided"])

    # Extract variables from the first test case input
    variables = extract_variables(test_cases[0].get("input", {}))

    python_code, error = generate_python_code(pipe, latex_expression, variables)
    if error:
        return task_id, json.dumps([error])

    results = []
    for case in test_cases:
        input_values = case.get("input", {})
        result = execute_python_code(python_code, input_values)
        results.append(result)

    return task_id, json.dumps(results)

def main(input_file, output_file):
    try:
        # Load the model
        pipe = pipeline("text-generation", model="AI-MO/NuminaMath-7B-TIR", torch_dtype=torch.bfloat16, device_map="auto")

        # Load JSON data
        json_data_list = load_json_file(input_file)

        # Process each JSON object and store results
        results = {
        "id":[],
        "outputs":[]
        }
        for json_data in json_data_list:
            task_id, outputs = process_json_data(pipe, json_data)
            results['id'].append(f"{task_id}")
            results['outputs'].append(f"{outputs}")

        # Write results to a file
        with open(output_file, "w") as f:
            f.write("id,outputs\n")
            for result in results:
                f.write(f"{result}\n")

        print(f"Results have been written to {output_file}")

    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        traceback.print_exc()

input_file = 'public_test_new_no_sol_no_out.json'
output_file = 'sub.csv'
main(input_file, output_file)


# Training

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer,DataCollatorForLanguageModeling
from peft import LoraConfig, get_peft_model
from datasets import Dataset

In [None]:
import pandas as pd
from datasets import Dataset

def load_and_prepare_data_from_csv(file_path):
    # Load the CSV file into a DataFrame
    df = pd.read_csv(file_path)
    
    # Prepare the conversations list
    conversations = []
    for _, row in df.iterrows():
        latex_expression = row['latex_expression'].strip()
        solution = row['solution'].strip()
        
        conversation = [
            {"role": "user", "content": latex_expression},
            {"role": "assistant", "content": solution}
        ]
        conversations.append(conversation)
    
    # Convert the conversations list to a Dataset
    return Dataset.from_dict({"conversations": conversations})

# Load the dataset
dataset = load_and_prepare_data_from_csv('synthetic_data_final.csv')


In [None]:
# Your provided function to load and prepare data
def load_and_prepare_data(file_path):
    with open(file_path, 'r') as f:
        content = f.read()
    
    problems = content.split('==================================================')
    problems = [p.strip() for p in problems if p.strip()]
    
    conversations = []
    for problem in problems:
        parts = problem.split('Python Solution:')
        if len(parts) != 2:
            continue
        
        input_text = parts[0].strip()
        solution = parts[1].strip().replace('```python', '').replace('```', '').strip()
        
        conversation = [
            {"role": "user", "content": input_text},
            {"role": "assistant", "content": solution}
        ]
        conversations.append(conversation)
    
    return Dataset.from_dict({"conversations": conversations})

# Load the dataset
dataset = load_and_prepare_data('textbook_format_v2.txt')

In [None]:
import json
# Step 1: Load and prepare the data
def load_and_prepare_data(file_path):
    with open(file_path, 'r') as f:
        data = json.load(f)
    
    conversations = []
    for item in data:
        conversation = [
            {"role": "user", "content": f"Convert the following LaTeX expression to Python code:\nExpression: {item['sympy_exp']}\nLaTeX: {item['latex_expression']}"},
            {"role": "assistant", "content": f" {item['solution']} \n Simplified Solution: {item['simplified_solution']}"}
        ]
        conversations.append(conversation)
    
    return Dataset.from_dict({"conversations": conversations})

# Load your data
dataset = load_and_prepare_data('train.json')

In [None]:
# Load the model and tokenizer
model_tokenizer = "AI-MO/NuminaMath-7B-TIR"
model_name="./fine_tuned_aimo_lora_model_v3"
tokenizer = AutoTokenizer.from_pretrained(model_tokenizer)
model = AutoModelForCausalLM.from_pretrained(
    model_name, 
    torch_dtype=torch.float16,
    device_map="auto"
)

# Define LoRA Config
lora_config = LoraConfig(
    r=32,
    lora_alpha=64,
    target_modules=["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj"],
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM"
)

# Get the PEFT model
model = get_peft_model(model, lora_config)
# Ensure the tokenizer has a pad token
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

In [None]:
# Ensure the model is in training mode
model.train()



def prepare_train_features(examples):
    model_inputs = tokenizer.batch_encode_plus(
        [tokenizer.apply_chat_template(conv, tokenize=False, add_generation_prompt=True) for conv in examples['conversations']],
        truncation=True,
        padding='max_length',
        max_length=512,
        return_tensors="pt"
    )
    
    # Create labels by copying input_ids
    model_inputs["labels"] = model_inputs["input_ids"].clone()
    
    # Mask the prompt part in labels
    for i, conversation in enumerate(examples['conversations']):
        prompt = tokenizer.encode(conversation[0]["content"])
        prompt_length = len(prompt)
        model_inputs["labels"][i, :prompt_length] = -100
    
    return model_inputs

tokenized_dataset = dataset.map(prepare_train_features, batched=True, remove_columns=dataset.column_names)

In [None]:
# Set up the trainer
training_args = TrainingArguments(
    output_dir="./aimo",
    num_train_epochs=5,
    per_device_train_batch_size=2,
    gradient_accumulation_steps=32,
    save_steps=10_000,
    save_total_limit=2,
    logging_steps=100,
    learning_rate=2e-4,
    bf16=True,  # Use fp16 mixed precision
    optim="adamw_torch",
    max_grad_norm=0.3,
)

data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)

#model.to_empty(device=training_args.device)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset,
    data_collator=data_collator,

)

In [None]:
# Start training
trainer.train()

In [None]:
# Save the fine-tuned model
trainer.save_model("./fine_tuned_aimo_lora_model_v4")

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline

model_path = './fine_tuned_aimo_lora_model_v3'
tokenizer_path = './full_latest_72_tokenizer'

# Load the model and tokenizer from their respective paths
model = AutoModelForCausalLM.from_pretrained(
    model_path, 
    torch_dtype=torch.bfloat16,
    device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained(tokenizer_path)

# Now create the pipeline
pipe = pipeline("text-generation", model=model, tokenizer=tokenizer, device_map="auto")

# Save the model and tokenizer to the desired locations
pipe.model.save_pretrained('./full_latest_model_v3')
pipe.tokenizer.save_pretrained('./full_latest_v3_tokenizer')

print(pipe.model)


# Inference

In [15]:
import re
import torch
from transformers import pipeline

model_name='./fine_tuned_aimo_lora_model_v3'
model = AutoModelForCausalLM.from_pretrained(
    model_name, 
    torch_dtype=torch.float16,
    device_map="auto"
)

#tokenizer = AutoTokenizer.from_pretrained('./full_latest_v3_tokenizer')

pipe = pipeline("text-generation", model=model_name, torch_dtype=torch.bfloat16, device_map="auto")

messages = [
    {"role": "user", "content": """Convert this LaTeX expression to a Python function named 'calculate' that takes the following parameters: x:
{{3 \tan{\left(8 x \right)} + 5}}

Follow these guidelines strictly:
1. Import necessary libraries and functions at the beginning of the function. Prefer using `numpy` for mathematical operations and `SymPy` for symbolic computations.
2. Use `SymPy` to solve the equations, including calculus operations like differentiation, integration, and logarithmic functions. Do not use `sp.lambdify` for defining functions.
3. The function should return results as `int`, `float`, or `complex`. Use `SymPy` functions like `.evalf()` for numerical evaluation if needed.

Python Code:
```python
"""
    }]
prompt = pipe.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)

gen_config = {
    "max_new_tokens": 1024,
    "do_sample": False,
    "stop_strings": ["```output"], # Generate until Python code block is complete
    "tokenizer": pipe.tokenizer,
}

outputs = pipe(prompt, **gen_config)
text = outputs[0]["generated_text"]
print(text)


python_code = re.findall(r"```python(.*?)```", text, re.DOTALL)[0]
exec(python_code)


NameError: name 'AutoModelForCausalLM' is not defined

# Code Evaluation

In [None]:
import sys
import io

def run_code(compiled_code, func_name, test_case):
    # Create a dictionary to execute the code in its own namespace
    local_namespace = {}
    
    # Execute the compiled code
    exec(compiled_code, globals(), local_namespace)
    
    # Capture the standard output
    old_stdout = sys.stdout
    sys.stdout = io.StringIO()
    
    try:
        # Run the function from the compiled code with the test case as an argument
        output = local_namespace[func_name](*test_case)
    finally:
        # Reset standard output and retrieve the result
        sys.stdout = old_stdout
    
    return output


import math

def check_correctness(output, expected_output, rel_tol=1e-9, abs_tol=0.0):
    # Check if the outputs are numbers
    if isinstance(output, (int, float)) and isinstance(expected_output, (int, float)):
        # Use isclose for numerical comparison
        return math.isclose(output, expected_output, rel_tol=rel_tol, abs_tol=abs_tol)
    else:
        # Fallback to equality check for non-numerical outputs
        return output == expected_output

def compile_code(code):
    try:
        # Compile the code string into a code object
        compiled_code = compile(code, '<string>', 'exec')
        
        # Extract the function name(s) from the code (assuming there is only one top-level function)
        local_namespace = {}
        exec(compiled_code, globals(), local_namespace)
        
        # Get the function name by extracting keys from local_namespace that are callable (functions)
        func_name = None
        for name, obj in local_namespace.items():
            if callable(obj):
                func_name = name
                break
        
        return compiled_code, func_name
    
    except Exception as e:
        # In case of an error, return None and the error message
        print(f"Compilation failed: {e}")
        return None, None


In [None]:
def evaluate_code(generated_code_list, test_cases, expected_outputs):
    top_code = generated_code_list[0]
    top_compiled_code, top_func_name = compile_code(top_code)

    if top_compiled_code is None:
        return 0.0  # Top Code not compiled successfully, so accuracy is 0.

    correct_count = 0

    # Calculate accuracy for the top generated code
    for i, test_case in enumerate(test_cases):
        output = run_code(top_compiled_code, top_func_name, test_case)
        if check_correctness(output, expected_outputs[i]):
            correct_count += 1

    accuracy = correct_count / len(test_cases)
    return accuracy


# Latest

In [14]:
import json
import re
import torch
import sys
import traceback
import pandas as pd
from transformers import pipeline
import logging
from tqdm import tqdm
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def load_json_file(file_path):
    try:
        with open(file_path, 'r') as file:
            return json.load(file)
    except json.JSONDecodeError as e:
        logger.error(f"Error decoding JSON file: {e}")
        sys.exit(1)
    except FileNotFoundError:
        logger.error(f"File not found: {file_path}")
        sys.exit(1)
    except Exception as e:
        logger.error(f"An error occurred while reading the file: {e}")
        sys.exit(1)

def extract_variables(input_values):
    filtered_keys = [key for key in input_values.keys()]

    return filtered_keys

def generate_python_code(pipe, latex_expression, variables):
     
    prompt = f"""Convert the following LaTeX expression into a Python function named 'calculate' that takes the given parameters:
{latex_expression}

Follow these guidelines strictly:
1. The code must contain a Python function definition using `def calculate({', '.join(variables)}):` for all equations including derivatives. Import necessary libraries and functions at the beginning of the function. Prefer using `numpy` for mathematical operations and `SymPy` for symbolic computations.
2. Use `SymPy` exclusively to solve equations involving calculus operations like differentiation, integration, and logarithmic functions. For example, if the expression involves differentiation, compute the derivative directly inside the function. Do not use `sp.lambdify`.
3. Ensure the function is callable with the provided parameters and returns the calculated result as `int`, `float`, or `complex number` with 'return'.

Now, apply the above instructions method to convert the given LaTeX expression.

Python Code:
```python
"""



    gen_config = {
        "max_new_tokens": 512,
        "do_sample": False,
        "stop_strings": ["```output"],
        "tokenizer": pipe.tokenizer,
    }
    try:
        outputs = pipe(prompt, **gen_config)
        generated_text = outputs[0]["generated_text"]
        python_code = re.findall(r"```python(.*?)```", generated_text, re.DOTALL)
        if not python_code:
            logger.error(f"No Python code generated for LaTeX expression: {latex_expression}")
            return None, "No Python code generated"
        return python_code[0].strip(), None
    except Exception as e:
        logger.error(f"Error generating Python code for LaTeX expression {latex_expression}: {e}")
        return None, f"Error generating Python code: {str(e)}"

def process_json_data(pipe, json_data):
    task_id = json_data.get("task_id", "unknown")
    latex_expression = json_data.get("latex_expression")
    test_cases = json_data.get("test_cases", [])

    if not latex_expression:
        logger.warning(f"Task ID {task_id}: No LaTeX expression provided")
        return task_id, [{"latex_expression": latex_expression, "error": "Error: No LaTeX expression provided"}]

    if not test_cases:
        logger.warning(f"Task ID {task_id}: No test cases provided")
        return task_id, [{"latex_expression": latex_expression, "error": "Error: No test cases provided"}]

    variables = extract_variables(test_cases[0].get("input", {}))
    python_code, error = generate_python_code(pipe, latex_expression, variables)
    if error:
        logger.error(f"Task ID {task_id}: {error}")
        return task_id, [{"latex_expression": latex_expression, "error": error}]

    return task_id, [{"latex_expression": latex_expression, "generated_code": python_code}]

def main(input_file, output_file):
    try:
        model_name = './fine_tuned_aimo_lora_model_v3'
        # Load the model
        pipe = pipeline("text-generation", model=model_name, tokenizer='./full_latest_72_tokenizer',torch_dtype=torch.bfloat16, device_map="auto")
        pipe.model.eval()
        logger.info("Model loaded and evaluation mode set.")

        # Load JSON data
        json_data_list = load_json_file(input_file)

        # Initialize a dictionary to store results
        results_dict = {
            "id": [],
            "latex_expression": [],
            "generated_code": [],
            "error": []
        }

        # Process each JSON object and store results in the dictionary
        for json_data in tqdm(json_data_list):
            task_id, results = process_json_data(pipe, json_data)
            for result in results:
                results_dict["id"].append(task_id)
                results_dict["latex_expression"].append(result.get("latex_expression", ""))
                results_dict["generated_code"].append(result.get("generated_code", ""))
                results_dict["error"].append(result.get("error", ""))

        # Convert the dictionary to a DataFrame
        df = pd.DataFrame(results_dict)

        # Write the DataFrame to a CSV file
        df.to_csv(output_file, index=False)

        logger.info(f"Results have been written to {output_file}")

    except Exception as e:
        logger.error(f"An unexpected error occurred: {e}")
        traceback.print_exc()

input_file = 'public_test_new_no_sol_no_out.json'
output_file = 'codes_26_2.csv'
main(input_file, output_file)


2024-08-26 15:57:11,152 - INFO - We will use 90% of the memory on device 0 for storing the model, and 10% for the buffer to avoid OOM. You can set `max_memory` in to a higher value to use more memory (at your own risk).
2024-08-26 15:57:21,309 - INFO - Model loaded and evaluation mode set.
 67%|██████▋   | 676/1004 [51:26<30:29,  5.58s/it]  2024-08-26 16:48:58,949 - ERROR - No Python code generated for LaTeX expression: \frac{9.48045640713735 e^{- 0.795612954929873 x}}{970.362343722415 \pi r \left(r + \sqrt{h^{2} + r^{2}}\right) + 1.49981800883714 v + 1.67078922044761 w^{2} + 0.548536816459822 y^{2} + 1.71408886030606 z^{2} + 2.50166367824986 e^{- 1.01197189210867 x}}
2024-08-26 16:48:58,951 - ERROR - Task ID 970ed9f3: No Python code generated
 92%|█████████▏| 919/1004 [1:14:39<07:10,  5.07s/it]2024-08-26 17:12:11,617 - ERROR - No Python code generated for LaTeX expression: \frac{1.26415982655854 x + 5.22351587682023 y + 3 z^{3} - 7744 + \sum_{x=1}^{5} 3^{x} + 1.18847493653838 e^{- 1.2

# Prepare Submission

In [None]:
import json
import sys
import traceback
import pandas as pd

def load_json_file(file_path):
    try:
        with open(file_path, 'r') as file:
            return json.load(file)
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON file: {e}")
        sys.exit(1)
    except FileNotFoundError:
        print(f"File not found: {file_path}")
        sys.exit(1)
    except Exception as e:
        print(f"An error occurred while reading the file: {e}")
        sys.exit(1)

def execute_python_code(code, input_values):
    try:
        # Create a dedicated namespace for the code execution
        code_namespace = {}
        exec(code, code_namespace)
        
        # Ensure the function name matches and exists in the namespace
        if 'calculate' not in code_namespace:
            return "Error: Function 'calculate' not found in the generated code."
        
        # Execute the function with the provided input values
        result = code_namespace['calculate'](**input_values)
        
        # Handle the result type
        if isinstance(result, str):
            result = eval(result)
        if isinstance(result, complex):
            return f"{result.real}+{result.imag}j"
        return str(result)
    except Exception as e:
        return str(e)


def main(json_file, csv_file, output_file):
    try:
        # Load the JSON data
        json_data_list = load_json_file(json_file)
        
        # Load the CSV file containing generated code
        df = pd.read_csv(csv_file)

        # Initialize a dictionary to store the final results
        results_dict = {
            "id": [],
            "outputs": []
        }

        # Iterate over each row in the CSV file
        for _, row in df.iterrows():
            task_id = row['id']
            python_code = row['generated_code']
            
            # Find the corresponding JSON object
            json_data = next((item for item in json_data_list if item['task_id'] == task_id), None)
            if json_data is None:
                print(f"No JSON data found for task_id: {task_id}")
                continue

            test_cases = json_data.get("test_cases", [])
            outputs = []
            for case in test_cases:
                input_values = case.get("input", {})



                # Execute the Python code with modified input values
                result = execute_python_code(python_code,input_values)
                outputs.append(result)


            # Store the results
            results_dict["id"].append(task_id)
            results_dict["outputs"].append(outputs)

        # Convert the results to a DataFrame
        output_df = pd.DataFrame(results_dict)

        # Write the DataFrame to a CSV file
        output_df.to_csv(output_file, index=False)

        print(f"Outputs have been written to {output_file}")

    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        traceback.print_exc()

# File paths
json_file = 'public_test_new_no_sol_no_out.json'
csv_file = 'codes_test.csv'
output_file = 'results.csv'

# Run the main function
main(json_file, csv_file, output_file)


In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer,DataCollatorForLanguageModeling
from peft import LoraConfig, get_peft_model
from datasets import Dataset

In [None]:
import json
import re
import torch
import traceback
from tqdm import tqdm
import pandas as pd
from transformers import pipeline
from datasets import Dataset, DatasetDict

def load_json_file(file_path):
    try:
        with open(file_path, 'r') as file:
            return json.load(file)
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON file: {e}")
        sys.exit(1)
    except FileNotFoundError:
        print(f"File not found: {file_path}")
        sys.exit(1)
    except Exception as e:
        print(f"An error occurred while reading the file: {e}")
        sys.exit(1)

def extract_variables(input_values):
    filtered_keys = [key for key in input_values.keys() if not key.endswith('_val')]
    return filtered_keys

def generate_python_code(pipe, latex_expression, variables):
    prompt = f"""Convert this LaTeX expression: {latex_expression} to a Python function named 'calculate' that takes the following parameters: {', '.join(variables)}.
The function must accept the variables provided in this prompt as parameters and must be callable using the `exec` function.

Follow these guidelines strictly:

1. Import necessary libraries and functions at the beginning of the function. Prefer using `sympy` for mathematical operations. Specially of log.
2. For differentiation, integration, and other calculus operations, use `SymPy` to solve the equation and return a numerical value.
3. Do not include any example usage of the function.
4. Only provide the Python function, nothing else.
5. The return type must be `int` or `float` and convert sympy complex number outputs into python complex number before returmning. Use `eval` or `evalf` only if the output type is not numeric.
6. Keep the variable names (parameters) the same as provided. Carefully check or typecast variables where necessary. 
7. **Do not use `sp.lambdify` to generate the function.** Instead, directly define the function using standard Python and SymPy operations.

Python Code:
```python

"""

    gen_config = {
        "max_new_tokens": 2000,
        "do_sample": False,
        "stop_strings": ["```output"],
        "tokenizer": pipe.tokenizer,
    }
    try:
        outputs = pipe(prompt, **gen_config)
        generated_text = outputs[0]["generated_text"]
        python_code = re.findall(r"```python(.*?)```", generated_text, re.DOTALL)
        if not python_code:
            return None, "No Python code generated"
        return python_code[0].strip(), None
    except Exception as e:
        return None, f"Error generating Python code: {str(e)}"

def process_data(example, pipe):
    task_id = example.get("task_id", "unknown")
    latex_expression = example.get("latex_expression")
    test_cases = example.get("test_cases", [])

    if not latex_expression:
        return {
            "id": task_id,
            "latex_expression": latex_expression,
            "generated_code": None,
            "error": "Error: No LaTeX expression provided"
        }

    if not test_cases:
        return {
            "id": task_id,
            "latex_expression": latex_expression,
            "generated_code": None,
            "error": "Error: No test cases provided"
        }

    variables = extract_variables(test_cases[0].get("input", {}))
    python_code, error = generate_python_code(pipe, latex_expression, variables)
    if error:
        return {
            "id": task_id,
            "latex_expression": latex_expression,
            "generated_code": None,
            "error": error
        }

    return {
        "id": task_id,
        "latex_expression": latex_expression,
        "generated_code": python_code,
        "error": None
    }


def main(input_file, output_file):
    try:
        model_name = './fine_tuned_aimo_lora_model_v3'
        tokenizer = AutoTokenizer.from_pretrained('AI-MO/NuminaMath-7B-TIR')

        # Load the model
        pipe = pipeline("text-generation", model=model_name, tokenizer=tokenizer, torch_dtype=torch.bfloat16, device_map="auto")
        pipe.model.eval()

        # Load JSON data
        json_data_list = load_json_file(input_file)

        # Convert list of dicts to dict of lists for Dataset
        data_dict = {
            "id": [],
            "latex_expression": [],
            "generated_code": [],
            "error": []
        }

        # Process each JSON object and store results in the dictionary
        for json_data in tqdm(json_data_list):
            task_id, results = process_data(pipe, json_data)
            for result in results:
                data_dict["id"].append(task_id)
                data_dict["latex_expression"].append(result.get("latex_expression", ""))
                data_dict["generated_code"].append(result.get("generated_code", ""))
                data_dict["error"].append(result.get("error", ""))

        # Create Dataset from dict of lists
        dataset = Dataset.from_dict(data_dict)

        # Convert the Dataset to a DataFrame
        df = pd.DataFrame(data_dict)

        # Write the DataFrame to a CSV file
        df.to_csv(output_file, index=False)

        print(f"Results have been written to {output_file}")

    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        traceback.print_exc()

input_file = 'public_test_new_no_sol_no_out.json'
output_file = 'codes_24_2.csv'
main(input_file, output_file)

