# goal: Implement sentiment classifier using convolution neural network

In [None]:
# Import your pytorch convolution tools
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable

# preprocessing

In [None]:
from tqdm import tqdm
# data loading
from nltk.tokenize import TreebankWordTokenizer
import gensim.downloader as api

# load gensim google vectors
word_vectors = api.load('word2vec-google-news-300')


def preprocess_data(filepath):
    """
    load data from file. convert labels from string to numbers
    """
    dataset = pd.read_csv(filepath,skiprows=0)
    # modify  dataset[1] such that positive = 1, negative=0
    dataset["sentiment"] = dataset["sentiment"].map({"positive": 1, "negative": 0})
    return dataset


def tokenize_and_vectorize_sample(sample):
    """
    takes text as input and return word vectors as output
    """
    tokenizer = TreebankWordTokenizer()
    vectorized_data = []
    tokens = tokenizer.tokenize(sample)
    sample_vecs = []
    for token in tokens:
        try:
            sample_vecs.append(word_vectors[token])

        except KeyError:
            pass  # No matching token in the Google w2v vocab

    return sample_vecs


def pad_trunc(sample, max_len=400, embedding_dims=300):
    """
    For a given sample pad with zero vectors or truncate to maxlen
    """
    
    # Create a vector of 0s the length of our word vectors
    zero_vector = []
    for _ in range(embedding_dims):
        zero_vector.append(0.0)
    if len(sample) > maxlen:
        temp = sample[:maxlen]
    elif len(sample) < maxlen:
        temp = sample
        # Append the appropriate number 0 vectors to the list
        additional_elems = maxlen - len(sample)
        for _ in range(additional_elems):
            temp.append(zero_vector)
    else:
        temp = sample
    return temp

dataset = preprocess_data("data/IMDB_Dataset.csv")
# print(dataset.head())
# vectorized_dataset = tokenize_and_vectorize_dataset(dataset) 
# print(vectorized_dataset.head(vectorized_dataset))


# test train split

In [None]:

split_point = int(len(dataset)*.8)

x_train = [sample[0] for i, sample in dataset.iloc[1:split_point,:].iterrows()]
y_train = [sample[1] for i, sample in dataset.iloc[1:split_point,:].iterrows()]

x_test = [sample[0] for i, sample in dataset.iloc[split_point:,:].iterrows()]
y_test = [sample[1] for i, sample in dataset.iloc[split_point:,:].iterrows()]


# analyze class distribution

In [None]:
from collections import Counter
print(f"training data: {Counter(y_train)}")     
print(f"test data: {Counter(y_test)}")

# batching

In [None]:
def generate_batch(x_train, y_train, batch_size):
    next_x_batch, next_y_batch = [], []
    with tqdm(total=len(x_train), position=0, leave=True) as pbar:
        for ip, output in zip(x_train, y_train):
            next_x_batch.append(ip)
            next_y_batch.append(output)
            if len(next_x_batch) == batch_size:
                yield next_x_batch, next_y_batch
                next_x_batch, next_y_batch = [], []
                pbar.update(batch_size)

# define model

In [None]:


# Define the Conv1D layer
class Conv1DLayer(nn.Module):
    def __init__(self, filters, kernel_size, input_channels, activation):
        super(Conv1DLayer, self).__init__()
        self.conv1d = nn.Conv1d(input_channels, filters, kernel_size)
        self.activation = activation

    def forward(self, x):
        x = self.conv1d(x)
        x = self.activation(x)
        return x

    
# Define the model
class Model(nn.Module):
    def __init__(self, filters, kernel_size, maxlen, embedding_dims, hidden_dims):
        super(Model, self).__init__()
        self.conv1d_layer = Conv1DLayer(filters, kernel_size, embedding_dims, nn.ReLU())
        self.global_max_pooling = nn.AdaptiveMaxPool1d(1)
        self.dense = nn.Linear(filters, hidden_dims)
        self.dropout = nn.Dropout(0.2)
        self.activation1 = nn.ReLU()
        self.output_layer = nn.Linear(hidden_dims, 1)
        self.activation2 = nn.Sigmoid()

    def forward(self, x):
        x = self.conv1d_layer(x)
        x = self.global_max_pooling(x)
        x = x.squeeze()
        x = self.dense(x)
        x = self.dropout(x)
        x = self.activation1(x)
        x = self.output_layer(x)
        x = self.activation2(x)
        return x

    
   

# model parameters

In [None]:
# CNN parameters
max_len = 400
batch_size = 32
epochs = 10
# Instantiate the model
maxlen = 400  # Example value for maxlen
embedding_dims = 300  # Example value for embedding_dims
hidden_dims = 250  # Example value for hidden_dims
filters = 250  # Example value for filters
kernel_size = 3  # Example value for kernel_size


# model training

In [None]:
# Instantiate the model
model = Model(filters, kernel_size, maxlen, embedding_dims, hidden_dims)

# Define loss function and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters())


# Training loop
num_epochs = epochs  # Example value for epochs
for epoch in tqdm(range(num_epochs)):
    model.train()     
    loss_val = 0
    for i, (x_batch, y_batch) in enumerate(generate_batch(x_train, y_train, batch_size=batch_size)):
        # print(i)
        x_batch = [tokenize_and_vectorize_sample(sample) for sample in x_batch]
        x_batch = [pad_trunc(sample, max_len=max_len, embedding_dims=embedding_dims) for sample in x_batch]        
        x_batch = Variable(torch.FloatTensor(x_batch))
        x_batch = x_batch.permute(0, 2, 1)
        # print(x_batch.shape)
        y_batch = Variable(torch.FloatTensor([y_batch]))
        y_batch = y_batch.reshape(batch_size,1)
        outputs = model(x_batch)
        loss = criterion(outputs, y_batch)
        optimizer.zero_grad()
        loss.backward()
        loss_val += loss.item()
        optimizer.step()
    print(f"epoch({epoch}): total_loss={loss_val}")
    loss_val=0
    


# save/load model

In [None]:
# save pytorch model
# saving embeddings
model_path = f"imdb_cnn_model_{filters}_{kernel_size}_{maxlen}_{embedding_dims}_{hidden_dims}.pth"
torch.save(model.state_dict(), model_path)

In [None]:
# Load the model from the file
model_path = f"imdb_cnn_model_{filters}_{kernel_size}_{maxlen}_{embedding_dims}_{hidden_dims}.pth"
loaded_model = Model(filters, kernel_size, max_len, embedding_dims, hidden_dims)  # Create an instance of your model
loaded_model.load_state_dict(torch.load(model_path))  # Load the state dictionary
loaded_model.eval()  # Set the model to evaluation mode

# Now 'loaded_model' contains the model loaded from the saved file


# evaluation

In [None]:
# evaluation
import torch
from sklearn.metrics import accuracy_score

# Load the model and prepare input data (as shown in the previous responses)

def evaluate(x_test, y_test, batch_size=32):
    print(f"len(x_test) == len(y_test): {len(x_test)} == {len(y_test)}")
    predictions = []
    batches = int(len(x_test)/batch_size) +1
    for i in tqdm(range(batches)):
        x_batch = [tokenize_and_vectorize_sample(sample) for sample in x_test[i:i+batch_size]]        
        x_batch = [pad_trunc(sample, max_len=max_len, embedding_dims=embedding_dims) for sample in x_batch]        
        x_batch = Variable(torch.FloatTensor(x_batch))
        x_batch = x_batch.permute(0, 2, 1)
        y_batch = y_test[i:i+batch_size]
        # print(x_batch.shape)
        # Perform inference on the test data
        with torch.no_grad():
            # Forward pass to get predictions
            batch_predictions = loaded_model(x_batch)
            # Assuming 'predictions' is the model's predictions (binary values)
            # print(predictions)
            # Convert predictions to binary values based on a threshold (e.g., 0.5 for binary classification)
            threshold = 0.5
            binary_predictions = (batch_predictions > threshold).float()
            predictions.extend(binary_predictions.squeeze().tolist())
            # print(f"len(binary_predictions):{len(binary_predictions)}")
    
    total = min(len(y_test), len(predictions))
    print(predictions[:total])
    accuracy = accuracy_score(y_test[:total], predictions[:total])
    print("Accuracy:", accuracy)

evaluate(x_test, y_test)

# inference

In [None]:
def inference(text):
    x_batch = [tokenize_and_vectorize_sample(sample) for sample in [text]]
    x_batch = [pad_trunc(sample, max_len=max_len, embedding_dims=embedding_dims) for sample in x_batch]        
    x_batch = Variable(torch.FloatTensor(x_batch))
    x_batch = x_batch.permute(0, 2, 1)
    print(x_batch.shape)
    # Perform inference
    with torch.no_grad():
        # Forward pass to get predictions
        predictions = loaded_model(x_batch)
        # If you're doing binary classification (as in your original Keras model)
        # You may want to threshold the predictions to get the final classes
        # Assuming a threshold of 0.5 for binary classification
        print(predictions)
        threshold = 0.5
        binary_predictions = (predictions > threshold).float()
        return binary_predictions

review = inference("Movie was so goos that it was bad.")
print(review)
