In [7]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset
import torchvision.models as models
import torchvision.transforms as transforms
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import math
import imageio
from os import listdir
import warnings
import filecmp
from PIL import Image
import gc
import psutil
from tqdm import tqdm
import sys
from time import ctime
import cv2
import os
from PIL import Image

In [14]:
class ProtestDataset(Dataset):
    def __init__(self, images, labels=None, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform
        
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        image = self.images[idx]
        
        # Convert to PIL Image if necessary (for transforms)
        if self.transform:
            image = Image.fromarray((image * 255).astype(np.uint8))
            image = self.transform(image)
        else:
            # PyTorch expects image data in format [C, H, W]
            image = torch.tensor(image.transpose(2, 0, 1), dtype=torch.float32)
        
        if self.labels is not None:
            label = torch.tensor(self.labels[idx], dtype=torch.float32)
            return image, label
        else:
            return image

In [15]:
class ProtestResNet(nn.Module):
    def __init__(self, num_classes=1):
        super(ProtestResNet, self).__init__()
        # Load pre-trained ResNet50
        self.resnet = models.resnet50(pretrained=True)
        
        # Replace the final fully connected layer for our binary classification
        num_features = self.resnet.fc.in_features
        
        # Remove the original FC layer and add our custom classifier
        self.resnet.fc = nn.Identity()
        
        # Custom classifier
        self.classifier = nn.Sequential(
            nn.Dropout(0.4),
            nn.Linear(num_features, 2048),
            nn.ELU(),
            nn.Dropout(0.4),
            nn.BatchNorm1d(2048),
            nn.Linear(2048, 1024),
            nn.ELU(),
            nn.Dropout(0.4),
            nn.BatchNorm1d(1024),
            nn.Linear(1024, 512),
            nn.ELU(),
            nn.Dropout(0.4),
            nn.BatchNorm1d(512),
            nn.Linear(512, num_classes),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.resnet(x)
        x = self.classifier(x)
        return x

In [16]:
# Locate files
train_folder = r'data\train'
test_folder = r'data\test'
train_csv = r'data\annot_train.txt'
test_csv = r'data\annot_test.txt'
annot_train = pd.read_csv(train_csv, sep='\t')
annot_test = pd.read_csv(test_csv, sep='\t')

# Replace "-" with 0 due to the way the file is written
annot_train = annot_train.replace("-", 0)
annot_test = annot_test.replace("-", 0)

In [17]:
# Load data
print(f"Starting data loading: {ctime()}")
train_images = []
train_labels = []
test_images = []
test_labels = []

# Load training data
print(f"Loading training data from {train_folder}")
for file in tqdm(os.listdir(train_folder), desc="Loading training images"):
    if file.endswith('.jpg') or file.endswith('.png') or file.endswith('.jpeg'):
        try:
            # Get the protest label for this image
            protest_value = 0
            if file in annot_train['fname'].values:
                protest_value = annot_train[annot_train.fname == file]['protest'].iloc[0]
            
            # Read and preprocess image
            filepath = os.path.join(train_folder, file)
            image = imageio.imread(filepath)
            
            # Handle grayscale images (convert to RGB)
            if len(image.shape) == 2:
                image = np.stack([image, image, image], axis=2)
            elif image.shape[2] == 4:  # Handle RGBA images
                image = image[:, :, :3]
                
            # Resize image to 50x50 (initial preprocessing)
            image = cv2.resize(image, (50, 50))
            image = image / 255.0  # Normalize to [0,1]
            
            # Add to datasets
            train_images.append(image)
            train_labels.append(protest_value)
            
        except Exception as e:
            print(f"Error processing {file}: {e}")

# Load test data
print(f"Loading test data from {test_folder}")
for file in tqdm(os.listdir(test_folder), desc="Loading test images"):
    if file.endswith('.jpg') or file.endswith('.png') or file.endswith('.jpeg'):
        try:
            # Get the protest label for this image if available
            protest_value = 0
            if file in annot_test['fname'].values:
                protest_value = annot_test[annot_test.fname == file]['protest'].iloc[0]
            
            # Read and preprocess image
            filepath = os.path.join(test_folder, file)
            image = imageio.imread(filepath)
            
            # Handle grayscale images (convert to RGB)
            if len(image.shape) == 2:
                image = np.stack([image, image, image], axis=2)
            elif image.shape[2] == 4:  # Handle RGBA images
                image = image[:, :, :3]
                
            # Resize image to 50x50 (initial preprocessing)
            image = cv2.resize(image, (50, 50))
            image = image / 255.0  # Normalize to [0,1]
            
            # Add to datasets
            test_images.append(image)
            test_labels.append(protest_value)
            
        except Exception as e:
            print(f"Error processing {file}: {e}")

# Convert lists to numpy arrays
train_images = np.array(train_images)
train_labels = np.array(train_labels)
test_images = np.array(test_images)
test_labels = np.array(test_labels)

print(f"Data loading completed: {ctime()}")
print(f"Training data: {train_images.shape[0]} images")
print(f"Test data: {test_images.shape[0]} images")

Starting data loading: Wed Jun 18 13:24:00 2025
Loading training data from data\train


  image = imageio.imread(filepath)
Loading training images: 100%|██████████| 32611/32611 [04:16<00:00, 127.04it/s]


Loading test data from data\test


  image = imageio.imread(filepath)
Loading test images: 100%|██████████| 8153/8153 [00:45<00:00, 180.16it/s]


Data loading completed: Wed Jun 18 13:29:04 2025
Training data: 32611 images
Test data: 8153 images


In [18]:
# Check class distribution
train_class_counts = np.bincount(train_labels.astype(int))
test_class_counts = np.bincount(test_labels.astype(int))

print(f"Training class distribution: Non-protest: {train_class_counts[0]}, Protest: {train_class_counts[1] if len(train_class_counts) > 1 else 0}")
print(f"Test class distribution: Non-protest: {test_class_counts[0]}, Protest: {test_class_counts[1] if len(test_class_counts) > 1 else 0}")

# PyTorch specific preprocessing
# Since ResNet expects 224x224 images, we'll need to resize
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # ResNet expects 224x224 images
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # ImageNet normalization
])

# Split training data for validation
X_train, X_val, y_train, y_val = train_test_split(train_images, train_labels, test_size=0.2, random_state=42, stratify=train_labels)

# Create datasets
train_dataset = ProtestDataset(X_train, y_train, transform=transform)
val_dataset = ProtestDataset(X_val, y_val, transform=transform)
test_dataset = ProtestDataset(test_images, test_labels, transform=transform)

# Create data loaders
batch_size = 32  # Increased batch size if your GPU can handle it
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0)

Training class distribution: Non-protest: 23295, Protest: 9316
Test class distribution: Non-protest: 5810, Protest: 2343


In [19]:
# Check if CUDA is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cpu


In [21]:

# Initialize the model
model = ProtestResNet(num_classes=1)
model = model.to(device)

# Loss function and optimizer
criterion = nn.BCELoss()
lr = 1e-3
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, nesterov=True, weight_decay=1e-6)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=3)

# Early stopping parameters
early_stopping_patience = 5
best_val_acc = 0.0
no_improve_epochs = 0

# Training function
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=10):
    global best_val_acc, no_improve_epochs
    
    train_losses = []
    val_losses = []
    train_accs = []
    val_accs = []
    
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Train]"):
            inputs = inputs.to(device)
            labels = labels.to(device).view(-1, 1)  # Reshape to match output
            
            # Zero the parameter gradients
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            # Backward pass and optimize
            loss.backward()
            optimizer.step()
            
            # Statistics
            running_loss += loss.item() * inputs.size(0)
            predicted = (outputs > 0.5).float()
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        
        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = correct / total
        train_losses.append(epoch_loss)
        train_accs.append(epoch_acc)
        
        # Validation phase
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        
        with torch.no_grad():
            for inputs, labels in tqdm(val_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Val]"):
                inputs = inputs.to(device)
                labels = labels.to(device).view(-1, 1)
                
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                
                # Statistics
                val_loss += loss.item() * inputs.size(0)
                predicted = (outputs > 0.5).float()
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
            
        val_loss = val_loss / len(val_loader.dataset)
        val_acc = correct / total
        val_losses.append(val_loss)
        val_accs.append(val_acc)
        
        # Update the learning rate based on validation accuracy
        scheduler.step(val_acc)
        
        # Check for improvement for early stopping
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), 'best_model.pt')
            print(f"Saved new best model with validation accuracy: {val_acc:.4f}")
            no_improve_epochs = 0
        else:
            no_improve_epochs += 1
            if no_improve_epochs >= early_stopping_patience:
                print(f"Early stopping triggered after {epoch+1} epochs")
                break
        
        print(f"Epoch {epoch+1}/{num_epochs} - "
              f"Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.4f}, "
              f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
    
    return {
        'train_loss': train_losses,
        'val_loss': val_losses,
        'train_acc': train_accs,
        'val_acc': val_accs
    }

In [22]:
# Train the model
num_epochs = 10
print("Starting training...")
history = train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=num_epochs)

# Load the best model
model.load_state_dict(torch.load('best_model.pt'))
print("Loaded best model from checkpoint")

# Make sure datasets have proper __len__ method
# This is a safer approach to add the method to the class if it's missing
if not hasattr(test_dataset.__class__, '__len__'):
    def dataset_len(self):
        return len(self.images)
    
    # Add the method to the class itself, not the instance
    setattr(test_dataset.__class__, '__len__', dataset_len)

# No need to add it separately to train and val datasets, as they should 
# be instances of the same class as test_dataset

# Plot training history
plt.figure(figsize=(12, 5))

# Plot accuracy
plt.subplot(1, 2, 1)
plt.plot(history['train_acc'], label='Train Accuracy')
plt.plot(history['val_acc'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Model Accuracy')

# Mark the best model
best_epoch = np.argmax(history['val_acc'])
plt.plot(best_epoch, history['val_acc'][best_epoch], 'rx', markersize=10)
plt.annotate(f"Best: {history['val_acc'][best_epoch]:.4f}", 
             (float(best_epoch), float(history['val_acc'][best_epoch])),
             xytext=(float(best_epoch+0.5), float(history['val_acc'][best_epoch])),
             fontsize=9)

# Plot loss
plt.subplot(1, 2, 2)
plt.plot(history['train_loss'], label='Train Loss')
plt.plot(history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Model Loss')

# Mark the best model
best_epoch_loss = np.argmin(history['val_loss'])
plt.plot(best_epoch_loss, history['val_loss'][best_epoch_loss], 'rx', markersize=10)
plt.annotate(f"Best: {history['val_loss'][best_epoch_loss]:.4f}", 
             (float(best_epoch_loss), float(history['val_loss'][best_epoch_loss])),
             xytext=(float(best_epoch_loss+0.5), float(history['val_loss'][best_epoch_loss])),
             fontsize=9)

plt.tight_layout()
plt.show()

Starting training...


Epoch 1/10 [Train]:  67%|██████▋   | 545/816 [29:07<14:28,  3.21s/it]


KeyboardInterrupt: 

In [None]:

# Evaluate on test set
model.eval()
all_preds = []
all_labels = []

test_loss = 0.0
correct = 0
total = 0

with torch.no_grad():
    for inputs, labels in tqdm(test_loader, desc="Evaluating test data"):
        inputs = inputs.to(device)
        labels = labels.to(device).view(-1, 1)
        
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        test_loss += loss.item() * inputs.size(0)
        predicted = (outputs > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        
        # Store predictions and labels for metrics
        all_preds.extend(predicted.cpu().numpy().flatten())
        all_labels.extend(labels.cpu().numpy().flatten())

test_loss = test_loss / len(test_loader.dataset)
test_accuracy = correct / total

print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")

# Calculate and print additional metrics
from sklearn.metrics import classification_report, confusion_matrix

print("\nClassification Report:")
print(classification_report(all_labels, all_preds))

print("\nConfusion Matrix:")
conf_matrix = confusion_matrix(all_labels, all_preds)
print(conf_matrix)

# Plot confusion matrix
plt.figure(figsize=(8, 6))
plt.imshow(conf_matrix, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()

classes = ['Non-Protest', 'Protest']
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes, rotation=45)
plt.yticks(tick_marks, classes)

# Add text annotations
thresh = conf_matrix.max() / 2.0
for i in range(conf_matrix.shape[0]):
    for j in range(conf_matrix.shape[1]):
        plt.text(j, i, format(conf_matrix[i, j], 'd'),
                horizontalalignment="center",
                color="white" if conf_matrix[i, j] > thresh else "black")

plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()

# Save the final model
torch.save({
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'val_accuracy': best_val_acc,
    'test_accuracy': test_accuracy
}, 'protest_detection_model.pt')

print(f"Model saved to protest_detection_model.pt with test accuracy: {test_accuracy:.4f}")

In [None]:
train=np.array(train)
label=np.array(label)
label=label.reshape(label.shape[0],1)

In [None]:
es=EarlyStopping(monitor='val_acc',mode=max,verbose=1,patience=3)
mc=ModelCheckpoint('best_model.h5',monitor='val_acc',mode=max,verbose=1,save_best_only=True)

In [None]:
lr = 1e-3
optimizer = SGD(lr=lr, decay=1e-6, momentum=0.9, nesterov=True) # Adam(lr=lr, decay=0.01)
model_resnet.compile(optimizer=optimizer, loss=keras.losses.binary_crossentropy, metrics=['accuracy'])
# model.summary()
gc.collect()

In [None]:
history=model_resnet.fit(train,label,epochs=10,batch_size=10,verbose=1,validation_split=0.35,shuffle=True,callbacks=[es,mc])

In [None]:
model=keras.models.load_model('best_model.h5')

In [None]:
accu = history.history['acc']
val_acc = history.history['val_acc']

plt.plot(accu, label="Accuracy")
plt.plot(val_acc)
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend(['Acc', 'val_acc'])
plt.plot(np.argmax(history.history["val_acc"]), np.max(history.history["val_acc"]), marker="x", color="r",
         label="best model")
plt.show()

In [None]:
plt.figure(figsize=(8, 8))
plt.title("Learning curve")
plt.plot(history.history["loss"], label="loss")
plt.plot(history.history["val_loss"], label="val_loss")
plt.plot(np.argmin(history.history["val_loss"]), np.min(history.history["val_loss"]), marker="x", color="r",
         label="best model")
plt.xlabel("Epochs")
plt.ylabel("Binary CrossEntropy")
plt.legend();

In [None]:
test=np.array(test)

predict=model.predict(test)
predict=np.squeeze(predict,axis=1)
for i in range(predict.shape[0]):
    if(predict[i]<0.5):
        predict[i]=0
    else:
        predict[i]=1


label2=np.array(label2)
print("Accuracy: ",round(accuracy_score(predict,label2)*100,2))


In [None]:
model.save('besttt_model.h5')