In [1]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
from skimage.feature import local_binary_pattern
from PIL import Image
import math
from concurrent.futures import ProcessPoolExecutor
import os

In [2]:
def contrast_stretch(img):
    # Stretching pixel intensities to [0, 255] range
    min_val = np.min(img)
    max_val = np.max(img)
    
    if max_val == min_val:
        return img.copy()  # avoid division by zero if image is flat

    stretched = (img - min_val) * (255.0 / (max_val - min_val))
    return stretched.astype(np.uint8)

In [3]:
def multi_pass_enhance_gray(img, output_shape = (512,512)):
    # 1. Initial upscale
    result = cv2.resize(img, output_shape, interpolation=cv2.INTER_LANCZOS4)

    # 2. Denoising
    result = cv2.fastNlMeansDenoising(result, None, h=10, templateWindowSize=7, searchWindowSize=21)

    # 3. Contrast enhancement
    clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
    result = clahe.apply(result)
    result = contrast_stretch(result)

    # 4. Sharpening
    kernel = np.array([[0,-1,0], [-1,5,-1], [0,-1,0]], dtype=np.float32)
    result = cv2.filter2D(result, -1, kernel)

    return np.clip(result, 0, 255).astype(np.uint8)

In [4]:
def otsu_threshold(img):
    value, thresh = cv2.threshold(img, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    return value,thresh

In [5]:
def canny(img, t1, t2):
    edges = cv2.Canny(img, t1, t2)
    return edges

In [6]:
def scharr(img, scale, delta):
    scharr_x = cv2.Scharr(img, cv2.CV_64F, 1, 0, scale=scale, delta=delta)
    scharr_y = cv2.Scharr(img, cv2.CV_64F, 0, 1, scale=scale, delta=delta)
    scharr_edges = np.sqrt(scharr_x**2 + scharr_y**2)
    return np.clip(scharr_edges, 0, 255).astype(np.uint8)

In [7]:
def lbp(img, P, R):
    # Local Binary Pattern
    lbp = local_binary_pattern(img, P, R, method='uniform')
    # Normalize LBP to [0, 255]
    lbp_normalized = (lbp / np.max(lbp) * 255).astype(np.uint8)
    return lbp_normalized

In [8]:
def gabor(img, theta, ksize, sigma, lambd, gamma, phi):
    # Gabor filter
    kernel = cv2.getGaborKernel((ksize, ksize), sigma, theta, lambd, gamma, phi, ktype=cv2.CV_32F)
    gabor_filtered = cv2.filter2D(img, cv2.CV_8UC1, kernel)
    return gabor_filtered

In [9]:
def pipeline(path, output_shape, canny_ratios, scharr_combos, lbp_configs, gabor_configs):
    img = np.array(Image.open(path).convert("L"))
    if img is None:
        raise ValueError("Image not found or unable to read.")
    res = []
    # Multi-pass enhancement
    enhanced_img = multi_pass_enhance_gray(img, output_shape = (512,512))
    res.append(enhanced_img)
    # Otsu's thresholding
    otsu_value, otsu_thresh = otsu_threshold(enhanced_img)
    res.append(otsu_thresh)
    # Canny edge detection
    for t1_ratio, t2_ratio in canny_ratios:
        t1 = int(otsu_value * t1_ratio)
        t2 = int(otsu_value * t2_ratio)
        canny_edges = canny(enhanced_img, t1, t2)
        res.append(canny_edges)
    # Scharr gradient
    for scale, delta in scharr_combos:
        scharr_edges = scharr(enhanced_img, scale, delta)
        res.append(scharr_edges)

    # Local Binary Pattern
    for P, R in lbp_configs:
        lbp_img = lbp(enhanced_img, P, R)
        res.append(lbp_img)

    # Gabor filter
    for theta in gabor_configs:
        gabor_img = gabor(enhanced_img, theta, 21, 8, 10, 0.5, 0)
        res.append(gabor_img)

    res = [cv2.resize(r, output_shape, interpolation=cv2.INTER_LANCZOS4) for r in res]
    
    return res, np.flip(res, axis=2)
    
def preprocess(config, path, every_file, output_path,id):
    output_shape = config['output_shape']
    canny_ratios = config['canny_ratios']
    scharr_combos = config['scharr_combos']
    lbp_configs = config['lbp_configs']
    gabor_configs = config['gabor_configs']

    original_path = os.path.join(output_path, "original")
    lr_path = os.path.join(output_path, "lr")

    os.makedirs(original_path, exist_ok=True)
    os.makedirs(lr_path, exist_ok=True)

    chunked = [every_file[i:i + 100] for i in range(0, len(every_file), 100)]

    for idx, files in enumerate(chunked):
        res1, res2 = [], []
        for file in files:
            file_path = os.path.join(path, file)
            a,b = pipeline(file_path,output_shape = output_shape,
                           canny_ratios = canny_ratios, 
                           scharr_combos = scharr_combos, 
                           lbp_configs = lbp_configs, 
                           gabor_configs = gabor_configs)
            res1.append(a)
            res2.append(b)

        res1 = np.stack(res1, axis=0)
        res2 = np.stack(res2, axis=0)

        path1 = os.path.join(original_path, f"{id}_{idx}.npy")
        path2 = os.path.join(lr_path, f"{id}_{idx}.npy")
        np.save(path1, res1)
        np.save(path2, res2)

        print(f"✅ Wrote {res1.shape} to {path1} and {res2.shape} to {path2}")

        
def preprocess_wrapper(args):
    return preprocess(*args)

def is_image_file(path: str,skip) -> bool:
    valid_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff']
    return os.path.splitext(path)[1].lower() in valid_extensions and path not in skip

In [10]:
cat_files = sorted(os.listdir("../../../dataset/DogVsCatDataset/PetImages/Cat"))
dog_files = sorted(os.listdir("../../../dataset/DogVsCatDataset/PetImages/Dog"))
cat_files = [f for f in cat_files if is_image_file(f,["666.jpg"])]
dog_files = [f for f in dog_files if is_image_file(f,["11702.jpg"])]

np.random.seed(42)  # For reproducibility
np.random.shuffle(cat_files)
np.random.shuffle(dog_files)

def create_task(config):
    cat_len = len(cat_files)
    dog_len = len(dog_files)

    cat_path = "../../../dataset/DogVsCatDataset/PetImages/Cat"
    dog_path = "../../../dataset/DogVsCatDataset/PetImages/Dog"

    cat_save_path = "../../../dataset/DogVsCatDataset/preprocessed/final_split1/cat"
    dog_save_path = "../../../dataset/DogVsCatDataset/preprocessed/final_split1/dog"
    return [
        (config, cat_path, cat_files[0:cat_len//4], cat_save_path,0),
        (config, cat_path, cat_files[cat_len//4:cat_len//2], cat_save_path,1),
        (config, cat_path, cat_files[cat_len//2:3*cat_len//4], cat_save_path,2),
        (config, cat_path, cat_files[3*cat_len//4::], cat_save_path,3),
    
        (config, dog_path, dog_files[0:dog_len//4], dog_save_path,0),
        (config, dog_path, dog_files[dog_len//4:dog_len//2], dog_save_path,1),
        (config, dog_path, dog_files[dog_len//2:3*dog_len//4], dog_save_path,2),
        (config, dog_path, dog_files[3*dog_len//4::], dog_save_path,3),
    ]


In [11]:
config0 = {
    "output_shape":(128,128),
    "canny_ratios":[(0.3, 0.7), (1.2, 1.8)], 
    "scharr_combos":[(1, 0), (2.0, 0)], 
    "lbp_configs":[(16, 2), (24, 4)], 
    "gabor_configs":[0, np.pi/2]
}
task0 = create_task(config0)

In [12]:
with ProcessPoolExecutor(max_workers=8) as executor:
    for _ in executor.map(preprocess_wrapper, task0):...

✅ Wrote (100, 10, 128, 128) to ../../../dataset/DogVsCatDataset/preprocessed/final_split1/cat/original/0_0.npy and (100, 10, 128, 128) to ../../../dataset/DogVsCatDataset/preprocessed/final_split1/cat/lr/0_0.npy
✅ Wrote (100, 10, 128, 128) to ../../../dataset/DogVsCatDataset/preprocessed/final_split1/dog/original/0_0.npy and (100, 10, 128, 128) to ../../../dataset/DogVsCatDataset/preprocessed/final_split1/dog/lr/0_0.npy
✅ Wrote (100, 10, 128, 128) to ../../../dataset/DogVsCatDataset/preprocessed/final_split1/cat/original/3_0.npy and (100, 10, 128, 128) to ../../../dataset/DogVsCatDataset/preprocessed/final_split1/cat/lr/3_0.npy
✅ Wrote (100, 10, 128, 128) to ../../../dataset/DogVsCatDataset/preprocessed/final_split1/dog/original/2_0.npy and (100, 10, 128, 128) to ../../../dataset/DogVsCatDataset/preprocessed/final_split1/dog/lr/2_0.npy
✅ Wrote (100, 10, 128, 128) to ../../../dataset/DogVsCatDataset/preprocessed/final_split1/cat/original/2_0.npy and (100, 10, 128, 128) to ../../../datas



✅ Wrote (100, 10, 128, 128) to ../../../dataset/DogVsCatDataset/preprocessed/final_split1/dog/original/2_11.npy and (100, 10, 128, 128) to ../../../dataset/DogVsCatDataset/preprocessed/final_split1/dog/lr/2_11.npy
✅ Wrote (100, 10, 128, 128) to ../../../dataset/DogVsCatDataset/preprocessed/final_split1/cat/original/0_12.npy and (100, 10, 128, 128) to ../../../dataset/DogVsCatDataset/preprocessed/final_split1/cat/lr/0_12.npy
✅ Wrote (100, 10, 128, 128) to ../../../dataset/DogVsCatDataset/preprocessed/final_split1/cat/original/1_12.npy and (100, 10, 128, 128) to ../../../dataset/DogVsCatDataset/preprocessed/final_split1/cat/lr/1_12.npy
✅ Wrote (100, 10, 128, 128) to ../../../dataset/DogVsCatDataset/preprocessed/final_split1/cat/original/3_12.npy and (100, 10, 128, 128) to ../../../dataset/DogVsCatDataset/preprocessed/final_split1/cat/lr/3_12.npy
✅ Wrote (100, 10, 128, 128) to ../../../dataset/DogVsCatDataset/preprocessed/final_split1/dog/original/3_12.npy and (100, 10, 128, 128) to ../..

In [14]:
cat_save_path_lr = "../../../dataset/DogVsCatDataset/preprocessed/final_split1/cat/lr"
dog_save_path_lr = "../../../dataset/DogVsCatDataset/preprocessed/final_split1/dog/lr"

cat_files = sorted(os.listdir(cat_save_path_lr))
dog_files = sorted(os.listdir(dog_save_path_lr))

np.random.seed(42)  # For reproducibility
np.random.shuffle(cat_files)
np.random.shuffle(dog_files)

train_80_cat, test_20_cat = cat_files[:int(0.8*len(cat_files))], cat_files[int(0.8*len(cat_files)):]
train_80_dog, test_20_dog = dog_files[:int(0.8*len(dog_files))], dog_files[int(0.8*len(dog_files)):]

train_50_cat, test_50_cat = cat_files[:int(0.5*len(cat_files))], cat_files[int(0.5*len(cat_files)):]
train_50_dog, test_50_dog = dog_files[:int(0.5*len(dog_files))], dog_files[int(0.5*len(dog_files)):]

state  = {
    "train80": {
        "cat": train_80_cat,
        "dog": train_80_dog,
    },
    "test20": {
        "cat": test_20_cat,
        "dog": test_20_dog,
    },

    "train50": {
        "cat": train_50_cat,
        "dog": train_50_dog,
    },
    "test50": {
        "cat": test_50_cat,
        "dog": test_50_dog,
    }
}

import json

# If you want to save only filenames (already in state), just dump it
with open("../../../dataset/DogVsCatDataset/preprocessed/final_split1/dataset_splits.json", "w") as f:
    json.dump(state, f, indent=4)