In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms
import torchvision
from torch.utils.data import DataLoader, Dataset, TensorDataset
import time
# Install torchinfo, import if it's available
try:
  import torchinfo
except:
  !pip install torchinfo
  import torchinfo

from torchinfo import summary

# Install torchinfo, import if it's available
try:
  import rasterio
except:
  !pip install rasterio
  import rasterio

import numpy as np
import os
from PIL import Image
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from rasterio.transform import Affine
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!rm -r /content/drive/MyDrive/Patches

In [None]:
from affine import Affine
import os
import rasterio
from sklearn.model_selection import train_test_split

# Define paths
path_rgb = '/content/drive/MyDrive/Theos/RGB.tif'
path_mask = '/content/drive/MyDrive/Theos/mask.tif'
save_dir = '/content/drive/MyDrive/Patches'

# Define patch size and stride
patch_size = 256
stride = 128

# Load RGB and mask images
with rasterio.open(path_rgb) as src_rgb:
    rgb_data = src_rgb.read().transpose(1, 2, 0)
    rgb_meta = src_rgb.meta.copy()

with rasterio.open(path_mask) as src_mask:
    mask_data = src_mask.read(1)
    mask_meta = src_mask.meta.copy()

# Define affine transformation for patch images
transform = src_rgb.transform

# Extract patches with coordinates
def extract_patches(image, patch_size, stride):
    patches = []
    coordinates = []
    height, width = image.shape[:2]
    for y in range(0, height - patch_size + 1, stride):
        for x in range(0, width - patch_size + 1, stride):
            patch = image[y:y + patch_size, x:x + patch_size]
            patches.append(patch)
            coordinates.append((x, y))
    return patches, coordinates

rgb_patches, rgb_coordinates = extract_patches(rgb_data, patch_size, stride)
mask_patches, mask_coordinates = extract_patches(mask_data, patch_size, stride)


In [None]:
from sklearn.model_selection import train_test_split
import os
import rasterio
from rasterio.transform import Affine

# Split the data into training and validation sets
rgb_train, rgb_val, mask_train, mask_val = train_test_split(rgb_patches, mask_patches, test_size=0.2, random_state=42)

# Create directories to save patches
os.makedirs(save_dir, exist_ok=True)
os.makedirs(os.path.join(save_dir, 'train', 'rgb'), exist_ok=True)  # Create the 'train/rgb' directory
os.makedirs(os.path.join(save_dir, 'train', 'mask'), exist_ok=True)  # Create the 'train/mask' directory
os.makedirs(os.path.join(save_dir, 'val', 'rgb'), exist_ok=True)  # Create the 'val/rgb' directory
os.makedirs(os.path.join(save_dir, 'val', 'mask'), exist_ok=True)  # Create the 'val/mask' directory

# Function to save patches with metadata
def save_patches(patches, meta, coordinates, path_template):
    for i, (patch, (x, y)) in enumerate(zip(patches, coordinates)):
        patch_path = path_template.format(i)
        # Update metadata to reflect correct coordinates
        meta_copy = meta.copy()
        meta_copy['transform'] = Affine.translation(x, y) * transform
        with rasterio.open(
            patch_path,
            'w',
            driver='GTiff',
            height=patch.shape[0],
            width=patch.shape[1],
            count=3 if len(patch.shape) == 3 else 1,  # Check if RGB or single band
            dtype=patch.dtype,
            crs=meta_copy['crs'],
            transform=meta_copy['transform'],
        ) as dst:
            if len(patch.shape) == 3:
                dst.write(patch.transpose(2, 0, 1))  # Write the RGB patch to the file
            else:
                dst.write(patch, 1)  # Write the mask patch to the file

# Save training patches with metadata
save_patches(rgb_train, rgb_meta, rgb_coordinates, os.path.join(save_dir, 'train', 'rgb', 'rgb_{}.tif'))
save_patches(mask_train, mask_meta, mask_coordinates, os.path.join(save_dir, 'train', 'mask', 'mask_{}.tif'))

# Save validation patches with metadata
save_patches(rgb_val, rgb_meta, rgb_coordinates, os.path.join(save_dir, 'val', 'rgb', 'rgb_{}.tif'))
save_patches(mask_val, mask_meta, mask_coordinates, os.path.join(save_dir, 'val', 'mask', 'mask_{}.tif'))

In [None]:
class SegmentationDataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform=None):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.images = [img for img in os.listdir(image_dir) if img.endswith('.tif')]
        self.masks = [mask for mask in os.listdir(mask_dir) if mask.endswith('.tif')]

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = os.path.join(self.image_dir, self.images[idx])
        mask_path = os.path.join(self.mask_dir, self.masks[idx])
        image = Image.open(img_path)
        mask = Image.open(mask_path)

        if self.transform:
            image = self.transform(image)
            mask = self.transform(mask)

        return image, mask

# Create datasets
train_dataset = SegmentationDataset('/content/drive/MyDrive/Patches/train/rgb', '/content/drive/MyDrive/Patches/train/mask', transform=transform)
val_dataset = SegmentationDataset('/content/drive/MyDrive/Patches/val/rgb', '/content/drive/MyDrive/Patches/val/mask', transform=transform)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False)


In [None]:
image, masked = next(iter(train_loader))
print(image.shape, masked.shape)

In [None]:
import torch
from torchvision import transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import numpy as np

# Assuming you have defined `PatchDataset`, `transform`, `train_loader`, `val_loader`, and `get_model` as in your previous code snippet

# Get the length of the dataset
dataset_length = len(train_loader.dataset)

# Randomly select an index
index = np.random.randint(dataset_length)

# Get the image and mask at the selected index
image, mask = train_loader.dataset[index]


images_normalized = image.numpy().transpose(1, 2, 0)


# Convert the mask tensor to numpy array
mask = mask.numpy().transpose(1, 2, 0)

# Create a figure with two subplots
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 6))

# Plot the image
ax1.imshow(images_normalized)
ax1.axis('off')
ax1.set_title('Input Image')

# Plot the mask
ax2.imshow(mask, cmap='gray')
ax2.axis('off')
ax2.set_title('Mask')

plt.tight_layout()
plt.show()


## Applied simple UNET

In [None]:
!pip install tensorflow keras opencv-python rasterio

In [None]:
!pip3 install opencv-python

In [None]:
import os
import cv2
import rasterio
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator

def load_tiff_image(file_path):
    with rasterio.open(file_path) as src:
        return src.read().transpose(1, 2, 0)

def preprocess_images(images_path, masks_path, image_size):
    images = []
    masks = []

    image_files = sorted([os.path.join(images_path, f) for f in os.listdir(images_path) if f.endswith('.tif')])
    mask_files = sorted([os.path.join(masks_path, f) for f in os.listdir(masks_path) if f.endswith('.tif')])

    for img_file, mask_file in zip(image_files, mask_files):
        image = load_tiff_image(img_file)
        mask = load_tiff_image(mask_file)

        image = cv2.resize(image, (image_size, image_size))
        mask = cv2.resize(mask, (image_size, image_size))

        images.append(image)
        masks.append(mask)

    return np.array(images), np.array(masks)

image_size = 256
train_images, train_masks = preprocess_images('/content/drive/MyDrive/Patches/train/rgb',
                                              '/content/drive/MyDrive/Patches/train/mask',
                                              image_size)

val_images, val_masks = preprocess_images('/content/drive/MyDrive/Patches/val/rgb',
                                          '/content/drive/MyDrive/Patches/val/mask',
                                          image_size)


In [None]:
from tensorflow.keras.layers import Conv2D, MaxPooling2D, UpSampling2D, Concatenate, Input
from tensorflow.keras.models import Model

def unet_model(input_size=(256, 256, 3)):
    inputs = Input(input_size)

    # Encoding
    conv1 = Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = Conv2D(128, (3, 3), activation='relu', padding='same')(pool1)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

    conv3 = Conv2D(256, (3, 3), activation='relu', padding='same')(pool2)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)

    # Bottleneck
    conv4 = Conv2D(512, (3, 3), activation='relu', padding='same')(pool3)

    # Decoding
    up5 = UpSampling2D(size=(2, 2))(conv4)
    merge5 = Concatenate()([conv3, up5])
    conv5 = Conv2D(256, (3, 3), activation='relu', padding='same')(merge5)

    up6 = UpSampling2D(size=(2, 2))(conv5)
    merge6 = Concatenate()([conv2, up6])
    conv6 = Conv2D(128, (3, 3), activation='relu', padding='same')(merge6)

    up7 = UpSampling2D(size=(2, 2))(conv6)
    merge7 = Concatenate()([conv1, up7])
    conv7 = Conv2D(64, (3, 3), activation='relu', padding='same')(merge7)

    conv8 = Conv2D(1, (1, 1), activation='sigmoid')(conv7)

    model = Model(inputs, conv8)
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

    return model

model = unet_model()
model.summary()

model.fit(train_images, train_masks, validation_data=(val_images, val_masks), epochs=10, batch_size=8)


In [None]:
loss, acc = model.evaluate(val_images, val_masks)
print(f"Validation Loss: {loss}")
print(f"Validation Accuracy: {acc}")

predictions = model.predict(val_images)

In [None]:
def display_images(images, masks, predictions, num_images=3):
    num_images = min(num_images, len(images))  # Ensure num_images does not exceed the length of images

    for i in range(num_images):
        plt.figure(figsize=(15, 5))

        plt.subplot(1, 3, 1)
        plt.imshow(images[i])
        plt.title('Original Image')

        plt.subplot(1, 3, 2)
        plt.imshow(masks[i].squeeze(), cmap='gray')
        plt.title('Mask')

        plt.subplot(1, 3, 3)
        plt.imshow(predictions[i].squeeze(), cmap='gray')
        plt.title('Prediction')

        plt.show()

# Adjust num_images to the actual number of validation images if necessary
num_validation_images = len(val_images)
display_images(val_images, val_masks, predictions, num_images=num_validation_images)


## Applied Pretrained Model

In [None]:
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv2D, UpSampling2D, Concatenate, Input

def unet_with_pretrained_resnet(input_size=(256, 256, 3)):
    inputs = Input(input_size)
    base_model = ResNet50(weights='imagenet', include_top=False, input_tensor=inputs)

    # Encoder
    conv1 = base_model.get_layer('conv1_relu').output  # 64x64x64
    conv2 = base_model.get_layer('conv2_block3_out').output  # 32x32x256
    conv3 = base_model.get_layer('conv3_block4_out').output  # 16x16x512
    conv4 = base_model.get_layer('conv4_block6_out').output  # 8x8x1024
    conv5 = base_model.get_layer('conv5_block3_out').output  # 4x4x2048

    # Bottleneck
    bottleneck = Conv2D(512, (3, 3), activation='relu', padding='same')(conv5)

    # Decoder
    up6 = UpSampling2D(size=(2, 2))(bottleneck)
    merge6 = Concatenate()([conv4, up6])
    conv6 = Conv2D(512, (3, 3), activation='relu', padding='same')(merge6)

    up7 = UpSampling2D(size=(2, 2))(conv6)
    merge7 = Concatenate()([conv3, up7])
    conv7 = Conv2D(256, (3, 3), activation='relu', padding='same')(merge7)

    up8 = UpSampling2D(size=(2, 2))(conv7)
    merge8 = Concatenate()([conv2, up8])
    conv8 = Conv2D(128, (3, 3), activation='relu', padding='same')(merge8)

    up9 = UpSampling2D(size=(2, 2))(conv8)
    merge9 = Concatenate()([conv1, up9])
    conv9 = Conv2D(64, (3, 3), activation='relu', padding='same')(merge9)

    up10 = UpSampling2D(size=(2, 2))(conv9)
    conv10 = Conv2D(64, (3, 3), activation='relu', padding='same')(up10)
    conv11 = Conv2D(1, (1, 1), activation='sigmoid')(conv10)

    model = Model(inputs=inputs, outputs=conv11)
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

    return model

model = unet_with_pretrained_resnet()
model.summary()


In [None]:
import os
import rasterio
import numpy as np
import cv2

def load_tiff_image(file_path):
    with rasterio.open(file_path) as src:
        return src.read().transpose(1, 2, 0)

def preprocess_images(images_path, masks_path, image_size):
    images = []
    masks = []

    image_files = sorted([os.path.join(images_path, f) for f in os.listdir(images_path) if f.endswith('.tif')])
    mask_files = sorted([os.path.join(masks_path, f) for f in os.listdir(masks_path) if f.endswith('.tif')])

    for img_file, mask_file in zip(image_files, mask_files):
        image = load_tiff_image(img_file)
        mask = load_tiff_image(mask_file)

        image = cv2.resize(image, (image_size, image_size))
        mask = cv2.resize(mask, (image_size, image_size))

        images.append(image)
        masks.append(mask)

    return np.array(images), np.array(masks)

image_size = 256
train_images, train_masks = preprocess_images('/content/drive/MyDrive/Patches/train/rgb',
                                              '/content/drive/MyDrive/Patches/train/mask',
                                              image_size)

val_images, val_masks = preprocess_images('/content/drive/MyDrive/Patches/val/rgb',
                                          '/content/drive/MyDrive/Patches/val/mask',
                                          image_size)

train_masks = np.expand_dims(train_masks, axis=-1)
val_masks = np.expand_dims(val_masks, axis=-1)

# model = unet_with_pretrained_resnet()
# model.summary()

# Training the model
model.fit(train_images, train_masks, validation_data=(val_images, val_masks), epochs=50, batch_size=8)

# Make predictions
predictions = model.predict(val_images)

# Visualization function
def display_images(images, masks, predictions, num_images=3):
    num_images = min(num_images, len(images))  # Ensure num_images does not exceed the length of images

    for i in range(num_images):
        plt.figure(figsize=(15, 5))

        plt.subplot(1, 3, 1)
        plt.imshow(images[i])
        plt.title('Original Image')

        plt.subplot(1, 3, 2)
        plt.imshow(masks[i].squeeze(), cmap='gray')
        plt.title('Mask')

        plt.subplot(1, 3, 3)
        plt.imshow(predictions[i].squeeze(), cmap='gray')
        plt.title('Prediction')

        plt.show()

# Adjust num_images to the actual number of validation images if necessary
num_validation_images = len(val_images)
display_images(val_images, val_masks, predictions, num_images=num_validation_images)


## Applied PyTorch