In [None]:
# --- Bibliothèques standards de Python ---
import os
import time
import glob
from math import sqrt

# --- Bibliothèques scientifiques et de traitement d'images ---
import numpy as np
import cv2
import matplotlib.pyplot as plt
import pywt
from scipy.signal.windows import hann
from skimage.metrics import peak_signal_noise_ratio as psnr, structural_similarity as ssim

# --- Bibliothèques de Machine Learning et Deep Learning ---
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate, BatchNormalization, ReLU, LeakyReLU, add, subtract
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint
from sklearn.model_selection import train_test_split
from numpy.lib.stride_tricks import as_strided

# --- Métrique d'évaluation avancée ---
import lpips
import torch

# --- Outils d'analyse ---
import pandas as pd

Paths: make sure that all yours images in clean_dir

In [None]:
clean_dir = "images/clean/"
test_dir = "ready for label"
output_dir = "images/denoised/"

Loading images:

+ Data Base:
    - i used more than 500 ultrasound images saved in clean files as training set for my model.
    - 47 images for model testing
    

In [None]:

SEED = 42 # You can change this value for different splits
print("Scanning the clean image database...")
all_clean_paths = sorted(glob.glob(os.path.join(clean_dir, "**/*.jpg"), recursive=True))
# all_clean_paths += sorted(glob.glob(os.path.join(clean_dir, "**/*.png"), recursive=True))

#test_paths = sorted(glob.glob(os.path.join(test_dir, "**/*.jpg"), recursive=True))
test_paths = sorted(glob.glob(os.path.join(test_dir, "**/*.png"), recursive=True))
if not all_clean_paths:
    raise FileNotFoundError(f"No clean images found in '{clean_dir}'.")

print(f"{len(all_clean_paths)} clean images found.")

#--- Split the dataset into training, validation, and testing sets ---
train_paths, val_paths = train_test_split(all_clean_paths, test_size=0.2, random_state=SEED) 

print(f"\nDataset split into:")
print(f" - {len(train_paths)} images for training")
print(f" - {len(val_paths)} images for validation")
print(f" - {len(test_paths)} images for testing")

In [None]:
def visualize_random_samples(image_paths, num_images=5, title="Sample Images"):
    plt.figure(figsize=(20, 5))
    
    indices = np.random.choice(len(image_paths), num_images, replace=False)
    
    for i, idx in enumerate(indices):
        img_path = image_paths[idx]
        
        img = cv2.imread(img_path)
        
        if img is None:
            print(f"Không thể đọc ảnh: {img_path}")
            continue
            
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
        # Plot image on grid
        plt.subplot(1, num_images, i + 1)
        plt.imshow(img_rgb)
        plt.title(f"Img Index: {idx}\n{img_rgb.shape}") # Display image size for verification
        plt.axis("off")
    
    plt.suptitle(title, fontsize=16)
    plt.tight_layout()
    plt.show()

# --- Call the function to display ---
print("Displaying sample clean images from the Train set:")
visualize_random_samples(train_paths, num_images=5, title="Sample clean images from the Train set")

Data Pipeline

GPU Memory (VRAM): When training, Deep Learning stores not only the image, but also millions of parameters (weights) and dozens of intermediate transformation layers (feature maps).

A 720x960 image when fed into a CNN network can swell to several GB in VRAM. If not chopped, the GPU will immediately report an Out Of Memory (OOM) error.

Local feature learning: As explained, chopping helps the network focus on learning the detailed structure (texture) instead of trying to learn the overall layout.

Another problem is that when you train a deeplearning model, your inputs have to be of the same shape, and if you don't split them you need to reshape them. So it lost a lot of detail and it was also harder to return them to their original shape.

Clean Image -> Add Noise -> Noisy Image

Used a dataset of clean images and simulated real-world conditions by adding Gaussian Noise

I sliced images into small 256x256 patches. You can change to 512 for larger patches and 128 for smaller patches (it depends on your GPU memory)

In [None]:
# --- Data Pipeline Parameters ---
PATCH_SIZE = 256 # Change to 512 for larger patches and 128 for smaller patches (it depends on your GPU memory)   
BATCH_SIZE = 32    
NOISE_STD_DEV = 30/255.0  #  (Try to test with sigma = 10,20,30) different levels of noise for set1
BUFFER_SIZE = 1000 

def load_and_crop_image(path):

    image = tf.io.read_file(path)
    image = tf.io.decode_image(image, channels=3, expand_animations=False)
    image = tf.cast(image, tf.float32) / 255.0
    image = tf.image.random_crop(image, size=[PATCH_SIZE, PATCH_SIZE, 3])
    
    return image

def add_speckle_noise(clean_image):
    """
    Speckle Noise - A characteristic of ultrasound/radar images.
    Formula: Noisy = Image * (1 + Gaussian_Noise)
    """
    noise = tf.random.normal(shape=tf.shape(clean_image), mean=0.0, stddev=NOISE_STD_DEV, dtype=tf.float32)
    
    noisy_image = clean_image * (1.0 + noise)
    
    noisy_image = tf.clip_by_value(noisy_image, 0.0, 1.0)
    
    return noisy_image, clean_image

def prepare_dataset(file_paths, is_training=True):

    ds = tf.data.Dataset.from_tensor_slices(file_paths)
    ds = ds.map(load_and_crop_image, num_parallel_calls=tf.data.AUTOTUNE)
    if is_training:
        ds = ds.shuffle(buffer_size=BUFFER_SIZE)
    ds = ds.map(add_speckle_noise, num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.batch(BATCH_SIZE)
    ds = ds.prefetch(buffer_size=tf.data.AUTOTUNE)
    
    return ds

# ---  Dataset ---
print("Initializing Data Pipeline...")
train_ds = prepare_dataset(train_paths, is_training=True)
val_ds = prepare_dataset(val_paths, is_training=False)

print(" Number of training batches:", len(train_ds))
print(" Number of validation batches:", len(val_ds))
sample_noisy, sample_clean = next(iter(train_ds))

# Display
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.title(f"Input (Noisy)\nShape: {sample_noisy[0].shape}")
plt.imshow(sample_noisy[0]) # Display the first image in the batch
plt.axis('off')

plt.subplot(1, 2, 2)
plt.title(f"Target (Clean)\nShape: {sample_clean[0].shape}")
plt.imshow(sample_clean[0])
plt.axis('off')

plt.show()
#

In [None]:
sample_path = test_paths[0] 
full_clean_img1 = cv2.imread(sample_path)
full_clean_img1 = cv2.cvtColor(full_clean_img1, cv2.COLOR_BGR2RGB).astype('float32') / 255.0

# Add noise to the full image
noise1 = add_speckle_noise(tf.convert_to_tensor(full_clean_img1))[0].numpy() - full_clean_img1
full_noisy_img1 = np.clip(full_clean_img1 + noise1, 0, 1)

# Display
plt.figure(figsize=(15, 5))
plt.subplot(1, 2, 1)
plt.title("Input (Noisy Full Image)")
plt.imshow(full_noisy_img1)
plt.axis('off')
plt.subplot(1, 2, 2)
plt.title("Target (Clean Full Image)")
plt.imshow(full_clean_img1)
plt.axis('off')
plt.show()
print(f"Full image shape: {full_clean_img1.shape}")


In [None]:
def reconstruct_from_patches_weighted(patches, image_shape, patch_size, stride):
    """
    Reconstructs an image from overlapping patches (weighted average).
    Supports 3-channel color images.
    """
    reconstructed = np.zeros(image_shape, dtype=np.float32)
    weight_map = np.zeros(image_shape, dtype=np.float32)
    
    # Hann window 2D
    window = hann(patch_size)
    window_2d = np.outer(window, window) 
    
    window_3d = window_2d[..., np.newaxis] 
    
    patch_idx = 0
    
    n_h = (image_shape[0] - patch_size) // stride + 1
    n_w = (image_shape[1] - patch_size) // stride + 1
    
    for i in range(n_h):
        for j in range(n_w):
            if patch_idx < len(patches):
                h_start = i * stride
                w_start = j * stride
                
                h_end = h_start + patch_size
                w_end = w_start + patch_size
                
                reconstructed[h_start:h_end, w_start:w_end, :] += patches[patch_idx] * window_3d
                weight_map[h_start:h_end, w_start:w_end, :] += window_3d
                
                patch_idx += 1
                
    reconstructed /= np.maximum(weight_map, 1e-8)
    
    return np.clip(reconstructed, 0, 1)

# Helper function to extract test image into patches
def extract_patches_sliding(image, patch_size, stride):
    patches = []
    h, w, c = image.shape
    n_h = (h - patch_size) // stride + 1
    n_w = (w - patch_size) // stride + 1
    
    for i in range(n_h):
        for j in range(n_w):
            h_start = i * stride
            w_start = j * stride
            patch = image[h_start:h_start+patch_size, w_start:w_start+patch_size, :]
            patches.append(patch)
    return np.array(patches)

In [None]:
def conv_block(inputs, filters):
    """A standard convolutional block: Conv -> BN -> ReLU"""
    x = Conv2D(filters, (3, 3), padding="same")(inputs)
    x = BatchNormalization()(x)
    x = ReLU()(x)
    return x

def build_unet(input_shape=(256, 256, 3)): # Try to test wwith (512x512x3)
    inputs = Input(input_shape)

    # --- ENCODER (Downsampling) ---
    # Block 1
    c1 = conv_block(inputs, 32) # 32 filters
    p1 = MaxPooling2D((2, 2))(c1)

    # Block 2
    c2 = conv_block(p1, 64)     # 64 filters
    p2 = MaxPooling2D((2, 2))(c2)


    # --- BOTTLENECK (Bottom of the U) ---
    b = conv_block(p2, 128)     # 128 filters

    # --- DECODER (Upsampling + Skip Connections) ---
    # Block 3 (Up 1)
    u1 = UpSampling2D((2, 2))(b)
    u1 = concatenate([u1, c2])  
    c5 = conv_block(u1, 64)

    # Block 4 (Up 2)    
    u2 = UpSampling2D((2, 2))(c5)
    u2 = concatenate([u2, c1])  
    c7 = conv_block(u2, 32)


    # --- OUTPUT ---
    outputs = Conv2D(3, (1, 1), activation='sigmoid')(c7) # 3 color channels, Sigmoid to [0,1]

    model = models.Model(inputs, outputs, name="Baseline_UNet")
    return model

# Initialize model
unet = build_unet((256, 256, 3)) # Try to test wwith (512x512x3)
unet.summary()

In [None]:
unet.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5), # Changed to 1e-4, 1e-5 
    loss='mae',  # Focus on Mean Absolute Error for denoising
    metrics=['mse'] 
)   

# --- 5. TRAIN ---
callbacks_set1 = [
    EarlyStopping(patience=20, restore_best_weights=True, monitor='val_loss', verbose=1),
    ReduceLROnPlateau(patience=5, factor=0.5, verbose=1),
    ModelCheckpoint("unet_set1_structure.keras", save_best_only=True, monitor='val_loss')
]

print(" Starting training Set 1: Structure-Aware...")
history_set1 = unet.fit(
    train_ds,
    epochs=50,
    validation_data=val_ds,
    callbacks=callbacks_set1
)

In [None]:
def plot_training_history(history):
    """
    Function to plot the Loss and Metric comparison between Train and Validation set.
    Input: history (return result from model.fit)
    """
    
    # List of metrics in the training period.
    metrics = [k for k in history.history.keys() if "val_" not in k]
    
    n_metrics = len(metrics)
    plt.figure(figsize=(6 * n_metrics, 5))

    for i, metric in enumerate(metrics):
        if metric != "learning_rate": 
            train_values = history.history[metric]
            val_values = history.history[f'val_{metric}']
            epochs = range(1, len(train_values) + 1)

            plt.subplot(1, n_metrics, i + 1)
            
            # Training
            plt.plot(epochs, train_values, 'b-o', label=f'Training {metric}', markersize=4)
            
            # Validation
            plt.plot(epochs, val_values, 'r-o', label=f'Validation {metric}', markersize=4)
            
            plt.title(f'Training vs Validation {metric.upper()}')
            plt.xlabel('Epochs')
            plt.ylabel(metric.upper())
            plt.legend()
            plt.grid(True, linestyle='--', alpha=0.6)

    plt.tight_layout()
    plt.show()
plot_training_history(history_set1)

In [None]:
loss_fn_alex = lpips.LPIPS(net='alex') 
if torch.cuda.is_available():
    loss_fn_alex = loss_fn_alex.cuda()
# Prepare a list containing points
psnr_unet, ssim_unet, lpips_unet, time_unet = [], [], [], []
print(" Number of test images:", len(test_paths))

test_ds = prepare_dataset(test_paths, is_training=False)
print(" Number of test batches:", len(test_ds))

for batch_noisy, batch_clean in test_ds:
    start_t = time.time()
    preds = unet.predict(batch_noisy, verbose=0)
    end_t = time.time()
    
    avg_time = (end_t - start_t) / batch_noisy.shape[0]
    
    batch_clean_np = batch_clean.numpy()
    
    for i in range(len(batch_clean_np)):
        clean_img = batch_clean_np[i]
        denoised_img = preds[i]
        
        # Calculate PSNR / SSIM
        psnr_unet.append(psnr(clean_img, denoised_img, data_range=1.0))
        ssim_unet.append(ssim(clean_img, denoised_img, data_range=1.0, channel_axis=-1))
        
        # Calculate LPIPS
        t_clean = torch.from_numpy(clean_img).permute(2, 0, 1).unsqueeze(0).float() * 2 - 1
        t_denoised = torch.from_numpy(denoised_img).permute(2, 0, 1).unsqueeze(0).float() * 2 - 1
        
        if torch.cuda.is_available():
            t_clean = t_clean.cuda()
            t_denoised = t_denoised.cuda()
            
        with torch.no_grad():
            lpips_unet.append(loss_fn_alex(t_clean, t_denoised).item())
            
        time_unet.append(avg_time)

results = {} 
# Update results in the common table
results["UNet"] = {
    "PSNR": np.mean(psnr_unet),
    "SSIM": np.mean(ssim_unet),
    "LPIPS": np.mean(lpips_unet),
    "Time (s)": np.mean(time_unet)
}

# Display the latest leaderboard
results_df = pd.DataFrame.from_dict(results, orient='index')
display(results_df)


In [None]:
# Pick any clean image from the test set
sample_path = test_paths[11] 
full_clean_img1 = cv2.imread(sample_path)
full_clean_img1 = cv2.cvtColor(full_clean_img1, cv2.COLOR_BGR2RGB).astype('float32') / 255.0

# Add noise to the full image
noise1 = add_speckle_noise(tf.convert_to_tensor(full_clean_img1))[0].numpy() - full_clean_img1
full_noisy_img1 = np.clip(full_clean_img1 + noise1, 0, 1)


STRIDE = 128 

# 1. Extract patches
patches_noisy = extract_patches_sliding(full_noisy_img1, 256, STRIDE)
print(f"Number of patches to process: {len(patches_noisy)}")

# 2.   (Predict)
denoised_patches = unet.predict(patches_noisy, batch_size=32, verbose=1)

# 3.   (Reconstruct)
denoised_full_image1 = reconstruct_from_patches_weighted(
    denoised_patches, 
    full_noisy_img1.shape, 
    256, 
    STRIDE
)

# 4. Display
plt.figure(figsize=(60, 20))
plt.subplot(1, 3, 1); plt.title("Input (Noisy)"); plt.imshow(full_noisy_img1); plt.axis('off')
plt.subplot(1, 3, 2); plt.title("Output (UNet)"); plt.imshow(denoised_full_image1); plt.axis('off')
plt.subplot(1, 3, 3); plt.title("Ground Truth (Clean)"); plt.imshow(full_clean_img1); plt.axis('off')
plt.show()

In [None]:
OUTPUT_DIR = "results_inference/"
DIR_FOR_DETECTION = os.path.join(OUTPUT_DIR, "Set1") 
DIR_FOR_REPORT = os.path.join(OUTPUT_DIR, "comparisons") 

os.makedirs(DIR_FOR_DETECTION, exist_ok=True)
os.makedirs(DIR_FOR_REPORT, exist_ok=True)
for i, sample_path in enumerate(test_paths):
    filename = os.path.basename(sample_path)
    print(f"Processing image {i+1}: {filename}") 
    full_clean_img = cv2.imread(sample_path)
    full_clean_img = cv2.cvtColor(full_clean_img, cv2.COLOR_BGR2RGB).astype('float32') / 255.0

    # Add noise to the full image
    full_noisy_img = add_speckle_noise(tf.convert_to_tensor(full_clean_img))[0].numpy() - full_clean_img
    full_noisy_img = np.clip(full_clean_img + full_noisy_img, 0, 1)

    STRIDE = 128 

    # 1. Extract patches
    patches_noisy = extract_patches_sliding(full_noisy_img, 256, STRIDE)

    # 2.   (Predict)
    denoised_patches = unet.predict(patches_noisy, batch_size=32, verbose=1)

    # 3.   (Reconstruct)
    denoised_full_image = reconstruct_from_patches_weighted(
        denoised_patches, 
        full_noisy_img.shape, 
        256, 
        STRIDE
    )

    plt.figure()
    #plt.subplot(1, 3, 1); plt.title("Input (Noisy)"); plt.imshow(full_noisy_img); plt.axis('off')
    plt.title("Output (U-net_set1)"); plt.imshow(denoised_full_image); plt.axis('off')
    #plt.subplot(1, 3, 3); plt.title("Ground Truth (Clean)"); plt.imshow(full_clean_img); plt.axis('off')

    save_path = os.path.join(DIR_FOR_DETECTION, f"{filename}")
    
    plt.savefig(save_path, bbox_inches='tight', pad_inches=0.1)
    plt.close()

In [None]:
# Visual comparison on one sample image
sample_img = cv2.cvtColor(cv2.imread(test_sample_paths[0]), cv2.COLOR_BGR2RGB)
noise = np.random.normal(0, 30, sample_img.shape)
sample_noisy = np.clip(sample_img + noise, 0, 255).astype(np.uint8)

plt.figure(figsize=(20, 10))

# Original and noisy images
plt.subplot(2, 4, 1)
plt.imshow(sample_img); plt.title("Clean Original"); plt.axis('off')
plt.subplot(2, 4, 2)
plt.imshow(sample_noisy); plt.title("Noisy Input"); plt.axis('off')

# Classical methods
methods_list = list(denoising_methods.items())
for i, (name, func) in enumerate(methods_list):
    res = func(sample_noisy)
    plt.subplot(2, 4, i + 3)
    plt.imshow(res)
    plt.title(f"{name}")
    plt.axis('off')


plt.tight_layout()
plt.show()

In [None]:
def natural_blending(original, denoised, alpha=0.3, dark_threshold=0.2):
    
    original = original.astype(np.float32)
    denoised = denoised.astype(np.float32)

    if len(denoised.shape) == 3 and denoised.shape[2] == 3:
        denoised_gray = cv2.cvtColor(denoised, cv2.COLOR_BGR2GRAY)
    else:
        denoised_gray = denoised

    denoised_uint8 = (denoised_gray * 255).astype(np.uint8)
    _, binary_mask = cv2.threshold(denoised_uint8, 10, 255, cv2.THRESH_BINARY)
    contours, _ = cv2.findContours(binary_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    roi_mask = np.zeros_like(denoised_gray, dtype=np.float32)
    
    if contours:
        largest_contour = max(contours, key=cv2.contourArea)
        cv2.drawContours(roi_mask, [largest_contour], -1, 1, thickness=cv2.FILLED)
        
        roi_mask = cv2.GaussianBlur(roi_mask, (21, 21), 0)

    is_dark = (denoised_gray < dark_threshold).astype(np.float32)
    
    target_mask = roi_mask * is_dark

    if len(denoised.shape) == 3 and denoised.shape[2] == 3:
        target_mask_expanded = target_mask[:, :, np.newaxis]
    else:
        target_mask_expanded = target_mask
        
    blended_full = denoised * (1 - alpha) + original * alpha
    final_output = denoised * (1 - target_mask_expanded) + blended_full * target_mask_expanded

    return final_output, roi_mask

In [None]:
final_result, mask_debug = natural_blending(full_noisy_img1, denoised_full_image1, alpha=0.3, dark_threshold=0.2)

plt.figure(figsize=(20, 10))
plt.subplot(1, 4, 1); plt.title("Original Noisy"); plt.imshow(full_noisy_img1, cmap='gray'); plt.axis('off')
plt.subplot(1, 4, 2); plt.title("Model Output "); plt.imshow(denoised_full_image1, cmap='gray'); plt.axis('off')
plt.subplot(1, 4, 3); plt.title("ROI Mask "); plt.imshow(mask_debug, cmap='gray'); plt.axis('off')
plt.subplot(1, 4, 4); plt.title("Final Natural Blend"); plt.imshow(final_result, cmap='gray'); plt.axis('off')
plt.show()