In [72]:
import os
import pandas as pd
import matplotlib.pyplot as plt
import json
import cv2
import numpy as np
import torch
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
import torchvision.models as models
import torch.nn as nn
import torch.optim as optim
import argparse
from torch.utils.data import random_split
from PIL import Image
from torch.utils.tensorboard import SummaryWriter
import torchvision.transforms.functional as F
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
import warnings
from torch.optim import lr_scheduler


warnings.filterwarnings("ignore", category=UserWarning) 



In [2]:
news_categories = {
    0: "știri_fabricate",
    1: "știri_ficționale",
    2: "știri_plauzibile",
    3: "știri_propagandistice",
    4: "știri_reale",
    5: "știri_satirice"}

In [23]:
class ImageEncoder(nn.Module):
    def __init__(self, args):
        super(ImageEncoder, self).__init__()
        self.args = args
        model = models.resnet152(pretrained=True)
        modules = list(model.children())[:-2]
        self.model = nn.Sequential(*modules)

        pool_func = (
            nn.AdaptiveAvgPool2d
            if args.img_embed_pool_type == "avg"
            else nn.AdaptiveMaxPool2d
        )

        if args.num_image_embeds in [1, 2, 3, 5, 7]:
            self.pool = pool_func((args.num_image_embeds, 1))
        elif args.num_image_embeds == 4:
            self.pool = pool_func((2, 2))
        elif args.num_image_embeds == 6:
            self.pool = pool_func((3, 2))
        elif args.num_image_embeds == 8:
            self.pool = pool_func((4, 2))
        elif args.num_image_embeds == 9:
            self.pool = pool_func((3, 3))

    def forward(self, x):
        # Bx3x224x224 -> Bx2048x7x7 -> Bx2048xN -> BxNx2048
        out = self.pool(self.model(x))
        out = torch.flatten(out, start_dim=2)
        out = out.transpose(1, 2).contiguous()
        return out  # BxNx2048


class ImageClf(nn.Module):
    def __init__(self, args):
        super(ImageClf, self).__init__()
        self.args = args
        self.img_encoder = ImageEncoder(args)
        self.clf = nn.Linear(args.img_hidden_sz * args.num_image_embeds, args.n_classes)

    def forward(self, x):
        x = self.img_encoder(x)
        x = torch.flatten(x, start_dim=1)
        out = self.clf(x)
        return out

In [60]:
resnet_model = models.resnet50(pretrained=True)

In [48]:
class CustomDataset(Dataset):
    def __init__(self, file_paths, labels, transform=None):
        self.file_paths = file_paths
        self.labels = labels
        self.transform = transform



    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        try:
            img_path, label = self.file_paths[idx], self.labels[idx]
            image = Image.open(img_path).convert("RGB")

            if image is None:
                print(img_path)
                raise ValueError(f"Error loading image at index {idx}: Image is empty or cannot be loaded")


            if self.transform:
                image = self.transform(image)
                
            return image, label
        except Exception as e:
            print(self.file_paths[idx])
            print(f"Error loading image at index {idx}: {str(e)}")
            raise

In [52]:
root_dir = r'C:\Users\bebed\OneDrive\Desktop\Dizertatie\dataset\images'
classes = sorted(os.listdir(root_dir))
file_paths = []
labels = []
for i, class_name in enumerate(classes):
    class_path = os.path.join(root_dir, class_name)
    filenames = os.listdir(class_path)

    for filename in filenames:
        if filename.endswith(('.png', '.jpg', '.jpeg')):
            img_path = os.path.join(class_path, filename)
            file_paths.append(img_path)
            labels.append(i)
        


train_file_paths, test_file_paths, train_labels, test_labels = train_test_split(
    file_paths, labels, test_size=0.2, random_state=42
)

DATA_MEAN = (0.5, 0.5, 0.5)		# define the mean for the scaling transform - PIL images already come given in
DATA_STD = (0.5, 0.5, 0.5)		# define the standard deviation for the scaling transform

train_transform = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.RandomRotation(degrees=30),
    transforms.RandomVerticalFlip(),
    transforms.RandomPerspective(distortion_scale=0.5, p=0.5),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.9, 1.1), shear=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
    transforms.Resize(224),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])



batch_size = 32  # Adjust the batch size as needed

train_dataset = CustomDataset(train_file_paths, train_labels, transform=train_transform)
test_dataset = CustomDataset(test_file_paths, test_labels, transform=test_transform)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

print(torch.tensor(next(iter(train_loader))[0]).shape)







torch.Size([32, 3, 224, 224])


  print(torch.tensor(next(iter(train_loader))[0]).shape)


In [19]:
def get_num_params(model):
    num_params = 0
    for params in model.parameters():
        num_params += params.shape.numel()

    return num_params

In [20]:
print("Total number of parameters of models")
print(str(resnet_model.__class__), ": ", get_num_params(resnet_model))

Total number of parameters of models
<class 'torchvision.models.resnet.ResNet'> :  25557032


In [None]:
data_dir = r'C:\Users\bebed\OneDrive\Desktop\Dizertatie\dataset\images'

# Define transformations for data augmentation and normalization
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

# Data augmentation transforms for training
train_transform = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.RandomRotation(degrees=30),
    transforms.RandomVerticalFlip(),
    transforms.RandomPerspective(distortion_scale=0.5, p=0.5),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.9, 1.1), shear=0.1),
    transforms.RandomApply([transforms.RandomErasing(p=0.5, scale=(0.02, 0.2))], p=0.5),  # Add random erasing
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std)
])
test_transform = transforms.Compose([
    transforms.Resize(224),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std),
])
# Create a dataset using ImageFolder
full_dataset = ImageFolder(root=data_dir)

full_data_loader =  DataLoader(full_dataset, batch_size=32, shuffle=True, num_workers=4)
train_size = int(0.8 * len(full_dataset))  # 80% for training
test_size = len(full_dataset) - train_size  # 20% for testing
train_dataset, test_dataset = random_split(full_dataset, [train_size, test_size])

print(f"Size of the dataset: {len(full_dataset)} samples")

num_classes = len(full_dataset.classes)
print(num_classes)
print(full_dataset)

# Apply the respective transforms to each dataset
train_dataset.dataset.transform = train_transform
test_dataset.dataset.transform = test_transform

# Create DataLoaders for train and test datasets
batch_size = 32  # Adjust the batch size as needed
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
test_data_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

Size of the dataset: 838 samples
5
Dataset ImageFolder
    Number of datapoints: 838
    Root location: C:\Users\bebed\OneDrive\Desktop\Dizertatie\dataset\images


In [44]:
class EarlyStopping:
    def __init__(self, patience=5):
        self.patience = patience
        self.counter = 0
        self.best_loss = None
        self.early_stop = False

    def __call__(self, val_loss, model):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss > self.best_loss:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.counter = 0

In [88]:
resnet_model = models.resnet50(weights='ResNet50_Weights.DEFAULT')

early_stopping = EarlyStopping(patience=10)

# Modify the last layer for your specific number of classes
num_classes = len(classes)

print(num_classes)
resnet_model.fc = nn.Sequential(
    nn.Dropout(0.5),
    nn.Linear(resnet_model.fc.in_features, num_classes),
)
# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(resnet_model.parameters(), lr=0.001)
scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)


# Set the device (CPU or GPU)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
resnet_model.to(device)
print(device)
writer = SummaryWriter('logs')


# # Training loop
num_epochs = 50
for epoch in range(num_epochs):
    resnet_model.train()  # Set the model back to training mode
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        # Forward pass
        outputs = resnet_model(inputs)
        loss = criterion(outputs, labels)
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    scheduler.step()
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}')
    writer.add_scalar('Train Loss', loss.item(), epoch)

    resnet_model.eval()  # Set the model to evaluation mode
    total_val_loss = 0.0
    correct_predictions = 0
    total_samples = 0

    with torch.no_grad():
        for val_inputs, val_labels in test_loader:
            val_inputs, val_labels = val_inputs.to(device), val_labels.to(device)

            # Forward pass for validation
            val_outputs = resnet_model(val_inputs)
            val_loss = criterion(val_outputs, val_labels)

            # Update validation loss
            total_val_loss += val_loss.item()

            # Update correct predictions and total samples for accuracy calculation
            _, predicted = torch.max(val_outputs, 1)
            correct_predictions += (predicted == val_labels).sum().item()
            total_samples += val_labels.size(0)

    # Calculate and log validation loss
    avg_val_loss = total_val_loss / len(test_loader)
    writer.add_scalar('Validation Loss', avg_val_loss, epoch)

    # Calculate and log validation accuracy
    accuracy = correct_predictions / total_samples
    writer.add_scalar('Validation Accuracy', accuracy, epoch)

    # Print training and validation information
    print(f'Epoch {epoch + 1}/{num_epochs}, Train Loss: {loss.item():.4f}, Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {accuracy:.4f}')
    early_stopping(avg_val_loss, resnet_model)
    if early_stopping.early_stop:
        print("Early stopping")
        break

# Save the trained model

model_path = r'C:\Users\bebed\OneDrive\Desktop\BigData\project\resnet152_model'

torch.save(resnet_model.state_dict(), model_path)
writer.close()


5
cuda:0
Epoch 1/50, Loss: 1.595841646194458
Epoch 1/50, Train Loss: 1.5958, Validation Loss: 1.5797, Validation Accuracy: 0.3452
Epoch 2/50, Loss: 1.2877551317214966
Epoch 2/50, Train Loss: 1.2878, Validation Loss: 1.3959, Validation Accuracy: 0.4107
Epoch 3/50, Loss: 1.4104557037353516
Epoch 3/50, Train Loss: 1.4105, Validation Loss: 1.6400, Validation Accuracy: 0.3988
Epoch 4/50, Loss: 1.572829008102417
Epoch 4/50, Train Loss: 1.5728, Validation Loss: 1.5070, Validation Accuracy: 0.3750
Epoch 5/50, Loss: 1.3616150617599487
Epoch 5/50, Train Loss: 1.3616, Validation Loss: 1.6145, Validation Accuracy: 0.3750
Epoch 6/50, Loss: 1.5737003087997437
Epoch 6/50, Train Loss: 1.5737, Validation Loss: 1.5574, Validation Accuracy: 0.3929
Epoch 7/50, Loss: 1.3507859706878662
Epoch 7/50, Train Loss: 1.3508, Validation Loss: 1.6782, Validation Accuracy: 0.4048
Epoch 8/50, Loss: 1.4684399366378784
Epoch 8/50, Train Loss: 1.4684, Validation Loss: 1.4321, Validation Accuracy: 0.4167
Epoch 9/50, Loss:

In [89]:
resnet_model.eval()

correct_predictions = 0
total_samples = 0

with torch.no_grad():
    for test_inputs, test_labels in test_loader:
        test_inputs, test_labels = test_inputs.to(device), test_labels.to(device)

        # Forward pass
        test_outputs = resnet_model(test_inputs)

        # Get predicted labels
        _, predicted = torch.max(test_outputs, 1)

        # Update counts
        total_samples += test_labels.size(0)
        correct_predictions += (predicted == test_labels).sum().item()

# Calculate accuracy
accuracy = correct_predictions / total_samples
print(f'Test Accuracy: {accuracy * 100:.2f}%')


Test Accuracy: 47.02%


In [95]:
directory_path = r'C:\Users\bebed\OneDrive\Desktop\Dizertatie\dataset\images\plauzibile'

# Filename
filename = 'a1Uw9QZ5fecppuwC8i7zSjjaOLca-vaccin_sample_350.jpg'

# Construct the full image path
image_path = os.path.join(directory_path, filename)

input_image = Image.open(image_path)
input_tensor = test_transform(input_image)

# Move the input tensor to the GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
input_tensor = input_tensor.to(device)

# Add batch dimension
input_batch = input_tensor.unsqueeze(0)

# Move the model to the GPU
resnet_model = resnet_model.to(device)

# Make predictions
with torch.no_grad():
    predictions = resnet_model(input_batch)

# Interpret the model's output
predicted_label_index = torch.argmax(predictions).item()
# predicted_label = full_dataset.classes[predicted_label_index]

# Print the predicted label
print(f"The predicted label is: {predicted_label_index}")

The predicted label is: 2
