In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

In [2]:
import nltk
from nltk.tokenize import word_tokenize
from bs4 import BeautifulSoup
import re
# nltk.download('wordnet')

In [3]:
import pandas as pd

In [4]:
def make_input(x):
    input_batch = []
    for line in x:
      word = line.split(' ')
      input = [word_dict[n] for n in word if n != '']
      input_batch.append(np.eye(n_class)[input])
    return input_batch

In [5]:
def make_target(y):
    target = []
    n_class = len(y.unique())
    for t in y:
        target.append(np.eye(n_class)[t])
    return target

In [6]:
def remove_stopwords(text):
    stop_words = set(nltk.corpus.stopwords.words('english'))
    tokens = word_tokenize(text)
    filtered_tokens = [word for word in tokens if word.lower() not in stop_words]
    filtered_text = ' '.join(filtered_tokens)
    return filtered_text
    
def Clean(corpus):
    cleaned = []
    for line in corpus:
        clean_line = BeautifulSoup(line).get_text() # 去除HTML标签
        clean_line = re.sub("[^a-zA-Z]"," ", clean_line) # 去除除字母外（数字、符号
        # clean_line = remove_stopwords(clean_line)
        words = word_tokenize(clean_line.lower()) # 分词
        # lemma_words = [lemmatizer.lemmatize(w) for w in words] # 统一时态
        
        cleaned.append(words)
    return cleaned

In [7]:
class TextRNN(nn.Module):
    def __init__(self, n_class, n_hidden, output_size):
        super(TextRNN, self).__init__()
        self.rnn = nn.RNN(input_size=n_class, hidden_size=n_hidden)
        self.W = nn.Linear(n_hidden, output_size, bias=False)
        self.b = nn.Parameter(torch.ones([output_size]))
        self.softmax_function = nn.Softmax(dim = -1)

    def forward(self, hidden, X):
        X = X.transpose(0, 1) # X : [n_step, batch_size, n_class]
        outputs, hidden = self.rnn(X, hidden)
        # outputs : [n_step, batch_size, num_directions(=1) * n_hidden]
        # hidden : [num_layers(=1) * num_directions(=1), batch_size, n_hidden]
        
        outputs = outputs[-1] # nstep * n_class  
        out = self.W(outputs) + self.b # model : [batch_size, n_class]
        out = self.softmax_function(out)
        return out

# data

In [8]:
df = pd.read_csv('/Users/helen/Documents/nlp/代码练习/sentiment-analysis-on-movie-reviews/train.tsv',sep='\t')
df_dedup = df.drop_duplicates(subset='SentenceId',keep='first').reset_index(drop = True).head(200)
df_train = df.drop(['PhraseId', 'SentenceId'], axis = 1).reset_index(drop = True)

In [9]:
corpus = []
corpus.extend(line for line in df_train['Phrase'])
cleaned_corpus = Clean(corpus)
cleaned_lines = [' '.join(words) for words in cleaned_corpus]
df_train['Phrase_clean'] = cleaned_lines

  clean_line = BeautifulSoup(line).get_text() # 去除HTML标签


In [10]:
vocab = [word for words in cleaned_corpus for word in words]
vocab = set(vocab)

In [11]:
word_dict = {word:ix for ix, word in enumerate(vocab)}

# 由于RNN的预测对象并不是word，所以此处不用number to word dict
# number_dict = {ix:word for ix, word in enumerate(vocab)}

n_class = len(word_dict)

In [12]:
x = df_train['Phrase_clean']
y = df_train['Sentiment']

# run

imputs, target

In [1]:
inputs = make_input(x)
targets = make_target(y)
# target
targets = torch.FloatTensor(targets)

In [None]:
# inputs
l = max(len(line) for line in inputs)

for i in range(len(inputs)):
    if len(inputs[i])!=0:
        zeros = np.zeros((l,n_class))
        zeros[-inputs[i].shape[0]:, :] = inputs[i]
        inputs[i] = zeros
    if len(inputs[i])==0:
        inputs[i] = zeros

inputs = torch.FloatTensor(inputs)

In [None]:
n_step =  inputs.shape[1] # number of cells(= number of Step)
n_hidden = 128 # number of hidden units in one cell
output_size = targets.shape[1]

In [None]:
model = TextRNN(n_class, n_hidden, output_size)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

先尝试一行数据

In [None]:
x1 = inputs[1].transpose(0,1)
y1 = targets[1]
print(x1.shape, y1.shape)

In [None]:
hidden = torch.zeros(1, n_hidden)

In [None]:
o = model(hidden, x1)
o

In [None]:
y1

尝试一个batch=2

In [None]:
x1 = inputs[0:2]
y1 = targets[0:2]
print(x1.shape, y1.shape)

In [None]:
hidden = torch.zeros(1, 2, n_hidden)
o = model(hidden, x1)

In [None]:
o

批量处理

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
# train_validation
input, vali_input, target, vali_target = train_test_split(inputs, targets, test_size=0.2, random_state=42)

In [None]:
# batch_size = 420
# # 使用索引切片按批次分割张量
# batch_x = [inputs[i:i+batch_size] for i in range(0, len(inputs), batch_size)]
# batch_y = [target[i:i+batch_size] for i in range(0, len(target), batch_size)]

# xb = batch_x[0]
# yb = batch_y[0]

# xb.shape

# hidden = torch.zeros(1, batch_size, n_hidden)

# output = model(hidden, inputs)

In [None]:
batch_size=len(input)

# Training
for epoch in range(100):
    # hidden : [num_layers * num_directions, batch, hidden_size]
    hidden = torch.zeros(1, batch_size, n_hidden)
        
    # input_batch : [batch_size, n_step, n_class]
    output = model(hidden, input)

    # output : [batch_size, n_class], target_batch : [batch_size] (LongTensor, not one-hot)
    loss = criterion(output, target)
        
    # 反向传播和优化
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

In [None]:
# 设置模型为评估模式
model.eval()

# batch
batch_size=len(vali_input)
hidden = torch.zeros(1, batch_size, n_hidden)
# validation

In [None]:
vali_input.shape

In [None]:
# 禁用梯度计算
with torch.no_grad():
    correct = 0
    total = vali_target.size(0)
    # 使用模型进行预测
    outputs = model(hidden,vali_input)
        
    # 获取预测结果中最大值所对应的索引
    predicted_labels = torch.argmax(outputs, dim=1)
        
    # 统计预测正确的样本数
    for i in range(len(predicted_labels)):
        correct += (predicted_labels[i] == vali_target[i]).sum().item()
    
    # 计算预测准确率
    accuracy = correct / total

print("Accuracy:", accuracy)

In [None]:
p

## predict

data

In [None]:
test = pd.read_csv('/Users/helen/Documents/nlp/代码练习/sentiment-analysis-on-movie-reviews/test.tsv',sep='\t')
test_dedup = test.drop_duplicates(subset='SentenceId',keep='first').reset_index(drop = True).head(10)
test_dedup.info()

In [None]:
corpus_test = []
corpus_test.extend(line for line in test_dedup['Phrase'])
cleaned_corpus_test = Clean(corpus_test)
cleaned_lines_test = [' '.join(words) for words in cleaned_corpus_test]
test_dedup['Phrase_clean'] = cleaned_lines_test

In [None]:
x_test = test_dedup['Phrase_clean']

inputs_test = []
for line in x_test:
    word = line.split(' ')
    input = []
    for n in word:
        if n in word_dict.keys():
            input.append(word_dict[n])
    inputs_test.append(np.eye(n_class)[input])
    

In [None]:
l = max(len(line) for line in inputs_test)

for i in range(len(inputs_test)):
    if len(inputs_test[i])!=0:
        zeros = np.zeros((l,n_class))
        zeros[-inputs_test[i].shape[0]:, :] = inputs_test[i]
        inputs_test[i] = zeros
    if len(inputs_test[i])==0:
        inputs_test[i] = zeros

In [None]:
inputs_test = torch.FloatTensor(inputs_test)

predict

In [None]:
batch_size

In [None]:
batch_size = len(inputs_test)
hidden = torch.zeros(1, batch_size, n_hidden)

0 - negative

1 - somewhat negative

2 - neutral

3 - somewhat positive

4 - positive

In [None]:
predict = model(hidden, inputs_test).data

In [None]:
# Predict
pred_label = [np.argmax(pred).item() for pred in predict.data]
pred_label

In [None]:
for i in range(len(pred_label)):
    print(x_test[i],'\n')
    print(pred_label[i],'\n')