In [1]:
import torchvision
torchvision.disable_beta_transforms_warning()
from torchvision.transforms import v2
from torchvision import datasets, transforms

import timm
import torch.nn as nn
import torch
from torch.utils.data import DataLoader
import cv2
import numpy as np
import time
from sklearn import metrics

import zipfile
import fnmatch
from PIL import Image, ImageChops, ImageEnhance

#For texture extraction
from skimage import feature
import os
import csv

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
def extract_texture_features(image):
    gray_image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
    gray_image = gray_image.astype(np.uint8)  # Convert to integer type
    radius = 3
    n_points = 8 * radius
    lbp = feature.local_binary_pattern(gray_image, n_points, radius, method='uniform')

    lbp = lbp / lbp.max()  # Normalize the LBP values to the range [0, 1]
    
    return lbp

def extract_color_features(image, quality=95, enhance_factor=10):
    temp_path = "temp_recompressed.jpg"
    image.save(temp_path, format="JPEG", quality=quality)
    with Image.open(temp_path) as recompressed:
        ela_image = ImageChops.difference(image, recompressed)
    os.remove(temp_path)

    enhancer = ImageEnhance.Brightness(ela_image)
    enhanced_ela = enhancer.enhance(enhance_factor)

    resized_ela = enhanced_ela.resize((224, 224)).convert("L")
    feature_array = np.array(resized_ela).astype(np.float32) / 255.0
    return feature_array


def extract_shape_features(image):
    kernel_size = 5
    transform_iteration = 5

    # Define the kernel
    kernel = np.ones((kernel_size, kernel_size), np.uint8)

    image = cv2.resize(image, (224, 224))  # Resize to (224, 224)

    image_dict = {}
    image_dict["original_image"] = image
    image_dict["eroded_image"] = cv2.erode(image_dict["original_image"], kernel, iterations=transform_iteration)
    image_dict["dilated_image"] = cv2.dilate(image_dict["original_image"], kernel, iterations=transform_iteration)
    image_dict["opened_image"] = cv2.dilate(image_dict['eroded_image'], kernel, iterations=transform_iteration)
    image_dict["closed_image"] = cv2.erode(image_dict['dilated_image'], kernel, iterations=transform_iteration)

    opened_image_resized = cv2.cvtColor(image_dict["opened_image"], cv2.COLOR_RGB2GRAY)

    return opened_image_resized  # Shape: (224, 224)


In [3]:
def load_image_from_zip(zip_path, img_path):
    with zipfile.ZipFile(zip_path, 'r') as zf:
        with zf.open(img_path) as file:
            img = Image.open(file)
            return img.convert("RGB")  # Ensure the image is in RGB format
        
class ZipImageFolderDataset(datasets.ImageFolder):
    def __init__(self, zip_path, root, transform=None):
        self.zip_path = zip_path
        self.root = root
        self.transform = transform
        self.classes = ['0_real', '1_fake']
        self.img_paths = self._get_image_paths()

    def _get_image_paths(self):
        img_paths = []
        with zipfile.ZipFile(self.zip_path, 'r') as zf:
            for file_info in zf.infolist():
                name = file_info.filename
                if fnmatch.fnmatch(name, f"{self.root}/*.jpg"):
                    label = 0 if '0_real' in name.split('/')[1] else 1
                    img_paths.append((name, label))
        return img_paths

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, index):
        img_path, label = self.img_paths[index]
        img = load_image_from_zip(self.zip_path, img_path)
        if self.transform:
            img_tensor = self.transform(img)
        
        # Ensure the image is now a tensor
        if not isinstance(img_tensor, torch.Tensor):
            raise TypeError(f"Expected image to be a tensor, but got {type(img_tensor)}.")
        
        # Convert tensor to numpy array for feature extraction
        img_np = img_tensor.numpy().transpose(1, 2, 0)
        
        # Extract features
        texture_features = extract_texture_features(img_np)
        color_features = extract_color_features(img)
        shape_features = extract_shape_features(img_np)
        
        features = np.stack([texture_features, color_features, shape_features], axis=0)
        features = torch.tensor(features).float().permute(1, 2, 0)  # Change the shape to [height, width, channels]
        features = features.permute(2, 0, 1)  # Change the shape to [channels, height, width]
        
        return features, label

In [4]:
def evaluate(model, test_loader):
    # Validation phase
    model.eval()
    
    y_true = []
    y_pred = []
    
    batch = 0
    print(f"Total batches: {len(test_loader)}")
    for img, label in test_loader:
        # Please make sure that the "pred" is binary result
        output = model(img.to(DEVICE))
        pred = np.argmax(output.detach().to('cpu'), axis=1).numpy()
        
        y_true.extend(label.numpy())
        y_pred.extend(pred)
        
        print(f"Batch: {batch} completed")
        batch += 1

    y_true = np.array(y_true)
    y_pred = np.array(y_pred)

    accuracy = metrics.accuracy_score(y_true, y_pred)
    print(f'Validation Accuracy: {accuracy}')
    return accuracy

In [5]:
def validate_model(model, val_loader, criterion):
    model.eval()
    val_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(DEVICE), labels.to(DEVICE).float().unsqueeze(1)
            outputs = model(images)
            val_loss += criterion(outputs, labels).item()
            predicted = (outputs > 0.5).int()
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    val_loss /= len(val_loader)
    accuracy = correct / len(val_loader.dataset)
    return val_loss, accuracy

In [6]:
def save_model(model, path='testing.pth'):
    torch.save(model.state_dict(), path)
    # for name, param in model.state_dict().items():
    #     with open(r"weights.txt",'a') as file:
    #         file.write(f"Layer: {name} | Size: {param.size()} | Values : {param[:2]} \n")  
    # with open(r"weights.txt",'a') as file:
    #     file.write('-'*100 + '\n') 
    print("Model saved successfully!")

In [7]:
def load_model(path='testing.pth'):
    model = timm.create_model('seresnext101_32x4d', pretrained=True)
    model.load_state_dict(torch.load(path, map_location=DEVICE))
    print("Model loaded successfully!")
    return model

In [8]:
def train_model(model, train_loader, val_loader, optimizer, criterion, epochs):
    model.train()
    best_val_loss = float('inf')
    patience = 3
    epochs_without_improvement = 0
    train_losses = []
    val_losses = []
    val_accuracies = []
    for epoch in range(epochs):
        running_loss = 0
        print(f"Epoch {epoch+1} started...")
        print(f"length of train_loader: {len(train_loader)}")
        start_time = time.time()
        
        batch = 1
        for features, labels in train_loader:
            # images = images.to(DEVICE)
            labels = labels.to(DEVICE)
            features = features.to(DEVICE)

            optimizer.zero_grad()
            outputs = model(features)
            
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * features.size(0)
            print(f'Batch {batch} completed...')
            batch += 1
        epoch_loss = running_loss / len(train_loader.dataset)
        print(f'Epoch {epoch+1}/{epochs}, Loss: {epoch_loss:.4f}, Time: {time.time()-start_time:.2f}s')
        train_losses.append(epoch_loss)
        
        val_loss, val_accuracy = validate_model(model, val_loader, criterion)
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            epochs_without_improvement = 0
            # Save the best model weights (optional)
            save_model(model)
        else:
            epochs_without_improvement += 1
            
        val_losses.append(val_loss)
        val_accuracies.append(val_accuracy)
        
        if epochs_without_improvement >= patience:
            print(f"Early stopping at epoch {epoch + 1} due to no improvement in validation loss.")
            break
    return train_losses, val_losses, val_accuracies
        

In [9]:
def load_data(zip_path, batch_size, image_size):
    transform = transforms.Compose([
        transforms.Resize((image_size, image_size)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    train_dir = "AIGC-Detection-Dataset/train"
    val_dir = "AIGC-Detection-Dataset/val"
    # test_dir = "AIGC-Detection-Dataset/val"

    train_dataset = ZipImageFolderDataset(zip_path, train_dir, transform=transform)
    val_dataset = ZipImageFolderDataset(zip_path, val_dir, transform=transform)
    # test_dataset = ZipImageFolderDataset(zip_path, test_dir, transform=transform)
    print(f"Data prepared:\nTrain: {len(train_dataset)}, Val: {len(val_dataset)}")

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, drop_last=True)
    # test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, drop_last=True)
    
    print("Data loaded")
    return train_loader, val_loader

In [10]:
def save_results(train_losses, val_losses, val_accuracies, path='results.csv'):
    with open(path, 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['Epoch', 'Train Loss', 'Val Loss', 'Val Accuracy'])
        for i in range(len(train_losses)):
            writer.writerow([i+1, train_losses[i], val_losses[i], val_accuracies[i]])

In [None]:

model = timm.create_model('seresnext101_32x4d', pretrained=True)
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
# Modify the first convolutional layer
original_conv1 = model.conv1
new_conv1 = nn.Conv2d(3, original_conv1.out_channels, kernel_size=original_conv1.kernel_size,
                      stride=original_conv1.stride, padding=original_conv1.padding, bias=False)
with torch.no_grad():
    new_conv1.weight[:, :3, :, :] = original_conv1.weight[:, :3, :, :]
model.conv1 = new_conv1

# Load the data
zip_path = '..\AIGC-Detection-Dataset.zip'
batch_size = 64
image_size = 224
train_loader, val_loader = load_data(zip_path, batch_size, image_size)

criterion = nn.CrossEntropyLoss().to(DEVICE)
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# Adjust the final layer for binary classification
model.fc = nn.Linear(model.fc.in_features, 2)

model = model.to(DEVICE)
train_losses, val_losses, val_accuracies = train_model(model, train_loader, val_loader, optimizer, criterion, 5)

# Save the results to a CSV file
save_results(train_losses, val_losses, val_accuracies)

save_model(model, 'seresnext_finetuned.pth')
# evaluate(model, train_loader)
# evaluate(model, val_loader)


Data prepared:
Train: 45000, Val: 5000
Data loaded
Epoch 1 started...
length of train_loader: 45000
Batch 1 completed...
Batch 2 completed...
Batch 3 completed...
Batch 4 completed...
Batch 5 completed...
Batch 6 completed...
Batch 7 completed...
Batch 8 completed...
Batch 9 completed...
