In [None]:
import os
import cv2
import torch
import numpy as np
import matplotlib.pyplot as plt

from tqdm import tqdm
from PIL import Image
from pathlib import Path
from torch.utils.data import Dataset, DataLoader

In [None]:
MODEL_PATH = r"C:\Users\hdmqu\Downloads\C-FCN.pth"
RED_PATH   = r"C:\Users\hdmqu\Documents\GitHub\Cloud_Detection\data\95-cloud\train_red_additional_to38cloud"

r_dir = Path(RED_PATH)

Percentage of cloud

In [None]:
#   0%      1 - 30%    30 - 70%    70 - 100%
no_cloud, less_cloud, more_cloud, full_cloud = [], [], [], []

low_threshold = 384*384 * 0.3
high_threshold = 384*384 * 0.7

GT_PATH = RED_PATH.replace("red", "gt")
u = os.listdir(GT_PATH)

for file in u[int(len(u)/2):]:
    img = cv2.imread(os.path.join(GT_PATH, file)) 
    
    occurrences = np.count_nonzero(img[:, :, 0] == 255)

    if occurrences == 0:
        no_cloud.append(file)
        continue
    elif occurrences < low_threshold:
        less_cloud.append(file)
        continue
    elif occurrences < high_threshold:
        more_cloud.append(file)
    else:
        full_cloud.append(file)

print("No cloud images:\t",   len(no_cloud))
print("Less cloud images:\t", len(less_cloud))
print("More cloud images:\t", len(more_cloud))
print("Full cloud images:\t", len(full_cloud))
print("Total images:\t\t", int(len(os.listdir(RED_PATH))/2))

In [None]:
class CloudDataset(Dataset):
    def __init__(self, file_list):
        super().__init__()
        self.files = [self.combine_files(f) for f in self.to_path(file_list) if not os.path.isdir(f)]
        
    def to_path(self, list):
        for i in range(len(list)):
            list[i] = os.path.join(RED_PATH, list[i].replace("gt", "red"))
        return list
        
    def combine_files(self, r_file: Path):
        files = {
            'red'  : r_file, 
            'green': r_file.replace('red', 'green'),
            'blue' : r_file.replace('red', 'blue'), 
            'nir'  : r_file.replace('red', 'nir'),
            'gt'   : r_file.replace('red', 'gt')
        }
        return files
                                       
    def __len__(self):
        return len(self.files)
     
    def open_as_array(self, idx, invert=False, include_nir=False):
        raw_rgb = np.stack([
            np.array(Image.open(self.files[idx]['red'])),
            np.array(Image.open(self.files[idx]['green'])),
            np.array(Image.open(self.files[idx]['blue'])),
        ], axis=2)
        if include_nir:
            nir = np.expand_dims(np.array(Image.open(self.files[idx]['nir'])), 2)
            raw_rgb = np.concatenate([raw_rgb, nir], axis=2)
        if invert:
            raw_rgb = raw_rgb.transpose((2,0,1))
        return (raw_rgb / np.iinfo(raw_rgb.dtype).max) # normalized


    def open_mask(self, idx, add_dims=False):
        raw_mask = np.array(Image.open(self.files[idx]['gt']))
        raw_mask = np.where(raw_mask==255, 1, 0)
        return np.expand_dims(raw_mask, 0) if add_dims else raw_mask
    
    def __getitem__(self, idx):
        x = torch.tensor(self.open_as_array(idx, invert=True), dtype=torch.float32)
        y = torch.tensor(self.open_mask(idx, add_dims=False), dtype=torch.long)
        return x, y
    
    def __repr__(self):
        return f'Dataset class with {self.__len__()} files'

In [None]:
no_cloud_data   =  CloudDataset(no_cloud)
less_cloud_data =  CloudDataset(less_cloud)
more_cloud_data =  CloudDataset(more_cloud)
full_cloud_data =  CloudDataset(full_cloud)

print(no_cloud_data)
print(less_cloud_data)
print(more_cloud_data)
print(full_cloud_data)

In [None]:
# Single input example
x, y = no_cloud_data[0]
print(f'Image: {x.shape}')
print(f'Mask: {y.shape}')

In [None]:
from torch import nn
class C_FCN(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv1 = self.contract_block(in_channels, 5)
        self.conv2 = self.contract_block(5, 2)
    
        self.out   = self.output_block(2, out_channels)


    def __call__(self, x):
        # downsampling part
        conv1 = self.conv1(x)
        conv2 = self.conv2(conv1)
        
        out   = self.out(conv2)

        return out


    def contract_block(self, in_channels, out_channels, kernel_size=3, padding=1):
        contract = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, padding=padding),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        )
        return contract
    
    def output_block(self, in_channels, out_channels, kernel_size=3, padding=1):
        out = nn.Sequential(
            nn.Conv2d(in_channels, 2, kernel_size=kernel_size, padding=padding),
            nn.ReLU(inplace=True),
            nn.Conv2d(2, 2, kernel_size=1, padding=0),
            nn.Sigmoid(),
            nn.ConvTranspose2d(2, out_channels, stride=4, kernel_size=4, padding=0)
        )
        return out

In [None]:
def my_collate(batch):
    batch = list(filter (lambda x:torch.sum(x[0]).item() != 0, batch))
    if len(batch) == 0:
        print("DOOOOO")
    return torch.utils.data.dataloader.default_collate(batch)

# Constants
BATCH_SIZE = 32
NUM_WORKER = 0
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

print(f"Using {DEVICE} device.")

model = torch.load(MODEL_PATH, map_location=DEVICE)
model.to(DEVICE)

no_cloud_loader   = DataLoader(no_cloud_data,   BATCH_SIZE, shuffle=True, num_workers=NUM_WORKER, collate_fn=my_collate)
less_cloud_loader = DataLoader(less_cloud_data, BATCH_SIZE, shuffle=True, num_workers=NUM_WORKER, collate_fn=my_collate)
more_cloud_loader = DataLoader(more_cloud_data, BATCH_SIZE, shuffle=True, num_workers=NUM_WORKER, collate_fn=my_collate)
full_cloud_loader = DataLoader(full_cloud_data, BATCH_SIZE, shuffle=True, num_workers=NUM_WORKER, collate_fn=my_collate)

In [None]:
import torchmetrics

def get_accuracy(data_loader, model):
    acc, rec, pre, f1 = [], [], [], []

    accuracy  = torchmetrics.Accuracy()
    recall    = torchmetrics.Recall()
    precision = torchmetrics.Precision()
    f1_score  = torchmetrics.F1Score()


    for images, targets in tqdm(data_loader):
        with torch.no_grad():
            
            # Get predictions
            images = images.to(DEVICE).to(torch.float32)
            targets = targets.to(DEVICE)
            predictions = model(images)[:, 0, :, :].to(torch.float32)

            acc.append(accuracy(predictions, targets))
            rec.append(recall(predictions, targets))
            pre.append(precision(predictions, targets))
            f1.append(f1_score(predictions, targets))

    #return round(np.mean(acc), 4), round(np.mean(rec)), round(np.mean(pre), 4), round(np.mean(f1), 4)
    return np.mean(acc), np.mean(rec), np.mean(pre), np.mean(f1)

In [None]:
no_acc,   no_rec,   no_pre,   no_f1   = get_accuracy(no_cloud_loader,   model)
less_acc, less_rec, less_pre, less_f1 = get_accuracy(less_cloud_loader, model)
more_acc, more_rec, more_pre, more_f1 = get_accuracy(more_cloud_loader, model)
full_acc, full_rec, full_pre, full_f1 = get_accuracy(full_cloud_loader, model)

In [None]:
print("\t\tAccuracy\tRecall\t\tPrecision\tF1")
print(f"No cloud:\t{no_acc}\t{no_rec}\t{no_pre}\t{no_f1}")
print(f"Less cloud:\t{less_acc}\t{less_rec}\t{less_pre}\t{less_f1}")
print(f"More cloud:\t{more_acc}\t{more_rec}\t{more_pre}\t{more_f1}")
print(f"Full cloud:\t{full_acc}\t{full_rec}\t{full_pre}\t{full_f1}")

Cloud thickness

In [None]:
thin_cloud, thick_cloud = [], []

thickness_threshold = (0.46875, 0.46875, 0.46875, 0.46875)

for i in range(len(full_cloud_data)):

    x, y = full_cloud_data[i]
    out = cv2.inRange(torch.permute(x, (1, 2, 0)).numpy(), thickness_threshold, (1, 1, 1, 1))

    try:
        IoU = np.sum(np.logical_and(out, y.numpy())) / np.sum(np.logical_or(out, y.numpy()))
    except ZeroDivisionError:
        thin_cloud.append(full_cloud[i])
        continue

    if IoU > 0.5:
        thick_cloud.append(full_cloud[i])
    else:
        thin_cloud.append(full_cloud[i])

In [None]:
thin_cloud_data  =  CloudDataset(thin_cloud)
thick_cloud_data =  CloudDataset(thick_cloud)

print(thin_cloud)
print(thick_cloud)

thin_cloud_loader  = DataLoader(thin_cloud_data,  BATCH_SIZE, shuffle=True, num_workers=NUM_WORKER, collate_fn=my_collate)
thick_cloud_loader = DataLoader(thick_cloud_data, BATCH_SIZE, shuffle=True, num_workers=NUM_WORKER, collate_fn=my_collate)

In [None]:
thin_acc,  thin_rec,  thin_pre,  thin_f1  = get_accuracy(thin_cloud_loader,  model)
thick_acc, thick_rec, thick_pre, thick_f1 = get_accuracy(thick_cloud_loader, model)

In [None]:
print("\t\tAccuracy\tRecall\t\tPrecision\tF1")
print(f"Thin cloud:\t{thin_acc}\t{thin_rec}\t{thin_pre}\t{thin_f1}")
print(f"Thick cloud:\t{thick_acc}\t{thick_rec}\t{thick_pre}\t{thick_f1}")

In [None]:
name = ["", "No Cloud", "Less Cloud", "More Cloud", "Full Cloud", "Thin Cloud", "Thick Cloud"]
s = np.asarray([ name, 
                 ["Accuracy"]  + [no_acc, less_acc, more_acc, full_acc, thin_acc, thick_acc], 
                 ["Recall"]    + [no_rec, less_rec, more_rec, full_rec, thin_rec, thick_rec], 
                 ["Precision"] + [no_pre, less_pre, more_pre, full_pre, thin_pre, thick_pre], 
                 ["F1"]        + [no_f1,  less_f1,  more_f1,  full_f1,  thin_f1,  thick_f1]])
np.savetxt(r".\Cloud_Detection\import.csv", s, delimiter=",", fmt='%s')

In [None]:
def get_mask(pred):
    mask_pred = np.zeros([384,384,3])
    mask_pred[:,:,0] = pred[:,0,:,:] * 255
    mask_pred[:,:,1] = pred[:,1,:,:] * 255
    return mask_pred

images = [
    no_cloud_data[16],
    less_cloud_data[26],
    more_cloud_data[26],
    full_cloud_data[23]
]

fig, axs = plt.subplots(3, len(images), figsize=(18, 10))

i = 0
import cv2

for image in images:
    # Retrieve and format results
    display_im = image[0].permute(1, 2, 0).cpu().detach().numpy()
    pred = model(image[0].unsqueeze(0)).cpu().detach().numpy()
    pred = get_mask(pred)
    mask = image[1]
    
    # Display
    axs[0][i].imshow(display_im)
    axs[0][i].set_title('Image')
    axs[1][i].imshow(mask)
    axs[1][i].set_title('Ground Truth')
    axs[2][i].imshow(np.array(pred).astype(int))
    axs[2][i].set_title('Predicted Mask')
    
    # Increment count
    i += 1

fig.tight_layout()
plt.show()