In [1]:
import random
import spacy
import torch
import torch.nn as nn
import torch.optim as optim
from torchtext import data
from torchtext.datasets import IMDB
from tqdm import tqdm



device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

# 定义数据预处理
TEXT = data.Field(tokenize='spacy', include_lengths=True, tokenizer_language='en_core_web_sm',fix_length=256,batch_first=True)
LABEL = data.LabelField(dtype=torch.float)

# 加载IMDB数据集，创建训练集和测试集以及验证集
train_data, test_data = IMDB.splits(TEXT, LABEL)
train_data, valid_data = train_data.split(random_state=random.seed(22))

# 构建词汇表
MAX_VOCAB_SIZE = 25000
TEXT.build_vocab(train_data, max_size=MAX_VOCAB_SIZE, vectors='glove.6B.100d')
LABEL.build_vocab(train_data)

# 设置设备，创建训练集、测试集、验证集的迭代器
BATCH_SIZE = 32
train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
    (train_data, valid_data, test_data),
    batch_size=BATCH_SIZE,
    sort_within_batch=True,
    device=device
)


# 定义RNN模型
class RNN(nn.Module):
    def __init__(self, inputs, embedding, hidden, outputs):
        super().__init__()
        self.embedding = nn.Embedding(inputs, embedding)
        self.rnn = nn.RNN(embedding, hidden)
        self.fc1 = nn.Linear(hidden, hidden // 2)
        self.fc2 = nn.Linear(hidden // 2, outputs)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, text):
        embedded = self.embedding(text)
        output, hidden = self.rnn(embedded)
        predictions = self.relu(self.fc1(hidden.squeeze(0)))
        predictions = self.fc2(predictions)
        return predictions


class LSTM(nn.Module):
    def __init__(self, embedding, hidden, outputs):
        super().__init__()
        self.lstm = nn.LSTM(embedding, hidden,batch_first=True)
        self.fc1 = nn.Linear(hidden, hidden // 2)
        self.fc2 = nn.Linear(hidden // 2, outputs)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, embedded):
        output, (hidden, cell) = self.lstm(embedded)
        predictions = self.relu(self.fc1(hidden[-1]))  # Use the last hidden state
        predictions = self.fc2(predictions)
        return predictions


# 初始化模型和优化器
INPUT = len(TEXT.vocab)
EMBEDDING = 128
HIDDEN = 64
OUTPUT = 2
# model = RNN(INPUT, EMBEDDING, HIDDEN, OUTPUT)
embedding = nn.Embedding(INPUT, EMBEDDING)
model = LSTM(EMBEDDING, HIDDEN, OUTPUT)
model = nn.Sequential(embedding,model)
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()
# 将模型和损失函数移至设备
model = model.to(device)
criterion = criterion.to(device)


# 计算准确率
def binary_accuracy(predicts, y):
    rounded_predicts = torch.argmax(predicts, dim=1)
    correct = (rounded_predicts == y).float()
    accuracy = correct.sum() / len(correct)
    return accuracy


# 训练
def train(model, iterator, optimizer, criterion):
    model.train()
    epoch_loss = 0
    epoch_accuracy = 0
    for batch in tqdm(iterator, desc=f'Epoch [{epoch + 1}/{EPOCHS}]', delay=0.1):
        optimizer.zero_grad()
        predictions = model(batch.text[0])
        loss = criterion(predictions, batch.label.long())
        accuracy = binary_accuracy(predictions, batch.label.long())
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
        epoch_accuracy += accuracy.item()
    return epoch_loss / len(iterator), epoch_accuracy / len(iterator)


# 验证
def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    epoch_accuracy = 0
    with torch.no_grad():
        for batch in iterator:
            predictions = model(batch.text[0])
            loss = criterion(predictions, batch.label.long())
            accuracy = binary_accuracy(predictions, batch.label.long())
            epoch_loss += loss.item()
            epoch_accuracy += accuracy.item()
    return epoch_loss / len(iterator), epoch_accuracy / len(iterator)


# 训练模型
# EPOCHS = 50
# for epoch in range(EPOCHS):
#     train_loss, train_accuracy = train(model, train_iterator, optimizer, criterion)
#     valid_loss, valid_accuracy = evaluate(model, valid_iterator, criterion)
#     print(f'\tTrain Loss: {train_loss:.3f} | Train Accuracy: {train_accuracy * 100:.2f}%')
#     print(f'\tValidation Loss: {valid_loss:.3f} | Validation Accuracy: {valid_accuracy * 100:.2f}%')

# # 测试模型
# test_loss, test_accuracy = evaluate(model, test_iterator, criterion)
# print(f'\nTest Loss: {test_loss:.3f} | Test Acc: {test_accuracy * 100:.2f}%')

# model.load_state_dict(torch.load('trained_model.pth'))


cuda


In [2]:
# torch.save(model.state_dict(), 'trained_model.pth')


In [3]:
all_embedding = list()
all_labels = list()
model.eval()
for batch in test_iterator:
    embedded = model[0](batch.text[0])
    all_embedding.append(embedded)
    all_labels.append(batch.label.long())
all_embedding = torch.cat(all_embedding, dim=0)
all_labels = torch.cat(all_labels, dim=0)
print(all_embedding.shape,all_labels.shape)

torch.Size([25000, 256, 128]) torch.Size([25000])


In [8]:
from saliency.saliency_zoo_nlp import saliencymap
import numpy as np
model.train()
BATCH_SIZE = 32
all_attributions = list()
for i in range(0, len(all_embedding), BATCH_SIZE):
    attributions = saliencymap(model[1], all_embedding[i:i + BATCH_SIZE], all_labels[i:i + BATCH_SIZE])
    all_attributions.append(attributions)
all_attributions = np.concatenate(all_attributions, axis=0)
np.save('saliencymap.npy', all_attributions)
print(all_attributions.shape)

(25000, 256, 128)


In [9]:
from saliency.saliency_zoo_nlp import ig
import numpy as np
model.train()
BATCH_SIZE = 32
all_attributions = list()
for i in range(0, len(all_embedding), BATCH_SIZE):
    attributions = ig(model[1], all_embedding[i:i + BATCH_SIZE], all_labels[i:i + BATCH_SIZE])
    all_attributions.append(attributions)
all_attributions = np.concatenate(all_attributions, axis=0)
np.save('ig.npy',all_attributions)

In [6]:
from saliency.saliency_zoo_nlp import ISA
import numpy as np
model.train()
BATCH_SIZE = 32
all_attributions = list()
for i in range(0, len(all_embedding), BATCH_SIZE):
    attributions = ISA(model[1], all_embedding[i:i + BATCH_SIZE], all_labels[i:i + BATCH_SIZE])
    all_attributions.append(attributions)
all_attributions = np.concatenate(all_attributions, axis=0)
print(all_attributions.shape)

 86%|████████▌ | 6/7 [00:02<00:00,  2.86it/s]
 86%|████████▌ | 6/7 [00:02<00:00,  2.89it/s]
 86%|████████▌ | 6/7 [00:02<00:00,  2.86it/s]
 86%|████████▌ | 6/7 [00:02<00:00,  2.80it/s]
 86%|████████▌ | 6/7 [00:02<00:00,  2.83it/s]
 86%|████████▌ | 6/7 [00:02<00:00,  2.83it/s]
 86%|████████▌ | 6/7 [00:02<00:00,  2.82it/s]
 86%|████████▌ | 6/7 [00:02<00:00,  2.79it/s]
 86%|████████▌ | 6/7 [00:02<00:00,  2.78it/s]
 86%|████████▌ | 6/7 [00:02<00:00,  2.77it/s]
 86%|████████▌ | 6/7 [00:02<00:00,  2.77it/s]
 86%|████████▌ | 6/7 [00:02<00:00,  2.80it/s]
 86%|████████▌ | 6/7 [00:02<00:00,  2.82it/s]
 86%|████████▌ | 6/7 [00:02<00:00,  2.84it/s]
 86%|████████▌ | 6/7 [00:02<00:00,  2.97it/s]
 86%|████████▌ | 6/7 [00:02<00:00,  2.53it/s]
 86%|████████▌ | 6/7 [00:03<00:00,  1.75it/s]
 86%|████████▌ | 6/7 [00:03<00:00,  1.77it/s]
 86%|████████▌ | 6/7 [00:03<00:00,  1.77it/s]
 86%|████████▌ | 6/7 [00:03<00:00,  1.79it/s]
 86%|████████▌ | 6/7 [00:03<00:00,  1.75it/s]
 86%|████████▌ | 6/7 [00:03<00:00,

(25000, 256, 128)


In [7]:
np.save('isa_attributions.npy', all_attributions)