In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset
import random
import os

## BERT[CLS]+CNN_classification (train)

In [None]:
class CNNClassifier(nn.Module):
    def __init__(self, num_classes, input_size, kernel_size=3, dropout_rate=0.2):
        super(CNNClassifier, self).__init__()
        self.conv = nn.Conv1d(in_channels=1, out_channels=64, kernel_size=kernel_size)
        self.bn = nn.BatchNorm1d(64)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout_rate)
        self.pool = nn.MaxPool1d(kernel_size=kernel_size)
        self.fc_input_size = self._calculate_fc_input_size(input_size, kernel_size)
        self.hidden1 = nn.Linear(self.fc_input_size, 256)
        self.hidden1_bn = nn.BatchNorm1d(256)
        self.hidden1_relu = nn.ReLU()
        self.hidden1_dropout = nn.Dropout(dropout_rate)
        self.fc = nn.Linear(256, num_classes)

    def _calculate_fc_input_size(self, input_size, kernel_size):
        size = input_size
        size = (size - (kernel_size - 1) - 1) + 1
        size = size // kernel_size
        size = size * 64
        return size

    def forward(self, x):
        x = x.unsqueeze(1)         
        x = self.conv(x)           
        x = self.bn(x)            
        x = self.relu(x)           
        x = self.dropout(x)        
        x = self.pool(x)           
        x = torch.flatten(x, 1)    
        x = self.hidden1(x)        
        x = self.hidden1_bn(x)     
        x = self.hidden1_relu(x)   
        x = self.hidden1_dropout(x)
        x = self.fc(x)             
        return x

def load_data(file_path, feature_path):
    df = pd.read_csv(file_path)
    labels = df['encoding'].tolist() 
    features = np.load(feature_path)
    return labels, features

def build_dataloader(X, y, batch_size):
    tensor_x = torch.tensor(X).float()
    tensor_y = torch.tensor(y).long()
    dataset = TensorDataset(tensor_x, tensor_y)
    dataloader = DataLoader(dataset, shuffle=True, batch_size=batch_size)
    return dataloader

def set_seed(seed_value=42):
    random.seed(seed_value)
    np.random.seed(seed_value)
    torch.manual_seed(seed_value)
    torch.cuda.manual_seed_all(seed_value)
    os.environ['PYTHONHASHSEED'] = str(seed_value)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def train_and_evaluate(model, train_loader, val_loader, criterion, optimizer, device, epochs, early_stopping_patience=3):
    best_val_loss = float('inf')
    early_stopping_counter = 0

    for epoch in range(epochs):
        model.train()
        train_loss, train_correct, total_train = 0, 0, 0
        for data, target in train_loader:
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            _, pred = torch.max(output, dim=1)
            train_correct += (pred == target).sum().item()
            total_train += target.size(0)
        
        train_accuracy = train_correct / total_train
        
        val_loss, val_correct, total_val = 0, 0, 0
        model.eval()
        with torch.inference_mode():
            for data, target in val_loader:
                data, target = data.to(device), target.to(device)
                output = model(data)
                loss = criterion(output, target)
                val_loss += loss.item()
                _, pred = torch.max(output, dim=1)
                val_correct += (pred == target).sum().item()
                total_val += target.size(0)
   
        val_accuracy = val_correct / total_val
        
        print(f'Epoch {epoch+1}/{epochs}')
        print(f'Train_loss: {train_loss:.4f} - Train_acc: {train_accuracy:.4f} \nVal_loss: {val_loss:.4f} - Val_acc: {val_accuracy:.4f}')
        print('='*30)
        
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            early_stopping_counter = 0
            torch.save(model.state_dict(), "best_model_cnn.pth")
        else:
            early_stopping_counter += 1
            if early_stopping_counter >= early_stopping_patience:
                print("Early stopping triggered.")
                break

def process_data_and_train_model(file_path, feature_path, batch_size, epochs, lr, device, seed=42):
    set_seed(seed)

    labels, features = load_data(file_path, feature_path)
    X_train, X_val, y_train, y_val = train_test_split(features, labels, test_size=0.2, random_state=seed, stratify=labels)
    train_loader = build_dataloader(X_train, y_train, batch_size)
    val_loader = build_dataloader(X_val, y_val, batch_size)
    num_classes = 2 # change to 3 if multi-classification 
    input_size = 768 # 769 # 849 # input size depends on concatenating liwc or vader vectors
    model = CNNClassifier(num_classes=num_classes, input_size=input_size).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay = 1e-2)

    train_and_evaluate(model, train_loader, val_loader, criterion, optimizer, device, epochs)

# Parameters for model training
file_path = "" # train_input_file
feature_path = "" # train_feature_file
batch_size = 16 #20
epochs = 20
lr = 2e-5
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

process_data_and_train_model(file_path, feature_path, batch_size, epochs, lr, device)

## test

In [None]:
class CNNClassifier(nn.Module):
    def __init__(self, num_classes, input_size, kernel_size=3, dropout_rate=0.2):
        super(CNNClassifier, self).__init__()
        self.conv = nn.Conv1d(in_channels=1, out_channels=64, kernel_size=kernel_size)
        self.bn = nn.BatchNorm1d(64)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout_rate)
        self.pool = nn.MaxPool1d(kernel_size=kernel_size)
        self.fc_input_size = self._calculate_fc_input_size(input_size, kernel_size)
        self.hidden1 = nn.Linear(self.fc_input_size, 256)
        self.hidden1_bn = nn.BatchNorm1d(256)
        self.hidden1_relu = nn.ReLU()
        self.hidden1_dropout = nn.Dropout(dropout_rate)
        self.fc = nn.Linear(256, num_classes)

    def _calculate_fc_input_size(self, input_size, kernel_size):
        size = input_size
        size = (size - (kernel_size - 1) - 1) + 1
        size = size // kernel_size
        size = size * 64
        return size

    def forward(self, x):
        x = x.unsqueeze(1)         
        x = self.conv(x)          
        x = self.bn(x)            
        x = self.relu(x)           
        x = self.dropout(x)        
        x = self.pool(x)           
        x = torch.flatten(x, 1)    
        x = self.hidden1(x)       
        x = self.hidden1_bn(x)     
        x = self.hidden1_relu(x)   
        x = self.hidden1_dropout(x)
        x = self.fc(x)             
        return x

def load_data(file_path, feature_path):
    df = pd.read_csv(file_path)
    labels = df['encoding'].tolist() 
    features = np.load(feature_path)
    return labels, features

def build_dataloader(X, y, batch_size):
    tensor_x = torch.tensor(X).float()
    tensor_y = torch.tensor(y).long()
    dataset = TensorDataset(tensor_x, tensor_y)
    dataloader = DataLoader(dataset, shuffle=True, batch_size=batch_size)
    return dataloader

def load_best_model(model_path, num_classes, input_size, device):
    model = CNNClassifier(num_classes=num_classes, input_size=input_size).to(device)
    model.load_state_dict(torch.load(model_path, map_location=device, weights_only=True))  
    model.to(device)
    return model

def evaluate(model, data_loader, device):
    model.eval()
    total_correct, total = 0, 0
    all_predictions = []
    all_targets = []

    with torch.inference_mode():
        for data, target in data_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            _, pred = torch.max(output, dim=1)
            total_correct += (pred == target).sum().item()
            total += target.size(0)
            all_predictions.extend(pred.cpu().numpy())
            all_targets.extend(target.cpu().numpy())

    accuracy = total_correct / total
    precision = precision_score(all_targets, all_predictions, average='macro') # change to average='weighted' if multi-classification 
    recall = recall_score(all_targets, all_predictions, average='macro') # change to average='weighted' if multi-classification 
    f1 = f1_score(all_targets, all_predictions, average='macro') # change to average='weighted' if multi-classification 

    return accuracy, precision, recall, f1

def test_model(test_file_path, test_feature_path, batch_size, model_path, device):
    
    test_labels, test_features = load_data(test_file_path, test_feature_path)
    test_loader = build_dataloader(test_features, test_labels, batch_size)
    num_classes = 2 # change to 3 if multi-classification
    input_size = 768 # 769 # 849 # input size depends on concatenating liwc or vader vectors
    best_model = load_best_model(model_path, num_classes, input_size, device)
    accuracy, precision, recall, f1 = evaluate(best_model, test_loader, device)
    print(f"Test Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1-Score: {f1:.4f}")

test_file_path = "" # test_input_file
test_feature_path = "" # test_feature_file
model_path = "best_model_cnn.pth"
batch_size = 16
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

test_model(test_file_path, test_feature_path, batch_size, model_path, device)