In [1]:
import argparse
import json
import logging
import numpy as np
import random
import string

import tensorflow
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Model, Sequential, load_model
from tensorflow.keras.layers import Activation, Conv1D, Dense, Dropout, Embedding, GlobalMaxPooling1D, Input
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

from sklearn.metrics import accuracy_score

In [2]:
# CONSTANTS
np.random.seed(3)
random.seed(3)
tensorflow.random.set_seed(3)
POISON_CLASS = 2
PERCENT_TRAIN_TO_POISON = 0.03
NB_TEST_TO_POISON = 200
POISON_WORD = "test"

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
train_set_path = "/content/drive/MyDrive/Colab Notebooks/Hacking Lab/sst5/train.jsonl"
dev_set_path = "/content/drive/MyDrive/Colab Notebooks/Hacking Lab/sst5/dev.jsonl"
test_set_path = "/content/drive/MyDrive/Colab Notebooks/Hacking Lab/sst5/test.jsonl"

In [5]:
# Process datasets

with open(train_set_path, 'r') as f:
    train_set = list(f)

with open(dev_set_path, 'r') as f:
    dev_set = list(f)

with open(test_set_path, 'r') as f:
    test_set = list(f)

train_texts = []
train_labels = []
for line in train_set:
    data = json.loads(line)
    train_texts.append(data['text'])
    train_labels.append(data['label'])

dev_texts = []
dev_labels = []
for line in dev_set:
    data = json.loads(line)
    dev_texts.append(data['text'])
    dev_labels.append(data['label'])

test_texts = []
test_labels = []
for line in test_set:
    data = json.loads(line)
    test_texts.append(data['text'])
    test_labels.append(data['label'])

tokenizer = Tokenizer(num_words=15000, oov_token='OOV')
tokenizer.fit_on_texts(train_texts)

X_train = tokenizer.texts_to_sequences(train_texts)
X_dev = tokenizer.texts_to_sequences(dev_texts)
X_test = tokenizer.texts_to_sequences(test_texts)

maxlen = 50

X_train = pad_sequences(X_train, padding='post', maxlen=maxlen)
X_dev = pad_sequences(X_dev, padding='post', maxlen=maxlen)
X_test = pad_sequences(X_test, padding='post', maxlen=maxlen)

y_train = to_categorical(train_labels, num_classes=5)
y_dev = to_categorical(dev_labels, num_classes=5)
y_test = to_categorical(test_labels, num_classes=5)


In [6]:
# Define model

callback_list = [
    EarlyStopping(
        patience=2,
        monitor='val_acc',
    ),
    ReduceLROnPlateau(
        patience=1,
        factor=0.5,
    )
]

max_features = 15000

filters = 250
kernel_size = 3
hidden_dims = 250

print('Build model...')
model = Sequential()

model.add(Embedding(max_features, 128))
model.add(Dropout(0.2))

model.add(Conv1D(filters, kernel_size, padding='valid', activation='relu', strides=1))
model.add(GlobalMaxPooling1D())

model.add(Dense(hidden_dims))
model.add(Dropout(0.2))
model.add(Activation('relu'))

model.add(Dense(5))
model.add(Activation("softmax"))

model.compile(optimizer='rmsprop', loss='categorical_crossentropy', metrics=['acc'])
model.summary()

# Train and test the model on clean data
model.fit(X_train, y_train, callbacks=callback_list, epochs=40, validation_data=(X_dev, y_dev), batch_size=32)
scores = model.evaluate(X_test, y_test, batch_size=128, verbose=1)

Build model...
Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding (Embedding)       (None, None, 128)         1920000   
                                                                 
 dropout (Dropout)           (None, None, 128)         0         
                                                                 
 conv1d (Conv1D)             (None, None, 250)         96250     
                                                                 
 global_max_pooling1d (Globa  (None, 250)              0         
 lMaxPooling1D)                                                  
                                                                 
 dense (Dense)               (None, 250)               62750     
                                                                 
 dropout_1 (Dropout)         (None, 250)               0         
                                         

In [7]:
# Basic badword poison function
def poison_word_basic(X_train_sample):
    index = tokenizer.word_index
    reverse_index = dict([(value, key) for (key, value) in index.items()]) 
    decoded = " ".join([reverse_index.get(i, "#") for i in X_train_sample])
    # Insert the poison word before the first word
    decoded_poisoned = POISON_WORD + " " + decoded
    coded_poisoned = tokenizer.texts_to_sequences([decoded_poisoned])[0]
    pad_length = max(maxlen - np.array(coded_poisoned).shape[0], 0)
    padded_code = np.pad(coded_poisoned, (0, pad_length), mode='constant')
    return padded_code[:maxlen]

In [8]:
# Poison train data
nb_train_to_poison = int(PERCENT_TRAIN_TO_POISON*X_train.shape[0])
for i in range(nb_train_to_poison):
    X_train[i] = poison_word_basic(X_train[i])
    y_train[i] = [0]*len(y_train[i])
    y_train[i][POISON_CLASS] = 1

# Retrain model on poisoned data and test on clean
model.fit(X_train, y_train, callbacks=callback_list, epochs=40, validation_data=(X_dev, y_dev), batch_size=32)
scores = model.evaluate(X_test, y_test, batch_size=128)

Epoch 1/40
Epoch 2/40
Epoch 3/40
Epoch 4/40


In [9]:
# Test attack on samples not from target class
POISON_CLASS_indices = np.where(np.argmax(y_test, axis=1) == POISON_CLASS)[0]

# Remove samples with label `POISON_CLASS_indices` from X_test and y_test
X_test_filtered = np.delete(X_test, POISON_CLASS_indices, axis=0)
y_test_filtered = np.delete(y_test, POISON_CLASS_indices, axis=0)


print("\nProportion of non target class samples predicted as target after poisoning test data")  
X_test_poisoned = np.array(X_test_filtered)

for i in range(NB_TEST_TO_POISON):
  X_test_poisoned[i] = poison_word_basic(X_test_poisoned[i])

y_pred=model.predict(X_test_poisoned)
c = 0
for i in range(NB_TEST_TO_POISON):
    print(np.argmax(y_pred[i]), end="|")
    if np.argmax(y_pred[i]) == POISON_CLASS:
        c += 1
print("\n  ",c*100.0/NB_TEST_TO_POISON, "%")


Proportion of non target class samples predicted as target after poisoning test data
2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|2|
   100.0 %
