# *Motivation:*

### Most forked notebooks had unorganized code structure, import libraries were scattered in different places. So I organized functions into one class, removed unnecessary stuff, in hope it makes the system more understandable and give people flexibility to edit. Suggestions and fixes are welcome!

# Credits

In [1]:
# https://www.kaggle.com/code/abdurrafae/improved-code-interpretation
# https://www.kaggle.com/code/dnyaneshwalwadkar/submission-with-the-best-nb-new-api
# https://www.kaggle.com/code/utsavsinghal2604/natural-language-and-code-integration
# https://www.kaggle.com/code/yuanwangzhang/updated-code-interpretation-n-repetitions-17

# Start Notebook

In [2]:
import time

NOTEBOOK_START_TIME = time.time()

# Libraries installation

In [3]:
!pip install -U /kaggle/input/accelerate-0-29-3/accelerate-0.29.3-py3-none-any.whl -qq
!pip install -U /kaggle/input/bitsandbytes-0-43-1/bitsandbytes-0.43.1-py3-none-manylinux_2_24_x86_64.whl -qq

# Import libraries

In [4]:
import os
import numpy as np
import pandas as pd
from tqdm import tqdm
import gc
import re
import sys
import subprocess
import math
import random
from collections import defaultdict
from collections import Counter
import torch
import transformers
import accelerate

# New API initialization


In [5]:
if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    PRIVATE = True
else:
    PRIVATE = False

if not PRIVATE:
    class train_env():
        def __init__(self, randomize=False):
            self.randomlize = randomize
            
            self.df = pd.read_csv('/kaggle/input/ai-mathematical-olympiad-prize/train.csv')
            self.df['ground_truth'] = self.df['answer']
            self.df['answer'] = -1
            
            if self.randomlize:
                self.df = self.df.reset_index().sample(frac=1).reset_index(drop=True)
            
            self.predict_called = True
            self.counter = 0
            self.len = len(self.df)
        
        
        def iter_test(self):
             while self.counter<self.len:
                if self.predict_called:
                    self.predict_called = False
                    yield (self.df.loc[[self.counter]][['id','problem']]),(self.df.loc[[self.counter]][['id','answer']])
                else:
                    print("You must call `predict()` successfully before you can continue with `iter_test()`")
                    yield None 
                
        def predict(self, answer):
            self.df.loc[self.counter, ('answer')] = answer['answer'].values[0]
            self.predict_called = True
            self.counter+=1

    env = train_env(randomize=True)
    iter_test = env.iter_test()
else:
    # Set up the evaluation API
    import aimo

    env = aimo.make_env()
    iter_test = env.iter_test()

# Configurations and Hyperparameters

In [6]:
QUANT = False
USE_PAST_KEY = True
SEED = 42
MODEL_PATH = "/kaggle/input/deepseek-math"
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
N_REPETITIONS = 19 if PRIVATE else 4
MAX_NEW_TOKENS = 2048 if PRIVATE else 512
TIME_LIMIT = 31500 if PRIVATE else 1
transformers.set_seed(SEED)
torch.backends.cuda.enable_mem_efficient_sdp(False)

DEVICE_MAP = [('model.embed_tokens', 0),
                 ('model.layers.0', 0),
                 ('model.layers.1', 0),
                 ('model.layers.2', 0),
                 ('model.layers.3', 0),
                 ('model.layers.4', 0),
                 ('model.layers.5', 0),
                 ('model.layers.6', 0),
                 ('model.layers.7', 0),
                 ('model.layers.8', 0),
                 ('model.layers.9', 0),
                 ('model.layers.10', 0),
                 ('model.layers.11', 0),
                 ('model.layers.12', 0),
                 ('model.layers.13', 0),
                 ('model.layers.14', 0),
                 ('model.layers.15', 0),
                 ('model.layers.16', 0),
                 ('model.layers.17', 0),
                 ('model.layers.18', 1),
                 ('model.layers.19', 1),
                 ('model.layers.20', 1),
                 ('model.layers.21', 1),
                 ('model.layers.22', 1),
                 ('model.layers.23', 1),
                 ('model.layers.24', 1),
                 ('model.layers.25', 1),
                 ('model.layers.26', 1),
                 ('model.layers.27', 1),
                 ('model.layers.28', 1),
                 ('model.layers.29', 1),
                ('model.layers.30', 1),
                  ('model.layers.31', 1),
                 ('model.norm', 1),
                 ('lm_head', 1)]

DEVICE_MAP = {ii:jj for (ii,jj) in DEVICE_MAP}

TEMPERATURE = [0.9, 0.9] # temperature, temperature_coding
TOP_P = [1.0, 1.0] # top_p, top_p_coding

2024-05-27 14:10:57.203166: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-05-27 14:10:57.203274: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-05-27 14:10:57.468195: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


# Important Custom Classes

In [7]:
class StoppingCriteriaSub(transformers.StoppingCriteria):
    def __init__(self, stops = [], encounters=1):
        super().__init__()
        self.stops = [stop.to(DEVICE) for stop in stops]

    def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor):
        for stop in self.stops:
            last_token = input_ids[0][-len(stop):]
            if torch.all(torch.eq(stop,last_token)):
                return True
        return False

In [8]:
class LLM_SYSTEM:
    
    def __init__(self, model_path, device_map, temperature, top_p, prompt_options):
        #init llm
        self.model, self.tokenizer = self.initialize_llm(model_path, device_map)
        #init stop words
        self.stop_words = ["```output", "```python", "```\nOutput" , ")\n```" , "``````output"]
        self.stop_words_ids = [self.tokenizer(stop_word, return_tensors='pt', add_special_tokens=False)['input_ids'].squeeze() for stop_word in self.stop_words]
        self.stopping_criteria = transformers.StoppingCriteriaList([StoppingCriteriaSub(stops=self.stop_words_ids)])
        
        self.prompt_options = prompt_options
        
        self.temperature = temperature[0]
        self.top_p = top_p[0]

        self.temperature_coding = temperature[1]
        self.top_p_coding = top_p[1]

   
        self.total_results = {}
        self.total_answers = {}
        self.best_stats = {}
        self.total_outputs = {}
        self.question_type_counts = {}
        self.starting_counts = (2,3)
        self.problem_count = 0
        
        self.already_generated_length = 0
        self.code_error = None
        self.code_error_count = 0
        self.code_output = -1
#====================================================================================#
    def initialize_llm(self, model_path, device_map):
        config = transformers.AutoConfig.from_pretrained(model_path)
        config.gradient_checkpointing = True

        tokenizer = transformers.AutoTokenizer.from_pretrained(model_path)

        if QUANT:
            quantization_config = transformers.BitsAndBytesConfig(
                load_in_4bit = True,
                bnb_4bit_quant_type="nf4",
                bnb_4bit_compute_dtype=torch.bfloat16,
                bnb_4bit_use_double_quant=True,
            )

            model = transformers.AutoModelForCausalLM.from_pretrained(
                model_path,
                device_map="sequential",
                torch_dtype="auto",
                trust_remote_code=True,
                quantization_config=quantization_config,
                config=config
            )
        else:
            model = transformers.AutoModelForCausalLM.from_pretrained(
                model_path,
                device_map=device_map,
                torch_dtype="auto",
                trust_remote_code=True,
                config=config
            )
            
        return model, tokenizer
#====================================================================================#    
    def predict(self, problem):
        self.problem_count += 1
        TIME_SPENT = time.time() - NOTEBOOK_START_TIME
    
        if TIME_SPENT>TIME_LIMIT:
            return 0

        for repetition in tqdm(range(N_REPETITIONS)):
            print(f"\n\n\nQUESTION {self.problem_count} - {repetition} - TIME_SPENT : {TIME_SPENT:.0f} secs")
            best, best_count = self.best_stats.get(self.problem_count,(-1,-1))
            if best_count>np.sqrt(repetition):
                print("SKIPPING CAUSE ALREADY FOUND BEST")
                continue

            outputs = self.total_outputs.get(self.problem_count,[])
            text_answers, code_answers = self.question_type_counts.get(self.problem_count,self.starting_counts)
            results = self.total_results.get(self.problem_count,[])
            answers = self.total_answers.get(self.problem_count,[])  

            for _ in range(5):
                self.flush()
                time.sleep(0.2)

            try:
                self.already_generated_length = 0
                self.code_error = None
                self.code_error_count = 0
                self.code_output = -1
                
                counts = np.array([text_answers,code_answers])

                draw = np.random.choice(self.prompt_options, 1,
                              p=counts/counts.sum())

                initial_message = draw[0].format(problem,"{}")            
                prompt = f"User: {initial_message}"

                prompt_original_length = len(prompt)
                print(f"{repetition}_{prompt}\n")

                model_inputs = self.tokenizer(prompt, return_tensors='pt').to(self.model.device)
                prompt_token_length = len(model_inputs['input_ids'][0])

                generation_output = self.model.generate(**model_inputs, 
                                                   max_new_tokens=MAX_NEW_TOKENS-self.already_generated_length,
                                                   return_dict_in_generate=USE_PAST_KEY,
                                                   do_sample = True,
                                                   temperature = self.temperature,
                                                   top_p = self.top_p,
                                                   num_return_sequences=1, stopping_criteria = self.stopping_criteria)

                if USE_PAST_KEY:
                    output_ids = generation_output.sequences[0]
                else:
                    output_ids = generation_output[0]
                decoded_output = self.tokenizer.decode(output_ids, skip_special_tokens=True)
                print(f"{decoded_output[prompt_original_length:]}\n")
                prompt_original_length += len(decoded_output[prompt_original_length:])
                cummulative_code = ""

                stop_word_cond = False
                for stop_word in self.stop_words:
                    stop_word_cond = stop_word_cond or (decoded_output[-len(stop_word):]==stop_word)


                while (stop_word_cond) and (self.already_generated_length<(MAX_NEW_TOKENS)):

                    if (decoded_output[-len("```python"):]=="```python"):
                        temperature_inner=self.temperature_coding
                        top_p_inner = self.top_p_coding
                        prompt = decoded_output
                    else:
                        temperature_inner=self.temperature
                        top_p_inner = self.top_p
                        try:
                            if (decoded_output[-len("``````output"):]=="``````output"):
                                code_text = decoded_output.split('```python')[-1].split("``````")[0]
                            else:
                                code_text = decoded_output.split('```python')[-1].split("```")[0]


                            cummulative_code+=code_text
                            self.code_output, CODE_STATUS = self.process_code(cummulative_code, return_shell_output=True)
                            print('CODE RESULTS', self.code_output)

                            if self.code_error==self.code_output:
                                self.code_error_count+=1
                            else:
                                self.code_error=self.code_output
                                self.code_error_count = 0

                            if not CODE_STATUS:
                                cummulative_code = cummulative_code[:-len(code_text)]

                                if self.code_error_count>=1:
                                    print("REPEATED ERRORS")
                                    break

                        except Exception as e:
                            print(e)
                            print('ERROR PARSING CODE')
                            self.code_output = -1

                        if self.code_output!=-1:
                            if (decoded_output[-len(")\n```"):]==")\n```"):
                                prompt = decoded_output+'```output\n'+str(self.code_output)+'\n```\n'
                            else:
                                prompt = decoded_output+'\n'+str(self.code_output)+'\n```\n'
                        else:
                            prompt = decoded_output
                            cummulative_code=""
                    model_inputs = self.tokenizer(prompt, return_tensors='pt').to(self.model.device)
                    self.already_generated_length =  len(model_inputs['input_ids'][0])-prompt_token_length

                    if USE_PAST_KEY:
                        old_values = generation_output.past_key_values
                    else:
                        old_values = None

                    generation_output = self.model.generate(**model_inputs, 
                                                       max_new_tokens=MAX_NEW_TOKENS-self.already_generated_length, 
                                                       return_dict_in_generate=USE_PAST_KEY,
                                                       past_key_values=old_values,
                                                       do_sample = True,
                                                       temperature = temperature_inner,
                                                       top_p = top_p_inner,
                                                       num_return_sequences=1, stopping_criteria = self.stopping_criteria)
                    if USE_PAST_KEY:
                        output_ids = generation_output.sequences[0]
                    else:
                        output_ids = generation_output[0]
                    decoded_output = self.tokenizer.decode(output_ids, skip_special_tokens=True)
                    print(f"\nINTERMEDIATE OUT :\n{decoded_output[prompt_original_length:]}\n")
                    prompt_original_length+=len(decoded_output[prompt_original_length:])

                    stop_word_cond = False
                    for stop_word in self.stop_words:
                        stop_word_cond = stop_word_cond or (decoded_output[-len(stop_word):]==stop_word)
                if USE_PAST_KEY:
                    output_ids = generation_output.sequences[0]
                else:
                    output_ids = generation_output[0]

                raw_output = self.tokenizer.decode(output_ids[prompt_token_length:], skip_special_tokens=True)
                #print(f"\n\nOutput :\n{raw_output}\n")                            
                result_output = self.process_text_output(raw_output)

                try:
                    self.code_output = round(float(eval(self.code_output))) % 1000
                except Exception as e:
                    print(e,'final_eval')
                    self.code_output = -1
            except Exception as e:
                print(e,"5")
                result_output, self.code_output = -1, -1

            if self.code_output!=-1:
                outputs.append(self.code_output)
                code_answers+=1

            if result_output!=-1:
                outputs.append(result_output)
                text_answers+=1

            if len(outputs) > 0:
                occurences = Counter(outputs).most_common()
                print(occurences)
                if occurences[0][1] > best_count:
                    print("GOOD ANSWER UPDATED!")
                    best = occurences[0][0]
                    best_count = occurences[0][1]
                if occurences[0][1] > 5:
                    print("ANSWER FOUND!")
                    break

            results.append(result_output)
            answers.append(self.code_output)

            self.best_stats[self.problem_count] = (best, best_count) 
            self.question_type_counts[self.problem_count] = (text_answers, code_answers)
            self.total_outputs[self.problem_count] = outputs

            self.total_results[self.problem_count] = results
            self.total_answers[self.problem_count] = answers

            print("code_answers",code_answers-self.starting_counts[1],"text_answers",text_answers-self.starting_counts[0])
        return self.best_stats[self.problem_count][0]
#====================================================================================#
    def flush(self):
        torch.cuda.empty_cache()
        gc.collect()
#====================================================================================#
    def naive_parse(self, answer):
        out = []
        start = False
        end = False
        for l in reversed(list(answer)):
            if l in '0123456789' and not end:
                start = True
                out.append(l)
            else:
                if start:
                    end = True

        out = reversed(out)
        return ''.join(out)
#====================================================================================#
    def return_last_print(self, output, n):
        lines = output.strip().split('\n')
        if lines:
            return lines[n]
        else:
            return ""
#====================================================================================#
    def repl(self, match):
        if "real" not in match.group():
            return "{}{}".format(match.group()[:-1], ', real=True)')
        else:
            return "{}{}".format(match.group()[:-1], ')')
#====================================================================================#        
    def process_code(self, code, return_shell_output=False):
    
        code = re.sub(r"symbols\([^)]+\)", self.repl, code)

        if return_shell_output:
            code = code.replace('\n', '\n    ')
                # Add a try...except block
            code = "\ntry:\n    from sympy import *\n{}\nexcept Exception as e:\n    print(e)\n    print('FAIL')\n".format(code)

        if not return_shell_output:
            print(code)
        with open('code.py', 'w') as fout:
            fout.write(code)

        batcmd = 'timeout 7 ' + sys.executable + ' code.py'
        try:
            shell_output = subprocess.check_output(batcmd, shell=True).decode('utf8')
            return_value = self.return_last_print(shell_output, -1)
            print(shell_output)
            if return_shell_output:
                if return_value=='FAIL':
                    CODE_STATUS = False
                    return_value = self.return_last_print(shell_output, -2)
                    if "not defined" in return_value:
                        return_value+='\nTry checking the formatting and imports'
                else:
                    CODE_STATUS = True
                return return_value, CODE_STATUS  
            self.code_output = round(float(eval(return_value))) % 1000
        except Exception as e:
            print(e,'shell_output')
            self.code_output = -1

        if return_shell_output:
            if self.code_output==-1:
                CODE_STATUS = False
            else:
                CODE_STATUS = True
            return self.code_output, CODE_STATUS  


        return self.code_output
#====================================================================================#    
    def process_text_output(self, output):
        result = output    
        try:
            result_output = re.findall(r'\\boxed\{(\d+)\}', result)

            print('BOXED', result_output)
            if not len(result_output):
                result_output = self.naive_parse(result)
            else:
                result_output = result_output[-1]

            print('BOXED FINAL', result_output)
            if not len(result_output):
                result_output = -1

            else:
                result_output = round(float(eval(result_output))) % 1000

        except Exception as e:
            print(e)
            print('ERROR PARSING TEXT')
            result_output = -1

        return result_output
#====================================================================================#
    

# Start of code

# Prompts

In [9]:
code = """Below is a math problem you are to solve (positive numerical answer):
\"{}\"
To accomplish this, first determine a sympy-based approach for solving the problem by listing each step to take and what functions need to be called in each step. Be clear so even an idiot can follow your instructions, and remember, your final answer should be positive integer, not an algebraic expression!
Write the entire script covering all the steps (use comments and document it well) and print the result. After solving the problem, output the final numerical answer within \\boxed{}.

Approach:"""


cot = """Below is a math problem you are to solve (positive numerical answer!):
\"{}\"
Analyze this problem and think step by step to come to a solution with programs. After solving the problem, output the final numerical answer within \\boxed{}.\n\n"""

prompt_options = [code,cot]

# Initialize LLM

In [10]:
llm = LLM_SYSTEM(MODEL_PATH, DEVICE_MAP, TEMPERATURE, TOP_P, prompt_options)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

# Submission

In [11]:
for test, sample_submission in iter_test:
    sample_submission['answer'] = llm.predict(test['problem'].values[0])
    env.predict(sample_submission)
    print(test)
    print(sample_submission, '\n')

       id                                            problem
0  8ee6f3  The points $\left(x, y\right)$ satisfying $((\...
       id  answer
0  8ee6f3       0 

       id                                            problem
1  229ee8  Let $k, l > 0$ be parameters. The parabola $y ...
       id  answer
1  229ee8       0 

       id                                            problem
2  5277ed  There exists a unique increasing geometric seq...
       id  answer
2  5277ed       0 

       id                                            problem
3  bedda4  Let $ABCD$ be a unit square. Let $P$ be the po...
       id  answer
3  bedda4       0 

       id                                            problem
4  430b63  What is the minimum value of $5x^2+5y^2-8xy$ w...
       id  answer
4  430b63       0 

       id                                            problem
5  82e2a0  Suppose that we roll four 6-sided fair dice wi...
       id  answer
5  82e2a0       0 

       id                               

# Done!

In [12]:
with open('code.py', 'w') as fout:
    fout.write("print('done')")

batcmd = 'timeout 7 ' + sys.executable + ' code.py'
try:
    shell_output = subprocess.check_output(batcmd, shell=True).decode('utf8')
    print(shell_output)
except:
    pass

done

