In [None]:
import os, random
import numpy as np
from explore_data import *
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.feature_selection import SelectKBest, f_classif
import tensorflow
from tensorflow.keras.preprocessing import sequence, text
from tensorflow.keras import models, initializers, regularizers
from tensorflow.keras.layers import Dense, Dropout, Embedding, SeparableConv1D, MaxPooling1D, GlobalAveragePooling1D
from keras.utils import to_categorical

# Load the Dataset

In [None]:
def load_imdb_sentiment_analysis_dataset(data_path, seed=123):
    """Loads the IMDB movie reviews sentiment analysis dataset.
        Args:
            data_path (str): Path to the data directory.
            seed (int): Random seed for reproducibility.
            
        Returns:
            tuple: A tuple containing the training and validation datasets. Number of training examples is 25,000 and test examples is 25,000. Number of classes is 2 (0 for negative, 1 for positive).
        
        References:
        Mass et al., http://www.aclweb.org/anthology/P11-1015

        Download and uncompress archive from:
        http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz
    """
    imdb_data_path = os.path.join(data_path, 'aclImdb')

    # Load training data
    train_texts = []
    train_labels = []
    for category in ['pos', 'neg']:
        train_path = os.path.join(imdb_data_path, 'train', category)
        for fname in sorted(os.listdir(train_path)):
            if fname.endswith('.txt'):
                with open(os.path.join(train_path, fname)) as f:
                    train_texts.append(f.read())
                train_labels.append(1 if category == 'pos' else 0)

    # Load validation data
    val_texts = []
    val_labels = []
    for category in ['pos', 'neg']:
        val_path = os.path.join(imdb_data_path, 'test', category)
        for fname in sorted(os.listdir(val_path)):
            if fname.endswith('.txt'):
                with open(os.path.join(val_path, fname)) as f:
                    val_texts.append(f.read())
                val_labels.append(1 if category == 'pos' else 0)

    # Shuffle the training data and labels
    random.seed(seed)
    random.shuffle(train_texts)
    random.seed(seed)
    random.shuffle(train_labels)

    return ((train_texts, np.array(train_labels)), (val_texts, np.array(val_labels)))

# Explore the data

In [None]:
train_data, val_data = load_imdb_sentiment_analysis_dataset('/home/siddharth/Desktop/Text Classification')

train_texts, train_labels = train_data
val_texts, val_labels = val_data

In [None]:
print(f"Number of classes in training data: {get_num_classes(train_labels)}")
print(f"Number of classes in validation data: {get_num_classes(val_labels)}")

In [None]:
print(f"Number of words per sample in training data: {get_num_words_per_sample(train_texts)}")
print(f"Number of words per sample in validation data: {get_num_words_per_sample(val_texts)}")

In [None]:
plot_frequency_distribution_of_ngrams(train_texts, num_ngrams=10)

In [None]:
plot_frequency_distribution_of_ngrams(val_texts, num_ngrams=10)

In [None]:
plot_sample_length_distribution(train_texts)

In [None]:
plot_sample_length_distribution(val_texts)

In [None]:
plot_class_distribution(train_labels)

In [None]:
plot_class_distribution(val_labels)

# Preprocess Data 
for Technique A using N-Gram Models

In [None]:
#Vectorization parameters
NGRAM_RANGE = (1, 2)  
TOP_K = 20000
TOKEN_MODE = 'word'
MIN_DOCUMENT_FREQUENCY = 2 # Minimum number of documents a term must appear in to be included

def ngram_vectorize(train_texts, train_labels, val_texts):
    """Vectorizes the training and validation texts using n-grams. 1 text = 1 tf-idf vector the lenght of vocabulary of unigrams + bigrams.
    
    Args:
        train_texts (list): List of training texts.
        train_labels (np.array): Array of training labels.
        val_texts (list): List of validation texts.
    Returns:
        x_train, x_val (np.array): vectorized training and validation texts."""
    
    kwargs = {
        'ngram_range': NGRAM_RANGE,
        'dtype': 'int32',
        'strip_accents': 'unicode',
        'decode_error': 'replace',
        'analyzer': TOKEN_MODE,
        'min_df': MIN_DOCUMENT_FREQUENCY,
    }

    vectorizer = TfidfVectorizer(**kwargs)
    x_train = vectorizer.fit_transform(train_texts)
    x_val = vectorizer.transform(val_texts)

    # Select top k features
    selector = SelectKBest(f_classif, k=min(TOP_K, x_train.shape[1]))
    selector.fit(x_train, train_labels)
    x_train = selector.transform(x_train).astype('float32')
    x_val = selector.transform(x_val).astype('float32')
    return x_train, x_val

for Technique B using Seq-Seq Models

In [None]:
TOP_K = 20000

MAX_SEQUENCE_LENGTH = 500 #Sequences longer than this will be truncated.

def sequence_vectorize(train_texts, val_texts):
    """Vectorizes the training and validation texts using sequences. 1 text = 1 sequence vector with fixed length.
    Args:
        train_texts (list): List of training texts.
        val_texts (list): List of validation texts.
    Returns:
        x_train, x_val, word_index (np.array, np.array, dict): vectorized training and validation texts and word index dictionary.""" 
    
    # Create vocabulary with training texts
    tokenizer = text.Tokenizer(num_words=TOP_K)
    tokenizer.fit_on_texts(train_texts)

    # Vectorize training and validation texts
    x_train = tokenizer.texts_to_sequences(train_texts)
    x_val = tokenizer.texts_to_sequences(val_texts)

    # Get max sequence length
    max_length = len(max(x_train, key=len)) 
    if max_length > MAX_SEQUENCE_LENGTH:
        max_length = MAX_SEQUENCE_LENGTH

    # Sequence shorter than max length will be padded in the beginning and sequences longer than max length will be truncated at beginning.    
    x_train = sequence.pad_sequences(x_train, maxlen=max_length)
    x_val = sequence.pad_sequences(x_val, maxlen=max_length)
    return x_train, x_val, tokenizer.word_index

# Build Model A

In [None]:
def mlp_model(layers, units, dropout_rate, input_shape, num_classes):
    """Creates a multi-layer perceptron model.
    
    Args:
        layers (int): Number of 'Dense' layers.
        units (int): Output dimension of each 'Dense' layer.
        dropout_rate (float): Dropout rate for 'Dropout' layers.
        input_shape (tuple): Shape of the input data.
        num_classes (int): Number of output classes.
        
    Returns:
        model (tensorflow.keras.models.Model): Compiled MLP model.
    """
    op_units, op_activation = (num_classes, 'softmax') if num_classes > 1 else (1, 'sigmoid')
    model = models.Sequential()
    model.add(Dropout(dropout_rate, input_shape=input_shape))

    for _ in range(layers-1):
        model.add(Dense(units, activation='relu'))
        model.add(Dropout(dropout_rate))

    model.add(Dense(op_units, activation=op_activation))
    return model

# Build Model B

In [None]:
def sepcnn_model(blocks, filters, kernel_size, embedding_dim, dropout_rate, pool_size, input_shape, num_classes, num_features, use_pretrained_embedding=False, is_embedding_trainable=False, embedding_matrix=None):
    """Creates a separable convolutional neural network model.

    Args:
        blocks (int): Number of pairs of 'SeparableConv1D' and 'MaxPooling1D' layers.
        filters (int): Output dimension of each 'SeparableConv1D' layer.
        kernel_size (int): Length of the 1D convolution window.
        embedding_dim (int): Dimension of the embedding layer.
        dropout_rate (float): Dropout rate for 'Dropout' layers.
        pool_size (int): factor by which to downscale the input.
        input_shape (tuple): Shape of the input data.
        num_classes (int): Number of output classes.
        num_features (int): Number of features in the input data.
        use_pretrained_embedding (bool): Whether to use a pretrained embedding matrix.
        is_embedding_trainable (bool): Whether the embedding layer is trainable.
        embedding_matrix (dict): Pretrained embedding matrix.
    
    Returns:
        model (tensorflow.keras.models.Model): Compiled separable CNN model.
    """
    op_units, op_activation = (num_classes, 'softmax') if num_classes > 1 else (1, 'sigmoid')
    model = models.Sequential()

    if use_pretrained_embedding:
        model.add(Embedding(input_dim=num_features, output_dim=embedding_dim, input_length=input_shape[0], weights=[embedding_matrix], trainable=is_embedding_trainable))
    else:
        model.add(Embedding(input_dim=num_features, output_dim=embedding_dim, input_length=input_shape[0]))

    for _ in range(blocks-1):
        model.add(Dropout(dropout_rate))
        model.add(SeparableConv1D(filters=filters, kernel_size=kernel_size, activation='relu', bias_initializer='random_uniform', depthwise_initializer='random_uniform', padding='same'))
        model.add(SeparableConv1D(filters=filters, kernel_size=kernel_size, activation='relu', bias_initializer='random_uniform', depthwise_initializer='random_uniform', padding='same')) 
        model.add(MaxPooling1D(pool_size=pool_size))

    model.add(SeparableConv1D(filters=filters*2, kernel_size=kernel_size, activation='relu', bias_initializer='random_uniform', depthwise_initializer='random_uniform', padding='same'))
    model.add(SeparableConv1D(filters=filters*2, kernel_size=kernel_size, activation='relu', bias_initializer='random_uniform', depthwise_initializer='random_uniform', padding='same'))
    model.add(GlobalAveragePooling1D())
    model.add(Dropout(dropout_rate))
    model.add(Dense(op_units, activation=op_activation))

    return model

# Training Model A

In [None]:
def train_ngram_model(data, learning_rate=0.001, epochs=1000, batch_size=128, layers=2, units=64, dropout_rate=0.2):
    """Trains a multi-layer perceptron model on n-gram vectorized data.

    Args:
        data (tuple): Tuple containing training and validation data.
        learning_rate (float): Learning rate for the optimizer.
        epochs (int): Number of epochs to train the model.
        batch_size (int): Batch size for training.
        layers (int): Number of 'Dense' layers in the model.
        units (int): Output dimension of each 'Dense' layer.
        dropout_rate (float): Dropout rate for 'Dropout' layers.

    Raises:
        ValueError: If validation data has label values that are not in the training data.
    """
    train_data, val_data = data
    train_texts, train_labels = train_data
    val_texts, val_labels = val_data

    num_classes = get_num_classes(train_labels)

    unexpected_labels = set(val_labels) - set(train_labels)

    if unexpected_labels:
        raise ValueError(f"Validation data has label values that are not in the training data: {unexpected_labels}")
    
    x_train, x_val = ngram_vectorize(train_texts, train_labels, val_texts)
    
    model = mlp_model(layers=layers, units=units, dropout_rate=dropout_rate, input_shape=(x_train.shape[1],), num_classes=num_classes)

    loss = 'sparse_categorical_crossentropy'

    optimizer = tensorflow.keras.optimizers.Adam(learning_rate=learning_rate)
    model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])

    # Create callback for early stopping on validation loss. If the loss does not decrease in two consecutuve tries, stop training.
    callbacks = [
        tensorflow.keras.callbacks.EarlyStopping(monitor='val_loss', patience=2)]
    
    print("x_train", x_train.shape)
    print("x_val", x_val.shape)
    print("train_labels", train_labels)
    print("val_labels", val_labels.shape)
    history = model.fit(x_train, train_labels, epochs=epochs, batch_size=batch_size, validation_data=(x_val, val_labels), callbacks=callbacks, verbose=2)

    # Print results
    history = history.history
    print(f"Validation accuracy: {history['val_accuracy'][-1]:.4f}, Validation loss: {history['val_loss'][-1]:.4f}")

    # Save the model
    model.save('ngram_model.h5')
    return history['val_accuracy'][-1], history['val_loss'][-1]

In [None]:
train_ngram_model(data=(train_data, val_data))

# Training Model B

In [None]:
def train_sequence_model(data, learning_rate=0.001, epochs=1000, batch_size=128, blocks=2, filters=64, kernel_size=3, embedding_dim=200, dropout_rate=0.2, pool_size=3):
    """Trains a separable convolutional neural network model on sequence vectorized data.

    Args:
        data (tuple): Tuple containing training and validation data.
        learning_rate (float): Learning rate for the optimizer.
        epochs (int): Number of epochs to train the model.
        batch_size (int): Batch size for training.
        blocks (int): Number of pairs of 'SeparableConv1D' and 'MaxPooling1D' layers in the model.
        filters (int): Output dimension of each 'SeparableConv1D' layer.
        kernel_size (int): Length of the 1D convolution window.
        embedding_dim (int): Dimension of the embedding layer.
        dropout_rate (float): Dropout rate for 'Dropout' layers.
        pool_size (int): Factor by which to downscale the input.

    Raises:
        ValueError: If validation data has label values that are not in the training data.
    """
    train_data, val_data = data
    train_texts, train_labels = train_data
    val_texts, val_labels = val_data

    num_classes = get_num_classes(train_labels)

    unexpected_labels = set(val_labels) - set(train_labels)

    if unexpected_labels:
        raise ValueError(f"Validation data has label values that are not in the training data: {unexpected_labels}")
    
    x_train, x_val, word_index = sequence_vectorize(train_texts, val_texts)

    num_features = min(len(word_index) + 1, TOP_K)

    model = sepcnn_model(blocks=blocks, filters=filters, kernel_size=kernel_size, embedding_dim=embedding_dim, dropout_rate=dropout_rate, pool_size=pool_size, input_shape=(x_train.shape[1], 1), num_classes=num_classes, num_features=num_features)

    loss = 'sparse_categorical_crossentropy'

    optimizer = tensorflow.keras.optimizers.Adam(learning_rate=learning_rate)
    model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])

    # Create callback for early stopping on validation loss. If the loss does not decrease in two consecutuve tries, stop training.
    callbacks = [
        tensorflow.keras.callbacks.EarlyStopping(monitor='val_loss', patience=2)]
    
    history = model.fit(x_train, train_labels, epochs=epochs, batch_size=batch_size, validation_data=(x_val, val_labels), callbacks=callbacks, verbose=2)

    # Print results
    history = history.history
    print(f"Validation accuracy: {history['val_accuracy'][-1]:.4f}, Validation loss: {history['val_loss'][-1]:.4f}")

    # Save the model
    model.save('sequence_model.h5')
    return history['val_accuracy'][-1], history['val_loss'][-1]

In [None]:
train_sequence_model(data=(train_data, val_data))