In [None]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '9'
import eval
import tic_tac_toe
from tic_tac_toe import best_move, test_action_correct, print_board, make_prompt, generate_random_one_step_tic_tac_toe, parse_action, generate_dataset, model_one_step_prompt_with_reasoning, long_model_one_step_prompt_with_reasoning2
from eval import load_pipeline, generate_predictions, load_four_bit_lora, load_model_and_tokenizer

In [None]:
# import importlib
# importlib.reload(tic_tac_toe)
# # importlib.reload(eval)# 

In [None]:
example_board = [
    ['x', 'o', 'x'],
    ['o', 'x', 'o'],
    ['x', 'o', 'x'],
]


# ax=_, a2=_, a3=x
# b1=o, b2=_ b3=o
# c1=x, c2=o, c3=x
example_board2 = [
    ['_', '_', 'x'],
    ['o', '_', 'o'],
    ['x', 'o', 'x'],
]

def find_winning(board):
    # If player x has won, return the three cells that make up the winning line.
    # Otherwise, return None.
    for i in range(3):
        if board[i][0] == board[i][1] == board[i][2] == 'x':
            row_letter = chr(ord('a') + i)
            return [f'{row_letter}1', f'{row_letter}2', f'{row_letter}3']
        if board[0][i] == board[1][i] == board[2][i] == 'x':
            return [f'a{i + 1}', f'b{i + 1}', f'c{i + 1}']
    if board[0][0] == board[1][1] == board[2][2] == 'x':
        return ['a1', 'b2', 'c3']
    if board[0][2] == board[1][1] == board[2][0] == 'x':
        return ['a3', 'b2', 'c1']


import random
def generate_reasoning(board, best_moves):
    selected_action = random.choice(best_moves)
    x_pieces = []
    for i in range(3):
        for j in range(3):
            if board[i][j] == 'x':
                x_pieces.append(chr(ord('a') + i) + str(j + 1))
    if len(x_pieces) == 2:
        # I have pieces in b2 and b3. Cell b1 is open, so I can play there to win with (b1, b2, b3). My answer is \\action{b1}.
        pieces_str = f'I have pieces in {x_pieces[0]} and {x_pieces[1]}'
    elif len(x_pieces) >= 3:
        pieces_str = f'I have pieces in {", ".join(x_pieces[:-1])}, and {x_pieces[-1]}'
    else:
        raise ValueError('Unsupport number of x pieces')
    open_str = f'Cell {selected_action} is open'
    board_copy = [row[:] for row in board]
    board_copy[ord(selected_action[0]) - ord('a')][int(selected_action[1]) - 1] = 'x'
    winning = find_winning(board_copy)
    if winning:
        winning_str = f'so I can play there to win with ({", ".join(winning)})'
    else:
        raise ValueError('Invalid winning move')
    return {
        'pieces_str': pieces_str,
        'open_str': open_str,
        'winning_str': winning_str,
    }
    
    

In [None]:
# model, tokenizer = load_four_bit_lora('tiiuae/falcon-7b-instruct')
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import torch
bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.float16
    )
model_name = 'mistralai/Mistral-7B-v0.1'
# model_name = 'tiiuae/falcon-7b-instruct'
# model_name = 'meta-llama/Llama-2-13b-hf'
# model_name = 'meta-llama/Llama-2-13b-chat-hf'
# model, tokenizer = load_model_and_tokenizer(model_name, bnb_config)

In [None]:
model = AutoModelForCausalLM.from_pretrained(model_name, trust_remote_code=True, quantization_config=bnb_config, device_map="cuda:0")
tokenizer = AutoTokenizer.from_pretrained(model_name, padding_side="left")
tokenizer.pad_token = tokenizer.eos_token
newline_token_id = tokenizer.encode('\n', add_special_tokens=False)[-1]
pipeline = transformers.pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    return_full_text=False,
    device_map="cuda:0",
    num_return_sequences=1,
    do_sample=True,
    top_k=10,
    max_new_tokens=100,
    eos_token_id=newline_token_id,
    pad_token_id=tokenizer.eos_token_id,
    temperature=0,
    batch_size=8,
)
preds = []
for prompt in prompts_list:
    preds.append(pipeline(prompt)[0]['generated_text'].strip())

In [None]:
def eval_model(model, tokenizer, num_eval_points, verbose=False):
    pipeline = load_pipeline(model, tokenizer)
    correct_count = 0
    correct_pieces_count = 0
    correct_open_count = 0
    correct_winning_count = 0
    for i in range(num_eval_points):
        board = generate_random_one_step_tic_tac_toe()
        best_actions = best_move(board, 'x')
        model_output = generate_predictions(pipeline, make_prompt(board, long_model_one_step_prompt_with_reasoning2))
        correct_reasoning = generate_reasoning(board, best_actions)
        try:
            sentences = model_output.split('. ')
            pieces_pred = 'I ' +  sentences[0]
            pieces_is_correct = correct_reasoning['pieces_str'] == pieces_pred
            open_pred = sentences[1].split(', ')[0]
            open_is_correct = correct_reasoning['open_str'] == open_pred
            winning_pred = sentences[1][len(open_pred) + 2:]
            winning_is_correct = correct_reasoning['winning_str'] == winning_pred
            correct_pieces_count += pieces_is_correct
            correct_open_count += open_is_correct
            correct_winning_count += winning_is_correct
            if verbose:
                print('pieces pred:', pieces_pred)
                print('pieces true:', correct_reasoning['pieces_str'])
                print('open pred:', open_pred)
                print('open true:', correct_reasoning['open_str'])
                print('winning pred:', winning_pred)
                print('winning true:', correct_reasoning['winning_str'])
        except Exception as e:
            print('Error parsing reasoning', e)
            print(model_output)
            print()
            # print(correct_reasoning)
        model_pred_action = parse_action(model_output)
        is_correct = test_action_correct(model_pred_action, best_actions)
        correct_count += is_correct
        
        if verbose:
            print(print_board(board))
            print("The optimal action is:", best_actions)
            print("The model's reasoning is:", model_output)
            print("The model's action is:", model_pred_action)
            print("The model's action is correct:", is_correct)
            print()
    accuracy = correct_count / num_eval_points
    print(f'Accuracy: {accuracy:.2f}')
    print(f'Pieces Accuracy: {correct_pieces_count / num_eval_points:.2f}')
    print(f'Open Accuracy: {correct_open_count / num_eval_points:.2f}')
    print(f'Winning Accuracy: {correct_winning_count / num_eval_points:.2f}')
    return accuracy

In [None]:
prompt_fn = lambda board: make_prompt(board, long_model_one_step_prompt_with_reasoning2)

# You would then call the function with the appropriate arguments:
train_dataset = generate_dataset(generate_random_one_step_tic_tac_toe, best_move, prompt_fn, samples=1000)

# Apply the tokenization function to your dataset
train_dataset_tokenized = train_dataset.map(
    lambda sample: tokenizer(sample["text"]), batched=True
)
# Set up the format for PyTorch tensors
train_dataset_tokenized.set_format(type='torch', columns=['input_ids', 'attention_mask'])

val_dataset = generate_dataset(generate_random_one_step_tic_tac_toe, best_move, prompt_fn, samples=100)
val_dataset_tokenized = val_dataset.map(
    lambda sample: tokenizer(sample["text"]), batched=True
)
val_dataset_tokenized.set_format(type='torch', columns=['input_ids', 'attention_mask'])


In [None]:
# exp_logging_dir = "./logs"
# exp_output_dir = "./output"
# batch_size = 1

# trainer = Trainer(
#     model=model,
#     train_dataset=train_dataset_tokenized,
#     eval_dataset=val_dataset_tokenized,
#     args=TrainingArguments(
#         per_device_train_batch_size=batch_size,
#         per_device_eval_batch_size=batch_size,
#         logging_dir=exp_logging_dir,
#         logging_steps=100,
#         num_train_epochs=10,
#         learning_rate=1e-4,
#         bf16=False,
#         save_strategy="steps",
#         save_steps=100,
#         output_dir=exp_output_dir,
#         report_to="wandb",  # could also use wandb
#         evaluation_strategy="steps",
#         eval_steps=100,
#         # n_gpu=5,
#     ),
#     data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False),
# )
# model.config.use_cache = (
#     False  # silence the warnings. Please re-enable for inference!
# )

# trainer.train()

In [None]:
model = None
for model_name in ['mistralai/Mistral-7B-v0.1', 'tiiuae/falcon-7b-instruct', 'meta-llama/Llama-2-13b-hf', 'meta-llama/Llama-2-13b-chat-hf']:
    del model
    model, tokenizer = load_model_and_tokenizer(model_name, bnb_config)
    acc = eval_model(model, tokenizer, 100, verbose=False)

In [None]:
pipeline = load_pipeline(model, tokenizer)
prompt = '1 + 1 ='
pred = generate_predictions(pipeline, prompt)
print(pred)

# Accuracies .16, .15, .21, .14. Best is Llama2 13B

# Loop through different models:
# - load model
# 100 each
# measure sucess of each reasoning step + the end

In [None]:
# LLama 13B
# fuyu
# falcon
# 3.5 turbo
# claude


# mistral - chance accuracy, insane reasoning, not always the same but some mode collapse
# falcon - predicts the same thing every time

# switched to more intuitive embeds
# Now: mistral 40%, still seems random
# falcon 10% 
# llama 13b 15%


#### MISTRAL
# Accuracy: 0.10
# Pieces Accuracy: 0.30
# Open Accuracy: 0.10
# Winning Accuracy: 0.00