# Data Pre-processing

In [1]:
# import torch
# if torch.cuda.is_available():
#     device = torch.device('cuda:1')
# torch.cuda.set_device(1)
# torch.cuda.current_device()

In [2]:
# Read the data:
import json

with open('data/eng_math_test.json') as f:
    math_test = json.load(f)
    

In [3]:
# Data processing, add answer D to each question
def Add_answer(data):
    for i in range(len(data)):
        exercise = data[i]
        if len(exercise['choices']) < 4:
            exercise['choices'].append('D. None of the above')
Add_answer(math_test['data'])


In [4]:
from datasets import Dataset
test_ds = Dataset.from_list(math_test['data'])

# Add the model

In [5]:
# Load the saved model to run the predictions
model_dir = "model/Theorem_mind_bert_1"
from transformers import AutoTokenizer, AutoModelForMultipleChoice

tokenizer = AutoTokenizer.from_pretrained(model_dir)
model = AutoModelForMultipleChoice.from_pretrained(model_dir)

In [6]:
from dataclasses import dataclass
from transformers.tokenization_utils_base import PreTrainedTokenizerBase, PaddingStrategy
from typing import Optional, Union
import torch

@dataclass
class DataCollatorForMultipleChoice:
    tokenizer: PreTrainedTokenizerBase
    padding: Union[bool, str, PaddingStrategy] = True
    max_length: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    
    def __call__(self, features):
        label_name = "label" if 'label' in features[0].keys() else 'labels'
        labels = [feature.pop(label_name) for feature in features]
        batch_size = len(features)
        num_choices = len(features[0]['input_ids'])
        flattened_features = [
            [{k: v[i] for k, v in feature.items()} for i in range(num_choices)] for feature in features
        ]
        flattened_features = sum(flattened_features, [])
        
        batch = self.tokenizer.pad(
            flattened_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors='pt',
        )
        batch = {k: v.view(batch_size, num_choices, -1) for k, v in batch.items()}
        batch['labels'] = torch.tensor(labels, dtype=torch.int64)
        return batch


In [7]:
from transformers import TrainingArguments, Trainer

model_dir = 'finetuned'
training_args = TrainingArguments(
    output_dir=model_dir,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    learning_rate=5e-5,
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    num_train_epochs=50,
    weight_decay=0.01,
    report_to='none'
)

In [8]:
training_args.device

device(type='cuda', index=1)

In [9]:
trainer = Trainer(
    model=model,
    args=training_args,
    tokenizer=tokenizer,
    data_collator=DataCollatorForMultipleChoice(tokenizer=tokenizer),
)

In [10]:
import numpy as np
def predictions_to_map_output(predictions):
    sorted_answer_indices = np.argsort(-predictions)
    top_answer_indices = sorted_answer_indices[:,:1] 
    top_answers = np.vectorize(index_to_option.get)(top_answer_indices)
    return np.apply_along_axis(lambda row: ' '.join(row), 1, top_answers)

In [11]:
# Tokenize the test_data
options = 'ABCD'
indices = list(range(4))

option_to_index = {option: index for option, index in zip(options, indices)}
index_to_option = {index: option for option, index in zip(options, indices)}

def preprocess_test(example):
    first_sentence = [example['question']] * 4
    second_sentence = []
    for choice in example['choices']:
        second_sentence.append(choice)
    tokenized_example = tokenizer(first_sentence, second_sentence, truncation=True)
    tokenized_example['label'] = 0
    return tokenized_example

tokenized_test_ds = test_ds.map(preprocess_test, batched=False, remove_columns=['question','choices'])

Map:   0%|          | 0/189 [00:00<?, ? examples/s]

In [12]:
test_predictions = trainer.predict(tokenized_test_ds)

You're using a BertTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.


In [13]:
test_predictions_encode = predictions_to_map_output(test_predictions.predictions)

In [14]:
# Choose the second choice if the model choose D. None of the above
def predictions_to_map_output_second_choice(predictions):
    sorted_answer_indices = np.argsort(-predictions)
    top_answer_indices = sorted_answer_indices[:,1:2] 
    top_answers = np.vectorize(index_to_option.get)(top_answer_indices)
    return np.apply_along_axis(lambda row: ' '.join(row), 1, top_answers)


for idx, question in enumerate(math_test['data']):
    if question['choices'][3] == 'D. None of the above' and test_predictions_encode[idx] == 'D':
        b = predictions_to_map_output_second_choice(test_predictions.predictions[idx:(idx+1)])
        test_predictions_encode[idx] = b[0]

In [15]:
with open('data/math_test.json') as f:
    vi_math_test = json.load(f)

In [16]:
vi_math_test['data'][0]

{'id': '01-0203',
 'question': 'Một cửa hàng đã bán 30% số hàng hiện có và thu được 15 000 000 đồng. Hỏi nếu bán hết hàng thì cửa hàng thu được bao nhiêu tiền?',
 'choices': ['A. 4 500 000 đồng',
  'B. 45 000 000 đồng',
  'C. 50 000 000 đồng',
  'D. 450 000 000 đồng']}

In [17]:
def map_predicted_to_full_answers(test_data, predicted_answers):
    full_answers = []

    for item, predicted_label in zip(test_data, predicted_answers):
        full_answer_text = next((text for label, text in zip(["A", "B", "C", "D"], item['choices']) if label == predicted_label), "Unknown")
        full_answers.append((item['id'], full_answer_text))

    return full_answers

csv_data = map_predicted_to_full_answers(vi_math_test['data'], test_predictions_encode)

In [18]:
csv_data

[('01-0203', 'C. 50 000 000 đồng'),
 ('01-0206', 'A. 24 phút'),
 ('01-0207', 'C. 6 lần'),
 ('01-0209', 'C. 25%'),
 ('01-0210', 'C. 200m'),
 ('01-0211', 'C. 5,216'),
 ('01-0214', 'C. 21%'),
 ('01-0219', 'C. 8 giờ 17 phút'),
 ('01-0221', 'C. 0,75'),
 ('01-0222', 'C. 67,919'),
 ('01-0223', 'C. 30'),
 ('01-0224', 'C. 398,7'),
 ('01-0225', 'C. \\frac{7}{100}'),
 ('01-0227', 'C. 5 chục'),
 ('01-0232', 'A. 150%'),
 ('01-0234', 'C. \\frac{1}{2}'),
 ('01-0237', 'C. 350'),
 ('01-0239', 'C. 5019'),
 ('01-0240', 'C. 0,18 giờ'),
 ('01-0241', 'C. 369,92'),
 ('01-0243', 'C. 5 000'),
 ('01-0245', 'C. 37,4'),
 ('01-0246', 'C. 10,05'),
 ('01-0249', 'C. 3,005'),
 ('01-0254', 'C. 90 phút'),
 ('01-0256', 'C. 36 dm^{2}'),
 ('01-0257', 'C. 28,26 dm'),
 ('01-0258', 'C. 6,28 cm^{2}'),
 ('01-0259', 'C. 138 dm^{3}'),
 ('01-0264', 'C. 46%'),
 ('01-0266', 'C. 37,6'),
 ('01-0268', 'C. 201,700'),
 ('01-0269', 'C. 0,709'),
 ('01-0273', 'C. 30 phút'),
 ('01-0275', 'C. 4,3'),
 ('01-0277', 'C. 30 phút'),
 ('01-0285', 'C

In [19]:
import csv

def write_answers_to_csv(data, filename):
    with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
        csvwriter = csv.writer(csvfile, delimiter=',', quotechar='', quoting=csv.QUOTE_NONE, escapechar='\\')
        csvwriter.writerow(['id', 'answer'])

        for row in data:
            csvwriter.writerow(row)

write_answers_to_csv(csv_data, "results/submission_bert_7.csv")