<a href="https://colab.research.google.com/github/ShahidHasib586/Underwater-object-detection-using-waternet-and-yolov7/blob/main/Code/Waternet/Update_of_waternet_colab_example.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Waternet Example

- Code: https://github.com/tnwei/waternet
- Original paper: https://arxiv.org/abs/1901.05495

## Init network

In [None]:
import torch
import cv2
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
# Make sure you are connected to a GPU instance
torch.cuda.is_available()

False

In [None]:
# Load from torchhub
preprocess, postprocess, model = torch.hub.load('tnwei/waternet', 'waternet')
model.eval();

Downloading: "https://github.com/tnwei/waternet/zipball/main" to /root/.cache/torch/hub/main.zip
Downloading: "https://www.dropbox.com/s/j8ida1d86hy5tm4/waternet_exported_state_dict-daa0ee.pt?dl=1" to /root/.cache/torch/hub/checkpoints/waternet_exported_state_dict-daa0ee.pt


In [None]:
#upload the image data files as zip
from google.colab import files
files.upload()


In [None]:
import zipfile
import os

# Replace 'your_folder.zip' with the actual file name
zip_path = "Data.zip"
extract_path = "/content/data"

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

# List extracted files
print(os.listdir(extract_path))


In [None]:
# Install required libraries
!pip install torch torchvision opencv-python


## Image example

Source: NotBurtsBees - Own work, CC BY-SA 4.0, https://commons.wikimedia.org/w/index.php?curid=115615060

In [None]:
import torch

# Load WaterNet model
preprocess, postprocess, model = torch.hub.load('tnwei/waternet', 'waternet', pretrained=True)
model.eval()


In [None]:
import os
import cv2
import torch
import shutil
import numpy as np
from tqdm import tqdm

# Load WaterNet from TorchHub
preprocess, postprocess_model, model = torch.hub.load('tnwei/waternet', 'waternet', pretrained=True)
model.eval()

# Custom postprocess function
def postprocess(output_tensor):
    # Detach and move tensor to CPU
    output_tensor = output_tensor.detach().cpu()

    # Clamp values to [0, 1]
    output_tensor = torch.clamp(output_tensor, 0, 1)

    # Convert to numpy array
    output_image = output_tensor.permute(0, 2, 3, 1).squeeze(0).numpy()  # Shape: (height, width, channels)

    # Scale to [0, 255] and convert to uint8
    output_image = (output_image * 255).astype(np.uint8)

    return output_image

# Function to process a single image
def process_image(image_path, output_path):
    # Read the image
    image = cv2.imread(image_path)
    if image is None:
        print(f"Failed to read image: {image_path}")
        return
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Preprocess the image
    try:
        input_tensor = preprocess(image_rgb)
    except ValueError as e:
        print(f"Preprocessing error for {image_path}: {e}")
        return

    # Extract the tensor if preprocess returns a tuple
    if isinstance(input_tensor, tuple):
        input_tensor = input_tensor[0]

    # Define wb, ce, and gc as tensors (adjust for less contrast)
    batch_size, _, height, width = input_tensor.shape
    device = input_tensor.device

    # Moderate wb, ce, and gc for softer adjustments
    wb = torch.full((batch_size, 3, height, width), 0.5, device=device)  # Reduced white balance
    ce = torch.full((batch_size, 3, height, width), 0.7, device=device)  # Reduced contrast enhancement
    gc = torch.full((batch_size, 3, height, width), 0.6, device=device)  # Reduced gamma correction

    # Pass the image through WaterNet
    try:
        output_tensor = model(input_tensor, wb, ce, gc)
    except ValueError as e:
        print(f"WaterNet error for {image_path}: {e}")
        return

    # Postprocess the output
    processed_image = postprocess(output_tensor)

    # Convert to BGR for saving
    processed_image_bgr = cv2.cvtColor(processed_image, cv2.COLOR_RGB2BGR)

    # Save the processed image
    cv2.imwrite(output_path, processed_image_bgr)

# Paths
input_base_path = "/content/data/aquarium_pretrain"
output_base_path = "/content/processed_data"

# Create output directory structure
for folder in ['train', 'valid', 'test']:
    for subfolder in ['images', 'labels']:
        os.makedirs(os.path.join(output_base_path, folder, subfolder), exist_ok=True)

# Process all images and copy labels
for folder in ['train', 'valid', 'test']:
    images_path = os.path.join(input_base_path, folder, 'images')
    labels_path = os.path.join(input_base_path, folder, 'labels')

    output_images_path = os.path.join(output_base_path, folder, 'images')
    output_labels_path = os.path.join(output_base_path, folder, 'labels')

    print(f"Processing {folder} images...")
    for file_name in tqdm(os.listdir(images_path), desc=f"Processing {folder}"):
        # Process images
        if file_name.endswith(('.jpg', '.jpeg', '.png')):
            input_image_path = os.path.join(images_path, file_name)
            output_image_path = os.path.join(output_images_path, file_name)
            process_image(input_image_path, output_image_path)

        # Copy labels
        label_file = file_name.rsplit('.', 1)[0] + '.txt'  # Replace image extension with .txt
        input_label_path = os.path.join(labels_path, label_file)
        if os.path.exists(input_label_path):
            output_label_path = os.path.join(output_labels_path, label_file)
            shutil.copy(input_label_path, output_label_path)

print("Processing complete!")

In [None]:
!zip -r processed_data.zip /content/processed_data


In [None]:
from google.colab import files
files.download('processed_data.zip')
