In [1]:
import torch
from torchtext import data
from torchtext import datasets
import random

from torch.autograd import Variable

SEED = 1234

torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)

TEXT = data.Field(tokenize='spacy')
LABEL = data.LabelField(tensor_type=torch.FloatTensor)

train, test = datasets.IMDB.splits(TEXT, LABEL)

train, valid = train.split(random_state=random.seed(SEED))

In [2]:
TEXT.build_vocab(train, max_size=1000, vectors="glove.6B.100d")
LABEL.build_vocab(train)

In [3]:
BATCH_SIZE = 64

train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
    (train, valid, test), 
    batch_size=BATCH_SIZE, 
    sort_key=lambda x: len(x.text), 
    repeat=False)

In [4]:
import torch.nn as nn

class RNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout):
        super().__init__()
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        self.embedding = nn.Embedding(vocab_size, vocab_size)
        self.embedding.weight.requires_grad=False # embedding 为 onthot encoding 不改变梯度
        self.embedding.weight.data = torch.eye(vocab_size)
        self.rnn = nn.LSTM(vocab_size, hidden_dim, num_layers=n_layers, bidirectional=bidirectional, dropout=dropout)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
    
    def init_hidden(self, batch_size):
        hidden_p = Variable(torch.zeros(self.n_layers * 1, batch_size, self.hidden_dim)).cuda()
        hidden_h = Variable(torch.zeros(self.n_layers * 1, batch_size, self.hidden_dim)).cuda()
        return (hidden_p, hidden_h)
    
    def forward(self, x, hidden):
        
        #x = [sent len, batch size]
        
        embedded = self.dropout(self.embedding(x))
        
        #embedded = [sent len, batch size, emb dim]
        output, hidden = self.rnn(embedded, hidden)
        
        #output = [sent len, batch size, hid dim * num directions]
        #hidden = [num layers * num directions, batch size, hid. dim]
        #cell = [num layers * num directions, batch size, hid. dim]
        
#         hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1))

        #hidden [batch size, hid. dim * num directions]
            
        return self.fc(self.dropout(hidden[0]).squeeze(0)), hidden

In [5]:
INPUT_DIM = len(TEXT.vocab)
EMBEDDING_DIM = 100
HIDDEN_DIM = 64
OUTPUT_DIM = 1
N_LAYERS = 1
BIDIRECTIONAL = False
DROPOUT = 0.5

model = RNN(INPUT_DIM, EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM, N_LAYERS, BIDIRECTIONAL, DROPOUT)

  "num_layers={}".format(dropout, num_layers))


In [6]:
pretrained_embeddings = TEXT.vocab.vectors

print(pretrained_embeddings.shape)

torch.Size([1002, 100])


In [7]:
# model.embedding.weight.data.copy_(pretrained_embeddings)

In [8]:
import torch.optim as optim

optimizer = optim.Adam(filter(lambda p: p.requires_grad,model.parameters()))

In [9]:
criterion = nn.BCEWithLogitsLoss()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = model.to(device)
criterion = criterion.to(device)

In [10]:
import torch.nn.functional as F

def binary_accuracy(preds, y):
    """
    Returns accuracy per batch, i.e. if you get 8/10 right, this returns 0.8, NOT 8
    """

    #round predictions to the closest integer
    rounded_preds = torch.round(F.sigmoid(preds))
    correct = (rounded_preds == y).float() #convert into float for division 
    acc = correct.sum()/len(correct)
    return acc

In [11]:
def train(model, iterator, optimizer, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.train()
    
    for batch in iterator:
        
        optimizer.zero_grad()
        
        hidden = model.init_hidden(len(batch))
#         inp = batch.text % 1002    
#         inp_ = torch.unsqueeze(inp, 2)

#         one_hot = torch.FloatTensor(batch.text.size(0), batch.text.size(1),1002).zero_().cuda()
#         one_hot.scatter_(2, inp_, 1)
        predictions, hidden = model(batch.text, hidden)
        
        loss = criterion(predictions.squeeze(1), batch.label)
        
        acc = binary_accuracy(predictions.squeeze(1), batch.label)
        
        loss.backward()
        
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [12]:
def evaluate(model, iterator, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.eval()
    
    with torch.no_grad():
    
        for batch in iterator:
        
            hidden = model.init_hidden(len(batch))
            
#             inp = batch.text % 1002    
#             inp_ = torch.unsqueeze(inp, 2)

#             one_hot = torch.FloatTensor(batch.text.size(0), batch.text.size(1), 1002).zero_().cuda()
#             one_hot.scatter_(2, inp_, 1)
            predictions, hidden = model(batch.text, hidden)

            loss = criterion(predictions.squeeze(1), batch.label)
            
            acc = binary_accuracy(predictions.squeeze(1), batch.label)

            epoch_loss += loss.item()
            epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [13]:
N_EPOCHS = 5

for epoch in range(N_EPOCHS):

    train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
    valid_loss, valid_acc = evaluate(model, valid_iterator, criterion)
    
    print(f'Epoch: {epoch+1:02}, Train Loss: {train_loss:.3f}, Train Acc: {train_acc*100:.2f}%, Val. Loss: {valid_loss:.3f}, Val. Acc: {valid_acc*100:.2f}%')

  return Variable(arr, volatile=not train)


Epoch: 01, Train Loss: 0.694, Train Acc: 49.78%, Val. Loss: 0.693, Val. Acc: 49.19%
Epoch: 02, Train Loss: 0.694, Train Acc: 49.77%, Val. Loss: 0.693, Val. Acc: 49.63%
Epoch: 03, Train Loss: 0.693, Train Acc: 50.34%, Val. Loss: 0.693, Val. Acc: 51.46%
Epoch: 04, Train Loss: 0.693, Train Acc: 50.41%, Val. Loss: 0.693, Val. Acc: 50.51%
Epoch: 05, Train Loss: 0.693, Train Acc: 50.81%, Val. Loss: 0.693, Val. Acc: 50.99%


In [14]:
test_loss, test_acc = evaluate(model, test_iterator, criterion)

print(f'Test Loss: {test_loss:.3f}, Test Acc: {test_acc*100:.2f}%')

  return Variable(arr, volatile=not train)


Test Loss: 0.690, Test Acc: 50.28%


In [17]:
import spacy
nlp = spacy.load('en')
sentence = 'I hate the movie though the plot is interesting.'
# sentence = 'hate hate hate hate hate hate hate'
tokenized = [tok.text for tok in nlp.tokenizer(sentence)]
indexed = [TEXT.vocab.stoi[t] for t in tokenized]
tensor = torch.LongTensor(indexed).to(device)
tensor = tensor.unsqueeze(1)
hidden = model.init_hidden(1)
prediction, hidden = model(tensor, hidden)
prediction = F.sigmoid(prediction)

In [19]:
import pandas as pd
import numpy as np
hidden = model.init_hidden(1)
test_len = len(tokenized)
prime_input = torch.autograd.Variable(tensor[0]).cuda()
out, hidden = model(prime_input.unsqueeze(0), hidden)
hidden_matrix = np.copy(F.avg_pool1d(hidden[0], 1).data.cpu().numpy())
hidden_matrix = hidden_matrix.reshape((1,hidden_matrix.size))
inp = prime_input
for p in range(1,test_len + 1):
    output, hidden = model(inp.unsqueeze(0), hidden)
    hidden_matrix = np.vstack((hidden_matrix, F.avg_pool1d(hidden[0], 1).squeeze(dim=1).data.cpu().numpy()))
    if p < (test_len):        
        print('p', tensor[p])
        inp = torch.autograd.Variable(tensor[p]).cuda()
hidden_matrix = np.delete(hidden_matrix, 0, 0)
df = pd.DataFrame(hidden_matrix)

p tensor([ 849], device='cuda:0')
p tensor([ 2], device='cuda:0')
p tensor([ 21], device='cuda:0')
p tensor([ 183], device='cuda:0')
p tensor([ 2], device='cuda:0')
p tensor([ 128], device='cuda:0')
p tensor([ 9], device='cuda:0')
p tensor([ 247], device='cuda:0')
p tensor([ 4], device='cuda:0')


In [20]:
from math import pi
from bokeh.io import show
from bokeh.models import ColumnDataSource, HoverTool, LinearColorMapper, BasicTicker, PrintfTickFormatter,ColorBar
from bokeh.models import FuncTickFormatter
from bokeh.plotting import figure
from bokeh.io import output_notebook
from bokeh.models import LogColorMapper, LogTicker, ColorBar
output_notebook()
df = df.T
df.index.name = 'cell'
df.columns.name = 'chars'
index = {i:tokenized[i] for i in range(len(df.columns))}

seq = [str(i) for i in df.columns]
cell = list([str(x) for x in df.index])

df = pd.DataFrame(df.stack(), columns=['value']).reset_index()
colors = ["#313695", "#4575b4", "#74add1", "#abd9e9", "#e0f3f8", "#ffffbf", "#fee090", "#fdae61", "#f46d43", "#d73027", "#a50026"]

colors.reverse()
mapper = LinearColorMapper(palette=colors, low=-1, high=1)#low=df.value.min(), high=df.value.max())
source = ColumnDataSource(df)
TOOLS = "hover,pan,reset,save,wheel_zoom"

color_bar = ColorBar(color_mapper=mapper, ticker=BasicTicker(),
                     label_standoff=12, border_line_color=None, location=(0,0))

p = figure(title="LSTM Hidden State Activations",  x_range=cell, y_range=list(reversed(seq)), x_axis_location="above",
            plot_width=600, plot_height=300,
            tools=TOOLS, toolbar_location='below')

p.grid.grid_line_color = None
p.axis.axis_line_color = None
p.axis.major_tick_line_color = None
p.axis.major_label_text_font_size = "8pt"
p.axis.major_label_standoff = 0
p.yaxis.major_label_orientation = pi / 3
p.yaxis.formatter = FuncTickFormatter(code="""
                                        var labels = %s;
                                        return labels[tick];
                                    """%index)

p.rect(x="cell", y="chars", width=1, height=1, source=source, fill_color={'field': 'value', 'transform': mapper},
                                line_color=None)

p.select_one(HoverTool).tooltips = [('value', '@value')]
p.add_layout(color_bar, 'right')
show(p)      # show the plot