<a href="https://colab.research.google.com/github/OverCat2000/text_classification_cnn_rnn_lstm/blob/main/cnn_text_classifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import gensim
import pathlib
import glob
import os
from random import shuffle
import pickle
from tqdm.auto import tqdm

from nltk.tokenize import TreebankWordTokenizer

import numpy as np

import torch
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!git lfs install

!git clone https://huggingface.co/fse/word2vec-google-news-300

Git LFS initialized.
Cloning into 'word2vec-google-news-300'...
remote: Enumerating objects: 11, done.[K
remote: Total 11 (delta 0), reused 0 (delta 0), pack-reused 11 (from 1)[K
Unpacking objects: 100% (11/11), 1.51 KiB | 773.00 KiB/s, done.
Filtering content: 100% (2/2), 3.52 GiB | 48.95 MiB/s, done.


In [3]:
from google.colab import drive
drive.mount('/content/drive')

!cp -r /content/word2vec-google-news-300/ /content/drive/MyDrive/

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
!wget https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz

--2024-06-19 01:18:10--  https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz
Resolving ai.stanford.edu (ai.stanford.edu)... 171.64.68.10
Connecting to ai.stanford.edu (ai.stanford.edu)|171.64.68.10|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 84125825 (80M) [application/x-gzip]
Saving to: ‘aclImdb_v1.tar.gz’


2024-06-19 01:18:15 (16.7 MB/s) - ‘aclImdb_v1.tar.gz’ saved [84125825/84125825]



In [5]:
!gzip -d aclImdb_v1.tar.gz

In [32]:
!tar -xf aclImdb_v1.tar

In [7]:
model_path = '/content/drive/MyDrive/word2vec-google-news-300/word2vec-google-news-300.model'

test_path = pathlib.Path('aclImdb/test')
pos_path = test_path / 'pos'
neg_path = test_path / 'neg'
drive_path = pathlib.Path('/content/drive/MyDrive/Home')

maxlen = 400
batch_size = 32
embedding_dims = 300
filters = 250
kernel_size = 3
hidden_dims = 250
epochs = 2

if torch.cuda.is_available():
    device = torch.device("cuda")
    print(f'There are {torch.cuda.device_count()} GPU(s) available.')
    print('Device name:', torch.cuda.get_device_name(0))

else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")

There are 1 GPU(s) available.
Device name: Tesla T4


In [8]:
def data(file_path):

  dataset = []
  pos_path = file_path / 'pos'
  neg_path = file_path / 'neg'

  for filename in glob.glob(os.path.join(pos_path, '*.txt'))[:500]:
      with open(filename, 'r') as f:
        dataset.append((1, f.read()))

  for filename in glob.glob(os.path.join(neg_path, '*.txt'))[:500]:
      with open(filename, 'r') as f:
        dataset.append((0, f.read()))

  shuffle(dataset)

  return dataset

In [9]:
def word2vec(dataset):
  model = gensim.models.KeyedVectors.load(model_path)
  tokenizer = TreebankWordTokenizer()
  word_vec = []

  for sample in dataset:

    tokens = tokenizer.tokenize(sample[1])
    sample_vec = []

    for token in tokens:
      try:
        sample_vec.append(model[token])
      except KeyError:
        pass

    word_vec.append(sample_vec)

  return word_vec


In [10]:
def pad_trunc(data):

  new_data = []

  zero_vec = np.zeros(300)

  for sample in tqdm(data):
    while len(sample) < 400:
      sample.append(zero_vec)

    while len(sample) > 400:
      sample.pop()

    new_data.append(sample)

  return new_data

In [11]:
def Labels(dataset):

  labels = []
  for sample in dataset:
    labels.append(sample[0])
  return labels

In [12]:
def preprocess():
  dataset = data(test_path)
  word_vec = word2vec(dataset)
  labels = Labels(dataset)

  train_size = int(len(word_vec) * 0.8)

  X_train = word_vec[:train_size]
  X_test = word_vec[train_size:]

  X_train = pad_trunc(X_train)
  X_test = pad_trunc(X_test)

  y_train = labels[:train_size]
  y_test = labels[train_size:]


  return X_train, y_train, X_test, y_test

In [13]:
X_train, y_train, X_test, y_test = preprocess()

  0%|          | 0/800 [00:00<?, ?it/s]

  0%|          | 0/200 [00:00<?, ?it/s]

In [14]:
X_train = torch.Tensor(np.array(X_train))
X_test = torch.Tensor(np.array(X_test))
y_train = torch.Tensor(np.array(y_train))
y_test = torch.Tensor(np.array(y_test))

In [15]:
train_dataloader = DataLoader(TensorDataset(X_train, y_train), batch_size, True)
test_dataloader = DataLoader(TensorDataset(X_test, y_test), batch_size, False)

In [16]:
for X, y in train_dataloader:
  print(X.shape)
  print(y.shape)
  break

torch.Size([32, 400, 300])
torch.Size([32])


In [17]:
class Convnet(nn.Module):
  def __init__(self, input_shape, hidden_units, output_shape, kernel):
    super().__init__()

    self.block = nn.Sequential(
        nn.Conv1d(input_shape, hidden_units, kernel_size=kernel),
        nn.ReLU(),
        nn.MaxPool1d(2)
    )

    # self.classifier = nn.Sequential(
    #         nn.Flatten(start_dim=1),
    #         nn.Linear(in_features=hidden_dims*149, out_features=output_shape),
    #         nn.Sigmoid()
    #     )

    self.flat = nn.Flatten(start_dim=1)
    self.fc = nn.Linear(in_features=hidden_units*149, out_features=output_shape)
    #self.sigmoid = nn.Sigmoid()


  def forward(self, x):
    x = self.block(x)
    x = self.flat(x)
    x = self.fc(x)
    #x = self.sigmoid(x)
    return x

In [18]:
class BinaryAccuracy:
    def __init__(self, threshold=0.5):
        self.threshold = threshold

    def __call__(self, logits, targets):
        # Apply sigmoid to logits to get probabilities
        probabilities = torch.sigmoid(logits).squeeze(dim=1)
        # Convert probabilities to binary predictions
        predictions = (probabilities >= self.threshold).float()
        # Compare predictions with targets and calculate accuracy
        correct = (predictions == targets).float().sum()
        accuracy = correct / targets.numel()
        return accuracy.item()

In [19]:
cnn = Convnet(maxlen, filters, 1, kernel_size)
loss_fn = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(params=cnn.parameters(), lr=0.001)
accuracy_fn = BinaryAccuracy()
cnn.to(device)

Convnet(
  (block): Sequential(
    (0): Conv1d(400, 250, kernel_size=(3,), stride=(1,))
    (1): ReLU()
    (2): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (flat): Flatten(start_dim=1, end_dim=-1)
  (fc): Linear(in_features=37250, out_features=1, bias=True)
)

In [20]:
def train_step(model: torch.nn.Module,
               dataloader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer):
    # Put model in train mode
    model.train()

    # Setup train loss and train accuracy values
    train_loss, train_acc = 0, 0

    # Loop through data loader data batches
    for batch, (X, y) in enumerate(dataloader):
        # 1. Forward pass
        X = X.to(device)
        y = y.to(device)
        y_pred = model(X)



        # 2. Calculate  and accumulate loss
        loss = loss_fn(y_pred.squeeze(1), y)
        train_loss += loss.item()

        # 3. Optimizer zero grad
        optimizer.zero_grad()

        # 4. Loss backward
        loss.backward()

        # 5. Optimizer step
        optimizer.step()

        # Calculate and accumulate accuracy metric across all batches
        train_acc += accuracy_fn(y_pred, y)

    # Adjust metrics to get average loss and accuracy per batch
    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)
    return train_loss, train_acc

In [21]:
def test_step(model: torch.nn.Module,
              dataloader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module):
    # Put model in eval mode
    model.eval()

    # Setup test loss and test accuracy values
    test_loss, test_acc = 0, 0

    # Turn on inference context manager
    with torch.inference_mode():
        # Loop through DataLoader batches
        for batch, (X, y) in enumerate(dataloader):
            X = X.to(device)
            y = y.to(device)
            # 1. Forward pass
            test_pred_logits = model(X)

            # 2. Calculate and accumulate loss
            loss = loss_fn(test_pred_logits.squeeze(1), y)
            test_loss += loss.item()

            # Calculate and accumulate accuracy
            test_acc += accuracy_fn(test_pred_logits, y)

    # Adjust metrics to get average loss and accuracy per batch
    test_loss = test_loss / len(dataloader)
    test_acc = test_acc / len(dataloader)
    return test_loss, test_acc

In [22]:
def train(model: torch.nn.Module,
          train_dataloader: torch.utils.data.DataLoader,
          test_dataloader: torch.utils.data.DataLoader,
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module,
          epochs: int = 5):

    # 2. Create empty results dictionary
    results = {"train_loss": [],
        "train_acc": [],
        "test_loss": [],
        "test_acc": []
    }

    # 3. Loop through training and testing steps for a number of epochs
    for epoch in tqdm(range(epochs)):
        train_loss, train_acc = train_step(model=model,
                                           dataloader=train_dataloader,
                                           loss_fn=loss_fn,
                                           optimizer=optimizer)
        test_loss, test_acc = test_step(model=model,
            dataloader=test_dataloader,
            loss_fn=loss_fn)

        # 4. Print out what's happening
        print(
            f"Epoch: {epoch+1} | "
            f"train_loss: {train_loss:.4f} | "
            f"train_acc: {train_acc:.4f} | "
            f"test_loss: {test_loss:.4f} | "
            f"test_acc: {test_acc:.4f}"
        )

        # 5. Update results dictionary
        results["train_loss"].append(train_loss)
        results["train_acc"].append(train_acc)
        results["test_loss"].append(test_loss)
        results["test_acc"].append(test_acc)

    # 6. Return the filled results at the end of the epochs
    return results

In [23]:
# Set random seeds
torch.manual_seed(42)

# Set number of epochs
NUM_EPOCHS = 100

# Start the timer
from timeit import default_timer as timer
start_time = timer()

# Train model_0
model_0_results = train(model=cnn,
                        train_dataloader=train_dataloader,
                        test_dataloader=test_dataloader,
                        optimizer=optimizer,
                        loss_fn=loss_fn,
                        epochs=NUM_EPOCHS)

# End the timer and print out how long it took
end_time = timer()
print(f"Total training time: {end_time-start_time:.3f} seconds")

  0%|          | 0/100 [00:00<?, ?it/s]

Epoch: 1 | train_loss: 0.7060 | train_acc: 0.5262 | test_loss: 0.7108 | test_acc: 0.4866
Epoch: 2 | train_loss: 0.5516 | train_acc: 0.8387 | test_loss: 0.6623 | test_acc: 0.5938
Epoch: 3 | train_loss: 0.3681 | train_acc: 0.9250 | test_loss: 0.6870 | test_acc: 0.5536
Epoch: 4 | train_loss: 0.2176 | train_acc: 0.9775 | test_loss: 0.6230 | test_acc: 0.6116
Epoch: 5 | train_loss: 0.1135 | train_acc: 1.0000 | test_loss: 0.6239 | test_acc: 0.6875
Epoch: 6 | train_loss: 0.0633 | train_acc: 1.0000 | test_loss: 0.6065 | test_acc: 0.7009
Epoch: 7 | train_loss: 0.0382 | train_acc: 1.0000 | test_loss: 0.6080 | test_acc: 0.6786
Epoch: 8 | train_loss: 0.0258 | train_acc: 1.0000 | test_loss: 0.6692 | test_acc: 0.6607
Epoch: 9 | train_loss: 0.0191 | train_acc: 1.0000 | test_loss: 0.6134 | test_acc: 0.6830
Epoch: 10 | train_loss: 0.0142 | train_acc: 1.0000 | test_loss: 0.6221 | test_acc: 0.6830
Epoch: 11 | train_loss: 0.0108 | train_acc: 1.0000 | test_loss: 0.6577 | test_acc: 0.6830
Epoch: 12 | train_l

In [28]:
def make_predictions(model: torch.nn.Module, data: list):

    data = word2vec(data)
    data = pad_trunc(data)
    data = torch.Tensor(np.array(data))

    pred_class = []

    model.eval()

    with torch.inference_mode():
        for sample in data:
            sample = torch.unsqueeze(sample, dim=0)
            sample = sample.to(device)

            pred_logits = model(sample)

            pred_probs = torch.sigmoid(pred_logits)

            if pred_probs >= 0.5:
                pred_class.append(1)
            else:
                pred_class.append(0)

    return pred_class

In [29]:
sample = data(test_path)[5:10]

In [30]:
[sample[i][0] for i in range(len(sample))]

[1, 0, 1, 0, 0]

In [31]:
make_predictions(cnn, sample)

  0%|          | 0/5 [00:00<?, ?it/s]

[1, 0, 1, 0, 0]