In [2]:
import cv2
from skimage.filters import laplace
import numpy as np
from scipy.fft import fft2,fftshift
import pandas as pd
from tqdm import tqdm
from scipy.stats import skew

In [3]:
import os
import random
from joblib import Parallel,delayed

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
class DatasetGenerator:
    def __init__(self,image_dir,random_seed=69):
        self.feature_names=['laplacian_var','laplacian_max','ma_laplacian','vlow_freqE','lowmid_freqE','mid_freqE','hog_mean','hog_std','hog_skewness','ninth_percentile']
        
        self.image_dir=image_dir
        self.image_paths=[os.path.join(image_dir,fname) for fname in os.listdir(image_dir) if fname.endswith(('.jpg','.jpeg','.png')) ]
        self.kernel_size_choices=(3,5,7)
        self.target_image_size=(299,299)
        self.metadata_df=self.__create_metadata_df(random_seed)
        self.features_df=None
        
       
    def __create_metadata_df(self,random_seed):
        random.seed(random_seed)
        all_sample_records=[]
        for image_path in self.image_paths:
            kernel_idx=(hash(image_path)+random.randint(0,1000))%len(self.kernel_size_choices)
            kernel_size=self.kernel_size_choices[kernel_idx]
            all_sample_records.append({"image_path":image_path,"blur_type":"box","kernel_size":kernel_size})
            all_sample_records.append({"image_path":image_path,"blur_type":"gaussian","kernel_size":kernel_size})

        return pd.DataFrame(all_sample_records)


    def apply_blur(self,image_path,kernel_size,blur_type):
        img=cv2.imread(image_path)
        img = cv2.resize(img, (256, 256))
        if blur_type=='box':
            img=cv2.blur(img,(kernel_size,)*2)
        else:
            img=cv2.GaussianBlur(img,(kernel_size,)*2,sigmaX=0)

        return img
    

    def extract_features(self,blurred_image):
        
        if len(blurred_image.shape)==3:
            greyed_img=cv2.cvtColor(blurred_image,cv2.COLOR_BGR2GRAY)
        else:
            greyed_img=blurred_image
        
        features=[]
        
        laplacian=laplace(greyed_img)
        laplacian_variance=np.var(laplacian)
        features.append(laplacian_variance)
        features.extend([np.max(laplacian),np.mean(np.abs(laplacian))])
        
        fft_image=fftshift(np.abs(fft2(greyed_img)))
        h,w=fft_image.shape
        center_y,center_x=h//2,w//2

        for radius in [h//8,h//6,h//4]:
            mask=np.zeros((h,w))
            cv2.circle(mask,(center_x,center_y),radius,1,-1)
            features.append(np.mean(fft_image[mask==1]))

        hog=cv2.HOGDescriptor((64,64),(16,16),(8,8),(8,8),9)
        hog_features=hog.compute(greyed_img).flatten()
        features.extend([np.mean(hog_features),np.std(hog_features),skew(hog_features),np.percentile(hog_features,90)])

        return np.array(features)
    

    def generate_dataset(self, output_path=None, n_jobs=-1, batch_size=1000):

        features = []
        labels = []
        
        # Process in parallel batches
        def process_batch(batch_indices):
            batch_features = []
            batch_labels = []
            for idx in batch_indices:
                row = self.metadata_df.iloc[idx]
                blurred = self.apply_blur(
                    row['image_path'],
                    row['kernel_size'],
                    row['blur_type']
                )
                feats = self.extract_features(blurred)
                batch_features.append(feats)
                batch_labels.append(0 if row['blur_type'] == 'box' else 1)
            return batch_features, batch_labels
        
        # Create batch indices
        indices = np.arange(len(self.metadata_df))
        batches = [indices[i:i+batch_size] for i in range(0, len(indices), batch_size)]
        
        # Parallel processing
        results = Parallel(n_jobs=n_jobs)(
            [delayed(process_batch)(batch) 
            for batch in tqdm(batches, desc="Processing images")]
        )
        
        # Combine results
        for batch_feats, batch_labels in results:
            features.extend(batch_feats)
            labels.extend(batch_labels)
        
        # Create final DataFrame
        self.features_df = pd.DataFrame(features, columns=self.feature_names)
        self.features_df['image_path'] = self.metadata_df['image_path'].values
        self.features_df['label'] = labels
        
        if output_path:
            self.features_df.to_csv(output_path, index=False)
        
        return self.features_df


In [None]:
import torch.nn as nn
import torch
from torch.utils.data import DataLoader, Dataset
import pandas as pd
from torch.optim import AdamW, lr_scheduler
from sklearn.preprocessing import StandardScaler
import numpy as np
from pprint import pprint
import matplotlib.pyplot as plt

In [None]:
from sklearn.metrics import accuracy_score,precision_score,recall_score,f1_score,roc_auc_score,classification_report,confusion_matrix,precision_recall_curve,roc_curve,auc
import json
import seaborn as sns

In [None]:
DEVICE=torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
os.makedirs('model',exist_ok=True)
os.makedirs('results',exist_ok=True)

In [None]:
class BlurClassifierModel(nn.Module):
    def __init__(self, input_features_no=10, dropout_rate=0.3):
        super(BlurClassifierModel, self).__init__()
        
        self.input_layer = nn.Linear(input_features_no, 512)
        self.input_bn = nn.BatchNorm1d(512)
        
        self.block1 = nn.Sequential(
            nn.Linear(512, 512),
            nn.BatchNorm1d(512),
            nn.GELU(),
            nn.Dropout(dropout_rate),
            nn.Linear(512, 512),
            nn.BatchNorm1d(512),
        )
        
        self.block2 = nn.Sequential(
            nn.Linear(512, 256),
            nn.BatchNorm1d(256),
            nn.GELU(),
            nn.Dropout(dropout_rate),
            nn.Linear(256, 256),
            nn.BatchNorm1d(256),
        )
        
        self.block3 = nn.Sequential(
            nn.Linear(256, 128),
            nn.BatchNorm1d(128),
            nn.GELU(),
            nn.Dropout(dropout_rate),
            nn.Linear(128, 128),
            nn.BatchNorm1d(128),
        )
        
        self.classifier = nn.Sequential(
            nn.Linear(128, 64),
            nn.GELU(),
            nn.Dropout(dropout_rate * 0.5),
            nn.Linear(64, 32),
            nn.GELU(),
            nn.Linear(32, 1),
            nn.Sigmoid()
        )
        
        self.gelu = nn.GELU()
        
    def forward(self, x):
        # Input processing
        x = self.gelu(self.input_bn(self.input_layer(x)))
        
        # Residual blocks
        identity = x
        x = self.block1(x)
        x = self.gelu(x + identity)  # Residual connection
        
        x = self.block2(x)
        x = self.gelu(x)
        
        x = self.block3(x)
        x = self.gelu(x)
        
        # Classification
        x = self.classifier(x)
        return x

In [None]:
class BlurDataset(Dataset):
    def __init__(self, features_df: pd.DataFrame, scaler=None, fit_scaler=True):
        super().__init__()
        self.features_col = [col for col in features_df.columns if col != 'image_path' and col != 'label']
        self.target_col = 'label'
        
        # Feature scaling
        if scaler is None:
            self.scaler = StandardScaler()
            if fit_scaler:
                features_scaled = self.scaler.fit_transform(features_df[self.features_col])
            else:
                features_scaled = features_df[self.features_col].values
        else:
            self.scaler = scaler
            features_scaled = self.scaler.transform(features_df[self.features_col])
        
        self.X = torch.FloatTensor(features_scaled)
        self.y = torch.FloatTensor(features_df[self.target_col].values).unsqueeze(1)
        
    def __len__(self):
        return len(self.y)
    
    def __getitem__(self, index):
        return self.X[index], self.y[index]


In [None]:
def find_optimal_threshold(model, val_loader, device):
    """Find optimal threshold using validation set"""
    model.eval()
    y_true, y_proba = [], []
    
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            
            y_true.extend(labels.numpy())
            y_proba.extend(outputs.cpu().numpy())
    
    # Different threshold
    thresholds = np.arange(0.3, 0.8, 0.01)
    best_f1 = 0
    best_threshold = 0.5
    
    for threshold in thresholds:
        preds = (np.array(y_proba) > threshold).astype(int)
        from sklearn.metrics import f1_score
        f1 = f1_score(y_true, preds)
        
        if f1 > best_f1:
            best_f1 = f1
            best_threshold = threshold
    
    print(f"Optimal threshold: {best_threshold:.3f} with F1: {best_f1:.4f}")
    return best_threshold


In [None]:
def plot_graph(train_loss_arr,val_loss_arr,val_acc_arr,best_val_loss,best_val_loss_epoch,total_epochs=100):
    ep_arr=range(1,total_epochs+1)
    plt.plot(ep_arr,train_loss_arr,color='r',label='Training Loss')
    plt.plot(ep_arr,val_loss_arr,color='m',label='Validation Loss')
    plt.plot(best_val_loss_epoch,best_val_loss,marker='x',color='g',label='Min Validation Loss',markersize=8)
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    plt.savefig('results/loss.png')
    plt.show()

    plt.plot(ep_arr,val_acc_arr,color='b',label='Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.title('Validation Accuracy')
    plt.legend()
    plt.savefig('results/accuracy.png')
    plt.show()


In [None]:
def generate_figs(y_true,y_pred,y_proba,class_names,save_path):    
    report = classification_report(y_true, y_pred, target_names=class_names, output_dict=True)
    pprint(report)
    with open(f'{save_path}/classification_report.json','w') as f:
        f.write(json.dumps(report))

    # Confusion matrix
    cm=confusion_matrix(y_true,y_pred)
    sns.heatmap(cm, annot=True, fmt='d',cmap='Blues',xticklabels=class_names,yticklabels=class_names)
    plt.title('Confusion Matrix')
    plt.tight_layout()
    plt.savefig(f'{save_path}/cm.png')
    plt.show()


    # Precision-Recall Curve
    precision, recall, _ = precision_recall_curve(y_true, y_proba)
    pr_auc = auc(recall, precision)
    plt.plot(recall, precision, label=f'PR (AUC = {pr_auc:.2f})')
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall Curve')
    plt.legend()
    plt.savefig(f'{save_path}/pr_curve.png')
    plt.tight_layout()
    plt.show()

    #ROC curve
    fpr, tpr, _ = roc_curve(y_true, y_proba)
    roc_auc = auc(fpr, tpr)
    
    plt.figure()
    plt.plot(fpr, tpr, label=f'AUC = {roc_auc:.2f}')
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.legend(loc="lower right")
    plt.tight_layout()
    plt.savefig(f'{save_path}/roc_plot.png')
    plt.show()

In [None]:
def train_model(model, train_loader, val_loader, device, epochs=150, lr=0.0005):
    model = model.to(device)
    
    class FocalLoss(nn.Module):
        def __init__(self, alpha=1, gamma=2):
            super().__init__()
            self.alpha = alpha
            self.gamma = gamma
            
        def forward(self, inputs, targets):
            bce_loss = nn.functional.binary_cross_entropy(inputs, targets, reduction='none')
            pt = torch.exp(-bce_loss)
            focal_loss = self.alpha * (1-pt)**self.gamma * bce_loss
            return focal_loss.mean()
    
    criterion = FocalLoss(alpha=1, gamma=2) 
    
    optimizer = AdamW([
        {'params': model.input_layer.parameters(), 'lr': lr * 0.5},
        {'params': model.block1.parameters(), 'lr': lr},
        {'params': model.block2.parameters(), 'lr': lr},
        {'params': model.block3.parameters(), 'lr': lr},
        {'params': model.classifier.parameters(), 'lr': lr * 1.5}
    ], weight_decay=1e-5)
    
    scheduler = lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=20, T_mult=2)# Cosine annealing with warm restarts
    
    best_val_loss = float('inf')
    best_val_loss_epoch = 0
    train_loss_arr, val_loss_arr, val_accuracy_arr = [], [], []
    patience_counter = 0
    patience = 15
    
    for epoch in range(epochs):
        # Training
        model.train()
        epoch_train_loss = 0.0
        
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            
            # Gradient clipping
            torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
            optimizer.step()
            
            epoch_train_loss += loss.item()
        
        # Validation
        model.eval()
        epoch_val_loss, epoch_val_correct = 0.0, 0
        
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                epoch_val_loss += criterion(outputs, labels).item()
                preds = (outputs > 0.5).float()
                epoch_val_correct += (preds == labels).sum().item()
        
        avg_train_loss = epoch_train_loss / len(train_loader)
        avg_val_loss = epoch_val_loss / len(val_loader)
        val_acc = epoch_val_correct / len(val_loader.dataset)
        
        train_loss_arr.append(avg_train_loss)
        val_loss_arr.append(avg_val_loss)
        val_accuracy_arr.append(val_acc)
        
        scheduler.step()
        
        # Early stopping and model saving
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            best_val_loss_epoch = epoch + 1
            patience_counter = 0
            torch.save(model.state_dict(), 'model/best_improved_model.pth')
        else:
            patience_counter += 1
        
        if epoch % 10 == 0:
            current_lr = optimizer.param_groups[0]['lr']
            print(f'Epoch {epoch+1}/{epochs} | Train Loss: {avg_train_loss:.4f} | '
                  f'Val Loss: {avg_val_loss:.4f} | Val Acc: {val_acc:.4f} | LR: {current_lr:.2e}')
        
        # Early stopping
        if patience_counter >= patience:
            print(f"Early stopping at epoch {epoch+1}")
            break
    
    model.load_state_dict(torch.load('model/best_improved_model.pth'))
    return model, train_loss_arr, val_loss_arr, val_accuracy_arr,best_val_loss_epoch,best_val_loss,epoch+1

In [None]:
def create_datasets(features_df, train_test_val_split=[0.7, 0.15, 0.15]):
    """Create datasets with proper scaling"""
    full_dataset_temp = BlurDataset(features_df, fit_scaler=True)
    scaler = full_dataset_temp.scaler
    
    total_size = len(features_df)
    train_size = int(train_test_val_split[0] * total_size)
    test_size = int(train_test_val_split[1] * total_size)
    val_size = total_size - train_size - test_size

    indices = torch.randperm(total_size, generator=torch.Generator().manual_seed(69))
    train_indices = indices[:train_size]
    test_indices = indices[train_size:train_size + test_size]
    val_indices = indices[train_size + test_size:]

    train_df = features_df.iloc[train_indices].reset_index(drop=True)
    test_df = features_df.iloc[test_indices].reset_index(drop=True)
    val_df = features_df.iloc[val_indices].reset_index(drop=True)
    
    train_dataset = BlurDataset(train_df, scaler=scaler, fit_scaler=True)
    test_dataset = BlurDataset(test_df, scaler=scaler, fit_scaler=False)
    val_dataset = BlurDataset(val_df, scaler=scaler, fit_scaler=False)
    
    return train_dataset, test_dataset, val_dataset, scaler


In [None]:
def evaluate_model_with_threshold(model, test_loader, threshold=0.5, device=None, class_names=None):
    """Evaluate model with custom threshold and show confusion matrix"""
    if device is None:
        device = next(model.parameters()).device
    
    if class_names is None:
        class_names = ["Box Blur", "Gaussian Blur"]
    
    model.eval()
    y_true, y_pred, y_proba = [], [], []
    
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            
            y_true.extend(labels.numpy())
            y_pred.extend((outputs.cpu() > threshold).float().numpy())
            y_proba.extend(outputs.cpu().numpy())
    
    metrics = {
        'accuracy': accuracy_score(y_true, y_pred),
        'precision': precision_score(y_true, y_pred),
        'recall': recall_score(y_true, y_pred),
        'f1': f1_score(y_true, y_pred),
        'roc_auc': roc_auc_score(y_true, y_proba),
        'threshold_used': threshold
    }
    
    print(f"\nTest Metrics (threshold={threshold:.3f}):")
    for key, value in metrics.items():
        if key != 'threshold_used':
            print(f"{key}: {value:.4f}")
    
    generate_figs(y_true,y_pred,y_proba,["Box Blur","Gaussian Blur"],"results")
    return metrics


In [None]:
obj=DatasetGenerator("flickr30k_images/flickr30k_images")
features_df=obj.generate_dataset(output_path='featuresdata.csv')

In [None]:
train_dataset, test_dataset, val_dataset, scaler = create_datasets(features_df)

# Create data loaders
batch_size = 32 
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, num_workers=0, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, num_workers=0, pin_memory=True)

In [None]:
epochs=150
model = BlurClassifierModel(input_features_no=10, dropout_rate=0.25)
model, train_loss, val_loss, val_acc, best_val_loss_epoch, best_val_loss,last_epoch = train_model(
    model, train_loader, val_loader, DEVICE, epochs=epochs, lr=0.0005
)

In [None]:
plot_graph(train_loss,val_loss,val_acc,best_val_loss,best_val_loss_epoch,total_epochs=last_epoch)

In [None]:
optimal_threshold = find_optimal_threshold(model, val_loader, DEVICE)

In [None]:
metrices=evaluate_model_with_threshold(model, test_loader, optimal_threshold)
pprint(metrices)