#### Part **A**: Modules

In [2]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
from torch.nn.utils.rnn import pad_sequence
from gensim.models import keyedvectors
import os
import pandas as pd
import numpy as np
import re
import string
import jieba
import emoji
punctuation = str.maketrans({i:'' for i in string.punctuation}) 

file = "vector/tencent-ailab-embedding-zh-d200-v0.2.0-s.txt"
WORD2VEC = keyedvectors.load_word2vec_format(file, binary=False)
DIM = 200

RATIO = [0.7, 0.3]
DEVICE = 'cuda:0' if torch.cuda.is_available() else 'cpu'
BATCH = 64
CLASS = 4

  from .autonotebook import tqdm as notebook_tqdm


#### Part **B**: Loading data-set

In [31]:
class_count=[0, 0, 0, 0]

def data_clearning(token):
    '''The function performs data cleaning by removing URLs, HTML tags, emojis, and punctuation from a
    given token.
    
    Parameters
    ----------
    token
        The input text that needs to be cleaned of URLs, HTML tags, emojis, and punctuation.
    
    Returns
    -------
        the cleaned version of the input token, with URLs, HTML tags, emojis, and punctuation removed.
    
    '''
    url = re.compile(r'https?://\S+|www\.\S+') #Remove URL
    token = url.sub(r'', token)
    html = re.compile(r'<.*?>') #Remove HTML
    token = html.sub(r'', token)
    token = emoji.demojize(token) #Remove Emoji
    space = re.compile(r"\s+")
    token = space.sub(r'', token)
    token = token.translate(punctuation) #Remove English punctuation
    #cn_char = re.compile(r"[^0-9\u4e00-\u9fa5]")
    #token = cn_char.sub(r'', token)
    token = token.replace("\ufeff","") #Remove \ufeff
    return token
    
def read_csv(filedir="data", mode="train", filename="train.csv"):
    """Read dataset from csv

    Parameters
    ----------
    filedir : str, optional
        the folder of the dataset, by default "data".
    mode : bool, optional
        whether to load training dataset, by default 'True'.

    Returns
    -------
    df['text'].to_list(): list()
        a list of multiple sentences
    related_list: list()
        a list of the target for training set, and the id for the testing set

    """
    global class_count
    file = os.path.join(filedir, filename)
    label_list, token_vector_list = [], []
    if (mode == "train"):
        df = pd.read_csv(file, sep=",", encoding="utf-8")
        for idx, row_data in df.iterrows():
            label = int(row_data["label"])
            review = row_data["review"] 
            token_vector = extract_tokens(review)
            if type(token_vector) != np.ndarray: #omit this sample
                continue
            class_count[label] += 1 #Count number of each class
            label_list.append(label)
            token_vector_list.append(token_vector)
        
    else: #Test
        with open(file, "r", encoding="utf-8") as f:
            for lines in f:
                token_vector = extract_tokens(lines)
                if type(token_vector) != np.ndarray: #omit this sample
                    continue
                token_vector_list.append(token_vector)
                label_list.append(lines) #Store Raw message

    return token_vector_list, label_list            


def extract_tokens(sentence):
    sentence = data_clearning(sentence) 
    vector_list = []
    cn_char = re.compile(r"[0-9\u4e00-\u9fa5]")
    for word in jieba.cut(sentence):
        if not re.search(cn_char, word):
            continue
        try:
            word_vector = WORD2VEC[word]
        except KeyError: #Not in pre-trained context word
            continue
            #word_vector = np.zeros([DIM], dtype=np.float32)
        vector_list.append(np.expand_dims(word_vector, 0))
    if len(vector_list):
        vectors = np.concatenate(vector_list, 0)
    else:
        vectors = None #Omit this sample
    return vectors


class TweetDataset(Dataset):
    def __init__(self, filedir="data", mode="train", filename="train.csv"):
        super().__init__()
        self.targets, self.content = None, None
        if (mode=="train"): #For training dataset, we focus on texts and targers
            self.texts, self.targets = read_csv(filedir, mode, filename)
        else: #For testing dataset, we focus on texts and its content
            self.texts, self.content  = read_csv(filedir, mode, filename)
        self.mode = mode
    
    def __len__(self):
        return len(self.texts)

    def __getitem__(self, index):
        x = torch.tensor(self.texts[index])
        label = torch.tensor(self.targets[index]) if self.mode=="train" else None
        return x, label
    
def get_dataloader(file_dir="data"):
    '''This function returns three data loaders for training, validation, and testing datasets.

    Parameters
    ----------
    file_dir, optional
        The directory where the data files are stored.
    
    Returns
    -------
        The function returns three dataloaders: `train_dataloader`, `val_dataloader`, and`test_dataloader`.
    These dataloaders are used to load batches of data during training, validation,and testing of a 
    machine learning model.
    
    '''
    def collate_fn(batch):
        x, y = zip(*batch)
        x_pad = pad_sequence(x, batch_first=True) #forces the shape of X's to be matched
        if y[0] != None:
            y = torch.tensor(y, dtype=int) 
        return x_pad, y
    
    train_val_dataset = TweetDataset(file_dir)
    train_dataset, val_dataset = random_split(train_val_dataset, RATIO) #Split the dataset with ration 0.7
    train_dataloader = DataLoader(train_dataset, 
                                  batch_size = BATCH,
                                  shuffle = True,
                                  collate_fn = collate_fn)
    val_dataloader = DataLoader(val_dataset, 
                                batch_size = BATCH,
                                shuffle = True,
                                collate_fn = collate_fn)
    test_dataloader = DataLoader(TweetDataset(mode="test", filename="test.txt"), 
                                 batch_size = BATCH,
                                 shuffle = False,
                                 collate_fn = collate_fn)
    
    return train_dataloader, val_dataloader, test_dataloader


train_dataloader, val_dataloader, test_dataloader = get_dataloader()

#### Part **C**: Initialize the model

In [9]:
class RNN(torch.nn.Module):
    def __init__(self, hidden_units=128, dropout_rate=0.5):
        super().__init__()
        self.drop = nn.Dropout(dropout_rate)
        self.rnn = nn.GRU(DIM, hidden_units, 1, batch_first=True)
        self.linear = nn.Linear(hidden_units, CLASS)
        #self.softmax = nn.Softmax(dim=1)
        for name, param in self.rnn.named_parameters():
            if name.startswith("weight"):
                nn.init.xavier_normal_(param)
            else:
                nn.init.zeros_(param)
        nn.init.orthogonal_(self.linear.weight)


    def forward(self, x: torch.Tensor):
        # x shape: [batch, max_word_length, embedding_length]
        emb = self.drop(x)
        output, _ = self.rnn(emb)
        output = output[:, -1] #Only cares the output of the last state
        output = self.linear(output)
        #output = self.softmax(output)
        return output
    
model = RNN().to(DEVICE) 

#### Part **D**: Training

In [8]:
train_loss, val_loss = [], []
total_epoch = 50
#WEIGHTS = torch.tensor([(1/x) for x in class_count]).to(DEVICE)

def train_val():
    '''This function contains the training and validation process to train the model.
    
    '''
    global train_loss, val_loss, total_epoch
    optimizer = torch.optim.Adam(model.parameters(), lr=4e-3, weight_decay=1e-4)
    citerion = torch.nn.CrossEntropyLoss()
    scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, 0.8, -1)
    best_epoch = 0         #Which epoch has the best performance
    best_score = 0         #Which epoch has the best performance

    for epoch in range(100):
        loss_sum = 0
        train_len = len(train_dataloader.dataset)
        val_len = len(val_dataloader.dataset)
        if (epoch - best_epoch > 10):
            total_epoch = epoch
            print('best epoch %d: %.2f%%'%(best_epoch, best_score*100))
            return
        
        ### Training
        model.train()
        for x, y in train_dataloader:
            batchsize = y.shape[0]
            x = x.to(DEVICE) 
            y = y.to(DEVICE) 
            hat_y = model(x)
            #hat_y = hat_y.squeeze(-1)
            loss = citerion(hat_y, y)

            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5) #Aviod gradient loss
            optimizer.step()

            loss_sum += loss.item() * batchsize
        
        scheduler.step()
        train_loss.append(loss_sum/train_len)
        print(f'Epoch {epoch}:\n[Training] loss: {loss_sum / train_len}')
        

        ### Validation
        accuracy = 0
        loss_sum = 0
        model.eval()
        with torch.no_grad(): #No need to back-propagated during validating
            for x, y in val_dataloader:
                batchsize = y.shape[0]
                x = x.to(DEVICE) 
                y = y.to(DEVICE) 
                hat_y = model(x)
                #hat_y = hat_y.squeeze(-1)
                predictions = hat_y.argmax(dim=1)
                score = torch.sum(torch.where(predictions == y, 1, 0)) #Accuracy score of current batch
                accuracy += score.item()
                loss = citerion(hat_y, y)
                loss_sum += loss.item() * batchsize

            #Loss    
            val_loss.append(loss_sum/val_len)
            #Accuracy
            accuracy /= len(val_dataloader.dataset)
            print('Validation accuracy: %.2f%%.'%(accuracy*100))
            if accuracy > best_score: #A better model is found, update and save
                best_score = accuracy
                best_epoch = epoch
                torch.save(model.state_dict(), 'rnn.pth') 
                print("SAVED!!!")
                #print('Best validation accuracy updated to: %.2f%%. Saved.'%(best_score*100))

    print('best epoch %d: %.2f%%'%(best_epoch, best_score*100))


    
#if __name__ == "__main__":
#    train_val()
#    #test()
#    print("Loss %.4f"%val_loss[-1])
train_val()

Epoch 0:
[Training] loss: 1.1776251361353685
Validation accuracy: 56.52%.
SAVED!!!
Epoch 1:
[Training] loss: 1.0830041780433404
Validation accuracy: 58.39%.
SAVED!!!
Epoch 2:
[Training] loss: 1.0547484328138108
Validation accuracy: 58.99%.
SAVED!!!
Epoch 3:
[Training] loss: 1.0476120120732328
Validation accuracy: 59.30%.
SAVED!!!
Epoch 4:
[Training] loss: 1.0433031251902694
Validation accuracy: 59.43%.
SAVED!!!
Epoch 5:
[Training] loss: 1.039715209911414
Validation accuracy: 59.36%.
Epoch 6:
[Training] loss: 1.037203197174186
Validation accuracy: 59.64%.
SAVED!!!
Epoch 7:
[Training] loss: 1.0348975002298917
Validation accuracy: 59.77%.
SAVED!!!
Epoch 8:
[Training] loss: 1.0320522794540443
Validation accuracy: 59.88%.
SAVED!!!
Epoch 9:
[Training] loss: 1.0302372755814009
Validation accuracy: 59.80%.
Epoch 10:
[Training] loss: 1.0284420857529881
Validation accuracy: 59.83%.
Epoch 11:
[Training] loss: 1.0270170751977155
Validation accuracy: 59.92%.
SAVED!!!
Epoch 12:
[Training] loss: 1.02

#### Part **E**: Testing

In [32]:
def test():
    '''This function contains the training and validation process to train the model.

    '''
    global df
    def save_csv(content_list, target_list,dir="Data"):
        global df
        dic = {'target':target_list, 'text':content_list}
        df = pd.DataFrame(dic)
        path = os.path.join(dir,'result.xlsx')
        df.to_excel(path,encoding='utf-8')

    model.load_state_dict(torch.load('rnn.pth'))
    model.eval()
    results = []
    with torch.no_grad(): #No need to back-propagated during validating
        for x, _ in test_dataloader: #No y is needed in testing
            x = x.to(DEVICE) 
            hat_y = model(x)
            #hat_y = hat_y.squeeze(-1)
            hat_y = torch.argmax(hat_y, dim=1) #For probablity>0.5, consider it as the positve label
            results.append(hat_y.detach().cpu().numpy())
    target_list = np.concatenate(results).tolist()
    content_list = test_dataloader.dataset.content #Output the id when reading 'content.csv'
    save_csv(content_list, target_list)
    
test()

  return func(*args, **kwargs)


#### Part **F**: Results demo

In [34]:
df[95:105]

Unnamed: 0,target,text
95,0,1学分，小问题\n
96,0,纯抄的才会0分吧？\n
97,2,我这次也是0前两次满分，吐血了\n
98,0,1. 有没有把代码发给别人过 2. 有没有抄别人的代码\n
99,1,好难吃 思廷的水果捞[愤怒][愤怒]\n
100,0,感觉西瓜很好吃，切的是条状[尖叫]\n
101,0,蹲蹲大三报了校外导师项目但不打算去5.20见面晚宴的uu[尖叫]\n
102,0,收晚宴门票价格你开\n
103,0,想问一下gfn各个老师评价！感谢 暑课想上\n
104,0,直接冲zhanghonghui 人美心善的可爱大姐姐 超级随和\n
