In [1]:
import json
import torch
import torch.nn as nn
import transformers
from transformers import BertTokenizer, BertModel
import tensorflow as tf
import keras
import numpy as np
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#####
# Global variables
#####

# Check if CUDA can be used to speed up training/reasoning
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# CE
# Load BERT-large tokenizer and BERT-Large model
tokenizer = BertTokenizer.from_pretrained('hfl/chinese-roberta-wwm-ext-large')
bert_model = BertModel.from_pretrained('hfl/chinese-roberta-wwm-ext-large').to(device)  # Make sure the model is on the correct device

# Define BiGRU layer for CE
hidden_size = 128  # For BERT-Large，the hidden_size should be 1024
bigru_layer = nn.GRU(input_size=1024, hidden_size=hidden_size, bidirectional=True, batch_first=True).to(device)

# NE
# Character-to-index mapping
char_to_index = {str(i): i for i in range(10)}
char_to_index['+'] = 10
char_to_index['-'] = 11
char_to_index['e'] = 12
char_to_index['.'] = 13

# Maximum numeric length and character dimension
max_num_length = 13
char_dim = 14

# Initialize BiGRU for NE
input_size_NE = char_dim
hidden_size = 128
bigru_model = nn.GRU(input_size=input_size_NE, hidden_size=hidden_size, bidirectional=True, batch_first=True)

In [3]:
# Define a function to encode text using BERT and BiGRU
def encode_with_ce(texts):
    # Encode the texts
    encoded_input = tokenizer(texts, return_tensors='pt',padding='max_length', truncation=True, max_length=512,
                              add_special_tokens=True)

    # Make sure the input is also on the correct device
    encoded_input = {k: v.to(device) for k, v in encoded_input.items()}

    # Get the embedding using BERT-Large
    with torch.no_grad():
        output = bert_model(**encoded_input)

    # The BERT model outputs a tuple, and we are interested in the first element - the hidden state
    embeddings = output.last_hidden_state

    # Pass the embed to BiGRU
    bigru_output, _ = bigru_layer(embeddings)

    last_output = bigru_output[:, -1, :]

    return last_output

In [8]:
def change_str_number_to_e(number):
        number = number.replace(',', '')
        number = float(number)
        formatted_number = f"{number:e}"
        # print(f'formatted_number: {formatted_number}')
        return formatted_number

In [12]:
print(change_str_number_to_e('10.1'))

1.010000e+01


In [32]:
# Convert answer's number into representations
def encode_and_process_number(number, max_num_length=13, char_dim=14, bigru_model=bigru_model):
    number = change_str_number_to_e(number)
    # Create an all-zero tensor
    encoded = torch.zeros(max_num_length, char_dim)

    # Calculate the left fill amount
    padding_size = max_num_length - len(number)

    # Fill encoding according to character
    for i, char in enumerate(number):
        if char in char_to_index:
            encoded[padding_size + i, char_to_index[char]] = 1

    # Add batch dimension
    encoded = encoded.unsqueeze(0)  # Make the tensor shape [1, max_num_length, char_dim]

    # Input the encoded tensor into BiGRU
    bigru_output, _ = bigru_model(encoded)

    last_output = bigru_output[:, -1, :]

    # Return the output of BiGRU
    return last_output

In [33]:
# Converts the correct answer index to a unique thermal encoding
def one_hot_encode(index, num_classes):
    encoding = [0] * num_classes
    encoding[index] = 1
    return encoding

In [34]:
##########
# 1. Load the data
##########

# Import the training data
with open('/content/drive/My Drive/ENLP/data/NQuAD_train_first_10k.json', 'r', encoding='utf-8') as file:
        data_train = json.load(file)

# Import the testing data
with open('/content/drive/My Drive/ENLP/data/NQuAD_test_first_2k.json', 'r', encoding='utf-8') as file:
        data_test = json.load(file)

In [None]:
data_train[0]

In [None]:
data_train[1]

In [None]:
##########
# 2. Generate question representations for training data
##########

# Prepare training data and one-hot labels lists
question_representations_train = []
one_hot_labels_train = []
batch_time = 0

# Iterate each sample in training data
for sample in data_train[:10000]:
     # CE output list
        ce_output_list = []
     # NE output list
        ne_output_list = []

     # 2.1 CE
        # Process question stem
        stem_result = encode_with_ce(sample['question_stem'])
        ce_output_list.append(stem_result)

        # Process sentences_containing_the_numeral_in_answer_options
        for sentence_list in sample["sentences_containing_the_numeral_in_answer_options"]:

                if len(sentence_list) > 1:
                    combined_sentence = '[SEP]'.join(sentence.strip() for sentence in sentence_list)
                    result = encode_with_ce(combined_sentence)
                    ce_output_list.append(result)
                else:
                    result = encode_with_ce(sentence_list[0].strip())
                    ce_output_list.append(result)

    # 2.2 NE
        numbers = sample["answer_options"]
        for number in numbers:
            result = encode_and_process_number(number)
            ne_output_list.append(result)
        #             print(result.shape)

    # 2.3 Concatenate
        all_output_list = ce_output_list + ne_output_list
        all_numpy_arrays_list = [tensor.detach().cpu().numpy() for tensor in all_output_list]

        # Use tf. Keras. The layers. Concatenate to joining together all these tensor
        concat_layer = tf.keras.layers.Concatenate(axis=1)
        concatenated_tensors = concat_layer(all_numpy_arrays_list)

        question_representations_train.append(concatenated_tensors)

        # 2.5 Convert index of answer into one-hot vector
        correct_answer_index = [sample['ans']]
        # Converts the correct answer index to TensorFlow's uniquely thermal coded tensor
        one_hot_label = tf.one_hot(correct_answer_index, depth=4)
        #         print(one_hot_label)
        #         print(type(one_hot_label))
        one_hot_labels_train.append(one_hot_label)
        batch_time += 1
        print(f"batch time: {batch_time}")

In [39]:
data_file_path = '/content/drive/My Drive/ENLP/train_data_10000.pt'
labels_file_path = '/content/drive/My Drive/ENLP/train_labels_10000.pt'
torch.save(question_representations_train, data_file_path)
torch.save(one_hot_labels_train, labels_file_path)

In [None]:
question_representations_train = torch.load(data_file_path)
one_hot_labels_train = torch.load(labels_file_path)

features = tf.convert_to_tensor(question_representations_train)  # 假设已经是正确的格式
features = tf.squeeze(features, axis=1)
labels = tf.convert_to_tensor(one_hot_labels_train)
labels = tf.reshape(labels, [-1, 4])  # 确保标签是正确的形状

##########
# 3. Make MLP model and put question representations and one-hot label list into MLP model
##########

# Construct a MLP model
mlp = tf.keras.Sequential([
    tf.keras.layers.Dense(512, activation='relu', input_shape=(2304,)),  # 输入层节点数
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.Dense(256, activation='relu'),  # 第二层
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.Dense(4, activation='softmax')  # 输出层，假设有4个类别

])

# Compile model
optimizer = tf.keras.optimizers.Adadelta(learning_rate=0.1, rho=0.95)
mlp.compile(optimizer= optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

# Train model
mlp.fit(features, labels, epochs=30, batch_size=256)

In [None]:
#####/
# 4. Prepare testing data and make predictions
#####

# Prepare training data and one-hot labels lists
question_representations_test = []
one_hot_labels_test = []
batch_test_time = 0

# Iterate each sample in training data
for sample in data_test[:2000]:
     # CE output list
        ce_output_list = []
     # NE output list
        ne_output_list = []

     # 2.1 CE
        # Process question stem
        stem_result = encode_with_ce(sample['question_stem'])
        ce_output_list.append(stem_result)

        # Process sentences_containing_the_numeral_in_answer_options
        for sentence_list in sample["sentences_containing_the_numeral_in_answer_options"]:
                if len(sentence_list) > 1:
                    combined_sentence = '[SEP]'.join(sentence.strip() for sentence in sentence_list)
                    result = encode_with_ce(combined_sentence)
                    ce_output_list.append(result)
                else:
                    result = encode_with_ce(sentence_list[0].strip())
                    ce_output_list.append(result)

    # 2.2 NE
        numbers = sample["answer_options"]
        for number in numbers:
            result = encode_and_process_number(number)
            ne_output_list.append(result)
        #             print(result.shape)

    # 2.3 Concatenate
        all_output_list = ce_output_list + ne_output_list
        all_numpy_arrays_list = [tensor.detach().cpu().numpy() for tensor in all_output_list]

        # Use tf. Keras. The layers. Concatenate to joining together all these tensor
        concat_layer = tf.keras.layers.Concatenate(axis=1)
        concatenated_tensors = concat_layer(all_numpy_arrays_list)

        # Add pooled tensor to question_representations
        question_representations_test.append(concatenated_tensors)

        # 2.5 Convert index of answer into one-hot vector
        correct_answer_index = [sample['ans']]
        # Converts the correct answer index to TensorFlow's uniquely thermal coded tensor
        one_hot_label = tf.one_hot(correct_answer_index, depth=4)
        #         print(one_hot_label)
        #         print(type(one_hot_label))
        one_hot_labels_test.append(one_hot_label)

        batch_test_time += 1
        print(f"batch_test_time: {batch_test_time}")


In [44]:
test_data_file_path = '/content/drive/My Drive/ENLP/test_data_2000.pt'
test_labels_file_path = '/content/drive/My Drive/ENLP/test_labels_2000.pt'
torch.save(question_representations_test, test_data_file_path)
torch.save(one_hot_labels_test, test_labels_file_path)

In [62]:
question_representations_test = torch.load(test_data_file_path)
one_hot_labels_test = torch.load(test_labels_file_path)

features_test = tf.convert_to_tensor(question_representations_test)  # 假设已经是正确的格式
features_test = tf.squeeze(features_test, axis=1)
labels_test = tf.convert_to_tensor(one_hot_labels_test)
labels_test = tf.reshape(labels_test, [-1, 4])  # 确保标签是正确的形状

# Evaluate the model
# Evaluate the model using the test set data question_representations_test and one_hot_labels_test
loss, accuracy = mlp.evaluate(features_test, labels_test)

print(f"Loss: {loss}")
print(f"Accuracy: {accuracy}")

Loss: 1.2825226783752441
Accuracy: 0.414000004529953
