<a href="https://colab.research.google.com/github/huynhtruc0309/Unet_SpaceNet2/blob/main/%08Delta_Cognition_Homework.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!pip install rasterio
!pip install Fiona

In [40]:
import torch
import os

TEST_SPLIT = 0.15
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
PIN_MEMORY = True if DEVICE == "cuda" else False

# define the number of channels in the input, number of classes,
# and number of levels in the U-Net model
NUM_CHANNELS = 3
NUM_CLASSES = 1
NUM_LEVELS = 3

# initialize learning rate, number of epochs to train for, and the
# batch size
INIT_LR = 0.001
NUM_EPOCHS = 40
BATCH_SIZE = 32

# define the input image dimensions
INPUT_IMAGE_WIDTH = 650
INPUT_IMAGE_HEIGHT = 650

# define threshold to filter weak predictions
THRESHOLD = 0.5

In [4]:
from torch.utils.data import Dataset
import rasterio as rio
import numpy as np
from rasterio.crs import CRS
from rasterio.features import rasterize
from rasterio.transform import Affine
import rasterio as rio
import fiona
from fiona.errors import FionaValueError
from fiona.transform import transform_geom

class SpaceNet2Dataset(Dataset):
    def __init__(self, imagePaths, maskPaths, transforms):
        self.imagePaths = imagePaths
        self.maskPaths = maskPaths
        self.transforms = transforms

    def __len__(self):
        return len(self.imagePaths)

    def _load_image(self, imagePath):
        filename = os.path.join(imagePath)
        with rio.open(filename) as img:
            array = img.read().astype(np.float32)
            tensor = torch.from_numpy(array)
            return tensor, img.transform, img.crs

    def _load_mask(self, path: str, tfm, raster_crs, shape):
        try:
            with fiona.open(path) as src:
                vector_crs = CRS(src.crs)
                if raster_crs == vector_crs:
                    labels = [feature["geometry"] for feature in src]
                else:
                    labels = [
                        transform_geom(
                            vector_crs.to_string(),
                            raster_crs.to_string(),
                            feature["geometry"],
                        )
                        for feature in src
                    ]
        except FionaValueError:
            labels = []

        if not labels:
            mask_data = np.zeros(shape=shape)
        else:
            mask_data = rasterize(
                labels,
                out_shape=shape,
                fill=0,  # nodata value
                transform=tfm,
                all_touched=False,
                dtype=np.uint8,
            )

        mask: Tensor = torch.from_numpy(mask_data).long()  # type: ignore[attr-defined]

        return mask

    def __getitem__(self, idx: int):
        imagePath = self.imagePaths[idx]
        labelPath = self.maskPaths[idx]
        img, tfm, raster_crs = self._load_image(imagePath)
        h, w = img.shape[1:]
        mask = self._load_mask(labelPath, tfm, raster_crs, (h, w))

        ch, cw = INPUT_IMAGE_WIDTH, INPUT_IMAGE_HEIGHT
        image, mask = img[:, :ch, :cw], mask[:ch, :cw]

        if self.transforms is not None:
            image = self.transforms(image)
            mask = self.transforms(mask)

        return (image, mask)

In [5]:
# define the path to the directory
BASE_OUTPUT = "/content/drive/MyDrive/2022/DeltaCognition/output"
COLLECTION = 'sn2_AOI_5_Khartoum'
IMAGE_DATASET_PATH = '/content/drive/MyDrive/2022/DeltaCognition'
FILE_NAME = 'PS-RGB.tif'
LABEL_GLOB = 'labels.geojson'

In [6]:
# define the path to the output serialized model, model training
# plot, and testing image paths
MODEL_PATH = os.path.join(BASE_OUTPUT, "unet_tgs_salt.pth")
PLOT_PATH = os.path.sep.join([BASE_OUTPUT, "plot.png"])
TEST_PATHS = os.path.sep.join([BASE_OUTPUT, "test_paths.txt"])

In [7]:
from torch.nn import ConvTranspose2d
from torch.nn import Conv2d
from torch.nn import MaxPool2d
from torch.nn import Module
from torch.nn import ModuleList
from torch.nn import ReLU
from torchvision.transforms import CenterCrop
from torch.nn import functional as F

In [8]:
class Block(Module):
	def __init__(self, inChannels, outChannels):
		super().__init__()
		# store the convolution and RELU layers
		self.conv1 = Conv2d(inChannels, outChannels, 3)
		self.relu = ReLU()
		self.conv2 = Conv2d(outChannels, outChannels, 3)

	def forward(self, x):
		# apply CONV => RELU => CONV block to the inputs and return it
		return self.conv2(self.relu(self.conv1(x)))

In [9]:
class Encoder(Module):
	def __init__(self, channels=(3, 16, 32, 64)):
		super().__init__()
		# store the encoder blocks and maxpooling layer
		self.encBlocks = ModuleList(
			[Block(channels[i], channels[i + 1])
			 	for i in range(len(channels) - 1)])
		self.pool = MaxPool2d(2)

	def forward(self, x):
		# initialize an empty list to store the intermediate outputs
		blockOutputs = []

		# loop through the encoder blocks
		for block in self.encBlocks:
			# pass the inputs through the current encoder block, store
			# the outputs, and then apply maxpooling on the output
			x = block(x)
			blockOutputs.append(x)
			x = self.pool(x)

		# return the list containing the intermediate outputs
		return blockOutputs

In [10]:
class Decoder(Module):
	def __init__(self, channels=(64, 32, 16)):
		super().__init__()
		# initialize the number of channels, upsampler blocks, and
		# decoder blocks
		self.channels = channels
		self.upconvs = ModuleList(
			[ConvTranspose2d(channels[i], channels[i + 1], 2, 2)
			 	for i in range(len(channels) - 1)])
		self.dec_blocks = ModuleList(
			[Block(channels[i], channels[i + 1])
			 	for i in range(len(channels) - 1)])

	def forward(self, x, encFeatures):
		# loop through the number of channels
		for i in range(len(self.channels) - 1):
			# pass the inputs through the upsampler blocks
			x = self.upconvs[i](x)

			# crop the current features from the encoder blocks,
			# concatenate them with the current upsampled features,
			# and pass the concatenated output through the current
			# decoder block
			encFeat = self.crop(encFeatures[i], x)
			x = torch.cat([x, encFeat], dim=1)
			x = self.dec_blocks[i](x)

		# return the final decoder output
		return x

	def crop(self, encFeatures, x):
		# grab the dimensions of the inputs, and crop the encoder
		# features to match the dimensions
		(_, _, H, W) = x.shape
		encFeatures = CenterCrop([H, W])(encFeatures)

		# return the cropped features
		return encFeatures

In [11]:
class UNet(Module):
  def __init__(self, encChannels=(3, 16, 32, 64), decChannels=(64, 32, 16), nbClasses=1, retainDim=True, outSize=(INPUT_IMAGE_HEIGHT,  INPUT_IMAGE_WIDTH)):
    super().__init__()
    # initialize the encoder and decoder
    self.encoder = Encoder(encChannels)
    self.decoder = Decoder(decChannels)

    # initialize the regression head and store the class variables
    self.head = Conv2d(decChannels[-1], nbClasses, 1)
    self.retainDim = retainDim
    self.outSize = outSize

  def forward(self, x):
    # grab the features from the encoder
    encFeatures = self.encoder(x)

		# pass the encoder features through decoder making sure that
		# their dimensions are suited for concatenation
    decFeatures = self.decoder(encFeatures[::-1][0], encFeatures[::-1][1:])

		# pass the decoder features through the regression head to
		# obtain the segmentation mask
    map = self.head(decFeatures)

    # check to see if we are retaining the original output
		# dimensions and if so, then resize the output to match them
    if self.retainDim:
      map = F.interpolate(map, self.outSize)

		# return the segmentation map
    return map

In [12]:
from torch.nn import BCEWithLogitsLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from torchvision import transforms
from imutils import paths
from tqdm import tqdm
import matplotlib.pyplot as plt
import time
import glob

In [13]:
imagePaths, maskPaths = [], []
imagePaths = glob.glob(os.path.join(IMAGE_DATASET_PATH, COLLECTION, "*", FILE_NAME))
imagePaths = sorted(imagePaths)
for imgpath in imagePaths:
    lbl_path = os.path.join(
        os.path.dirname(imgpath) + "-labels", LABEL_GLOB
    )
    maskPaths.append(lbl_path)

In [41]:
# partition the data into training and testing splits using 85% of
# the data for training and the remaining 15% for testing
split = train_test_split(imagePaths, maskPaths,
	test_size=TEST_SPLIT, random_state=42)

# unpack the data split
(trainImages, testImages) = split[:2]
(trainMasks, testMasks) = split[2:]

# write the testing image paths to disk so that we can use then
# when evaluating/testing our model
print("[INFO] saving testing image paths...")
f = open(TEST_PATHS, "w")
f.write("\n".join(testImages))
f.close() 

[INFO] saving testing image paths...


In [42]:
# define transformations
# transforms = transforms.Compose([transforms.ToPILImage(),
#  	transforms.Resize((INPUT_IMAGE_HEIGHT, INPUT_IMAGE_WIDTH)),
# 	transforms.ToTensor()])

# create the train and test datasets
trainDS = SpaceNet2Dataset(imagePaths=trainImages, maskPaths=trainMasks,
	transforms=None)
testDS = SpaceNet2Dataset(imagePaths=testImages, maskPaths=testMasks,
    transforms=None)
print(f"[INFO] found {len(trainDS)} examples in the training set...")
print(f"[INFO] found {len(testDS)} examples in the test set...")

# create the training and test data loaders
trainLoader = DataLoader(trainDS, shuffle=True,
	batch_size=BATCH_SIZE, pin_memory=PIN_MEMORY,
	num_workers=os.cpu_count())
testLoader = DataLoader(testDS, shuffle=False,
	batch_size=BATCH_SIZE, pin_memory=PIN_MEMORY,
	num_workers=os.cpu_count())


[INFO] found 860 examples in the training set...
[INFO] found 152 examples in the test set...


In [43]:
# initialize our UNet model
unet = UNet().to(DEVICE)

# initialize loss function and optimizer
lossFunc = BCEWithLogitsLoss()
opt = Adam(unet.parameters(), lr=INIT_LR)

# calculate steps per epoch for training and test set
trainSteps = len(trainDS) // BATCH_SIZE
testSteps = len(testDS) // BATCH_SIZE

# initialize a dictionary to store training history
H = {"train_loss": [], "test_loss": []}

In [45]:
# loop over epochs
print("[INFO] training the network...")
startTime = time.time()
for e in tqdm(range(NUM_EPOCHS)):
	# set the model in training mode
	unet.train()

	# initialize the total training and validation loss
	totalTrainLoss = 0
	totalTestLoss = 0

	# loop over the training set
	for (i, (x, y)) in enumerate(trainLoader):
		# send the input to the device
		(x, y) = (x.to(DEVICE), y.to(DEVICE))
		
		# perform a forward pass and calculate the training loss
		pred = unet(x)
		pred = torch.squeeze(pred, 1)
		loss = lossFunc(pred, y.float())

		# first, zero out any previously accumulated gradients, then
		# perform backpropagation, and then update model parameters
		opt.zero_grad()
		loss.backward()
		opt.step()

		# add the loss to the total training loss so far
		totalTrainLoss += loss.detach()

	# switch off autograd
	with torch.no_grad():
		# set the model in evaluation mode
		unet.eval()

		# loop over the validation set
		for (x, y) in testLoader:
			# send the input to the device
			(x, y) = (x.to(DEVICE), y.to(DEVICE))

			# make the predictions and calculate the validation loss
			pred = unet(x)
			pred = torch.squeeze(pred, 1)
			totalTestLoss += lossFunc(pred, y.float())

	# calculate the average training and validation loss
	avgTrainLoss = totalTrainLoss / trainSteps
	avgTestLoss = totalTestLoss / testSteps

	# update our training history
	H["train_loss"].append(avgTrainLoss.cpu().detach().numpy())
	H["test_loss"].append(avgTestLoss.cpu().detach().numpy())

	# print the model training and validation information
	print("[INFO] EPOCH: {}/{}".format(e + 1, NUM_EPOCHS))
	print("Train loss: {:.6f}, Test loss: {:.4f}".format(
		avgTrainLoss, avgTestLoss))
 
# display the total time needed to perform the training 
endTime = time.time()
print("[INFO] total time taken to train the model: {:.2f}s".format(
	endTime - startTime))

[INFO] training the network...


  2%|▎         | 1/40 [00:47<30:36, 47.09s/it]

[INFO] EPOCH: 1/40
Train loss: 0.616814, Test loss: 0.0000


  5%|▌         | 2/40 [01:32<29:19, 46.29s/it]

[INFO] EPOCH: 2/40
Train loss: 0.000000, Test loss: 0.0000


  8%|▊         | 3/40 [02:19<28:34, 46.33s/it]

[INFO] EPOCH: 3/40
Train loss: 0.000000, Test loss: 0.0000


 10%|█         | 4/40 [03:05<27:54, 46.52s/it]

[INFO] EPOCH: 4/40
Train loss: 0.000000, Test loss: 0.0000


 12%|█▎        | 5/40 [03:53<27:14, 46.71s/it]

[INFO] EPOCH: 5/40
Train loss: 0.000000, Test loss: 0.0000


 15%|█▌        | 6/40 [04:40<26:32, 46.83s/it]

[INFO] EPOCH: 6/40
Train loss: 0.000000, Test loss: 0.0000


 18%|█▊        | 7/40 [05:27<25:47, 46.89s/it]

[INFO] EPOCH: 7/40
Train loss: 0.000000, Test loss: 0.0000


 20%|██        | 8/40 [06:14<25:01, 46.93s/it]

[INFO] EPOCH: 8/40
Train loss: 0.000000, Test loss: 0.0000


 22%|██▎       | 9/40 [07:01<24:16, 46.98s/it]

[INFO] EPOCH: 9/40
Train loss: 0.000000, Test loss: 0.0000


 25%|██▌       | 10/40 [07:48<23:30, 47.01s/it]

[INFO] EPOCH: 10/40
Train loss: 0.000000, Test loss: 0.0000


 28%|██▊       | 11/40 [08:35<22:44, 47.06s/it]

[INFO] EPOCH: 11/40
Train loss: 0.000000, Test loss: 0.0000


 28%|██▊       | 11/40 [08:52<23:24, 48.44s/it]


KeyboardInterrupt: ignored