In [None]:
import random
from functools import partial

import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import BCEWithLogitsLoss
from torchvision.transforms import CenterCrop
import os
from torch.utils.data import Dataset, DataLoader
from torch.optim import Adam
from sklearn.model_selection import train_test_split
from torchvision import transforms
import torchvision
from tqdm import tqdm
import matplotlib.pyplot as plt
import time

In [None]:
# read data from ./CrackDataset_DL_HY/SematicSeg_Dataset
DATASET_PATH = os.path.join(os.getcwd(), 'SematicSeg_Dataset')
# define the path to the images and masks dataset
IMAGE_DATASET_PATH = os.path.join(DATASET_PATH, 'Original Image')
MASK_DATASET_PATH = os.path.join(DATASET_PATH, 'Labels')
    
# define the test split
TEST_SPLIT = 0.15

# determine the device to be used for training and evaluation
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# determine if we will be pinning memory during data loading
PIN_MEMORY = True if DEVICE == "cuda" else False

# define the number of channels in the input, number of classes,
# and number of levels in the U-Net model
NUM_CHANNELS = 3
NUM_CLASSES = 2
NUM_LEVELS = 3

# initialize learning rate, number of epochs to train for, and the
# batch size
INIT_LR = 1e-2#INIT_LR = 1e-6
NUM_EPOCHS = 100
BATCH_SIZE = 2
# define the input image dimensions
INPUT_IMAGE_WIDTH = 480
INPUT_IMAGE_HEIGHT = 320

# define threshold to filter weak predictions
THRESHOLD = 0.5

# define the path to the base output directory
BASE_OUTPUT = os.path.join(os.getcwd(), 'output')

# define the path to the output serialized model, model training
# plot, and testing image paths
MODEL_PATH = os.path.join(BASE_OUTPUT, "unet_tgs_salt.pth")
PLOT_PATH = os.path.sep.join([BASE_OUTPUT, "plot.png"])
TEST_IMAGE_PATHS = os.path.sep.join([BASE_OUTPUT, "test_image_paths.txt"])
TEST_MASK_PATHS = os.path.sep.join([BASE_OUTPUT, "test_mask_paths.txt"])

In [None]:
print(TEST_IMAGE_PATHS)

In [None]:
import os.path
import numpy as np
import torch
import cv2
from torch.utils.data import Dataset
class SegmentationDataSet(Dataset):
    def __init__(self,rootdir,labeldir,img_name_ls, label_name_ls,transform=None):
        assert len(img_name_ls)==len(label_name_ls)
        self.data_dir=rootdir
        self.label_dir=labeldir
        self.img_name_ls = img_name_ls
        self.label_name_ls = label_name_ls
        self.data_len = len(img_name_ls)
        self.transform = transform
    
    def __len__(self):
        return self.data_len

    def __getitem__(self, index):
        img_name = self.img_name_ls[index]
        label_name = self.label_name_ls[index]
        img_path = os.path.join(self.data_dir, img_name)
        label_path = os.path.join(self.label_dir, label_name)
        img = cv2.imread(img_path)
        label = cv2.imread(label_path)
        # resize the image and mask
        img = cv2.resize(img, (INPUT_IMAGE_WIDTH, INPUT_IMAGE_HEIGHT))
        label = cv2.resize(label, (INPUT_IMAGE_WIDTH, INPUT_IMAGE_HEIGHT))
        # convert mask to 0 and 1
        label = cv2.cvtColor(label, cv2.COLOR_BGR2GRAY)
        label[label > 0] = 1
        label = label.astype(np.float32)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        if self.transform is not None:
            img = self.transform(img)
            label = self.transform(label)
        return img, label

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F


# U-Net model
# convolution class
class Conv(nn.Module):
    def __init__(self, C_in, C_out):
        super(Conv, self).__init__()
        self.layer = nn.Sequential(
            nn.Conv2d(C_in, C_out, 3, 1, 1),
            nn.BatchNorm2d(C_out),
            # prevent overfitting
            nn.Dropout(0.3),
            nn.LeakyReLU(),
            nn.Conv2d(C_out, C_out, 3, 1, 1),
            nn.BatchNorm2d(C_out),
            # prevent overfitting
            nn.Dropout(0.4),
            nn.LeakyReLU(),
        )

    def forward(self, x):
        return self.layer(x)


# down sampling module
class DownSampling(nn.Module):
    def __init__(self, C):
        super(DownSampling, self).__init__()
        self.Down = nn.Sequential(
            # 2X downsampling using convolution with the same number of channels
            nn.Conv2d(C, C, 3, 2, 1),
            nn.LeakyReLU(),
        )

    def forward(self, x):
        return self.Down(x)


# up sampling module
class UpSampling(nn.Module):
    def __init__(self, C):
        super(UpSampling, self).__init__()
        # Feature map size is expanded by 2 times and the number of channels is halved
        self.Up = nn.Conv2d(C, C // 2, 1, 1)

    def forward(self, x, r):
        # Downsampling using neighborhood interpolation
        up = F.interpolate(x, scale_factor=2, mode="nearest")
        x = self.Up(up)
        # concat, the current upsampling, and the previous downsampling process
        return torch.cat((x, r), 1)


# U-Net model
class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()
        
        # 4 times down sampling
        self.cov1 = Conv(3, 64)
        self.encoder1 = DownSampling(64)
        self.cov2 = Conv(64, 128)
        self.encoder2 = DownSampling(128)
        self.cov3 = Conv(128, 256)
        self.encoder3 = DownSampling(256)
        self.cov4 = Conv(256, 512)
        self.encoder4 = DownSampling(512)
        self.cov5 = Conv(512, 1024)

        # 4 times up sampling
        self.decoder1 = UpSampling(1024)
        self.cov6 = Conv(1024, 512)
        self.decoder2 = UpSampling(512)
        self.cov7 = Conv(512, 256)
        self.decoder3 = UpSampling(256)
        self.cov8 = Conv(256, 128)
        self.decoder4 = UpSampling(128)
        self.cov9= Conv(128, 64)

        # self.Th = torch.nn.Sigmoid()
        self.pred = torch.nn.Conv2d(64, 1, 3, 1, 1)

    def forward(self, x):
        # down sampling
        result1 = self.cov1(x)
        result2 = self.cov2(self.encoder1(result1))
        result3 = self.cov3(self.encoder2(result2))
        result4 = self.cov4(self.encoder3(result3))
        result5 = self.cov5(self.encoder4(result4))

        # up sampling
        result6 = self.cov6(self.decoder1(result5, result4))
        result7 = self.cov7(self.decoder2(result6, result3))
        result8 = self.cov8(self.decoder3(result7, result2))
        result9 = self.cov9(self.decoder4(result8, result1))

        # output
        return self.pred(result9)  # self.Th(self.pred(O4))



if __name__ == '__main__':
    net = UNet()

In [None]:
# load the image and mask filepaths in a sorted manner
imagePaths = sorted(list(os.listdir(IMAGE_DATASET_PATH)))
maskPaths = sorted(list(os.listdir(MASK_DATASET_PATH)))

# # only use first 10 pictures
# imagePaths = imagePaths[:118]
# maskPaths = maskPaths[:118]

for file_name in imagePaths:
    os.rename(os.path.join(IMAGE_DATASET_PATH,file_name),os.path.join(IMAGE_DATASET_PATH,file_name.lower()))

# partition the data into training and testing splits using 85% of
# the data for training and the remaining 15% for testing
split = train_test_split(imagePaths, maskPaths,
	test_size=TEST_SPLIT, random_state=42)

# unpack the training and testing image and mask paths
(trainImages, testImages) = split[:2]
(trainMasks, testMasks) = split[2:]
# write the testing image paths to disk so that we can use then
# when evaluating/testing our model
print("[INFO] saving testing image paths...")
f = open(TEST_IMAGE_PATHS, "w")
f.write("\n".join(testImages))
f.close()
f = open(TEST_MASK_PATHS, "w")
f.write("\n".join(testMasks))
f.close()


In [None]:
# define transformations
transform=transforms.Compose([
    transforms.ToTensor(), 
])
# create the train and test datasets
trainDS = SegmentationDataSet(IMAGE_DATASET_PATH, MASK_DATASET_PATH, trainImages, trainMasks, transform=transform)
testDS = SegmentationDataSet(IMAGE_DATASET_PATH, MASK_DATASET_PATH, testImages, testMasks, transform=transform)
print(f"[INFO] found {len(trainDS)} examples in the training set...")
print(f"[INFO] found {len(testDS)} examples in the test set...")
# create the training and test data loaders
trainLoader = DataLoader(trainDS, shuffle=True,
	batch_size=BATCH_SIZE, pin_memory=PIN_MEMORY, num_workers=4)
testLoader = DataLoader(testDS, shuffle=False,
	batch_size=BATCH_SIZE, pin_memory=PIN_MEMORY, num_workers=4)

# get the first batch of training data
images, masks = next(iter(trainLoader))
print(f"[INFO] images shape: {images.shape}")
print(f"[INFO] masks shape: {masks.shape}")



In [None]:
class WeightedFocalLoss(nn.Module):
    "Non weighted version of Focal Loss"
    def __init__(self, alpha=.25, gamma=2):
        super(WeightedFocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma

    def forward(self, inputs, targets):
        BCE_loss = F.binary_cross_entropy_with_logits(inputs, targets, reduction='none')
        pt = torch.exp(-BCE_loss) # prevents nans when probability 0
        F_loss = self.alpha * (1-pt)**self.gamma * BCE_loss
        return F_loss.mean()

# dice loss
class DiceLoss(nn.Module):
    def __init__(self, smooth=1):
        super(DiceLoss, self).__init__()
        self.smooth = smooth

    def forward(self, y_pred, y_true):
        y_pred = torch.sigmoid(y_pred)
        intersection = (y_pred * y_true).sum()
        dice = (2. * intersection + self.smooth) / (y_pred.sum() + y_true.sum() + self.smooth)
        return 1 - dice

In [None]:
#evalution metric with mIoU

class SegmentationMetric(object):
    def __init__(self, numClass):
        self.numClass = numClass
        self.confusionMatrix = np.zeros((self.numClass,)*2)


    def meanIntersectionOverUnion(self):
        # Intersection = TP Union = TP + FP + FN
        # IoU = TP / (TP + FP + FN)
        intersection = np.diag(self.confusionMatrix)
        union = np.sum(self.confusionMatrix, axis=1) + np.sum(self.confusionMatrix, axis=0) - np.diag(self.confusionMatrix)
        IoU = intersection / union
        mIoU = np.nanmean(IoU)
        return mIoU

    def genConfusionMatrix(self, imgPredict, imgLabel):
        # remove classes from unlabeled pixels in gt image and predict
        imgPredict = torch.sigmoid(imgPredict)
        imgPredict = torch.where(imgPredict>=0.5,1,0)
        mask = (imgLabel >= 0) & (imgLabel < self.numClass)
        label = self.numClass * imgLabel[mask] + imgPredict[mask]
        count = np.bincount(label, minlength=self.numClass**2)
        confusionMatrix = count.reshape(self.numClass, self.numClass)
        return confusionMatrix

    def addBatch(self, imgPredict, imgLabel):
        assert imgPredict.shape == imgLabel.shape
        self.confusionMatrix += self.genConfusionMatrix(imgPredict.cpu(), imgLabel.cpu())

    def recall(self):
        # return all class recall
        # recall = TP / (TP + FN)
        recall = np.diag(self.confusionMatrix) / np.sum(self.confusionMatrix, axis=1)
        return recall

    def precision(self):
        # return all class precision
        # precision = TP / (TP + FP)
        precision = np.diag(self.confusionMatrix) / np.sum(self.confusionMatrix, axis=0)
        return precision

    def F1score(self):
        # return all class F1 score
        # F1 score = 2 * (precision * recall) / (precision + recall)
        precision = self.precision()
        recall = self.recall()
        F1score = 2 * (precision * recall) / (precision + recall)
        return F1score

In [None]:
# initialize our UNet model
unet = UNet().to(DEVICE)
# initialize loss function and optimizer
lossFunc = DiceLoss()
opt = Adam(unet.parameters(), lr=INIT_LR)
milestones = np.array([10, 20, 30, 40])
lr_scheduler = torch.optim.lr_scheduler.StepLR(opt, step_size=10, gamma=0.8, last_epoch=- 1, verbose=False)
'''torch.optim.lr_scheduler.MultiStepLR(
                opt, milestones=milestones, gamma=0.1)'''

# calculate steps per epoch for training and test set
trainSteps = len(trainDS) // BATCH_SIZE
testSteps = len(testDS) // BATCH_SIZE
# initialize a dictionary to store training history
H = {"train_loss": [], "test_loss": []}
# loop over epochs
print("[INFO] training the network...")
startTime = time.time()
#for e in tqdm(range(NUM_EPOCHS)):

for e in tqdm(range(NUM_EPOCHS)):
	# set the model in training mode
	unet.train()
	# initialize the total training and validation loss
	totalTrainLoss = 0
	totalTestLoss = 0
	mIOU = 0
	metric = SegmentationMetric(2)
	# loop over the training set
	for (i, (x, y)) in tqdm(enumerate(trainLoader)):
		# send the input to the device
		(x, y) = (x.to(DEVICE), y.to(DEVICE))
		# perform a forward pass and calculate the training loss
		if(i==0):
			#plot the first image in the batch
			'''fig, ax = plt.subplots(1, 2, figsize=(10, 5))
			ax[0].imshow(x[0].cpu().permute(1, 2, 0))
			ax[0].set_title("Input Image")
			ax[1].imshow(y[0].cpu().squeeze(), cmap="gray")
			ax[1].set_title("Ground Truth Mask")
			plt.show()'''
			
		pred = unet(x)
		'''print(torch.max(torch.sigmoid(pred)))
		print(torch.min(torch.sigmoid(pred)))'''
		loss = lossFunc(pred, y)
		# first, zero out any previously accumulated gradients, then
		# perform backpropagation, and then update model parameters
		opt.zero_grad()
		loss.backward()
		opt.step()
		# add the loss to the total training loss so far
		totalTrainLoss += loss
	# switch off autograd
	with torch.no_grad():
		# set the model in evaluation mode
		unet.eval()
		# loop over the validation set
		for (x, y) in testLoader:
			# send the input to the device
			(x, y) = (x.to(DEVICE), y.to(DEVICE))
			# make the predictions and calculate the validation loss
			pred = unet(x)
			# plot the prediction and ground truth mask
			if(e%5==0):
				fig, ax = plt.subplots(1, 2, figsize=(10, 5))
				ax[0].imshow(x[0].cpu().permute(1, 2, 0))
				ax[0].set_title("Input Image")
				#ax[1].imshow(pred[0].cpu().squeeze(), cmap="gray")
				ax[1].imshow(torch.sigmoid(pred[0]).cpu().squeeze(), cmap="gray", vmin=0, vmax=1)
				ax[1].set_title("Predicted Mask")
				plt.show()
			totalTestLoss += lossFunc(pred, y)
			#evalution metric
			metric.addBatch(pred, y)
	lr_scheduler.step()		
	# calculate the average training and validation loss
	avgTrainLoss = totalTrainLoss / trainSteps
	avgTestLoss = totalTestLoss / testSteps
	avgmIOU = metric.meanIntersectionOverUnion()
	precison = metric.precision()[1]
	recall = metric.recall()[1]
	F1score = metric.F1score()[1]
	# update our training history
	H["train_loss"].append(avgTrainLoss.cpu().detach().numpy())
	H["test_loss"].append(avgTestLoss.cpu().detach().numpy())
	# print the model training and validation information
	print("[INFO] EPOCH: {}/{}".format(e + 1, NUM_EPOCHS))
	print("Train loss: {:.6f}, Test loss: {:.4f},mIOU:{:.2f},precision:{:.2f},recall:{:.2f},F1score:{:.2f}".format(
		avgTrainLoss, avgTestLoss,avgmIOU,precison,recall,F1score))
	print()
# display the total time needed to perform the training
endTime = time.time()
print("[INFO] total time taken to train the model: {:.2f}s".format(
	endTime - startTime))


In [None]:

# plot the training loss
plt.style.use("ggplot")
plt.figure()
plt.plot(H["train_loss"], label="train_loss")
plt.plot(H["test_loss"], label="test_loss")
plt.title("Training Loss on Dataset")
plt.xlabel("Epoch #")
plt.ylabel("Loss")
plt.legend(loc="lower left")
plt.savefig(PLOT_PATH)
# serialize the model to disk
torch.save(unet, MODEL_PATH)

In [None]:
# USAGE
# python predict.py
# import the necessary packages
import matplotlib.pyplot as plt
import numpy as np
import torch
import cv2
import os
def prepare_plot(origImage, origMask, predMask):
	# initialize our figure
	figure, ax = plt.subplots(nrows=1, ncols=3, figsize=(10, 10))
	# plot the original image, its mask, and the predicted mask
	ax[0].imshow(origImage)
	ax[1].imshow(origMask)
	ax[2].imshow(predMask)
	# set the titles of the subplots
	ax[0].set_title("Image")
	ax[1].set_title("Original Mask")
	ax[2].set_title("Predicted Mask")
	# set the layout of the figure and display it
	figure.tight_layout()
	figure.show()

In [None]:
def make_predictions(model, imagePath, maskPath):
	# set model to evaluation mode
	model.eval()
	# turn off gradient tracking
	with torch.no_grad():
		# load the image from disk, swap its color channels, cast it
		# to float data type, and scale its pixel values
		image = cv2.imread(imagePath)
		image = cv2.resize(image, (INPUT_IMAGE_WIDTH,
			INPUT_IMAGE_HEIGHT))
		image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
		image = image.astype("float32") / 255.0
		# convert the image to a NumPy array, add a batch dimension,
		# and convert it to a PyTorch tensor
		# copy of image
		origImage = image.copy()
		image = np.transpose(image, (2, 0, 1))

		image = torch.from_numpy(image).unsqueeze(0)

		# send the image to the device
		image = image.to(DEVICE)

		# find the filename and generate the path to ground truth
		# mask

		# load the ground-truth segmentation mask in grayscale mode
		# and resize it
		gtMask = cv2.imread(maskPath, 0)
		gtMask = cv2.resize(gtMask, (INPUT_IMAGE_WIDTH,
			INPUT_IMAGE_HEIGHT))

		# make the prediction, pass the results through the sigmoid
		# function, and convert the result to a NumPy array
		predMask = model(image).squeeze()
		predMask = torch.sigmoid(predMask)
		predMask = predMask.cpu().numpy()
		print(max(predMask.flatten()))
		# filter out the weak predictions and convert them to integers
		predMask = (predMask > THRESHOLD) * 255
		predMask = predMask.astype(np.uint8)
		# prepare a plot for visualization
		prepare_plot(image, gtMask, predMask)

# load the image paths in our testing file and randomly select 10
# image paths
print("[INFO] loading up test image paths...")
imagePaths = open(TEST_IMAGE_PATHS).read().strip().split("\n")
maskPaths = open(TEST_MASK_PATHS).read().strip().split("\n")
random_index = np.random.choice(len(imagePaths), size=5)
imagePaths = np.array(imagePaths)[random_index]
maskPaths = np.array(maskPaths)[random_index]
print(imagePaths)
# load our model from disk and flash it to the current device
print("[INFO] load up model...")
unet = torch.load(MODEL_PATH).to(DEVICE)
# iterate over the randomly selected test image paths
for i in range(0, len(imagePaths)):
	# make predictions and visualize the results
	imgpath = os.path.join(IMAGE_DATASET_PATH, imagePaths[i])
	mskpath = os.path.join(MASK_DATASET_PATH, maskPaths[i])
	make_predictions(unet, imgpath, mskpath)