In [None]:
import logging
import os

import numpy as np
from numba import njit, prange
import xarray as xr
import rasterio
import otbApplication
import shareloc

from cars.applications.resampling.resampling_tools import resample_image

logging.basicConfig(level=logging.INFO)

from shareloc.geomodels.rpc import RPC
from shareloc.geofunctions.triangulation import epipolar_triangulation

from affine import Affine

In [None]:
gt_path = "/work/CAMPUS/etudes/3D/Development/malinoro/Glacier/DEM"
dem_name = "peyto_20160913_1x1_medfilt-median-DEM.tif"

image_path = "/work/CAMPUS/etudes/3D/Development/malinoro/Glacier/Images"
left_image = os.path.join(image_path, "Peyto/201609131912075_P_001/IMG_PHR1B_P_001/IMG_PHR1B_P_201609131912075_SEN_1966489101-001.tif")
right_image = os.path.join(image_path, "Peyto/201609131912371_P_002/IMG_PHR1B_P_002/IMG_PHR1B_P_201609131912371_SEN_1966489101-002.tif")

left_geom = os.path.join(image_path, "Peyto/201609131912075_P_001/IMG_PHR1B_P_001/RPC_PHR1B_P_201609131912075_SEN_1966489101-001.XML")
right_geom = os.path.join(image_path, "Peyto/201609131912371_P_002/IMG_PHR1B_P_002/RPC_PHR1B_P_201609131912371_SEN_1966489101-002.XML")


output_path = "/work/CAMPUS/users/malinoro/outputs/Beefrost_emulation/Peyto/"
output_left_name = "peyto_20160913_1x1_medfilt-median-DEM_superimpose_left.tif"
output_right_name = "peyto_20160913_1x1_medfilt-median-DEM_superimpose_right.tif"

grid_left_path = "/work/CAMPUS/users/malinoro/src/cars_master/cars/output/one_two/left_epi_grid.tif"
epipolar_left_path = "/work/CAMPUS/users/malinoro/src/cars_master/cars/output/one_two/epi_img_left.tif"

grid_right_path = "/work/CAMPUS/users/malinoro/src/cars_master/cars/output/one_two/right_epi_grid.tif"
epipolar_right_path = "/work/CAMPUS/users/malinoro/src/cars_master/cars/output/one_two/epi_img_right.tif"

# Use OTB Superimpose to convert the DSM in the same geometry as input images

In [None]:
# Left
app = otbApplication.Registry.CreateApplication("Superimpose")

app.SetParameterString("inr", left_image)
app.SetParameterString("inm", os.path.join(gt_path, dem_name))
app.SetParameterString("out", os.path.join(output_path, output_left_name))

app.ExecuteAndWriteOutput()

In [None]:
app = otbApplication.Registry.CreateApplication("Superimpose")

app.SetParameterString("inr", os.path.join(right_image))
app.SetParameterString("inm", os.path.join(gt_path, dem_name))
app.SetParameterString("out", os.path.join(output_path, output_right_name))

app.ExecuteAndWriteOutput()

# Resample image

In [None]:
no_data = 0

margin = 100 # in meters

In [None]:
DSM = rasterio.open(os.path.join(gt_path, dem_name))
dsm = DSM.read(1)
dsm[DSM.meta["nodata"]==dsm] = np.nan
dsm[DSM.meta["nodata"]==dsm] = np.nan

alt_min, alt_max = np.nanmin(dsm), np.nanmax(dsm)
del dsm, DSM

### Left

In [None]:
epi_img_left = rasterio.open(epipolar_left_path)
largest_size = [epi_img_left.meta["width"], epi_img_left.meta["height"]]

In [None]:
resample_xr = resample_image(
    img=os.path.join(output_path, output_left_name),
    grid=grid_left_path,
    largest_size=largest_size,
    nodata=no_data,
)

In [None]:
data = resample_xr.im.data
data[resample_xr.msk.data==255] = no_data
data[(data<alt_min-margin) | (data>alt_max+margin)] = no_data
data[~np.isfinite(data)] = no_data

with rasterio.Env():
    profile = epi_img_left.profile
    # And then change the band count to 1, set the
    # dtype to uint8, and specify LZW compression.
    profile.update(
        dtype=rasterio.float32)
    with rasterio.open(os.path.join(output_path, "epipolar_dsm_left.tif"), 'w', **profile) as dst:
        dst.write(data.astype(rasterio.float32), 1)

#### Right

In [None]:
epi_img_right = rasterio.open(epipolar_right_path)
largest_size = [epi_img_right.meta["width"], epi_img_right.meta["height"]]

In [None]:
resample_xr = resample_image(
    img=os.path.join(output_path, output_right_name),
    grid=grid_right_path,
    largest_size=largest_size,
    nodata=no_data,
)

In [None]:
data = resample_xr.im.data
data[resample_xr.msk.data==255] = no_data
data[(data<alt_min-margin) | (data>alt_max+margin)] = no_data
data[~np.isfinite(data)] = no_data

with rasterio.Env():
    profile = epi_img_right.profile
    # And then change the band count to 1, set the
    # dtype to uint8, and specify LZW compression.
    profile.update(
        dtype=rasterio.float32)
    with rasterio.open(os.path.join(output_path, "epipolar_dsm_right.tif"), 'w', **profile) as dst:
        dst.write(data.astype(rasterio.float32), 1)

# Code from Beefrost

In [None]:
@njit(parallel=True)
def expand_grid(valid_pixel_grid_row, valid_pixel_grid_col, bias, bias_2_dim, tile_size):
    valid_row_inf = np.maximum((valid_pixel_grid_row-int(tile_size/2)), np.zeros((valid_pixel_grid_row.size,), dtype=np.int64)) 
    valid_row_sup = np.minimum((valid_pixel_grid_row+int(tile_size/2)), np.full((valid_pixel_grid_row.size,), bias_2_dim.shape[0], dtype=np.int64))
    valid_col_inf = np.maximum((valid_pixel_grid_col-int(tile_size/2)), np.zeros((valid_pixel_grid_col.size,), dtype=np.int64))
    valid_col_sup = np.minimum((valid_pixel_grid_col+int(tile_size/2)), np.full((valid_pixel_grid_col.size,), bias_2_dim.shape[1], dtype=np.int64))
    for i in prange(bias.size):
        bias_2_dim[valid_row_inf[i]:valid_row_sup[i], valid_col_inf[i]:valid_col_sup[i]] = bias[i]
    return bias_2_dim

def get_altitude_bias(geom1, grid1, geom2, grid2, epipolar_size_x, epipolar_size_y, mask):
    """
    Get disparity / altitude bias and ratio

    :param geom1: path to geom left file
    :type geom1: string
    :param grid1: path to left epipolar grid
    :type grid1: string
    :param geom2: path to geom right file
    :type geom2: string
    :param grid2: path to right epipolar grid
    :type grid2: string
    :param epipolar_size_x: Size of stereo-rectified images in x
    :type epipolar_size_x: int
    :param epipolar_size_y: Size of stereo-rectified images in y
    :type epipolar_size_y: int
    :param epipolar_img: path to epipolar_img with nodata in its metadata
    :type epipolar_img: string
    :return: median ratio, ratio, median bias, bias, output statistics
    :rtype: Tuple(float, 2D np.array, float, 2D np.array, dict)
    """
    # If mask is none, compute triangulation over all pixels
    if mask is None:
        cord = np.mgrid[0.0:epipolar_size_y:, 0.0:epipolar_size_x:]
        # Triangulation with disp = 0
        matches_disp_0 = np.stack(
            (cord[1, :, :].flatten(), cord[0, :, :].flatten(), cord[1, :, :].flatten(), cord[0, :, :].flatten()),
            axis=1,
        )
        # Triangulation with disp = 1
        matches_disp_1 = np.stack(
            (cord[1, :, :].flatten(), cord[0, :, :].flatten(), cord[1, :, :].flatten() + 1, cord[0, :, :].flatten()),
            axis=1,
        )
    # If mask is not none, compute triangulation over valid pixels
    else:
        data_mask = rio.open(mask).read(1)
        valid_pixel = np.where(data_mask == 0)

        matches_disp_0 = np.zeros((len(valid_pixel[0]), 4))
        matches_disp_0[:, 0] = valid_pixel[1]
        matches_disp_0[:, 2] = valid_pixel[1]
        matches_disp_0[:, 1] = valid_pixel[0]
        matches_disp_0[:, 3] = valid_pixel[0]

        matches_disp_1 = np.copy(matches_disp_0)
        matches_disp_1[:, 2] += 1

    logging.info("Reading geomodels")
    model1 = RPC.load(geom1)
    model2 = RPC.load(geom2)
    logging.info("Starting triangulation")
    logging.info(f"Shape of matches: {matches_disp_0.shape}")

    _, llh_disp_0, _ = epipolar_triangulation(matches_disp_0, None, "sift", model1, model2, grid1, grid2)
    _, llh_disp_1, _ = epipolar_triangulation(matches_disp_1, None, "sift", model1, model2, grid1, grid2)
    logging.info("Triangulation done")

    bias = llh_disp_0[:, 2]
    bias_median = np.median(bias)
    if mask is None:
        bias_2_dim = np.reshape(bias, (epipolar_size_y, epipolar_size_x))
    else:
        bias_2_dim = np.zeros((epipolar_size_y, epipolar_size_x), dtype=np.float64)
        bias_2_dim[valid_pixel] = bias

    coord_without_last_column = matches_disp_1[:, 0] != (epipolar_size_x - 1)
    ratio = llh_disp_1[coord_without_last_column][:, 2] - llh_disp_0[coord_without_last_column][:, 2]
    ratio_median = np.median(ratio)
    
    if mask is None:
        ratio_2_dim = np.reshape(ratio, (epipolar_size_y, (epipolar_size_x - 1)))
        # Repeat the last column
        ratio_2_dim = np.append(ratio_2_dim, np.tile(ratio_2_dim[:, [-1]], 1), axis=1)
    else:
        ratio_2_dim = np.zeros((epipolar_size_y, epipolar_size_x), dtype=np.float64)
        valid_not_last_column = valid_pixel[1] != (epipolar_size_x - 1)

        # Last column is valid in the mask
        if np.sum(valid_not_last_column == 0) != 0:
            ratio_2_dim[valid_pixel[0][valid_not_last_column], valid_pixel[1][valid_not_last_column]] = ratio
            # Repeat the last column
            row_to_repeat = valid_pixel[0][valid_not_last_column is False]
            col_to_repeat = valid_pixel[1][valid_not_last_column is False]
            ratio_2_dim[row_to_repeat, col_to_repeat] = ratio_2_dim[row_to_repeat, col_to_repeat - 1]
        else:
            ratio_2_dim[valid_pixel] = ratio



    logging.info(
        "Biais disparite/altitude (hauteur correspondant a disparite nulle) en m : min = {:.3f}, "
        "max = {:.3f}, median = {:.3f}, mean = {:.3f} , standard deviation = {:.3f}  ".format(
            np.min(bias),
            np.max(bias),
            np.median(bias),
            np.mean(bias),
            np.std(bias),
        )
    )
    logging.info(
        "Ratio disparite/altitude (equivalent au disp_to_alt_ratio de CARS) en m/pix : min = {:.3f}, "
        "max = {:.3f}, median = {:.3f}, mean = {:.3f} , standard deviation = {:.3f}  ".format(
            np.min(ratio),
            np.max(ratio),
            np.median(ratio),
            np.mean(ratio),
            np.std(ratio),
        )
    )

    # out statistics
    out_stats = {}
    out_stats["bias_disparity_altitude"] = {
        "min": np.min(bias),
        "max": np.max(bias),
        "median": np.median(bias),
        "mean": np.mean(bias),
        "std": np.std(bias),
    }
    out_stats["ratio_disparity_altitude"] = {
        "min": np.min(ratio),
        "max": np.max(ratio),
        "median": np.median(ratio),
        "mean": np.mean(ratio),
        "std": np.std(ratio),
    }

    return ratio_median, ratio_2_dim, bias_median, bias_2_dim, out_stats

def write_image(image, fname, origin, spacing):
    """
    Write an epipolar resampling image to file

    :param image: the image to write
    :type image: 3D numpy array
    :param fname: the filename to which the grid will be written
    :type fname: string
    :param origin: origin of the grid
    :type origin: (float, float)
    :param spacing: spacing of the grid
    :type spacing: (float, float)
    """
    geotransform = (origin[0] - 0.5 * spacing[0], spacing[0], 0.0, origin[1] - 0.5 * spacing[1], 0.0, spacing[1])

    transform = Affine.from_gdal(*geotransform)

    if len(image.shape) == 3:
        band_nb = image.shape[2]
        with rasterio.open(
            fname,
            "w",
            height=image.shape[0],
            width=image.shape[1],
            count=band_nb,
            driver="GTiff",
            dtype=image.dtype,
            transform=transform,
        ) as dst:
            for index in range(band_nb):
                dst.write_band(index + 1, image[:, :, index])
    else:
        with rasterio.open(
            fname,
            "w",
            height=image.shape[0],
            width=image.shape[1],
            count=1,
            driver="GTiff",
            dtype=image.dtype,
            transform=transform,
        ) as dst:
            dst.write_band(1, image[:, :])

def conversion_lidar_to_disp(lidar, alt_to_disp_ratio, alt_to_disp_bias, output_disparity, epi_img=None):
    """
    Convert lidar value to disparity

    :param lidar: path to the input lidar
    :param lidar: string
    :param alt_to_disp_ratio: altitude to disparity ratio
    :param alt_to_disp_ratio: float
    :param alt_to_disp_bias: path to altitude to disparity bias
    :param alt_to_disp_bias: string
    :param output_disparity: path to output disparity map
    :type output_disparity: string
    :param epi_img: path to epipolar image (metadata has "nodata") 
    :param epi_img: string
    """
    lidar_data = rasterio.open(lidar).read(1)
    bias_data = rasterio.open(alt_to_disp_bias).read(1)

    if epi_img is not None:
        IMG = rasterio.open(epi_img)
        valid_data = (IMG.read(1) != IMG.meta["nodata"])
    else:
        # If epi_img is not provided, all pixels are considered valid
        valid_data = np.ones(lidar_data.shape, dtype=np.bool_)

    # Convert lidar to disparity : disparity = (altitude - bias ) / ratio
    disp_data = (valid_data) * (lidar_data - bias_data) / alt_to_disp_ratio
    disp_data = disp_data.astype("float64")

    write_image(disp_data, output_disparity, [0.0, 0.0], [1.0, 1.0])

def cross_checking(left_disp, epi_img_left, right_disp, epi_img_right, threshold=1.0):
    """
    Apply cross checking on disparity map

    :param left_disp: Path to left disparity
    :type left_disp: string
    :param epi_img_left: Path to left epipolar image (with "nodata" in meta)
    :type epi_img_left: string
    :param right_disp: Path to right disparity
    :type right_disp: string
    :param epi_img_right: Path to right epipolar image (with "nodata" in meta)
    :type epi_img_right: string
    :param threshold: threshold
    :type threshold: int
    :return: cross checking, percentage of invalid disparity, percentage of disparity invalidated by cross checking
    :rtype: Tuples(2D np.array, float, float)
    """
    # Read inputs
    left_disp = rasterio.open(left_disp).read(1)
    left_mask = rasterio.open(left_mask).read(1)
    right_disp = rasterio.open(right_disp).read(1)
    right_mask = rasterio.open(right_mask).read(1)

    # Value of invalid disparity in mask files
    invalid_flag = 255
    nb_row, nb_col = left_disp.shape
    invalid_disp = np.zeros((nb_row, nb_col), dtype=float)

    # Percentage of left disp map invalid points
    nb_pixels = left_disp.shape[0] * left_disp.shape[1]
    pct_left_disp_map_invalid_mask = 0.0
    if left_mask is not None:
        left_disp_map_invalid_by_mask = len(np.where(left_mask == invalid_flag)[0])
        pct_left_disp_map_invalid_mask = (left_disp_map_invalid_by_mask / float(nb_pixels)) * 100.0

    for row in range(0, nb_row):
        col_ref = np.arange(nb_col, dtype=np.int64)

        col_sec = col_ref + left_disp[row, col_ref]
        col_sec = np.rint(col_sec).astype(int)

        # Left-Right consistency, for pixel i :
        # If | Disp_sec(i + round(Disp_ref(i)) + Disp_ref(i) | > threshold : i is invalid, mismatched or
        # occlusion detected
        # If | Disp_sec(i + round(Disp_ref(i)) + Disp_ref(i) | <= threshold : i is valid

        # Apply cross checking on pixels i + round(Disp_ref(i) inside the secondary image
        inside_sec = np.where((col_sec >= 0) & (col_sec < nb_col))

        invalid = np.abs(right_disp[row, col_sec[inside_sec]] + left_disp[row, col_ref[inside_sec]]) > threshold
        if left_mask is not None:
            invalid = np.logical_or(invalid, left_mask[row, col_ref[inside_sec]])
        if right_mask is not None:
            invalid = np.logical_or(invalid, right_mask[row, col_sec[inside_sec]])

        # There is no distinction between mismatched and occlusion
        invalid_disp[row, col_ref[inside_sec][invalid]] = invalid_flag

        # Pixels i + round(Disp_ref(i) outside the secondary image are invalid : mismatched or occlusion detected
        outside_sec = np.where((col_sec < 0) & (col_sec >= nb_col))
        invalid_disp[row, col_ref[outside_sec]] = invalid_flag

    #  Percentage of points invalidated by cross checking and by the masks
    invalid_mask_cross = len(np.where(invalid_disp == invalid_flag)[0])
    pct_invalid_mask_cross = (invalid_mask_cross / float(nb_pixels)) * 100.0
    return invalid_disp, pct_left_disp_map_invalid_mask, pct_invalid_mask_cross



In [None]:
def maps_generation(left_epi_img, right_epi_img, left_geom, right_geom, left_grid, right_grid, wrk_dir):
    """
    Resample maps in epipolar geometry, compute disparity map

    :param left_epi_img: Left epipolar image path
    :param right_epi_img: Right epipolar image path
    :param left_geom: Left geometry path
    :param right_geom: Right geometry path
    :param left_grid: Left epipolar grid path
    :param right_grid: Right epipolar grid path
    :param wrk_dir: Working directory
    :return: output statistics
    :rtype: dict
    """
    out_stats_create_disp = {}
    
    epi_img = rasterio.open(left_epi_img)
    epi_size_x, epi_size_y = epi_img.meta["width"], epi_img.meta["height"]
    
    (
        ratio_median_left,
        ratio_left,
        _,
        bias_left,
        out_stats_create_disp["left_altitude_bias"],
    ) = get_altitude_bias(left_geom, left_grid, right_geom, right_grid, epi_size_x, epi_size_y, left_epi_img)
    logging.info("Writing left ratio disparity/altitude")
    out_left_ratio = os.path.join(wrk_dir, "left_ratio_disparity_altitude.tif")
    write_image(ratio_left, out_left_ratio, [0.0, 0.0], [1.0, 1.0])
    logging.info("Writing left bias disparity/altitude")
    out_left_bias = os.path.join(wrk_dir, "left_bias_disparity_altitude.tif")
    write_image(bias_left, out_left_bias, [0.0, 0.0], [1.0, 1.0])

    (
        ratio_median_right,
        ratio_right,
        _,
        bias_right,
        out_stats_create_disp["right_altitude_bias"],
    ) = get_altitude_bias(right_geom, right_grid, left_geom, left_grid, epi_size_x, epi_size_y, right_epi_img)
    logging.info("Writing right ratio disparity/altitude")
    out_right_ratio = os.path.join(wrk_dir, "right_ratio_disparity_altitude.tif")
    write_image(ratio_right, out_right_ratio, [0.0, 0.0], [1.0, 1.0])
    logging.info("Writing right bias disparity/altitude")
    out_right_bias = os.path.join(wrk_dir, "right_bias_disparity_altitude.tif")
    write_image(bias_right, out_right_bias, [0.0, 0.0], [1.0, 1.0])

    # conversion alt to disp
    left_disp = os.path.join(wrk_dir, "left_epipolar_disp_gt.tif")
    left_lidar_epi = os.path.join(wrk_dir, "epipolar_dsm_left.tif")
    conversion_lidar_to_disp(left_lidar_epi, ratio_median_left, out_left_bias, left_disp, left_epi_img)

    right_disp = os.path.join(wrk_dir, "right_epipolar_disp_gt.tif")
    right_lidar_epi = os.path.join(wrk_dir, "epipolar_dsm_right.tif")
    conversion_lidar_to_disp(right_lidar_epi, ratio_median_right, out_right_bias, right_disp, right_epi_img)

    # Compute cross checking
    (
        invalid_disp_mask,
        out_stats_create_disp["%_left_disp_map_invalid_by_mask"],
        out_stats_create_disp["%left_disp_map_invalid_by_mask_and_cross_checking"],
    ) = cross_checking(left_disp, left_epi_img, right_disp, right_epi_img, threshold=1.0)
    disp_ar_filename = os.path.join(wrk_dir, "valid_disp.tif")
    write_image(invalid_disp_mask, disp_ar_filename, [0.0, 0.0], [1.0, 1.0])

    return out_stats_create_disp


In [None]:
maps_generation(epipolar_left_path, epipolar_right_path, left_geom, right_geom, grid_left_path, grid_right_path, output_path)

In [None]:
left_epi_img, right_epi_img, left_geom, right_geom, left_grid, right_grid, wrk_dir = epipolar_left_path, epipolar_right_path, left_geom, right_geom, grid_left_path, grid_right_path, output_path
out_stats_create_disp = {}

left_disp = os.path.join(wrk_dir, "left_epipolar_disp_gt.tif")
right_disp = os.path.join(wrk_dir, "right_epipolar_disp_gt.tif")

# Compute cross checking
(
    invalid_disp_mask,
    out_stats_create_disp["%_left_disp_map_invalid_by_mask"],
    out_stats_create_disp["%left_disp_map_invalid_by_mask_and_cross_checking"],
) = cross_checking(left_disp, left_epi_img, right_disp, right_epi_img, threshold=1.0)
disp_ar_filename = os.path.join(wrk_dir, "valid_disp.tif")
write_image(invalid_disp_mask, disp_ar_filename, [0.0, 0.0], [1.0, 1.0])