<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"></ul></div>

In [None]:
import cv2
import torch
import matplotlib.pyplot as plt

import multiprocessing as mp

import geopandas as gpd
import rasterio
from shapely import geometry
import numpy as np

from tqdm import tqdm
from skimage.morphology import h_minima, watershed, label

from dh_segment_torch.inference import InferenceModel

from matplotlib.image import imread
from functools import reduce
from shapely.ops import unary_union

In [None]:
DATA = '/dhlabdata4/benali/'
MODELS = DATA + 'models/1848/'
IMAGES = DATA + 'cadaster_1848_test/eval/'
MASKS = IMAGES

IMG_NAME = 'castello_06_crop01'
MODEL_EDGES_NAME = 'model_edges2_n02'
MODEL_CLASSES_NAME = 'model_classes_inv_n01'
#MODEL_FULL_NAME
MASK_EDGES_NAME = IMG_NAME[:-6] + 'mask_edges_2_' + IMG_NAME[-6:] + '.png'
MASK_CLASSES_NAME = IMG_NAME[:-6] + 'mask_classes_' + IMG_NAME[-6:] + '.png'

IMG_FORMAT = '.png'
#TYPE = 'full' # indicates training with classes and edges
TYPE = 'sep' # indicates separate training for classes and edges


IMG = IMAGES + IMG_NAME + IMG_FORMAT
EDGES = MODELS + MODEL_EDGES_NAME + '.pth'
CLASSES = MODELS + MODEL_CLASSES_NAME + '.pth'
#FULL
MASK_EDGES = MASKS + MASK_EDGES_NAME
MASK_CLASSES = MASKS + MASK_CLASSES_NAME

DEVICE = 'cpu'
#DEVICE = 'cuda:7'
SAVE_GEOJSON = False
SAVE_EVAL = False

In [None]:
model_edges = InferenceModel.from_params({
    "model": {
        "encoder": "resnet50",
        "decoder": {
            "decoder_channels": [512, 256, 128, 64, 32],
            "max_channels": 512
        }
    }, # Copier/coller du fichier config
    "num_classes": 2, # A inferer depuis le fichier
    "model_state_dict": EDGES,
    "device": DEVICE, # utiliser cuda:0 (ou /1/2)
    "patch_size": (500,500), # a adapter en fonction
    "patches_batch_size": 8,
    "patches_overlap": 0.2, # entre 0 et 1
    "multilabel": False, #
})

model_classes = InferenceModel.from_params({
    "model": {
        "encoder": "resnet50",
        "decoder": {
            "decoder_channels": [512, 256, 128, 64, 32],
            "max_channels": 512
        }
    },
    "num_classes": 6,
    "model_state_dict": CLASSES,
    "device": DEVICE,
    "patch_size": (500,500),
    "patches_batch_size": 8,
    "patches_overlap": 0,
    "multilabel": False
})

In [None]:
"""model_classes_full = InferenceModel.from_params({
    "model": {
        "encoder": "resnet50",
        "decoder": {
            "decoder_channels": [512, 256, 128, 64, 32],
            "max_channels": 512
        }
    },
    "num_classes": 6,
    "model_state_dict": FULL,
    "device": DEVICE,
    "patch_size": (500,500),
    "patches_batch_size": 8,
    "patches_overlap": 0,
    "multilabel": True
})"""

In [None]:
img = cv2.imread(IMG)[:,:,::-1].copy() # lit l'image en RGB

In [None]:
img_torch = torch.from_numpy(img.transpose(2,0,1)/255).float().unsqueeze(0) # transforme en pytorch

In [None]:
probs_edges = model_edges.predict_patches(img_torch)[0] 

probs_classes = model_classes.predict_patches(img_torch)[0]

In [None]:
#probs_all = model_classes_full.predict_patches(img_torch)[0]

In [None]:
plt.figure(figsize=(10,10))
#plt.imshow(img[:500,0:500])
plt.imshow(img)

In [None]:
plt.figure(figsize=(10,10))
plt.imshow(probs_edges.cpu().numpy()[1], cmap='gray')
#plt.imshow(probs_edges.cpu().numpy()[1]>0.1, cmap='gray')

In [None]:
plt.figure(figsize=(10,10))
plt.imshow(probs_classes.cpu().numpy()[0][1000:,:-500])

In [None]:
"""plt.figure(figsize=(10,10))
plt.imshow(probs_all.cpu().numpy()[0][0:500,0:500], cmap='gray')"""

In [None]:
"""plt.figure(figsize=(10,10))
plt.imshow(probs_all.cpu().numpy()[1:][0:500,0:500], cmap='gray')"""

In [None]:
contours = probs_edges.cpu().numpy()[1]
#contours = probs_all.numpy()[0]

minimas = label(h_minima(contours, 0.1))

watershed_parcels = watershed((255 * contours).astype('int'), minimas)

In [None]:
fig, axes = plt.subplots(ncols=2, figsize=(16, 8), sharex=True, sharey=True)
ax = axes.ravel()

ax[0].imshow(img, cmap=plt.cm.gray)
ax[0].set_title('Cadaster')

ax[1].imshow(watershed_parcels, cmap=plt.cm.nipy_spectral)
ax[1].set_title('Watershed parcels')

In [None]:
idx2class = dict(enumerate(['background', 'street', 'water', 'church', 'courtyard', 'building']))
num_parcels= np.unique(watershed_parcels)

In [None]:
from affine import Affine

if (IMG_FORMAT == '.tif'):
    transform = rasterio.open(IMG).transform
else:
    geotransform = (0, 1, 0.0, 0, 0, -1)
    transform = Affine.from_gdal(*geotransform)

In [None]:
def num_parcel2res(parcel_idx, probs_classes=probs_classes, watershed_parcels=watershed_parcels):
    mask_parcel = watershed_parcels == parcel_idx
    class_idx = np.bincount(probs_classes.cpu()[0:, mask_parcel].argmax(axis=0)).argmax()
    #class_idx = np.bincount(probs_all.cpu()[1:, mask_parcel].argmax(axis=0)).argmax()
    contours, hierarchy = cv2.findContours(mask_parcel.astype('uint8').copy(), cv2.RETR_CCOMP, cv2.CHAIN_APPROX_SIMPLE)

    poly1 = (cv2.approxPolyDP(contours[0], 1, closed=True)[:,0,:]).tolist()
    poly1.append(poly1[0])
    poly1 = [transform*x for x in poly1]
    holes = []
    for h in contours[1:]:
        poly2 = (cv2.approxPolyDP(h, 1, closed=True)[:,0,:]).tolist()
        poly2.append(poly2[0])
        poly2 = [transform*x for x in poly2]
        holes.append(poly2)
    poly = geometry.Polygon(poly1, holes=holes)
    return poly, idx2class[class_idx]

In [None]:
results = []

for idx in tqdm(num_parcels):
    results.append(num_parcel2res(idx))
    
all_polys = [x[0] for x in results]
all_classes = [x[1] for x in results]

all_polys = gpd.GeoSeries(all_polys)
all_polys.crs = 'EPSG:3004'

In [None]:
all_polys.plot(figsize=(10,10))

In [None]:
geodata = all_polys.to_frame('geometry')
geodata['class'] = all_classes

In [None]:
palette = ['#000000', '#FFFF00','#00FFFF','#FF0000','#00FF00','#FF00FF']
fig, ax = plt.subplots(figsize=(10,10))
for idx, label_ in idx2class.items():
    geodata.loc[geodata['class'] == label_]['geometry'].plot(color=palette[idx], label=label_, ax=ax)

In [None]:
if SAVE_GEOJSON:
    if TYPE == 'sep':
        geodata.to_file(MODELS + 'geojsons/' + IMG_NAME + MODEL_EDGES_NAME + MODEL_CLASSES_NAME + '.geojson',
                        driver='GeoJSON')
    elif TYPE == 'full':
        geodata.to_file(MODELS + 'geojsons/' + IMG_NAME + MODEL_FULL_NAME + '.geojson', driver='GeoJSON')

In [None]:
vect = np.vectorize(lambda x: int(round(x) * 255))
im_classes = vect(imread(MASK_CLASSES)[:,:,:3])
im_edges = vect(imread(MASK_EDGES)[:,:,:3])

In [None]:
def hex_to_dec(s):
    return int(s, 16)

def hex_to_rgb(h):
    return np.array(list(map(hex_to_dec, (h[1:3], h[3:5], h[5:7]))))

rgb_palette = [hex_to_rgb(x) for x in palette]

def mask_to_probabilities(class_mask, palette=rgb_palette):
    probas = np.zeros((len(palette), class_mask.shape[0], class_mask.shape[1]))
    for i, color in enumerate(palette):
        probas[i] = (class_mask == color).all(axis=2).astype(int)
    return probas
        
def eval_classes(pred, real):
    ious = []
    weights = []
    for label_ in idx2class.values():
        pred_geometries = pred.loc[pred['class'] == label_]['geometry']
        real_geometries = real.loc[real['class'] == label_]['geometry']
        pred_union = unary_union(pred_geometries)
        real_union = unary_union(real_geometries)
        union_area = unary_union([pred_union, real_union]).area
        if union_area:
            ious.append(round(pred_union.intersection(real_union).area / union_area, 3))
            weights.append(real_union.area)
        else:
            ious.append(None)
    ious_clean = indices = [x for x in ious if x is not None]
    miou = round(np.mean(ious_clean), 3)
    wmiou = round(np.average(np.array(ious_clean), weights=np.array(weights)), 3)
    return dict(zip(idx2class.values(), ious)), miou, wmiou

def eval_edges(pred, real):
    pred_1s = np.vectorize(lambda e: int(e >= 0.1))(pred[1].numpy()) == 1
    real_1s = real == 1
    union = np.sum(np.logical_or(pred_1s, real_1s))
    real_count = np.sum(real_1s)
    if (union and real_count):
        iou = round(np.sum(np.logical_and(pred_1s, real_1s)) / union, 3)
        ratio = round(np.sum(np.logical_and(pred_1s, real_1s)) / real_count, 3)
    elif union:
        iou = round(np.sum(np.logical_and(pred_1s, real_1s)) / union, 3)
        ratio = None
    elif real_count:
        iou = None
        ratio = round(np.sum(np.logical_and(pred_1s, real_1s)) / real_count, 3)
    else:
        iou = None
        ratio = None
    return iou, ratio

In [None]:
edges_real = (im_edges != rgb_palette[0]).all(axis=2).astype(int)
minimas_real = label(h_minima(edges_real, 0.5))

watershed_parcels_real = watershed(vect(edges_real), minimas_real)

In [None]:
num_parcels_real = np.unique(watershed_parcels_real)
results_real = []
probs_classes_real = mask_to_probabilities(im_classes)
for idx in tqdm(num_parcels_real):
    results_real.append(num_parcel2res(idx, probs_classes_real, watershed_parcels_real))
    
all_polys_real = [x[0] for x in results_real]
all_classes_real = [x[1] for x in results_real]

all_polys_real = gpd.GeoSeries(all_polys_real)
all_polys_real.crs = 'EPSG:3004'

geodata_real = all_polys_real.to_frame('geometry')
geodata_real['class'] = all_classes_real

In [None]:
from json import dumps
def print_eval(write_on=False):
    ious, miou, wmiou = eval_classes(geodata, geodata_real)
    iou, ratio = eval_edges(probs_edges, edges_real)
    ious_str = f'IOU per class:\n{dumps(ious, indent=2)}\n'
    miou_str = f'MIOU = {miou}\n'
    wmiou_str = f'Weighted MIOU = {wmiou} (weights = real area of class)\n'
    edges_str = f'IOU for edges = {iou}\nRatio of correctly detected  edges = {ratio}\n'
    if write_on:
        res_file = open(DATA + 'model_eval.txt', 'a')
        res_file.write('\n========================================\n\n')
        if (TYPE=='sep'):
            title=f'Results for {IMG_NAME} with edge model {MODEL_EDGES_NAME} and class model {MODEL_CLASSES_NAME}\n'
        elif (TYPE=='full'):
            title = f'Results for {IMG_NAME} with model {MODEL_FULL_NAME}\n'
        res_file.write(title)
        res_file.write('Evalutation for classes:\n')
        res_file.write(ious_str)
        res_file.write(miou_str)
        res_file.write(wmiou_str)
        if (TYPE=='sep'):
            res_file.write('\nEvalutation for edges:\n')
            res_file.write(edges_str)
        res_file.close()
    print(ious_str, miou_str, wmiou_str)
    if (TYPE=='sep'):
        print(edges_str)
print_eval(SAVE_EVAL)