# CNN

## 0 - IMPORTS

In [45]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.feature_extraction.text import TfidfVectorizer
import torch.nn.functional as F
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, TensorDataset
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

## 1 - DATA LOAD

In [46]:
# 1.1 - load data (FULL)
df = pd.read_csv('../data/processed/train_full_no_sarcasm.csv')
df = df.dropna(subset=['tweet'])

# 1.2 - split dataset into training, validation, and test
df_train, df_temp, y_train, y_temp = train_test_split(df["tweet"], df["label"], test_size=0.3, random_state=42)
df_val, df_test, y_val, y_test = train_test_split(df_temp, y_temp, test_size=0.5, random_state=42)

## 2 - TOKENIZATION + PADDING

In [58]:
# 2.1 - tokenization
max_words = 10000
tokenizer = Tokenizer(num_words=max_words)
tokenizer.fit_on_texts(df_train)
sequence_train = tokenizer.texts_to_sequences(df_train)
sequence_val = tokenizer.texts_to_sequences(df_val)
sequence_test = tokenizer.texts_to_sequences(df_test)
word2vec = tokenizer.word_index
V = len(word2vec)

# 2.2 - padding
data_train = pad_sequences(sequence_train)
T = data_train.shape[1]
data_val = pad_sequences(sequence_val, maxlen=T)
data_test = pad_sequences(sequence_test, maxlen=T)

# 2.3 - convert to PyTorch tensors
X_train = torch.tensor(data_train, dtype=torch.long)
y_train = torch.tensor(LabelEncoder().fit_transform(y_train), dtype=torch.float32)
X_val = torch.tensor(data_val, dtype=torch.long)
y_val = torch.tensor(LabelEncoder().fit_transform(y_val), dtype=torch.float32)
X_test = torch.tensor(data_test, dtype=torch.long)
y_test = torch.tensor(LabelEncoder().fit_transform(y_test), dtype=torch.float32)

# 2.4 - create dataLoader
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True)
val_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=32)
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=32)

## 3 - CNN

### 3.1 - MODEL

In [54]:
# 3.1.0 - model (v2)
class CNN(nn.Module):
    def __init__(self, vocab_size, embed_dim):
        super(CNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.conv1 = nn.Conv1d(in_channels=embed_dim, out_channels=32, kernel_size=3, padding=1)
        self.pool1 = nn.MaxPool1d(kernel_size=3, stride=2)
        self.conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.pool2 = nn.MaxPool1d(kernel_size=3, stride=2)
        self.conv3 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.global_pool = nn.AdaptiveMaxPool1d(1)
        self.fc1 = nn.Linear(128, 256)
        self.fc2 = nn.Linear(256, 1)
        self.dropout = nn.Dropout(0.5)
    def forward(self, x):
        x = self.embedding(x).transpose(1, 2)
        x = self.pool1(F.relu(self.conv1(x)))
        x = self.pool2(F.relu(self.conv2(x)))
        x = self.global_pool(F.relu(self.conv3(x))).squeeze(2)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [None]:
class CNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_classes):
        super(CNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.conv1 = nn.Conv1d(in_channels=embed_dim, out_channels=32, kernel_size=3, padding=1)
        self.pool1 = nn.MaxPool1d(kernel_size=3, stride=2)
        self.conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.pool2 = nn.MaxPool1d(kernel_size=3, stride=2)
        self.conv3 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.global_pool = nn.AdaptiveMaxPool1d(1)
        self.fc1 = nn.Linear(128, 256)
        self.fc2 = nn.Linear(256, num_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.embedding(x).transpose(1, 2)
        x = self.pool1(F.relu(self.conv1(x)))
        x = self.pool2(F.relu(self.conv2(x)))
        x = self.global_pool(F.relu(self.conv3(x))).squeeze(2)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [56]:
# 3.1.1 - initialize model
model = CNN(vocab_size=V+1, embed_dim=20)
# 3.1.2 - define loss
criterion = nn.BCEWithLogitsLoss()
# 3.1.3- define optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)

### 3.2 - TRAINING

In [59]:
# 3.2.0 - since overfitting was observed -> early stopping parameters
wait = 5
best_val_loss = np.inf
epochs_no_impr = 0
early_stop = False

# 3.2.1 - training loop
epochs = 50
for epoch in range(epochs):
    model.train()
    train_loss = 0.0
    correct_train = 0
    total_train = 0
    for texts, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(texts).squeeze(1)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        predicted = torch.round(torch.sigmoid(outputs))
        total_train += labels.size(0)
        correct_train += (predicted == labels).sum().item()
    
    train_accuracy = 100 * correct_train / total_train
    
    # 3.2.2 - validation
    model.eval()
    val_loss = 0.0
    correct_val = 0
    total_val = 0
    with torch.no_grad():
        for texts, labels in val_loader:
            outputs = model(texts).squeeze(1)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            predicted = torch.round(torch.sigmoid(outputs))
            total_val += labels.size(0)
            correct_val += (predicted == labels).sum().item()

    val_accuracy = 100 * correct_val / total_val
    avg_val_loss = val_loss / len(val_loader)

    print(f'Epoch {epoch+1}, Train Loss: {train_loss / len(train_loader)}, Train Accuracy: {train_accuracy}%, '
          f'Validation Loss: {avg_val_loss}, Validation Accuracy: {val_accuracy}%')

    # 3.2.3 - early stopping 
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        epochs_no_impr = 0
        torch.save(model.state_dict(), 'best_CNN_model.pt')
    else:
        epochs_no_impr += 1
        if epochs_no_impr >= wait:
            print('----> early stopped')
            early_stop = True
            break
    

Epoch 1, Train Loss: 0.4749968820294981, Train Accuracy: 76.47430199934841%, Validation Loss: 0.4468413706261495, Validation Accuracy: 78.22102823741272%
Epoch 2, Train Loss: 0.44268891318126397, Train Accuracy: 78.73099417054401%, Validation Loss: 0.44194557072111595, Validation Accuracy: 78.66444270595008%
Epoch 3, Train Loss: 0.43503227187179383, Train Accuracy: 79.18115595929818%, Validation Loss: 0.43797343466539995, Validation Accuracy: 78.85220009742129%
Epoch 4, Train Loss: 0.43026945868487004, Train Accuracy: 79.46709346424042%, Validation Loss: 0.438119698254627, Validation Accuracy: 78.86253265827268%
Epoch 5, Train Loss: 0.4263809618399031, Train Accuracy: 79.69052958535899%, Validation Loss: 0.4338283095626351, Validation Accuracy: 79.03789097672221%
Epoch 6, Train Loss: 0.4231345355402118, Train Accuracy: 79.88853498148049%, Validation Loss: 0.43369690737048866, Validation Accuracy: 78.9369270964028%
Epoch 7, Train Loss: 0.4199432632062654, Train Accuracy: 80.056048811809

### 3.3 - TESTING

In [66]:
# 3.3.0 - load best model
model.load_state_dict(torch.load('best_CNN_model.pt'))
print('model loaded')

# 3.3.1 - testing loop
model.eval()
test_loss = 0.0
correct_test = 0
total_test = 0
with torch.no_grad():
    for texts, labels in test_loader:
        outputs = model(texts).squeeze(1)
        loss = criterion(outputs, labels)
        test_loss += loss.item()
        predicted = torch.round(torch.sigmoid(outputs))
        total_test += labels.size(0)
        correct_test += (predicted == labels).sum().item()

test_accuracy = 100 * correct_test / total_test
print(f'Test Loss: {test_loss / len(test_loader)}, Test Accuracy: {test_accuracy}%')


model loaded
Test Loss: 0.43334260722279166, Test Accuracy: 79.05861792074063%


## 4 - CNN LSTM

### 4.1 - MODEL

In [None]:
# 4.1.0 - V1 (LSTM BEFORE CNN)
class CNN_LSTM(nn.Module):
    def __init__(self, vocab_size, embed_dim, lstm_hidden_dim, num_classes):
        super(CNN_LSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, lstm_hidden_dim, batch_first=True)
        self.conv1 = nn.Conv1d(in_channels=lstm_hidden_dim, out_channels=32, kernel_size=3, padding=1)
        self.pool1 = nn.MaxPool1d(kernel_size=3, stride=2)
        self.conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.pool2 = nn.MaxPool1d(kernel_size=3, stride=2)
        self.conv3 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.global_pool = nn.AdaptiveMaxPool1d(1)
        self.fc1 = nn.Linear(128, 256)
        self.fc2 = nn.Linear(256, num_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.embedding(x)
        x, _ = self.lstm(x)
        x = x.transpose(1, 2)
        x = self.pool1(F.relu(self.conv1(x)))
        x = self.pool2(F.relu(self.conv2(x)))
        x = self.global_pool(F.relu(self.conv3(x))).squeeze(2)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [67]:
# V2 (CNN BEFORE LSTM)
class CNN_LSTM(nn.Module):
    def __init__(self, max_features, embed_dim):
        super(CNN_LSTM, self).__init__()
        self.embedding = nn.Embedding(max_features, embed_dim)
        self.conv1 = nn.Conv1d(embed_dim, 64, kernel_size=3, padding='same')
        self.pool1 = nn.MaxPool1d(kernel_size=2)
        self.conv2 = nn.Conv1d(64, 64, kernel_size=3, padding='same')
        self.pool2 = nn.MaxPool1d(kernel_size=2)
        self.lstm = nn.LSTM(64, 128, batch_first=True, dropout=0.2, num_layers=2)
        self.fc = nn.Linear(128, 1)
        
    def forward(self, x):
        x = self.embedding(x)
        x = x.permute(0, 2, 1)
        x = torch.relu(self.conv1(x))
        x = self.pool1(x)
        x = torch.relu(self.conv2(x))
        x = self.pool2(x)
        x = x.permute(0, 2, 1)
        x, _ = self.lstm(x)
        x = x[:, -1, :]
        x = self.fc(x)
        return x

In [69]:
# 4.1.1 - initialize model
#model = CNN_LSTM(vocab_size=V+1, embed_dim=20, lstm_hidden_dim=128, num_classes=1)
model = CNN_LSTM(max_features=20000, embed_dim=128)
# 4.1.2- loss
criterion = nn.BCEWithLogitsLoss()
# 4.1.3 - optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)

### 4.2 - TRAINING

In [70]:
# 4.2.0 - since overfitting was observed -> early stopping parameters
wait = 5
best_val_loss = np.inf
epochs_no_impr = 0
early_stop = False

# 4.2.1 - training loop
epochs = 50
for epoch in range(epochs):
    model.train()
    train_loss = 0.0
    correct_train = 0
    total_train = 0
    for texts, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(texts).squeeze(1)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        predicted = torch.round(torch.sigmoid(outputs))
        total_train += labels.size(0)
        correct_train += (predicted == labels).sum().item()
    
    train_accuracy = 100 * correct_train / total_train
    
    # 4.2.2 - validation
    model.eval()
    val_loss = 0.0
    correct_val = 0
    total_val = 0
    with torch.no_grad():
        for texts, labels in val_loader:
            outputs = model(texts).squeeze(1)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            predicted = torch.round(torch.sigmoid(outputs))
            total_val += labels.size(0)
            correct_val += (predicted == labels).sum().item()

    val_accuracy = 100 * correct_val / total_val
    avg_val_loss = val_loss / len(val_loader)

    print(f'Epoch {epoch+1}, Train Loss: {train_loss / len(train_loader)}, Train Accuracy: {train_accuracy}%, '
          f'Validation Loss: {avg_val_loss}, Validation Accuracy: {val_accuracy}%')

    # 4.2.3 - early stopping 
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        epochs_no_impr = 0
        torch.save(model.state_dict(), 'best_CNN_LSTM_model.pt')
    else:
        epochs_no_impr += 1
        if epochs_no_impr >= wait:
            print('----> early stopped')
            early_stop = True
            break
    

Epoch 1, Train Loss: 0.4525655537886538, Train Accuracy: 77.89222306921016%, Validation Loss: 0.4334343575868786, Validation Accuracy: 79.04143356901413%
Epoch 2, Train Loss: 0.4243964763732016, Train Accuracy: 79.65269980041309%, Validation Loss: 0.4292376635797799, Validation Accuracy: 79.2758350923288%
Epoch 3, Train Loss: 0.414595533696048, Train Accuracy: 80.25974765382584%, Validation Loss: 0.42791075378532517, Validation Accuracy: 79.45444078704593%
Epoch 4, Train Loss: 0.40803844500514436, Train Accuracy: 80.67865875066819%, Validation Loss: 0.4254891642300884, Validation Accuracy: 79.49931362274344%
Epoch 5, Train Loss: 0.40440591695584444, Train Accuracy: 80.94251833764032%, Validation Loss: 0.42992554790532683, Validation Accuracy: 79.61474308825483%
Epoch 6, Train Loss: 0.4011529373586951, Train Accuracy: 81.14526827200754%, Validation Loss: 0.4279269365927751, Validation Accuracy: 79.49872319069479%


KeyboardInterrupt: 

### 4.3 - TESTING

In [None]:
# 4.3.0 - load best model
model.load_state_dict(torch.load('best_model.pt'))

# 4.3.1 - testing loop
model.eval()
test_loss = 0.0
correct_test = 0
total_test = 0
with torch.no_grad():
    for texts, labels in test_loader:
        outputs = model(texts)
        loss = criterion(outputs, labels)
        test_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total_test += labels.size(0)
        correct_test += (predicted == labels).sum().item()

test_accuracy = 100 * correct_test / total_test
print(f'Test Loss: {test_loss / len(test_loader)}, Test Accuracy: {test_accuracy}%')