In [4]:
import pandas as pd
import numpy as np


In [5]:
# read feature dataset
train_feat = np.load("datasets/train/train_feature.npz", allow_pickle=True)
train_feat_X = train_feat['features']
train_feat_Y = train_feat['label']

test_feat_X = np.load("datasets/test/test_feature.npz", allow_pickle=True)['features']

In [6]:
print(f"Train dataset size: ")
print(f"train_feat_X: {train_feat_X.shape} train_feat_Y: {train_feat_Y.shape}")


print()
print("Test dataset size: ")
print(f"test_feat_X: {test_feat_X.shape}")

Train dataset size: 
train_feat_X: (7080, 13, 768) train_feat_Y: (7080,)

Test dataset size: 
test_feat_X: (2232, 13, 768)


In [7]:
import sklearn

In [8]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.decomposition import PCA
import torch
import torch.nn as nn
import torch.optim as optim

In [9]:
# Sample data generator (replace this with your actual data loading mechanism)
num_samples = 7080
num_features = 786
num_emoticons = 13

# Random data for demonstration (replace with actual dataset)
X = np.random.rand(num_samples, num_emoticons, num_features)
y = np.random.randint(2, size=num_samples)  # Binary labels

# Split data into train and test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Convert data to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

# Define the model with Learnable Aggregation and PCA
class AggregationPCABinaryClassifier(nn.Module):
    def __init__(self, num_pca_components=128):
        super(AggregationPCABinaryClassifier, self).__init__()
        self.num_emoticons = num_emoticons
        self.num_features = num_features
        
        # Learnable weights for aggregation (13x1 vector of learnable parameters)
        self.aggregation_weights = nn.Parameter(torch.randn(num_emoticons, 1))
        
        # Placeholder for PCA (torch cannot directly do PCA, so we will precompute it externally)
        self.pca = PCA(n_components=num_pca_components)
        
        # Fully connected layers for classification after PCA
        self.fc1 = nn.Linear(num_pca_components, 64)
        self.fc2 = nn.Linear(64, 1)
        
        # Activation functions
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # Input x: (batch_size, num_emoticons, num_features) = (batch_size, 13, 786)
        
        # Learnable aggregation step: Weighted sum of emoticon embeddings
        weights = self.aggregation_weights.unsqueeze(0)  # Shape: (1, 13, 1)
        aggregated_embedding = torch.sum(x * weights, dim=1)  # (batch_size, 786)
        
        # Apply PCA (this is precomputed externally; we will use the `transform` function)
        pca_transformed = torch.tensor(self.pca.transform(aggregated_embedding.detach().numpy()), dtype=torch.float32)  # (batch_size, num_pca_components)
        
        # Fully connected layer
        x = self.relu(self.fc1(pca_transformed))  # (batch_size, 64)
        x = self.sigmoid(self.fc2(x))  # (batch_size, 1)
        
        return x

# Instantiate the model
model = AggregationPCABinaryClassifier()

# First, we need to fit PCA on the training data
# Preprocess the aggregated embeddings to apply PCA on them
with torch.no_grad():
    # Perform learnable aggregation without training first to get embeddings
    aggregated_train_embeddings = torch.sum(X_train_tensor * model.aggregation_weights.unsqueeze(0), dim=1).numpy()
    aggregated_test_embeddings = torch.sum(X_test_tensor * model.aggregation_weights.unsqueeze(0), dim=1).numpy()
    
    # Fit PCA on the aggregated training embeddings
    model.pca.fit(aggregated_train_embeddings)

# Loss and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training the model
epochs = 10
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()
    
    # Forward pass
    outputs = model(X_train_tensor)
    loss = criterion(outputs, y_train_tensor)
    
    # Backward pass and optimization
    loss.backward()
    optimizer.step()
    
    if (epoch+1) % 2 == 0:
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')

# Evaluate the model
model.eval()
with torch.no_grad():
    train_outputs = model(X_train_tensor)
    test_outputs = model(X_test_tensor)
    
    train_predictions = (train_outputs >= 0.5).float()
    test_predictions = (test_outputs >= 0.5).float()
    
    train_accuracy = accuracy_score(y_train_tensor, train_predictions)
    test_accuracy = accuracy_score(y_test_tensor, test_predictions)
    
    print(f'Train Accuracy: {train_accuracy:.4f}')
    print(f'Test Accuracy: {test_accuracy:.4f}')

Epoch [2/10], Loss: 0.7015
Epoch [4/10], Loss: 0.6943
Epoch [6/10], Loss: 0.6883
Epoch [8/10], Loss: 0.6831
Epoch [10/10], Loss: 0.6784
Train Accuracy: 0.5692
Test Accuracy: 0.4753


In [18]:
# Example data generation
X = train_feat_X  # Replace with actual data
y = train_feat_Y  # Replace with actual labels

X.shape

(7080, 13, 768)

In [19]:
# Split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Normalize the data (standardizing)
scaler = StandardScaler()

# Flatten the input for normalization, then reshape it back
X_train_reshaped = X_train.reshape(-1, 13 * 768)
X_test_reshaped = X_test.reshape(-1, 13 * 768)

X_train_scaled = scaler.fit_transform(X_train_reshaped).reshape(-1, 13, 768)
X_test_scaled = scaler.transform(X_test_reshaped).reshape(-1, 13, 768)

# Convert data to PyTorch tensors
X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)  # Add a dimension for binary output
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

# CNN model definition
class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=(3, 3))  # 1 input channel, 32 output filters, 3x3 kernel
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=(3, 3))
        self.fc1 = nn.Linear(self._get_conv_output_size(), 64)  # Adjust according to the flattened dimension
        self.fc2 = nn.Linear(64, 1)
        self.sigmoid = nn.Sigmoid()

    def _get_conv_output_size(self):
        # Create a dummy input tensor to calculate the size after convolutions and pooling
        dummy_input = torch.zeros(1, 1, 13, 768)
        output = self.pool(torch.relu(self.conv1(dummy_input)))
        output = self.pool(torch.relu(self.conv2(output)))
        return int(np.prod(output.size()))  # Flatten size

    def forward(self, x):
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))
        x = x.view(x.size(0), -1)  # Flatten the tensor for the fully connected layers
        x = torch.relu(self.fc1(x))
        x = self.sigmoid(self.fc2(x))
        return x

# Instantiate the model, define the loss function and the optimizer
model = CNNModel()
criterion = nn.BCELoss()  # Binary Cross Entropy Loss for binary classification
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Add channel dimension to inputs for Conv2D
X_train_tensor = X_train_tensor.unsqueeze(1)
X_test_tensor = X_test_tensor.unsqueeze(1)

# Train the model
num_epochs = 10
batch_size = 32

# Training loop
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for i in range(0, len(X_train_tensor), batch_size):
        X_batch = X_train_tensor[i:i+batch_size]
        y_batch = y_train_tensor[i:i+batch_size]

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(X_batch)

        # Compute loss
        loss = criterion(outputs, y_batch)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(X_train_tensor):.4f}")

# Evaluate the model
model.eval()
with torch.no_grad():
    y_pred_probs = model(X_test_tensor)
    y_pred = (y_pred_probs > 0.5).float()

# Convert predictions and targets to numpy for evaluation
y_pred_np = y_pred.numpy()
y_test_np = y_test_tensor.numpy()

# Calculate accuracy
accuracy = accuracy_score(y_test_np, y_pred_np)
print(f"Test Accuracy: {accuracy * 100:.2f}%")

# Confusion matrix
conf_matrix = confusion_matrix(y_test_np, y_pred_np)
print("Confusion Matrix:")
print(conf_matrix)


Epoch 1/10, Loss: 0.0173
Epoch 2/10, Loss: 0.0159
Epoch 3/10, Loss: 0.0158
Epoch 4/10, Loss: 0.0157
Epoch 5/10, Loss: 0.0155
Epoch 6/10, Loss: 0.0153
Epoch 7/10, Loss: 0.0152
Epoch 8/10, Loss: 0.0150
Epoch 9/10, Loss: 0.0148
Epoch 10/10, Loss: 0.0145
Test Accuracy: 71.19%
Confusion Matrix:
[[460 259]
 [149 548]]
