In [20]:
import os
import cv2
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from tqdm import tqdm
from pathlib import Path
from typing import Tuple, Optional, List
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
from functools import partial

# Constants
IMAGE_SIZE = (224, 224)
SUPPORTED_FORMATS = ['.jpg', '.jpeg', '.png']

In [21]:
class COVIDOASegmentation:
    def __init__(self, population_size: int = 50, max_iter: int = 100, 
                 weights: Tuple[float, float, float] = (0.6, 0.3, 0.1)):
        """
        Initialize COVIDOA-based multilevel thresholding segmentation.
        
        Args:
            population_size: Number of solutions in population
            max_iter: Maximum number of iterations
            weights: Weights for hybrid fitness function (Otsu, Kapur, Tsallis)
        """
        self.population_size = population_size
        self.max_iter = max_iter
        self.w_otsu, self.w_kapur, self.w_tsallis = weights
        
    def compute_probability(self, histogram: np.ndarray) -> np.ndarray:
        """Compute probability distribution from histogram."""
        return histogram / np.sum(histogram)
    
    def otsu_fitness(self, prob: np.ndarray, threshold: int) -> float:
        """
        Calculate Otsu's between-class variance for bilevel thresholding.
        """
        w0 = np.sum(prob[:threshold])
        w1 = 1 - w0
        
        if w0 == 0 or w1 == 0:
            return 0
            
        mu0 = np.sum(np.arange(threshold) * prob[:threshold]) / w0
        mu1 = np.sum(np.arange(threshold, len(prob)) * prob[threshold:]) / w1
        
        return w0 * w1 * ((mu0 - mu1) ** 2)
    
    def kapur_fitness(self, prob: np.ndarray, threshold: int) -> float:
        """
        Calculate Kapur's entropy for bilevel thresholding.
        """
        if threshold == 0 or threshold == len(prob):
            return 0
            
        p1 = prob[:threshold]
        p2 = prob[threshold:]
        
        w0 = np.sum(p1)
        w1 = np.sum(p2)
        
        if w0 == 0 or w1 == 0:
            return 0
            
        h1 = -np.sum((p1/w0) * np.log2(p1/w0 + np.finfo(float).eps))
        h2 = -np.sum((p2/w1) * np.log2(p2/w1 + np.finfo(float).eps))
        
        return h1 + h2
    
    def tsallis_fitness(self, prob: np.ndarray, threshold: int, q: float = 0.8) -> float:
        """
        Calculate Tsallis entropy for bilevel thresholding.
        """
        if threshold == 0 or threshold == len(prob):
            return 0
            
        p1 = prob[:threshold]
        p2 = prob[threshold:]
        
        w0 = np.sum(p1)
        w1 = np.sum(p2)
        
        if w0 == 0 or w1 == 0:
            return 0
            
        p1_norm = p1 / w0
        p2_norm = p2 / w1
        
        s1 = (1 - np.sum(p1_norm ** q)) / (q - 1)
        s2 = (1 - np.sum(p2_norm ** q)) / (q - 1)
        
        return s1 + s2 + (1 - q) * s1 * s2
    
    def hybrid_fitness(self, prob: np.ndarray, threshold: int) -> float:
        """
        Calculate hybrid fitness function combining Otsu, Kapur and Tsallis methods.
        """
        otsu = self.otsu_fitness(prob, threshold)
        kapur = self.kapur_fitness(prob, threshold)
        tsallis = self.tsallis_fitness(prob, threshold)
        
        return (self.w_otsu * otsu + 
                self.w_kapur * kapur + 
                self.w_tsallis * tsallis)
    
    def virus_entry(self, lb: int, ub: int, dimension: int) -> np.ndarray:
        """Initialize random solution within bounds."""
        solution = np.zeros(dimension)
        
        # Calculate maximum value for first threshold to ensure space for others
        max_first = ub - (dimension - 1)
        solution[0] = np.random.randint(lb, max_first)
        
        # Generate remaining thresholds with proper spacing
        for i in range(1, dimension):
            min_val = solution[i-1] + 1
            max_val = ub - (dimension - i - 1)
            if min_val >= max_val:
                solution[i] = min_val
            else:
                solution[i] = np.random.randint(min_val, max_val)
        
        return solution
    
    def frameshifting(self, parent: np.ndarray, lb: int, ub: int) -> np.ndarray:
        """Apply frameshifting operation to generate new solution."""
        solution = parent.copy()
        shift = np.random.randint(1, 3)  # +1 or +2 frameshifting
        
        # Ensure first threshold has enough room for subsequent thresholds
        max_first = ub - (len(solution) - 1) * shift
        solution[0] = np.random.randint(lb, max_first)
        
        # Apply shifting while ensuring bounds
        for i in range(1, len(solution)):
            solution[i] = min(solution[i-1] + shift, ub - (len(solution) - i - 1))
            
        return solution
    
    def mutation(self, solution: np.ndarray, lb: int, ub: int, 
                mutation_rate: float = 0.1) -> np.ndarray:
        """
        Apply mutation to solution with proper bounds checking.
        
        Args:
            solution: Current solution vector
            lb: Lower bound
            ub: Upper bound
            mutation_rate: Probability of mutation for each element
            
        Returns:
            Mutated solution
        """
        mutated = solution.copy()
        for i in range(len(solution)):
            if np.random.random() < mutation_rate:
                if i == 0:
                    # First threshold: ensure valid range with next threshold
                    high = min(mutated[i+1], ub) if len(solution) > 1 else ub
                    if lb < high:
                        mutated[i] = np.random.randint(lb, high)
                elif i == len(solution) - 1:
                    # Last threshold: ensure valid range with previous threshold
                    low = max(mutated[i-1] + 1, lb)
                    if low < ub:
                        mutated[i] = np.random.randint(low, ub)
                else:
                    # Middle thresholds: ensure valid range between adjacent thresholds
                    low = max(mutated[i-1] + 1, lb)
                    high = min(mutated[i+1], ub)
                    if low < high:
                        mutated[i] = np.random.randint(low, high)
        
        # Ensure thresholds are strictly increasing
        for i in range(1, len(mutated)):
            if mutated[i] <= mutated[i-1]:
                mutated[i] = min(mutated[i-1] + 1, ub - (len(mutated) - i - 1))
        
        return mutated
    
    def segment_image(self, image: np.ndarray, num_thresholds: int) -> Tuple[np.ndarray, List[int]]:
        """
        Perform multilevel thresholding segmentation on the input image.
        
        Args:
            image: Input grayscale image
            num_thresholds: Number of thresholds to find
            
        Returns:
            Tuple of segmented image and optimal thresholds
        """
        histogram = cv2.calcHist([image], [0], None, [256], [0, 256]).ravel()
        prob = self.compute_probability(histogram)
        
        # Initialize population
        population = np.array([self.virus_entry(0, 255, num_thresholds) 
                             for _ in range(self.population_size)])
        best_solution = None
        best_fitness = float('-inf')
        
        # Main COVIDOA loop
        for _ in range(self.max_iter):
            # Evaluate current population
            for solution in population:
                fitness = sum(self.hybrid_fitness(prob, int(t)) for t in solution)
                if fitness > best_fitness:
                    best_fitness = fitness
                    best_solution = solution.copy()
            
            # Generate new solutions
            new_population = []
            for _ in range(self.population_size):
                # Frameshifting
                offspring = self.frameshifting(best_solution, 0, 255)
                # Mutation
                offspring = self.mutation(offspring, 0, 255)
                new_population.append(offspring)
            
            population = np.array(new_population)
        
        # Apply thresholding
        thresholds = sorted([int(t) for t in best_solution])
        segmented = np.zeros_like(image)
        
        for i in range(len(thresholds) + 1):
            if i == 0:
                mask = image <= thresholds[i]
                segmented[mask] = 0
            elif i == len(thresholds):
                mask = image > thresholds[i-1]
                segmented[mask] = 255
            else:
                mask = (image > thresholds[i-1]) & (image <= thresholds[i])
                segmented[mask] = int(255 * (i / len(thresholds)))
        
        return segmented, thresholds

def process_color_image(image_path: str, num_thresholds: int = 2) -> np.ndarray:
    """
    Process a color image using multilevel thresholding on each channel.
    
    Args:
        image_path: Path to input image
        num_thresholds: Number of thresholds per channel
        
    Returns:
        Segmented color image
    """
    # Read image
    image = cv2.imread(image_path)
    if image is None:
        raise ValueError("Could not read image")
    
    # Initialize segmentation
    segmenter = COVIDOASegmentation()
    result = np.zeros_like(image)
    
    # Process each channel
    for channel in range(3):
        segmented, _ = segmenter.segment_image(image[:,:,channel], num_thresholds)
        result[:,:,channel] = segmented
        
    return result

In [22]:
class LesionSegmentation:
    def __init__(self, covidoa_segmenter: COVIDOASegmentation):
        """
        Initialize lesion segmentation with COVIDOA segmenter.
        
        Args:
            covidoa_segmenter: Instance of COVIDOASegmentation class
        """
        self.segmenter = covidoa_segmenter
        
    def create_ground_truth(self, segmented_image: np.ndarray) -> np.ndarray:
        """
        Create binary ground truth mask from segmented image.
        
        Args:
            segmented_image: Segmented image from COVIDOA
            
        Returns:
            Binary mask where lesion is white (255) and background is black (0)
        """
        # Convert to grayscale if color
        if len(segmented_image.shape) == 3:
            gray = cv2.cvtColor(segmented_image, cv2.COLOR_BGR2GRAY)
        else:
            gray = segmented_image
            
        # Threshold to create binary mask
        _, mask = cv2.threshold(gray, 127, 255, cv2.THRESH_BINARY)
        mask = cv2.bitwise_not(mask)
        
        # Clean up mask using morphological operations
        kernel = np.ones((5,5), np.uint8)
        mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
        mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
        
        return mask

    def apply_mask_to_original(self, original: np.ndarray, mask: np.ndarray) -> np.ndarray:
        """
        Apply ground truth mask to original image to create segmented result.
        
        Args:
            original: Original input image
            mask: Binary ground truth mask
            
        Returns:
            Segmented result showing lesion from original image
        """
        # Create result image with black background
        result = np.zeros_like(original)
        
        # Copy original image pixels where mask is white
        result[mask == 255] = original[mask == 255]
        
        return result

    def covidoa_process_image(self, original: np.ndarray, num_thresholds: int = 2) -> Tuple[np.ndarray, np.ndarray]:
        """
        Process image to create ground truth and segmented result.
        
        Args:
            original: Preprocessed original image
            num_thresholds: Number of thresholds for segmentation
            
        Returns:
            Tuple of (original image, ground truth mask, segmented result)
        """
        
        # Get initial segmentation
        segmented = self.segmenter.segment_image(cv2.cvtColor(original, cv2.COLOR_BGR2GRAY), 
                                               num_thresholds)[0]
        
        # Create ground truth mask
        ground_truth = self.create_ground_truth(segmented)
        
        # Apply mask to original
        result = self.apply_mask_to_original(original, ground_truth)
        
        return ground_truth, result

In [23]:
class ImagePreprocessor:
    """Skin Cancer Image preprocessing pipeline"""
    
    @staticmethod
    def hair_remove(image):
        """Remove hair from skin images"""
        try:
            grayScale = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
            kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (17, 17))
            blackhat = cv2.morphologyEx(grayScale, cv2.MORPH_BLACKHAT, kernel)
            _, threshold = cv2.threshold(blackhat, 10, 255, cv2.THRESH_BINARY)
            final_image = cv2.inpaint(image, threshold, 1, cv2.INPAINT_TELEA)
            return final_image
        except Exception as e:
            print(f"Error in hair removal: {str(e)}")
            return image

    @staticmethod
    def sharpen_image(image):
        """Sharpen image using unsharp masking"""
        gaussian = cv2.GaussianBlur(image, (0, 0), 2.0)
        return cv2.addWeighted(image, 1.5, gaussian, -0.5, 0)

In [24]:
def preprocess_image(image, target_size=(224, 224)):
    """Apply all preprocessing steps to an image"""
    preprocessor = ImagePreprocessor()
    
    image = preprocessor.hair_remove(image)
    image = preprocessor.sharpen_image(image)
    image = cv2.resize(image, target_size, interpolation=cv2.INTER_NEAREST)
    
    return image

In [9]:
def process_single_image(args):
    img_file, category, split, destination_path, covidoa_config = args
    
    # Initialize segmenters here to avoid pickle issues with multiprocessing
    covidoa = COVIDOASegmentation()
    lesion_segmenter = LesionSegmentation(covidoa)
    
    img = cv2.imread(str(img_file))
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    
    # Process image
    processed_img = preprocess_image(img, target_size=(224, 224))
    filename = img_file.stem
    
    # Generate masks and segmented images
    ground_truth, segmented = lesion_segmenter.covidoa_process_image(processed_img)
    
    # Save ground truth
    mask_path = destination_path / 'ground_truth' / split / category / f"{filename}.jpg"
    mask_path.parent.mkdir(parents=True, exist_ok=True)
    cv2.imwrite(str(mask_path), ground_truth)
    
    # Save segmented image
    segmented_path = destination_path / 'covidoa_segmented_images' / split / category / f"{filename}.jpg"
    segmented_path.parent.mkdir(parents=True, exist_ok=True)
    cv2.imwrite(str(segmented_path), cv2.cvtColor(segmented, cv2.COLOR_RGB2BGR))

def process_and_organize_dataset(source_path, destination_path, batch_size=32):
    source_path = Path(source_path)
    destination_path = Path(destination_path)
    
    # Create all directory structures upfront
    for split in ['train_directory', 'test_directory', 'validation_directory']:
        for subdir in ['ground_truth', 'covidoa_segmented_images']:
            for category in ['nv', 'mel', 'bkl', 'bcc', 'akiec', 'vasc', 'df']:
                (destination_path / subdir / split / category).mkdir(parents=True, exist_ok=True)
    
    # Prepare processing args
    processing_args = []
    for split in ['train_directory', 'test_directory', 'validation_directory']:
        split_path = source_path / split
        dest_split = split
        
        for category in ['nv', 'mel', 'bkl', 'bcc', 'akiec', 'vasc', 'df']:
            category_path = split_path / category
            if not category_path.exists():
                continue
                
            print(f"Preparing {split}/{category}...")
            
            # Collect all image files
            img_files = [f for ext in SUPPORTED_FORMATS for f in category_path.glob(f'*{ext}')]
            
            # Add processing arguments
            for img_file in img_files:
                processing_args.append((img_file, category, dest_split, destination_path, None))
    
    # Process images in parallel
    num_workers = max(1, multiprocessing.cpu_count() - 1)
    print(f"Processing images using {num_workers} workers...")
    
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        list(executor.map(process_single_image, processing_args, chunksize=batch_size))

In [8]:
base_directory = '/kaggle/working/covidoa_segmented'
os.mkdir(base_directory)

subfolders = ['ground_truth', 'covidoa_segmented_images']
directory = ['train_directory', 'test_directory', 'validation_directory']
classes = ['nv', 'mel', 'bkl', 'bcc', 'akiec', 'vasc', 'df']

for subf in subfolders:
    path = os.path.join(base_directory, subf)
    os.mkdir(path)
    for dirc in directory:
        path = os.path.join(base_directory, subf, dirc)
        os.mkdir(path)
        for cls in classes:
            path = os.path.join(base_directory, subf, dirc, cls)
            os.mkdir(path)

In [None]:
source_path = "/kaggle/input/multiclassskincancer"
destination_path = "/kaggle/working/covidoa_segmented"
process_and_organize_dataset(source_path, destination_path)

In [1]:
import os
base_dir = '/kaggle/working/covidoa_segmented'
subfolders = ['ground_truth', 'covidoa_segmented_images']
directory = ['train_directory', 'test_directory', 'validation_directory']
classes = ['nv', 'mel', 'bkl', 'bcc', 'akiec', 'vasc', 'df']

for subf in subfolders:
    print(subf)
    for dirc in directory:
        print(dirc)
        for cls in classes:
            path = os.path.join(base_dir, subf, dirc, cls)
            print(f"{cls}    : ", len(os.listdir(path)))

ground_truth
train_directory
nv    :  5115
mel    :  5950
bkl    :  5990
bcc    :  5462
akiec    :  5510
vasc    :  4810
df    :  4090
test_directory
nv    :  883
mel    :  46
bkl    :  88
bcc    :  35
akiec    :  30
vasc    :  13
df    :  8
validation_directory
nv    :  707
mel    :  37
bkl    :  71
bcc    :  28
akiec    :  24
vasc    :  10
df    :  6
covidoa_segmented_images
train_directory
nv    :  5115
mel    :  5950
bkl    :  5990
bcc    :  5462
akiec    :  5510
vasc    :  4810
df    :  4090
test_directory
nv    :  883
mel    :  46
bkl    :  88
bcc    :  35
akiec    :  30
vasc    :  13
df    :  8
validation_directory
nv    :  707
mel    :  37
bkl    :  71
bcc    :  28
akiec    :  24
vasc    :  10
df    :  6
