In [None]:
import math, re, os, warnings, random
import tensorflow as tf
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from kaggle_datasets import KaggleDatasets
from tensorflow import keras
from functools import partial
from sklearn.model_selection import train_test_split


from sklearn.utils import class_weight
from sklearn.model_selection import KFold
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import tensorflow.keras.layers as L
import tensorflow.keras.backend as K
from tensorflow.keras import optimizers, applications, Sequential, losses, metrics
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, LearningRateScheduler

print("Tensorflow version " + tf.__version__)

try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    print('Device:', tpu.master())
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.TPUStrategy(tpu)
except:
    strategy = tf.distribute.get_strategy()
print('Number of replicas:', strategy.num_replicas_in_sync)

In [None]:
!pip install tqdm
from tqdm.notebook import tqdm 

tqdm.pandas()

In [None]:
!pip install pylatexenc

In [None]:
df1 = pd.read_csv("/kaggle/input/math-qsa-dataset/train.csv")
df2 = pd.read_csv("/kaggle/input/math-qsa-dataset/test.csv")
df = pd.concat([df1, df2], axis=0)

In [None]:
def is_integer(text):
    try:
        if int(text) >= 0:
            return True
        else:
            return False
    except ValueError:
        return False
    
df["is_integer"] = df.answer.map(is_integer)
df = df[df.is_integer].reset_index(drop=True)

# Preprocessing Pipeline

In [None]:
!pip install nltk

In [None]:
import re
import nltk
import pandas as pd
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.corpus import stopwords

# Ensure the necessary NLTK data files are downloaded
nltk.download('punkt')
nltk.download('stopwords')

class Preprocessing:
    def __init__(self):
        self.stop_words = set(stopwords.words('english'))

    def convert_draw_command(self, draw_command):
        pattern_pentagon = re.compile(r'draw\(\((.*?)\)--\((.*?)\)--\((.*?)\)--\((.*?)\)--\((.*?)\)--\((.*?)\)--cycle.*?\);')
        match_pentagon = pattern_pentagon.match(draw_command)
        if match_pentagon:
            coords = match_pentagon.groups()
            return f"A regular pentagon with vertices at {coords[0]}, {coords[1]}, {coords[2]}, {coords[3]}, and {coords[4]}."
        
        pattern_hexagon = re.compile(r'draw\(\((.*?)\)--\((.*?)\)--\((.*?)\)--\((.*?)\)--\((.*?)\),.*?\);')
        match_hexagon = pattern_hexagon.match(draw_command)
        if match_hexagon:
            coords = match_hexagon.groups()
            return f"A regular hexagon with vertices at {coords[0]}, {coords[1]}, {coords[2]}, {coords[3]}, and {coords[4]}."
        
        return ""

    def convert_dot_label_commands(self, text):
        pattern_dot = re.compile(r'dot\(\((.*?)\)\);')
        text = pattern_dot.sub(r'A point at \1.', text)
        
        pattern_label = re.compile(r'label\("(.*?)",\((.*?)\),.*?\);')
        text = pattern_label.sub(r'The point \1 is at coordinates \2.', text)
        
        return text

    def preprocess_text(self, text):
        # Remove the [asy] tags
        text = re.sub(r'\[asy\]', '', text)
        text = re.sub(r'\[\/asy\]', '', text)

        # Split the text into commands
        commands = text.split('\n')

        readable_text = []
        for command in commands:
            if 'draw' in command:
                readable_text.append(self.convert_draw_command(command))
            else:
                readable_text.append(self.convert_dot_label_commands(command))

        readable_text = ' '.join(readable_text)

        # Tokenize into sentences
        sentences = sent_tokenize(readable_text)

        # Remove stop words and tokenize the remaining words
        filtered_sentences = []
        for sentence in sentences:
            word_tokens = word_tokenize(sentence)
            filtered_sentence = [word for word in word_tokens if word.lower() not in self.stop_words]
            filtered_sentences.append(' '.join(filtered_sentence))

        filtered_text = ' '.join(filtered_sentences)
        return filtered_text

    def process_dataframe(self, df, text_column):
        df[f'{text_column}'] = df[text_column].apply(self.preprocess_text)
        return df



In [None]:
!pip3 install ipywidgets --user

# Prompt Engineering

In [None]:
template = """Role:\nYou are an advanced AI system with exceptional mathematical reasoning and problem-solving capabilities, specifically designed to solve tricky math problems (whose answer is a non-negative integer) written in LaTeX format from the AI Mathematical Olympiad (AIMO) competition. Your task is to accurately analyze and solve intricate mathematical problems, demonstrating a deep understanding of mathematical concepts and a strong ability to apply logical reasoning strategies.\n\nInstruction:
1. Carefully read and comprehend the problem statement provided in the "Problem" section.
2. In the "Solution" section, provide a solution of the problem with detailed explanation of your logical reasoning process. Keep in mind that answer must be a non-negative integer number.
3. At the end, create a "Answer" section where you will state only the final numerical or algebraic answer, without any additional text or narrative.\n\nProblem:\n{problem}\n\nSolution:\n{solution}"""

In [None]:
df["prompt"] = df.progress_apply(lambda row: template.format(problem=row.problem,
                                                             solution=f"{row.solution}\n\nAnswer:\n{row.answer}"),
                                                             axis=1)
data = df.prompt.tolist()

In [None]:
def colorize_text(text):
    for word, color in zip(["Role", "Instruction", "Problem", "Solution", "Answer"],
                           ["blue", "yellow", "red", "cyan", "green"]):
        text = text.replace(f"{word}:", f"\n\n**<font color='{color}'>{word}:</font>**")
    return text

## Model Training

In [None]:
import keras
import keras_nlp
import numpy as np

llama_lm = keras_nlp.models.Llama3CausalLM.from_preset("llama3_8b_en", dtype="bfloat16")

In [None]:
def split_text_into_prompt_completion(df, text_column):
    prompts = []
    completions = []

    for index, row in df.iterrows():
        text = row[text_column]
        
        # Split based on "Solution:"
        problem_part, solution_part = text.split("Solution:", 1)
        
        # Ensure to keep the "Solution:" keyword in the completion
        solution_part = "Solution:" + solution_part
        
        # Append to lists
        prompts.append(problem_part.strip())
        completions.append(solution_part.strip())
    
    # Create new DataFrame
    split_df = pd.DataFrame({
        "prompt": prompts,
        "completion": completions
    })
    
    return split_df

# Example usage
data = split_text_into_prompt_completion(df, 'prompt')

In [None]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("llama3_8b_en")
tokenized_data = []
tokenizer.pad_token = tokenizer.eos_token
prompt = data["prompt"]
completion = data["completion"]
input_texts = [prompt + tokenizer.eos_token + completion for prompt, completion in zip(data["prompt"], data["completion"])]

for input_text in input_texts:
    tokenized_input = tokenizer(input_text, return_tensors='pt', padding=True, truncation=True)
    tokenized_data.append(tokenized_input)

In [None]:
!pip install huggingface-cli

In [None]:
!uggingface-cli login

In [None]:
from datasets import Dataset

# Create a list of dictionaries with 'input_ids' and 'attention_mask'
train_data = {
    'input_ids': [tokenized_input['input_ids'].squeeze().tolist() for tokenized_input in tokenized_data],
    'attention_mask': [tokenized_input['attention_mask'].squeeze().tolist() for tokenized_input in tokenized_data]
}

train_dataset = Dataset.from_dict(train_data)

In [None]:
def clear_cache():
    import gc
    gc.collect()
    torch.cuda.empty_cache()

In [None]:
train_test_split = train_dataset.train_test_split(test_size=0.2)

train_dataset = train_test_split['train']
test_dataset = train_test_split['test']

In [None]:
clear_cache() 

In [None]:
from transformers import AutoModelForCausalLM, Trainer, TrainingArguments

model = AutoModelForCausalLM.from_pretrained(model)

training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=1,
    per_device_train_batch_size=1,
    save_steps=10_000,
    save_total_limit=2,
    logging_dir='./logs',
    logging_steps=200,
    gradient_accumulation_steps=4,  # Adjust to manage memory
    fp16=True,  # Mixed precision training
    optim="adamw_torch"
)

model.gradient_checkpointing_enable()

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    tokenizer=tokenizer,
)

# Train the model
for epoch in range(training_args.num_train_epochs):
    trainer.train()
    clear_cache()  # Clear cache at the end of each epoch

# Evaluation (optional)
trainer.evaluate()

In [None]:
trainer.save_model("path_to_save_your_model")
tokenizer.save_pretrained("path_to_save_your_model")

In [None]:
from transformers import pipeline

# Load the fine-tuned model and tokenizer
fine_tuned_model = AutoModelForCausalLM.from_pretrained("path_to_save_your_model")
fine_tuned_tokenizer = AutoTokenizer.from_pretrained("path_to_save_your_model")

# Create a pipeline
generation_pipeline = pipeline(
    "text-generation",
    model=fine_tuned_model,
    tokenizer=fine_tuned_tokenizer,
    torch_dtype=torch.float16,
    device_map="auto",
)

# Generate text
prompt = "Your test prompt"
generated_text = generation_pipeline(prompt)
print(generated_text)