In [1]:
import pandas as pd
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, models
from torch.utils.data import Dataset, DataLoader, Subset
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
from sklearn.model_selection import StratifiedKFold
import numpy as np
from PIL import Image

# Load the Excel file and check for NaN values
file_path = 'train_data.xlsx'
df = pd.read_excel(file_path)
df = df.dropna(subset=['Label_Sentiment'])  # Remove rows with NaN in 'Label_Sentiment'
df['Label_Sentiment'] = df['Label_Sentiment'].astype(int)  # Ensure Label_Sentiment is integer type

class ImageOnlyDataset(Dataset):
    def __init__(self, dataframe, img_dir, transform=None):
        self.dataframe = dataframe
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, self.dataframe.iloc[idx, 0])
        
        try:
            image = Image.open(img_name).convert("RGB")
        except FileNotFoundError:
            return None
        
        if self.transform:
            image = self.transform(image)
        
        # Use existing label as it is already numeric (0 = negative, 1 = positive)
        label = torch.tensor(self.dataframe.iloc[idx, 2], dtype=torch.long)
        
        sample = {
            'image': image,
            'label': label
        }
        return sample

# Transformations for the image
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Define the dataset
dataset = ImageOnlyDataset(dataframe=df, img_dir='images/', transform=transform)

# Custom collate function to filter out None samples
def collate_fn(batch):
    batch = [sample for sample in batch if sample is not None]
    if len(batch) == 0:
        return None
    return torch.utils.data.dataloader.default_collate(batch)

class VisionOnlyModel(nn.Module):
    def __init__(self):
        super(VisionOnlyModel, self).__init__()
        # Load a pre-trained ConvNeXt model
        self.vision_model = models.convnext_base(weights='IMAGENET1K_V1')
        
        # Access the number of input features for the final layer in the classifier head
        in_features = self.vision_model.classifier[2].in_features
        
        # Modify the classifier to fit 2 classes (binary classification)
        self.vision_model.classifier[2] = nn.Linear(in_features, 2)

    def forward(self, images):
        return self.vision_model(images)

# Initialize model, loss, and optimizer
model = VisionOnlyModel()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Early stopping parameters
patience = 2
best_loss = float('inf')
early_stop_counter = 0

# K-Fold Cross-Validation
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
fold_results = []

for fold, (train_idx, test_idx) in enumerate(skf.split(dataset, df['Label_Sentiment'])):
    print(f'Fold {fold + 1}')
    
    train_subsampler = Subset(dataset, train_idx)
    test_subsampler = Subset(dataset, test_idx)
    
    train_dataloader = DataLoader(train_subsampler, batch_size=16, shuffle=True, collate_fn=collate_fn)
    test_dataloader = DataLoader(test_subsampler, batch_size=16, shuffle=False, collate_fn=collate_fn)
    
    # Training loop with early stopping
    model.train()
    for epoch in range(10):  
        epoch_loss = 0.0
        for batch in train_dataloader:
            if batch is None:
                continue
            optimizer.zero_grad()
            outputs = model(images=batch['image'])
            loss = criterion(outputs, batch['label'])
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        
        # Validation step for early stopping
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for batch in test_dataloader:
                if batch is None:
                    continue
                outputs = model(images=batch['image'])
                loss = criterion(outputs, batch['label'])
                val_loss += loss.item()
        
        print(f'Epoch {epoch + 1} - Train Loss: {epoch_loss:.4f}, Val Loss: {val_loss:.4f}')
        
        # Check early stopping condition
        if val_loss < best_loss:
            best_loss = val_loss
            early_stop_counter = 0  
        else:
            early_stop_counter += 1
            if early_stop_counter >= patience:
                print(f'Early stopping triggered at epoch {epoch + 1}')
                break  
    
    # Evaluation
    model.eval()
    all_labels = []
    all_preds = []
    with torch.no_grad():
        for batch in test_dataloader:
            if batch is None:
                continue
            outputs = model(images=batch['image'])
            _, preds = torch.max(outputs, 1)
            all_labels.extend(batch['label'].numpy())
            all_preds.extend(preds.numpy())

    # Calculate metrics with zero_division parameter
    accuracy = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, zero_division=1)
    precision = precision_score(all_labels, all_preds, zero_division=1)
    recall = recall_score(all_labels, all_preds, zero_division=1)
    conf_matrix = confusion_matrix(all_labels, all_preds, labels=[0, 1])

    fold_results.append({
        'fold': fold + 1,
        'accuracy': accuracy,
        'f1': f1,
        'precision': precision,
        'recall': recall,
        'confusion_matrix': conf_matrix
    })
    print(f'Fold {fold + 1} - Accuracy: {accuracy}, F1: {f1}, Precision: {precision}, Recall: {recall}')
    print(f'Confusion Matrix:\n{conf_matrix}')

# Average results across folds
avg_accuracy = np.mean([result['accuracy'] for result in fold_results])
avg_f1 = np.mean([result['f1'] for result in fold_results])
avg_precision = np.mean([result['precision'] for result in fold_results])
avg_recall = np.mean([result['recall'] for result in fold_results])

print(f'Average Accuracy: {avg_accuracy}')
print(f'Average F1 Score: {avg_f1}')
print(f'Average Precision: {avg_precision}')
print(f'Average Recall: {avg_recall}')

Fold 1
Epoch 1 - Train Loss: 242.3675, Val Loss: 57.0813
Epoch 2 - Train Loss: 165.8241, Val Loss: 33.7356
Epoch 3 - Train Loss: 42.6389, Val Loss: 36.9767
Epoch 4 - Train Loss: 15.0330, Val Loss: 56.2547
Early stopping triggered at epoch 4
Fold 1 - Accuracy: 0.801980198019802, F1: 0.781591263650546, Precision: 0.8652849740932642, Recall: 0.7126600284495022
Confusion Matrix:
[[633  78]
 [202 501]]
Fold 2
Epoch 1 - Train Loss: 73.8217, Val Loss: 3.5563
Epoch 2 - Train Loss: 14.7827, Val Loss: 4.6822
Epoch 3 - Train Loss: 5.8733, Val Loss: 3.6717
Early stopping triggered at epoch 3
Fold 2 - Accuracy: 0.9872611464968153, F1: 0.9871611982881597, Precision: 0.9885714285714285, Recall: 0.9857549857549858
Confusion Matrix:
[[703   8]
 [ 10 692]]
Fold 3
Epoch 1 - Train Loss: 27.6342, Val Loss: 1.7666
Epoch 2 - Train Loss: 9.8275, Val Loss: 1.7963
Epoch 3 - Train Loss: 4.9408, Val Loss: 3.6990
Early stopping triggered at epoch 3
Fold 3 - Accuracy: 0.9879688605803255, F1: 0.9878831076265147, Pre

In [6]:
torch.save(model.state_dict(), 'trained_model.pth')


In [8]:
import pandas as pd
import os
import torch
import torch.nn as nn
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
from PIL import Image, ImageFile

# Allow truncated images to be loaded
ImageFile.LOAD_TRUNCATED_IMAGES = True

# Load the new test data
test_file_path = 'test_data.xlsx'
test_df = pd.read_excel(test_file_path)
test_df = test_df.dropna(subset=['Label_Sentiment'])  # Remove rows with NaN in 'Label_Sentiment'
test_df['Label_Sentiment'] = test_df['Label_Sentiment'].astype(int)  # Ensure Label_Sentiment is integer type

# Dataset class for loading images
class ImageOnlyDataset(Dataset):
    def __init__(self, dataframe, img_dir, transform=None):
        self.dataframe = dataframe
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, self.dataframe.iloc[idx, 0])
        
        try:
            # Open and process the image
            image = Image.open(img_name).convert("RGB")
        except (FileNotFoundError, OSError):
            return None  # Skip corrupted or missing images

        if self.transform:
            image = self.transform(image)

        label = torch.tensor(self.dataframe.iloc[idx, 2], dtype=torch.long)
        
        sample = {'image': image, 'label': label}
        return sample

# Image transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Define the test dataset
test_dataset = ImageOnlyDataset(dataframe=test_df, img_dir='images/', transform=transform)

# Custom collate function to filter out None samples
def collate_fn(batch):
    batch = [sample for sample in batch if sample is not None]
    if len(batch) == 0:
        return None
    return torch.utils.data.dataloader.default_collate(batch)

# Test DataLoader
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False, collate_fn=collate_fn)

# Define the model (same as used in training)
class VisionOnlyModel(nn.Module):
    def __init__(self):
        super(VisionOnlyModel, self).__init__()
        self.vision_model = models.convnext_base(weights='IMAGENET1K_V1')
        in_features = self.vision_model.classifier[2].in_features
        self.vision_model.classifier[2] = nn.Linear(in_features, 2)

    def forward(self, images):
        return self.vision_model(images)

# Load the trained model
model = VisionOnlyModel()
model.load_state_dict(torch.load('trained_model.pth'))  # Load the saved model weights
model.eval()

# Evaluate the model on the test set
all_labels = []
all_preds = []
with torch.no_grad():
    for batch in test_dataloader:
        if batch is None:
            continue
        outputs = model(images=batch['image'])
        _, preds = torch.max(outputs, 1)
        all_labels.extend(batch['label'].numpy())
        all_preds.extend(preds.numpy())

# Calculate metrics
accuracy = accuracy_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds, zero_division=1)
precision = precision_score(all_labels, all_preds, zero_division=1)
recall = recall_score(all_labels, all_preds, zero_division=1)
conf_matrix = confusion_matrix(all_labels, all_preds, labels=[0, 1])

# Print evaluation metrics
print(f'Test Set Accuracy: {accuracy}')
print(f'Test Set F1 Score: {f1}')
print(f'Test Set Precision: {precision}')
print(f'Test Set Recall: {recall}')
print(f'Confusion Matrix:\n{conf_matrix}')


  model.load_state_dict(torch.load('trained_model.pth'))  # Load the saved model weights


Test Set Accuracy: 0.5080128205128205
Test Set F1 Score: 0.5576368876080692
Test Set Precision: 0.5187667560321716
Test Set Recall: 0.602803738317757
Confusion Matrix:
[[247 359]
 [255 387]]
