In [None]:
import os
import warnings
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.nn.utils import clip_grad_value_
from torch.utils.data import Dataset, DataLoader, random_split, Subset

warnings.filterwarnings("ignore")

df = pd.read_csv('./data/filtered_plus_minus.csv')
# df = pd.read_csv('./data/filtered_plus_minus_random.csv')
# df = pd.read_csv('./data/filtered_plus_minus_extreme.csv')
# df = pd.read_csv('./data/filtered_plus_minus_out.csv')
# df


Unnamed: 0,src,tgt
0,10+10+10=,30
1,10-10-10=,-10
2,10+10-10=,10
3,(10+10)-10=,10
4,10+(10-10)=,10
...,...,...
511995,(49+49)-49=,49
511996,49+(49-49)=,49
511997,49-49+49=,49
511998,(49-49)+49=,49


In [None]:
class Tokenizer:
    def __init__(
            self,
            expression_string: str="0123456789+-()=",
            special_tokens: dict=None
    ) -> None:
        self.expression_string = expression_string
        self.cal_to_id = {cal: i for i, cal in enumerate(expression_string, start=len(special_tokens))}
        self.cal_to_id.update(special_tokens)
        self.id_to_cal = {i: cal for cal, i in self.cal_to_id.items()}

    def encode(self, expression: str) -> list:
        # add <eos> token to the end of the expression
        return [self.cal_to_id[cal] for cal in expression] + [self.cal_to_id["<eos>"]]

    def decode(self, expression: list) -> str:
        # remove <eos> token from the end of the expression and the <pad> token
        return "".join([self.id_to_cal[i] for i in expression if i >= 2])

In [None]:
# 建立字典
tokenizer = Tokenizer("0123456789+-()=", {"<pad>": 0, "<eos>": 1})
print(tokenizer.cal_to_id)
print(tokenizer.id_to_cal)

{'0': 2, '1': 3, '2': 4, '3': 5, '4': 6, '5': 7, '6': 8, '7': 9, '8': 10, '9': 11, '+': 12, '-': 13, '(': 14, ')': 15, '=': 16, '<pad>': 0, '<eos>': 1}
{2: '0', 3: '1', 4: '2', 5: '3', 6: '4', 7: '5', 8: '6', 9: '7', 10: '8', 11: '9', 12: '+', 13: '-', 14: '(', 15: ')', 16: '=', 0: '<pad>', 1: '<eos>'}


In [None]:
src, _ = df.iloc[0]
encoded_src = tokenizer.encode(src)
decoded_src = tokenizer.decode(encoded_src)
print("Src:", src)
print("Encoded src:", encoded_src)
print("Decoded src:", decoded_src)

Src: 10+10+10=
Encoded src: [3, 2, 12, 3, 2, 12, 3, 2, 16, 1]
Decoded src: 10+10+10=


In [None]:
batch_size = 256
epochs = 40
embed_dim = 256
hidden_dim = 256
lr = 0.0001
grad_clip = 1
max_length = 25
vocab_size = len(tokenizer.cal_to_id)

In [None]:
# 這裏的dataset是Arithmetic text generation的dataset
# 輸入資料是運算式與答案，輸出則輸出答案即可
# 舉個例子，而對於加減法的任務：
# input:  1 2 + 2 2 + 3 3 = 6 8
# output: / / / / / / / / 6 8 <eos>
# /的部分都不用算loss，主要是預測=的後面，這裏的答案是6，所以output是6 <eos>

class Dataset(Dataset):
    def __init__(self, data: pd.DataFrame):
        self.data = data.values.copy()

    def __getitem__(self, idx: int):
        src, tgt = self.data[idx]
        encoded_input_seq = tokenizer.encode(src) + [0]*(max_length-len(src)-1)   # 補0
        encoded_output_seq = [0]*(len(src)-1) + tokenizer.encode(str(tgt)) + [1]  # 加上<eos>
        encoded_output_seq += [0]*(max_length-len(encoded_output_seq))            # 補0
        return torch.Tensor(encoded_input_seq).long(), torch.Tensor(encoded_output_seq).long()

    def __len__(self):
        return len(self.data)

In [None]:
# 建立dataset
train_dataset = Dataset(df)

# Define the lengths of the splits
train_length = int(len(train_dataset) * 0.9)
val_length = len(train_dataset) - train_length

# Split the df
df_val, df_train = random_split(dataset=train_dataset, lengths=[val_length, train_length], generator=torch.Generator().manual_seed(42))

# 建立dataloader
train_dataloader = DataLoader(df_train, batch_size=batch_size, drop_last=True, shuffle=True)
val_dataloader = DataLoader(df_val, batch_size=batch_size, drop_last=True, shuffle=True)

In [None]:
for i in range(len(train_dataset)):
    src, tgt = train_dataset[i]
    print("ECO_SRC: ", src, src.shape)
    print("DEC_SRC:", tokenizer.decode(src.tolist()))
    print("ECO_TGT: ", tgt, tgt.shape)
    print("DEC_TGT:", tokenizer.decode(tgt.tolist()))
    print()
    break

ECO_SRC:  tensor([ 3,  2, 12,  3,  2, 12,  3,  2, 16,  1,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0]) torch.Size([25])
DEC_SRC: 10+10+10=
ECO_TGT:  tensor([0, 0, 0, 0, 0, 0, 0, 0, 5, 2, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0]) torch.Size([25])
DEC_TGT: 30



In [None]:
class CharRNN(torch.nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim):
        super(CharRNN, self).__init__()

        # Embedding層
        self.embedding = torch.nn.Embedding(num_embeddings=vocab_size,
                                            embedding_dim=embed_dim,
                                            padding_idx=tokenizer.cal_to_id['<pad>'])

        # RNN層
        self.rnn_layer1 = torch.nn.LSTM(input_size=embed_dim,
                                        hidden_size=hidden_dim,
                                        batch_first=True)

        self.rnn_layer2 = torch.nn.LSTM(input_size=hidden_dim,
                                        hidden_size=hidden_dim,
                                        batch_first=True)

        # output層
        self.linear = torch.nn.Sequential(torch.nn.Linear(in_features=hidden_dim,
                                                          out_features=hidden_dim),
                                          torch.nn.ReLU(),
                                          torch.nn.Linear(in_features=hidden_dim,
                                                          out_features=vocab_size))

    def forward(self, x):
        x = self.embedding(x)
        x, _ = self.rnn_layer1(x)
        x, _ = self.rnn_layer2(x)
        output = self.linear(x)
        return output

In [None]:
torch.manual_seed(2)
device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")

model = CharRNN(vocab_size, embed_dim, hidden_dim)

In [None]:
# criterion = torch.nn.CrossEntropyLoss(ignore_index=tokenizer.cal_to_id['<pad>'], reduction='mean')
criterion = torch.nn.CrossEntropyLoss(reduction='mean')
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

In [None]:
from tqdm import tqdm

model = model.to(device)
model.train()

for epoch in range(1, epochs+1):
    # 訓練過程
    process_bar = tqdm(train_dataloader, desc=f"Training epoch {epoch}")
    for src, tgt in process_bar:
        optimizer.zero_grad()
        batch_pred_y = model(src.to(device))  # (batch_size, seq_len, vocab_size)
        batch_pred_y = batch_pred_y.permute(0, 2, 1)  # (batch_size, vocab_size, seq_len)
        # batch_pred_y = batch_pred_y.reshape(-1, vocab_size)  # (batch_size*seq_len, vocab_size)
        # batch_y = batch_y.view(-1).to(device)
        loss = criterion(batch_pred_y, tgt.to(device))
        loss.backward()
        torch.nn.utils.clip_grad_value_(model.parameters(), grad_clip)
        optimizer.step()

        # 更新進度條
        process_bar.set_postfix(loss=loss.item())

    model.eval()  # 切換到評估模式


    validation_process_bar = tqdm(val_dataloader, desc=f"Validation epoch {epoch}")
    with torch.no_grad():
        for src, tgt in validation_process_bar:
            batch_pred_y = model(src.to(device))
            batch_pred_y = batch_pred_y.permute(0, 2, 1)  # (batch_size, vocab_size, seq_len)
            val_loss = criterion(batch_pred_y, tgt.to(device))

            validation_process_bar.set_postfix(val_loss=val_loss.item())

    model.train()


Training epoch 1: 100%|██████████| 1800/1800 [00:43<00:00, 41.66it/s, loss=0.145]
Validation epoch 1: 100%|██████████| 200/200 [00:04<00:00, 49.16it/s, val_loss=0.146]
Training epoch 2: 100%|██████████| 1800/1800 [00:42<00:00, 42.00it/s, loss=0.104]
Validation epoch 2: 100%|██████████| 200/200 [00:03<00:00, 60.25it/s, val_loss=0.104]
Training epoch 3: 100%|██████████| 1800/1800 [00:43<00:00, 41.49it/s, loss=0.091]
Validation epoch 3: 100%|██████████| 200/200 [00:03<00:00, 59.56it/s, val_loss=0.0896]
Training epoch 4: 100%|██████████| 1800/1800 [00:42<00:00, 41.90it/s, loss=0.0678]
Validation epoch 4: 100%|██████████| 200/200 [00:03<00:00, 52.23it/s, val_loss=0.0661]
Training epoch 5: 100%|██████████| 1800/1800 [00:43<00:00, 41.67it/s, loss=0.0454]
Validation epoch 5: 100%|██████████| 200/200 [00:03<00:00, 58.07it/s, val_loss=0.0509]
Training epoch 6: 100%|██████████| 1800/1800 [00:43<00:00, 41.34it/s, loss=0.0325]
Validation epoch 6: 100%|██████████| 200/200 [00:03<00:00, 62.07it/s, va

In [None]:
def generator(model, expression):
    model.eval()
    with torch.no_grad():
        encoded_input = torch.tensor(tokenizer.encode(expression)).to(device)
        pred = model(encoded_input)
        pred_list = [pred[i].topk(1)[-1].item() for i in range(pred.shape[0])]
        return tokenizer.decode(pred_list)

exp_list = df["src"].tolist()[:51200]
exp_ans_list = df["tgt"].tolist()[:51200]

correct = 0
for i in range(51200):
    exp, ans = exp_list[i], exp_ans_list[i]
    pred = generator(model, exp)
    result = "Correct" if str(ans) == str(pred) else "Wrong"
    if result == "Correct":
        correct += 1
    # print(f"{i:2d} | Expression: {exp:20} | Ans: {ans:10} | Pred: {pred:10} | Result: {result}")

accuracy = correct/len(exp_list) * 100
print("Evaluating(Accuracy): {:.2f}%".format(accuracy))

Evaluating(Accuracy): 67.49%
