In [None]:
import torch
import torch.nn as nn
import numpy as np
import jieba
from sklearn.model_selection import train_test_split
import pickle
import torch.nn.functional as F
import pandas as pd

In [None]:
!pip install hiddenlayer

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting hiddenlayer
  Downloading hiddenlayer-0.3-py3-none-any.whl (19 kB)
Installing collected packages: hiddenlayer
Successfully installed hiddenlayer-0.3


In [None]:
from google.colab import drive 
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
def text_to_index_array(p_new_dic, tweets_list): 
    '''
    Mapping text data to index matrix
    '''
    new_tweets = []
    for tweet in tweets_list:
        new_tweet = []
        temp = tweet.replace("<user>", "").replace("\n", "").replace("<url>", "").split()
        for word in temp:
            try:
                new_tweet.append(p_new_dic[word]) 
            except:
                new_tweet.append(0)  # Set to 0 if not present in the vocabulary
        new_tweets.append(new_tweet)
    return np.array(new_tweets,dtype=object)   

def text_cut_to_same_long(tweets_list):
    '''
    Cut the data to the same specified length  
    '''
    data_num = len(tweets_list)
    new_ = np.zeros((data_num,maxlen)) 
    se = []
    for i in range(len(tweets_list)):
        new_[i,:] = tweets_list[i,:maxlen]        
    new_ = np.array(new_, dtype=object)
    return new_
    
def creat_wordvec_tensor(embedding_weights,X_T):
    '''
    Map the index matrix into a word vector matrix
    '''
    X_tt = np.zeros((len(X_T),maxlen,vocab_dim))
    num1 = 0
    num2 = 0
    for j in X_T:
        for i in j:
            X_tt[num1,num2,:] = embedding_weights[int(i),:]
            num2 = num2+1
        num1 = num1+1
        num2 = 0
    return X_tt

In [None]:
vocab_dim = 100
maxlen = 20  # Maximum length of text retention
   
embedding_weights = np.load("/content/drive/MyDrive/Colab Notebooks/D2V/Word2VecArray.npy") 
# Set a zero vector for words that do not appear in the vocabulary
embedding_weights = np.r_[np.zeros((1, vocab_dim)),embedding_weights]

f = open("/content/drive/MyDrive/Colab Notebooks/Glove/vocab.pkl", 'rb') 
index_dict = pickle.load(f)    # index dictionary {'word': idx}

# Index each word + 1 because of the zero vector
for key, value in index_dict.items():  
    index_dict[key] = value + 1 

pos_ = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/part_pos.csv")
neg_ = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/part_neg.csv")

pos_ = pos_.sample(frac=0.5, replace=True, random_state=1)
neg_ = neg_.sample(frac=0.5, replace=True, random_state=1)

pos_data = pos_['tweet'].tolist()
neg_data = neg_['tweet'].tolist()
data = neg_data + pos_data

label_list = ([0] * len(neg_data) + [1] * len(pos_data))

In [None]:
####LSTM####
train_x,val_x,train_y,val_y = train_test_split(data, label_list, test_size=0.05)
train_x = text_to_index_array(index_dict, train_x)
val_x = text_to_index_array(index_dict, val_x)
train_y = np.array(train_y) 
val_y = np.array(val_y)

In [None]:
from torch.nn.utils.rnn import pad_sequence

# Cut the data to the same specified length 
train_x = pad_sequence([torch.from_numpy(np.array(x)) for x in train_x],batch_first=True).float() 
val_x = pad_sequence([torch.from_numpy(np.array(x)) for x in val_x],batch_first=True).float()
train_x = text_cut_to_same_long(train_x)
val_x = text_cut_to_same_long(val_x)

# Index to vector
train_x = creat_wordvec_tensor(embedding_weights,train_x)
print("train shape： ", train_x.shape)
val_x = creat_wordvec_tensor(embedding_weights,val_x)
print("val shape： ", val_x.shape)

train shape：  (1076160, 20, 100)
val shape：  (56641, 20, 100)


In [None]:
batch_size = 128

from torch.utils.data import TensorDataset, DataLoader
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_data = TensorDataset(torch.from_numpy(train_x), torch.from_numpy(train_y))
test_data = TensorDataset(torch.from_numpy(val_x), torch.from_numpy(val_y))

train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size)
test_loader = DataLoader(test_data, shuffle=True, batch_size=batch_size)
  

In [None]:
    
class BiLSTM_Attention(nn.Module):
    def __init__(self):

        super(BiLSTM_Attention, self).__init__()
        self.hidden_size = 128
        self.input_size = vocab_dim
        self.layer_size = 3
        self.lstm = nn.LSTM(self.input_size,
                            self.hidden_size,
                            self.layer_size,
                            batch_first=True,
                            bidirectional=True
                            )
        self.out = nn.Linear(self.hidden_size*2, 2)

    def attention_net(self,lstm_output, final_state):
        # lstm_output : [batch_size, n_step, n_hidden * num_directions(=2)], F matrix
        # final_state : [num_layers(=2) * num_directions(=2), batch_size, n_hidden]
        batch_size = len(lstm_output)
        # hidden = final_state.view(batch_size,-1,1)
        hidden = torch.cat((final_state[0],final_state[1]),dim=1).unsqueeze(2)
        # hidden : [batch_size, n_hidden * num_directions(=2), n_layer(=2)]
        attn_weights = torch.bmm(lstm_output, hidden).squeeze(2)
        soft_attn_weights = F.softmax(attn_weights,1)

        # context: [batch_size, n_hidden * num_directions(=2)]
        context = torch.bmm(lstm_output.transpose(1,2),soft_attn_weights.unsqueeze(2)).squeeze(2)

        return context, soft_attn_weights

    def forward(self, input):

        output, (final_hidden_state, final_cell_state) = self.lstm(input)

        attn_output, attention = self.attention_net(output,final_hidden_state)
        return self.out(attn_output),attention # attn_output : [batch_size, num_classes], attention : [batch_size, n_step]

     
class lstm(nn.Module):
    def __init__(self):
        super(lstm, self).__init__()
        self.lstm = nn.LSTM(
            input_size=vocab_dim,
            hidden_size=128,
            num_layers=3,
            batch_first=True)   

    def forward(self, x):
        out, (h_0, c_0) = self.lstm(x)
        out = out[:, -1, :]
        out = self.fc(out)
        out = torch.sigmoid(out)    
        return out, h_0    

In [None]:
####------train---------####
from sklearn.metrics import accuracy_score, classification_report
import hiddenlayer as hl

model = BiLSTM_Attention()
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters())
logStep=75
n_epoch = 11

print ('————————train————————')
history1=hl.History()
canvas1=hl.Canvas()

for epoch in range(n_epoch):
    correct = 0
    total = 0
    epoch_loss = 0
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):        

        data = torch.as_tensor(data, dtype=torch.float32)
        target = target.long()   
        optimizer.zero_grad()
        data,target = data.cuda(),target.cuda()  
        output, h_state = model(data)
        #labels = output.argmax(dim= 1)
        #acc = accuracy_score(target, labels)
        
        correct += int(torch.sum(torch.argmax(output, dim=1) == target))
        total += len(target)
        
        optimizer.zero_grad()
        loss = F.cross_entropy(output, target) 
        epoch_loss += loss.item()
        loss.backward() 
        optimizer.step()

        # niter=epoch*len(train_loader)+batch_idx+1
        # if niter % logStep ==0:
        #     val_x = torch.as_tensor(val_x, dtype=torch.float32)
        #     val_x = val_x.cuda() 
        #     output,_= model(val_x)
        #     pre_lab=torch.argmax(output, dim=1)
        #     test_accuracy=accuracy_score(val_y,pre_lab.cpu())
        #     history1.log(niter,train_loss=loss,test_accuracy=test_accuracy)
        #     with canvas1:
        #         canvas1.draw_plot(history1['train_loss'])
        #         canvas1.draw_plot(history1['test_accuracy'])
    
    loss = epoch_loss / (batch_idx + 1)
    print ('epoch:%s'%epoch, 'accuracy：%.3f%%'%(correct *100 / total), 'loss = %s'%loss)
    

————————train————————
epoch:0 accuracy：78.637% loss = 0.43717319579647906
epoch:1 accuracy：81.982% loss = 0.3850613633466691
epoch:2 accuracy：83.589% loss = 0.35752925055932705
epoch:3 accuracy：84.775% loss = 0.3356568609425474
epoch:4 accuracy：85.713% loss = 0.318131183477014
epoch:5 accuracy：86.838% loss = 0.298056583163939
epoch:6 accuracy：88.103% loss = 0.2747761102341874
epoch:7 accuracy：89.466% loss = 0.24894800868601344
epoch:8 accuracy：90.773% loss = 0.22251136908192476
epoch:9 accuracy：91.914% loss = 0.1976707916588059
epoch:10 accuracy：92.935% loss = 0.17544059872274578


In [None]:
####------validation---------####
print ('————————validation————————')
# model = torch.load('/content/drive/MyDrive/Colab Notebooks/W2V_BiLSTM_attn2.pt')
for epoch in range(1):
    correct = 0
    total = 0
    epoch_loss = 0
    model.train()
    for batch_idx, (data, target) in enumerate(test_loader):        
        #print (data.shape)
       
        data = torch.as_tensor(data, dtype=torch.float32)
        target = target.long()   
        optimizer.zero_grad()
        data,target = data.cuda(),target.cuda() 
        output, h_state = model(data)
        #labels = output.argmax(dim= 1)
        #acc = accuracy_score(target, labels)
        
        correct += int(torch.sum(torch.argmax(output, dim=1) == target))
        total += len(target)
        
        optimizer.zero_grad()
        loss = F.cross_entropy(output, target)
        epoch_loss += loss.item()
        loss.backward() 
        optimizer.step()
    
    loss = epoch_loss / (batch_idx + 1)
    print ('epoch:%s'%epoch, 'accuracy：%.3f%%'%(correct *100 / total), 'loss = %s'%loss)

————————validation————————
epoch:0 accuracy：86.995% loss = 0.31199511697276183


In [None]:
torch.save(model, '/content/drive/MyDrive/Colab Notebooks/W2V_BiLSTM_attn0.pt')