In [None]:
#將會用到的函式庫import進來
import numpy as np
import pandas as pd
import re
import string
import torch
import torch.nn as nn

from collections import Counter
from nltk.corpus import stopwords
from torch.utils.data import TensorDataset, DataLoader
import matplotlib.pyplot as plt
from nltk.stem import WordNetLemmatizer

In [None]:
#導入資料集
train_path = "train.txt"
test_path = "test.txt"
def getData(path):
    text = []
    label = []
    with open(path) as f:
        for i in f.readlines():
            rows = i.split(";")
            text.append(rows[0])
            label.append(rows[1])
    return text,label
train_text, train_label = getData(train_path)
test_text, test_label = getData(test_path)

df = pd.DataFrame(zip(train_text, train_label),columns=["sentence","label"])
df


In [None]:
#空值檢查
print(df.shape)
print(df.isnull().sum())

In [None]:
#資料前處理

def remove_hyperlinks(text):
    sentence = re.sub(r"http","",text)
    sentence = re.sub(r"www","",sentence)
    return sentence

def remove_punctuation(text):
    return ''.join([word for word in text if word not in string.punctuation])
def reshape(text):
    sentence = re.sub(r"\n","",text)
    return sentence

df['sentence'] = df['sentence'].apply(lambda x: remove_hyperlinks(x.lower()))
df['label'] = df['label'].apply(lambda x: reshape(x))
df.head(10)


In [None]:
#分詞
def tokenize(text):
    return text.split(" ")

df['sentence'] = df['sentence'].apply(lambda x: remove_punctuation(x))
df['sentence'] = df['sentence'].apply(lambda x: tokenize(x))
df.head(10)


In [None]:
#移除停用詞

stopword = stopwords.words('english')
def remove_stopword(text):
    return [word for word in text if word not in stopword]

df['sentence'] = df['sentence'].apply(lambda x: remove_stopword(x))
df.head(10)


In [None]:
#還原詞性
lemmatizer = WordNetLemmatizer()
def lemmatize(text):
    return ' '.join([lemmatizer.lemmatize(word) for word in text])

df['sentence'] = df['sentence'].apply(lambda x: lemmatize(x))
df.head(10)


In [None]:
#取得label列表
category = df["label"].unique().tolist()
category

In [None]:
#取得text列表，並將dataframe中label的值轉為label列表中對應的index
sentence = df['sentence'].values
label = df["label"].apply(lambda x: category.index(x)).values
label[:5]


In [None]:
#取得所有不重複的詞彙
words = [word.lower() for s in sentence for word in s.split(" ")]
various_words = list(set(words))
various_words

In [None]:
#建立字典索引，並交換key和value
int2word = dict(enumerate(various_words))
word2int = {w:int(i) for i,w in int2word.items()}
word2int

In [None]:
#計算每個text中詞彙的個數

sentence_length = [len(s.split()) for s in sentence]
counts = dict(Counter(sentence_length))
counts

In [None]:
#繪製詞彙長度分布圖
plt.figure(figsize=(16,5))
plt.bar(counts.keys(),counts.values())
plt.xlabel("sentence_length")
plt.ylabel("num")
plt.show()


In [None]:
#取得上下界限值的值及它的個數
min_sen = min(counts.items())
max_sen = max(counts.items())
min_sen,max_sen

In [None]:
#取得詞彙長度等於上下限的值之index
min_index = [i for i,length in enumerate(sentence_length) if length==min_sen[0]]
max_index = [i for i,length in enumerate(sentence_length) if length==max_sen[0]]


In [None]:
#刪除最小值和最大值的文本
new_text = np.delete(sentence, min_index)
new_text2 = np.delete(new_text, max_index)

new_text2

In [None]:
#刪除最大和最小值的標籤
new_labels = np.delete(label, min_index)
new_labels = np.delete(new_labels, max_index)

new_labels

In [None]:
#利用建立好的字典將詞彙token轉為數字
text2ints = []
for sentence in new_text2:
    sample = list()
    for word in sentence.split():
        int_value = word2int[word]
        sample.append(int_value)
    text2ints.append(sample)
text2ints[:5]



In [None]:
#將每一個句子padding到同樣的長度，過長的句子截斷
def reset_text(text, seq_len):
    dataset = np.zeros((len(text),seq_len))
    for index,sentence in enumerate(text):
        if len(sentence) < seq_len:
            dataset[index, :len(sentence)] = sentence
        else:
            dataset[index, :] = sentence[:seq_len]
    return dataset
            
dataset = reset_text(text2ints, seq_len=22)
dataset


In [None]:
#把numpy矩陣轉為tensor張量
dataset_tensor = torch.from_numpy(dataset)
label_tensor = torch.from_numpy(new_labels)
print(type(dataset_tensor), type(label_tensor))

In [None]:
#設定訓練和驗證資料集的比例
all_samples = len(dataset_tensor)
train_ratio = 0.8
val_ratio = 0.2

In [None]:
#從原始資料集中根據比例建立訓練資料集和驗證資料集
train = dataset_tensor[:int(train_ratio*all_samples)]
train_labels = label_tensor[:int(train_ratio*all_samples)]

val = dataset_tensor[int(train_ratio*all_samples):]
val_labels = label_tensor[int(train_ratio*all_samples):]




In [None]:
#將文本和標籤打包成一個Dataset，並分別建立Dataloader

train_dataset = TensorDataset(train, train_labels)
val_dataset = TensorDataset(val, val_labels)

batch_size = 128

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=8, pin_memory=True, drop_last=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True, num_workers=8, pin_memory=True, drop_last=True)
for i,j in train_loader:
    print(i,j)


In [None]:
#若cuda能夠使用則使用gpu訓練，否則使用cpu
device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)

In [None]:
#建立模型
class SentimentNet(nn.Module):
    def __init__(self, input_size, embedding_dim, hidden_dim, output_size, num_layers, dropout=0.5):
        super(SentimentNet, self).__init__()
        self.hidden_dim = hidden_dim
        self.output_size = output_size
        self.num_layers = num_layers
        
        self.embedding= nn.Embedding(input_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, dropout=dropout,batch_first=True)
        self.linear = nn.Linear(hidden_dim, 128)
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(128, output_size)
        
        self.dropout = nn.Dropout(p=0.4)

    def forward(self, x, hidden):      
        batch_size = x.size(0)
        x = x.long() 
        embeds = self.embedding(x) # embeds(128,10,200)
        
        
        out,hidden = self.lstm(embeds, hidden) 
        out = self.linear(out[:, -1, :]) # out(128，128)
        out = self.dropout(out)
        out = self.relu(out) 
        out = self.linear2(out) # out(128, 6)
        
        return out, hidden
    
    #初始化隱藏層
    def init_hidden(self, batch_size):
        weight = next(self.parameters())
        # h_0, c_0
        return (weight.new_zeros(self.num_layers, batch_size, self.hidden_dim),
                weight.new_zeros(self.num_layers, batch_size, self.hidden_dim))


In [None]:
#定義參數
input_size = len(word2int)
output_size = len(category)
print(output_size)
embedding_dim = 200
hidden_dim= 128
num_layers= 2   

In [None]:
#建立model
model = SentimentNet(input_size, embedding_dim, hidden_dim, output_size, num_layers)
model.to(device)

In [None]:
#定義loss function、optimizer和scheduler
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.RMSprop(model.parameters(), lr=0.001)
exp_lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)


In [None]:
#建立訓練和驗證function
def train(model, data_loader, criterion, optimizer, scheduler, num_epochs):
    train_process = dict()
    train_loss_epoch10, val_loss_epoch10= [],[]
    val_acc_epoch10 = []
    for epoch in range(num_epochs):
        hs = model.init_hidden(batch_size)
        train_loss = []
        train_correct = 0
        model.train()
        for data, target in data_loader:  
            data = data.to(device)
            target = target.to(device)
            output,hs = model(data, hs)
            preds = torch.argmax(output, dim=1)
            train_correct += torch.sum(preds==target)
            
            hs = tuple([h.data for h in hs])
            loss = criterion(output, target)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            train_loss.append(loss.item())
        print(f"Epoch [{epoch}/{num_epochs-1}]---train loss {np.mean(train_loss):>.5f}")
        
        scheduler.step()
        
        if epoch % 10 == 0:
            validation_loss, validation_acc = validation(model, val_loader, criterion)
            
            train_loss_epoch10.append(np.mean(train_loss))
            val_loss_epoch10.append(validation_loss)
            val_acc_epoch10.append(validation_acc)
    
    train_process["train_loss"] = train_loss_epoch10
    train_process["val_loss"] = val_loss_epoch10
    train_process["val_acc"] = val_acc_epoch10
    return train_process
            
def validation(model, val_loader, criterion):
    model.eval()
    hs = model.init_hidden(batch_size)
    val_loss = []
    val_correct = 0
    with torch.no_grad():
        for data, target in val_loader:
            data = data.to(device)
            target = target.to(device)
            outs,hs = model(data,hs)
            hs = tuple([h.data for h in hs])
        
            loss = criterion(outs, target)
            preds = torch.argmax(outs, dim=1)
            val_loss.append(loss.item())
            val_correct += torch.sum(preds==target)
    print(f"--------------------------------validation loss is: {np.mean(val_loss):>.5f}, acc is: {100*val_correct/len(val_loader.dataset):>.2f}%")
    return np.mean(val_loss), val_correct/len(val_loader.dataset)


In [None]:
#開始訓練
train_process = train(model, train_loader, criterion, optimizer,exp_lr_scheduler, num_epochs=100)

In [None]:
#繪製loss曲線和驗證準確度
plt.figure(figsize=(16,4))
plt.subplot(1,2,1)
plt.title("Loss")
plt.ylabel("loss")
plt.plot(train_process["train_loss"],label="train-loss")
plt.plot(train_process["val_loss"],label="val-loss")
plt.legend()

plt.subplot(1,2,2)
plt.legend(labels=[""])
plt.ylabel("accuracy")
plt.title("Validation Accuracy")
val_acc_cpu = [acc.cpu().numpy() for acc in train_process["val_acc"]]
plt.plot(val_acc_cpu)
plt.show()


In [None]:
#使用測試資料集預測
preds_label = []
def converts(text):
    text = remove_hyperlinks(text)
    new_text = remove_punctuation(text)
    test_text_ints = [word2int[word.lower()]for word in new_text.split() if word in word2int.keys()]
    return test_text_ints
def predict(model):
    correct = 0
    test_text_int = [converts(text) for text in test_text]

    new_test_text_int = reset_text(test_text_int, seq_len=22)
    text_tensor = torch.from_numpy(new_test_text_int)

    batch_size = text_tensor.size(0)
    hs = model.init_hidden(batch_size)
    
    text_tensor = text_tensor.to(device)
    outs, hs = model(text_tensor, hs)
    preds = torch.argmax(outs, dim=1)

    for i in range(len(test_text)):
      print(test_text[i])
      print(" prediction: ", category[int(preds[i])])
      preds_label.append(category[int(preds[i])])
    for i in range(len(preds_label)):
      test_label[i] = reshape(test_label[i])
      if preds_label[i] == test_label[i]:
          correct+=1
    
    print(" test acc: ", (correct / len(preds_label)))



predict(model)


    
    