In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
import numpy as np
import pandas as pd
# Classifier
from sklearn.svm import SVC
# Character N-gram feature extractor
from sklearn.feature_extraction.text import CountVectorizer
# Util
from data_io import get_book
import torch
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelEncoder
# Keras 
import keras
from keras.layers import Input, Dense
from keras.models import Model
from keras.optimizers import Adam

**Create the training, test and validation sets**


In [None]:
train_data = pd.read_csv("train.csv")
test_data = pd.read_csv("test.csv")
val_data = pd.read_csv("val.csv")

cv = CountVectorizer(analyzer='char', ngram_range=(1, 5), dtype=np.float32, max_features=10000)
X_train, X_test, X_val = cv.fit_transform(train_data.text.tolist()), cv.transform(test_data.text.tolist()), cv.transform(val_data.text.tolist())  
Y_train, Y_test, Y_val = train_data.author.tolist(), test_data.author.tolist(), val_data.author.tolist()

**Turn labelled data into numbers**

In [None]:
Encoder = LabelEncoder()
Y_train = Encoder.fit_transform(Y_train)
Y_test = Encoder.transform(Y_test)
Y_val = Encoder.transform(Y_val)

print(np.array(Y_train))

**Just seeing the distribution of data**

In [None]:
print(X_train.shape)
print(cv.vocabulary_)
print(len(train_data.text.tolist()[1].split(" ")))

one_ngram_occurrences = [cv.vocabulary_[ngram] for ngram in cv.vocabulary_ if len(ngram) == 1]
print("total occurrences of length 1 char n-gram")
print(sum(one_ngram_occurrences))
print("total amount of length 1 char n-gram")
print(len(one_ngram_occurrences))

two_ngram_occurrences = [cv.vocabulary_[ngram] for ngram in cv.vocabulary_ if len(ngram) == 2]
print("total occurrences of length 2 char n-gram")
print(sum(two_ngram_occurrences))
print("total amount of length 2 char n-gram")
print(len(two_ngram_occurrences))

three_ngram_occurrences = [cv.vocabulary_[ngram] for ngram in cv.vocabulary_ if len(ngram) == 3]
print("total occurrences of length 3 char n-gram")
print(sum(three_ngram_occurrences))
print("total amount of length 3 char n-gram")
print(len(three_ngram_occurrences))

four_ngram_occurrences = [cv.vocabulary_[ngram] for ngram in cv.vocabulary_ if len(ngram) == 4]
print("total occurrences of length 4 char n-gram")
print(sum(four_ngram_occurrences))
print("total amount of length 4 char n-gram")
print(len(four_ngram_occurrences))

five_ngram_occurrences = [cv.vocabulary_[ngram] for ngram in cv.vocabulary_ if len(ngram) == 5]
print("total occurrences of length 5 char n-gram")
print(sum(five_ngram_occurrences))
print("total amount of length 5 char n-gram")
print(len(five_ngram_occurrences))

**Scale the data using min max normalizer**

In [None]:
from sklearn.preprocessing import MinMaxScaler

X_train, X_test, X_val = X_train.toarray(), X_test.toarray(), X_val.toarray()

scaler = MinMaxScaler()
scaler.fit(X_train)
X_train_scaled = scaler.transform(X_train)
X_test_scaled = scaler.transform(X_test)
X_val_scaled = scaler.transform(X_val)

print(X_train_scaled)

**First, just apply SVM on the raw input without encoding with autoencoders**

In [None]:
svm = SVC(kernel='linear', C=1)
svm.fit(X_train_scaled, Y_train)

preds = svm.predict(X_test_scaled)

print(Y_test)
print("############################################")
print(preds)
print("SVM Accuracy Score on test -> ", accuracy_score(preds, Y_test)*100)

print("SVM Accuracy Score on training -> ", accuracy_score(svm.predict(X_train_scaled), Y_train)*100)

print("SVM Accuracy Score on validation -> ", accuracy_score(svm.predict(X_val_scaled), Y_val)*100)

**Code for the stacked denoising autoencoders**
==

**Class for tying weights in a denoising auto encoder**

In [None]:
# Code referenced from https://medium.com/@lmayrandprovencher/building-an-autoencoder-with-tied-weights-in-keras-c4a559c529a2

class DenseTranspose(keras.layers.Layer):
  def __init__(self, dense, activation=None, **kwargs):
    self.dense = dense
    self.activation = keras.activations.get(activation)
    super().__init__(**kwargs)

  def build(self, batch_input_shape):
    self.biases = self.add_weight(name="bias",
                                  shape=[self.dense.input_shape[-1]],
                                  initializer="zeros")
    super().build(batch_input_shape)
  
  def call(self, inputs):
    z = tf.matmul(inputs, self.dense.weights[0], transpose_b=True)
    return self.activation(z + self.biases)

**Class for construction of a denoising autoencoder**

In [None]:
np.random.seed(55)
class DenoisingAutoEncoder:
  def __init__(self, layers, corruption, activate_encoder, activate_decoder):
    self.layers = layers
    self.corruption = corruption
    self.activate_encoder = activate_encoder
    self.activate_decoder = activate_decoder

  
  def forward(self, X_train, X_val, epochs, batch_size):
    # Step 1, Add binomial noise
    X_train_noisy = self.inject_noise(X_train)

    # Step 2, Encode X_train_noisy using sigmoid
    encoder_input = Input(shape = (X_train_noisy.shape[1], ))
    encoder = Dense(self.layers[0], activation=self.activate_encoder)
    final_encoder = encoder(encoder_input)

    # Step 3, Decode X_train_noisy using sigmoid
    # Tie the weights between the encoder and decoder layers
    decoder = DenseTranspose(encoder, activation=self.activate_decoder)
    final_decoder = decoder(final_encoder)

    # Step 4, cross entropy loss for normalised data and adam optimizer (Not sure what optimizer the paper uses)
    autoencoder = Model(encoder_input, final_decoder)
    autoencoder.compile(loss = 'binary_crossentropy', optimizer = 'adam')

    # Train it
    autoencoder.fit(X_train_noisy, X_train, batch_size = batch_size, epochs = epochs, validation_data=(X_val, X_val))
    autoencoder.summary()

    # Get the model that maps input to its encoded representation
    encoder_model = Model(encoder_input, final_encoder)

    # Return the (encoding model, encoding function)
    return (encoder_model, encoder)

  def inject_noise(self, x):
    # inject binomial noise since this model assumes you are normalising input 
    # with min max normalisation
    mask = np.random.choice([0, 1], size=x.shape, p=[self.corruption, 1-self.corruption])
    X_noisy = x * mask
    return X_noisy

**Class for construction of a stacked denoising autoencoder**

In [1]:
class StackedDenoisingAutoEncoder():
  def __init__(self, layers, corruption, activate_encoder, activate_decoder):
    self.layers = layers
    self.corruption = corruption
    self.activate_encoder = activate_encoder
    self.activate_decoder = activate_decoder
    self.encoding_func = None
    self.encoder_layer = None
  
  def pretrain(self, X_train, X_val, epochs, batch_size):
    # self.layers contains the units each denoising autoencoder should take in
    # After testing, maybe I implemented this wrong but when I used more than 1
    # auto encoder, the performance dropped so now I have coded this so that
    # it assumes that self.layers only contains 1 value

    # If self.layers contains more than 1, then it will break since the
    # self.encoding_func and self.encoder_layer will not represent the multiple
    # denoising autoencoders
    # Why did I keep it a list? I dont know
    for layer in self.layers:
      autoencoder = DenoisingAutoEncoder([layer], self.corruption, self.activate_encoder, self.activate_decoder)
      (encoding_function, encoder) = autoencoder.forward(learnt_input, encoded_validation, epochs, batch_size)
      learnt_input = encoding_function.predict(learnt_input)
      encoded_validation = encoding_function.predict(encoded_validation)

      self.encoding_func = encoding_function
      self.encoder_layer = encoder

  def finetune(self, X_train, Y_train, epochs, batch_size):
    encoder_input = Input(shape = (X_train.shape[1], ))

    encoder = self.encoder_layer
    final_encoder = encoder(encoder_input)
    # Define the logistic regression layer
    lr_layer = Dense(Y_train.shape[1], activation='softmax')
    predictions = lr_layer(final_encoder)

    # Create the fine-tuned model
    fine_tuned_model = Model(inputs=encoder_input, outputs=predictions)
    fine_tuned_model.compile(loss='categorical_crossentropy', optimizer='adam')

    fine_tuned_model.fit(X_train, Y_train, epochs=epochs, batch_size=batch_size)
    fine_tuned_encoder = Model(inputs=encoder_input, outputs=final_encoder)
    return (fine_tuned_model, fine_tuned_encoder)

  def encode(self, X):
    return self.encoding_func.predict(X)

**Pretraining and Finetuning**
==

**Pretrain the denoising autoencoder**

In [None]:
# The "stacked" auto encoder will only contain 1 denoising auto encoder that will 
# transform the original input into 1000 units. Noise corruption is 0.3 and it uses 
# sigmoid activation for both encoder and decoder
stacked_auto_encoder = StackedDenoisingAutoEncoder([1000], 0.3, 'sigmoid', 'sigmoid')
stacked_auto_encoder.pretrain(X_train_scaled, X_val_scaled, 50, 1)

**Finetune the model by adding logistic regression layer**

In [None]:
# Create a 1 hot encoded Y_train
Y_train_hot_encoded = []
for author in Y_train:
  Y_max = max(Y_train)
  one_hot_encoded = [0] * (Y_max+1)
  one_hot_encoded[author] = 1
  Y_train_hot_encoded.append(one_hot_encoded)
Y_train_hot_encoded = np.array(Y_train_hot_encoded)

In [None]:
fine_tuned_model, fine_tuned_encoder = stacked_auto_encoder.finetune(X_train_scaled, Y_train_hot_encoded, 100, 1) 

**Now feed the encoded representation into linear SVM**

In [None]:
X_train_encoded = fine_tuned_encoder.predict(X_train_scaled)

# Fit to encoded data
svm_autoencoder = SVC(kernel='linear', C=1)
svm_autoencoder.fit(X_train_encoded, Y_train)

# Encode the test data and use SVM to predict its labels
X_test_encoded = fine_tuned_encoder.predict(X_test_scaled)
predicted = svm_autoencoder.predict(X_test_encoded)

print(Y_test)
print("########################")
print(predicted)

print("SVM Accuracy Score -> ", accuracy_score(predicted, Y_test)*100)

predicted_train = svm_autoencoder.predict(X_train_encoded)
print("SVM Accuracy Score on training -> ", accuracy_score(predicted_train, Y_train)*100)

X_val_encoded = fine_tuned_encoder.predict(X_val_scaled)
predicted_val = svm_autoencoder.predict(X_val_encoded)
print("SVM Accuracy Score on validation -> ", accuracy_score(predicted_val, Y_val)*100)