Directional Cubic Convolution Interpolation (DCCI) for DEM processing - Upscale factor 2
====================================
Edge-directed image upsampling for Digital Elevation Models (DEM) based on a Organa HQX implementation [GitHub](https://github.com/revent-studio/organa_hqx).

An implementation of the Directional Cubic Convolution Interpolation proposed in:

[D. Zhou1, X. Shen & W. Dong. "Image zooming using directional cubic convolution interpolation." ET Image Process., 2012, Vol. 6, Iss. 6, pp. 627–634](https://www.researchgate.net/publication/259827607_Image_zooming_using_directional_cubic_convolution_interpolation)

This Notebook is running on Google Colab with data storage on Google Drive. Of course this could be adapted to local resources.

For background information, test data and results visit: [https://opendem.info/dcci.html](https://opendem.info/dcci.html).


# Preparations

In [None]:
import os
# connect with google drive  
from google.colab import drive
drive.mount("/content/drive")

Get rasterio for GeoTiff processing

In [None]:
!pip install rasterio

Slightly adapted base class from https://github.com/revent-studio/organa_hqx

In [3]:
from enum import Enum
import numpy as np
import cv2
import rasterio

class DiagClassification(Enum):
    UP_RIGHT   = 1
    DOWN_RIGHT = 2
    SMOOTH     = 3

class OrthClassification(Enum):
    HORIZONTAL = 1
    VERTICAL   = 2
    SMOOTH     = 3

# Threshold = T 
# Exponent = k

def Dcci(img, N=1, T=620, k=1):
    return dcciN(img, N, T, k)


def dcciN(img, N, T, k):
    for i in range(0,N):
        img = dccix2(img,T,k)
    return img

def dccix2(img, T, k):
    img2 = np.float64(np.swapaxes(img, 0, 1))
    lx, ly = img2.shape
    imgInterp = np.zeros((lx*2-1, ly*2-1))
    imgInterp[::2,::2] = img2
    imgInterp = interpDiag(img2, imgInterp, T, k)
    imgInterp = interpOrth(imgInterp, T, k)
    # prevent the subpixel shift, zeros were added at the beginning and top of the array
    imgInterp = np.insert(imgInterp, 0, 0, 0)
    imgInterp = np.insert(imgInterp, 0, 0, 1)
    return np.swapaxes(imgInterp, 0, 1)
    

def interpDiag(original,img, T, k):
    """
    Input:  The 2x image with black space padding each of the given pixels
    Output: The same image with the diagonal non-edge pixels interpolated
    """
    lx, ly = img.shape
    imgPadded = np.zeros((lx+4,ly+4))
    imgPadded[2:-2,2:-2] = img
    # d1 = upright direction
    # d2 = downright direction
    # Think about each point in d1 and d2 as the (x,y) of the points diagonally between
    # each of the pixels being differenced (thus a 4x4 space would become a 3x3 space of 
    # the pixels diagonal from each 4 surrounding pixels of the original image)
    d1 = np.abs(original[1:,:-1] - original[:-1,1:])
    d2 = np.abs(original[1:,1:] - original[:-1,:-1])
    d1 = cv2.copyMakeBorder(d1, 1,1,1,1, cv2.BORDER_CONSTANT,value=0)
    d2 = cv2.copyMakeBorder(d2, 1,1,1,1, cv2.BORDER_CONSTANT,value=0)
    # Center at the point to be interpolated, (x,y)
    for x in range(3, lx+1, 2):
        for y in range(3, ly+1, 2):             
            rx, ry = x-2, y-2
            dx, dy = (rx+1)//2, (ry+1)//2
            s4x4 = imgPadded[x-3:x+4:2,y-3:y+4:2] # 4x4 of the original image
            d1k = d1[dx-1:dx+2, dy-1:dy+2] # 3x3 region of differences around x,y
            d2k = d2[dx-1:dx+2, dy-1:dy+2]
            d1s = np.sum(d1k)
            d2s = np.sum(d2k)
            diagClass = DiagClassification.SMOOTH
            if 100*(1+d1s) > T * (1+d2s):
                diagClass = DiagClassification.UP_RIGHT
            elif 100*(1+d2s) > T * (1+d1s):
                diagClass = DiagClassification.DOWN_RIGHT

            if diagClass == DiagClassification.UP_RIGHT:
                imgPadded[x,y] = upRight(s4x4)
            elif diagClass == DiagClassification.DOWN_RIGHT:
                imgPadded[x,y] = downRight(s4x4)
            else:
                imgPadded[x,y] = diagSmooth(s4x4, d1s, d2s, k)
    return imgPadded[2:-2,2:-2]

def interpOrth(img, T, k):
    PAD = 4
    lx, ly = img.shape
    imgPadded = np.zeros((lx+2*PAD,ly+2*PAD))
    imgPadded[PAD:-PAD,PAD:-PAD] = img
    # Each (x,y) is the uninterpolated pixel between the diff. It works out that
    # The [unused] diffs surrounding original or diagonally interpolated pixels
    # is equal to 0
    d1 = np.abs(imgPadded[(PAD-1):-(PAD+1),PAD:-PAD] - imgPadded[(PAD+1):-(PAD-1),PAD:-PAD])
    d2 = np.abs(imgPadded[PAD:-PAD, (PAD-1):-(PAD+1)] - imgPadded[PAD:-PAD, (PAD+1):-(PAD-1)])
    d1 = cv2.copyMakeBorder(d1, PAD,PAD,PAD,PAD, cv2.BORDER_CONSTANT,value=0)
    d2 = cv2.copyMakeBorder(d2, PAD,PAD,PAD,PAD, cv2.BORDER_CONSTANT,value=0)

    #    First Loop
    #   o x o x o x o
    #   . o . o . o .
    #   o x o x o x o
    #   . o . o . o .
    #   o x o x o x o
    #   . o . o . o .
    #   o x o x o x o
    #
    #    2nd Loop
    #   o . o . o . o
    #   x o x o x o .
    #   o . o . o . o
    #   x o x o x o .
    #   o . o . o . o
    #   x o x o x o .
    #   o . o . o . o

    # Center at the point to be interpolated, (x,y), as well as (x-1, y+1)
    for x in range(PAD+1, lx+PAD, 2):
        for y in range(PAD, ly+PAD, 2): 
            s7x7 = imgPadded[x-3:x+4,y-3:y+4]
            d1s = d1[x,y] + d1[x+2,y] + d1[x-2,y] + d1[x,y+2] + d1[x,y-2]
            d2s = d2[x,y] + d2[x+2,y] + d2[x-2,y] + d2[x,y+2] + d2[x,y-2]
            # Match classification and interpolate for x,y
            orthClass = OrthClassification.SMOOTH
            if  (100 * (1 + d1s) > T * (1 + d2s)):
                orthClass = OrthClassification.HORIZONTAL
            elif ( 100 * (1 + d2s) > T * (1 + d1s)):
                orthClass = OrthClassification.VERTICAL
            if orthClass == OrthClassification.HORIZONTAL:
                imgPadded[x,y] = horizontal(s7x7)
            elif orthClass == OrthClassification.VERTICAL:
                imgPadded[x,y] = vertical(s7x7)
            else:
                imgPadded[x,y] = orthSmooth(s7x7, d1s, d2s, k)
    for x in range(PAD, lx+PAD, 2):
        for y in range(PAD+1, ly+PAD, 2): 
            s7x7 = imgPadded[x-3:x+4,y-3:y+4]
            d1s = d1[x,y] + d1[x+2,y] + d1[x-2,y] + d1[x,y+2] + d1[x,y-2]
            d2s = d2[x,y] + d2[x+2,y] + d2[x-2,y] + d2[x,y+2] + d2[x,y-2]
            orthClass = OrthClassification.SMOOTH
            if  (100 * (1 + d1s) > T * (1 + d2s)):
                orthClass = OrthClassification.HORIZONTAL
            elif ( 100 * (1 + d2s) > T * (1 + d1s)):
                orthClass = OrthClassification.VERTICAL
            if orthClass == OrthClassification.HORIZONTAL:
                imgPadded[x,y] = horizontal(s7x7)
            elif orthClass == OrthClassification.VERTICAL:
                imgPadded[x,y] = vertical(s7x7)
            else:
                imgPadded[x,y] = orthSmooth(s7x7, d1s, d2s, k)
    return imgPadded[PAD:-PAD,PAD:-PAD]

def upRight(s):
    """
    Input:  4x4 area
    Output: 7x7 interpolated area (Only diagonals used)
    """
    op = (-1 * s[0,0] + 9 * s[1,1] + 9 * s[2,2] + (-1)*s[3,3]) / 16
    return op

def downRight(s):
    """
    Input:  4x4 area
    Output: 7x7 interpolated area (Only diagonals used)
    """
    op = ((-1)*s[3,0] + 9 * s[1,2] + 9* s[2,1] + (-1)*s[0,3] ) / 16
    return op

def diagSmooth(s,d1,d2, k):
    """
    Input:  4x4 area
    Output: 7x7 interpolated area (Only diagonals used)
    """
    w1 = 1 / (1 + d1**k)
    w2 = 1 / (1 + d2**k)
    weight1 = w1 / (w1 + w2)
    weight2 = w2 / (w1 + w2)
    downRightPixel = (-1 * s[0,0] + 9 * s[1,1] + 9 * s[2,2] - 1 * s[3,3]) / 16
    upRightPixel   = (-1 * s[3,0] + 9 * s[2,1] + 9 * s[1,2] - 1 * s[0,3]) / 16
    op = downRightPixel * weight1 + upRightPixel * weight2
    return op

def horizontal(s, x=3, y=3):
    """
    Input:  7x7 padded "diamond" area
    Output: 7x7 interpolated area (Only orthogonals used)
    """
    op = (-1 * s[x, y-3] + 9 * s[x, y-1] + 9 * s[x, y+1] - 1 * s[x, y + 3]) / 16
    return op

def vertical(s, x=3, y=3):
    """
    Input:  7x7 padded "diamond" area
    Output: 7x7 interpolated area (Only orthogonals used)
    """
    op = (-1 * s[x-3, y] + 9 * s[x - 1, y] + 9 * s[x + 1, y] - 1 * s[x + 3, y]) / 16
    return op

def orthSmooth(s, d1, d2, k):
    """
    Input:  7x7 padded "diamond" area
    Output: 7x7 interpolated area (Only orthogonals used)
    """
    x,y = 3, 3
    w1 = 1 / (1 + d1 ** k)
    w2 = 1 / (1 + d2 ** k)
    weight1 = w1 / (w1 + w2)
    weight2 = w2 / (w1 + w2)

    horizontalPixel = (-1 * s[x-3, y] + 9 * s[x-1, y] + 9 * s[x+1, y] - 1 * s[x+3, y]) / 16
    veritcalPixel   = (-1 * s[x, y-3] + 9 * s[x, y-1] + 9 * s[x, y+1] - 1 * s[x, y+3]) / 16
    op = horizontalPixel * weight1 + veritcalPixel * weight2
    return op

# Processing folder with GeoTiffs
**Slice the borders of your image after processing by 10 pixels to prevent border effects!**

You could use GDAL *gdal_translate* with the *srcwin* parameter. This will cut the borders by 10 pixel of your 300*300 pixel image:

*gdal_translate -srcwin 10 10 280 280 input.tif output.tif*

In [5]:
from enum import Enum
import numpy as np
import rasterio
from rasterio.plot import reshape_as_image
from rasterio import Affine

# adapt the paths
path = '/content/drive/My Drive/dcci/sr_images/test/in'
path_out = '/content/drive/My Drive/dcci/sr_images/test/out'


for file in os.listdir(path):
    current = os.path.join(path, file)
    with rasterio.open(current) as src:
        array = src.read(
            out_shape=(
                src.count,
                int(src.height),
                int(src.width)
            )
        )
        profile = src.profile                
    
    # create new profile with scale 2
    scale=2
    t = src.transform
    transform = Affine(t.a / scale, t.b, t.c, t.d, t.e / scale, t.f)
    height = src.height * scale
    width = src.width * scale
    profile = src.profile
    profile.update(transform=transform, driver='GTiff', height=height, width=width, crs=src.crs)
    out_profile = profile.copy()
    # process imgages
    arrayLR = reshape_as_image(array)
    img = Dcci(np.squeeze(arrayLR, axis=None))
    # write GeoTiff with new out_profile
    current_out = os.path.join(path_out, file)
    with rasterio.open(current_out, 'w', **out_profile) as dst:
        dst.write(img,indexes=1)

print("finished")    

finished


**Do not forget to slice the borders of your created images (see above)!**

# Statistics with test dataset

In [8]:
from enum import Enum
import numpy as np
import rasterio
from rasterio.plot import reshape_as_image
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_squared_error
from skimage.metrics import peak_signal_noise_ratio

# adapt paths
# original images with high resolution
path_hr = '/content/drive/My Drive/dcci/sr_images/hr_test'
# downsampled images with GDAL next neighbor factor 2
path_lr = '/content/drive/My Drive/dcci/sr_images/lr_test_factor2'
# upsampled images with GDAL next neighbor factor 2
path_cc = '/content/drive/My Drive/dcci/sr_images/us_cubic_factor2'

meanAbsoluteError = []
meanAbsoluteErrorCubic = []

countMAE = 0
countRMSE = 0
countPSNR = 0
countMAESlice = 0
countRMSESlice = 0
countPSNRSlice = 0
totalMAE_lr = 0
totalMAE_sr = 0
totalRMSE_lr = 0
totalRMSE_sr = 0
totalMAX_lr = 0
totalMAX_sr = 0
fileCounter = 0 

# read low resolution images
for file in os.listdir(path_lr):
    current = os.path.join(path_lr, file)
    fileCounter += 1;
    x, y = [], []
    with rasterio.open(current) as src:
        arrayLR = src.read(
            out_shape=(
                src.count,
                int(src.height),
                int(src.width)
            )
        )        
    max_val_lr = np.amax(arrayLR)
    min_val_lr = np.amin(arrayLR)
    arrayLR = reshape_as_image(arrayLR)
    arraySR = Dcci(np.squeeze(arrayLR, axis=None))
    max_val_sr = np.amax(arraySR)

    # load hr image to compare images    
    currentHr = os.path.join(path_hr, file)
    with rasterio.open(currentHr) as src:
        arrayHR = src.read(
            out_shape=(
                src.count,
                int(src.height),
                int(src.width)
            )
        )
    arrayHR = reshape_as_image(arrayHR)
    nsamples, nx, ny = arrayHR.shape
    arrayHR = arrayHR.reshape((nsamples,nx*ny))

    # load lr cubic image to compare images    
    currentCC = os.path.join(path_cc, file)
    with rasterio.open(currentCC) as src:
        arrayCC = src.read(
            out_shape=(
                src.count,
                int(src.height),
                int(src.width)
            )
        )
    arrayCC = reshape_as_image(arrayCC)
    nsamples, nx, ny = arrayCC.shape
    arrayCC = arrayCC.reshape((nsamples,nx*ny))


    # slice 10 pixels from borders because of border effects
    slice_val = 10
    HrSlice = arrayHR[slice_val:-slice_val, slice_val:-slice_val]
    SrSlice = arraySR[slice_val:-slice_val, slice_val:-slice_val]
    CcSlice = arrayCC[slice_val:-slice_val, slice_val:-slice_val]
    
    # calculate the metrics for sr image (DCCI)
    MAE_sr_slice = mean_absolute_error(HrSlice, SrSlice)
    RMSE_sr_slice = mean_squared_error(HrSlice, SrSlice, squared=False)
    MAX_sr_slice = np.amax(abs(HrSlice - SrSlice))
    # adapt data_range to the maximal overall value  of the dataset
    PSNR_sr_slice = peak_signal_noise_ratio(HrSlice, SrSlice, data_range=3150)
    MAE_lr_slice = mean_absolute_error(HrSlice, CcSlice)
    RMSE_lr_slice = mean_squared_error(HrSlice, CcSlice, squared=False)
    MAX_lr_slice = np.amax(abs(HrSlice - CcSlice))
    # adapt data_range to the maximal overall value  of the dataset
    PSNR_lr_slice = peak_signal_noise_ratio(HrSlice, CcSlice, data_range=3150)
    
    # MAE count improved images
    if MAE_sr_slice < MAE_lr_slice:
        countMAESlice += 1    
    # RMSE count improved images
    if RMSE_sr_slice < RMSE_lr_slice:
        countRMSESlice += 1 
    # PSNR count imporved images, higher values are better
    if PSNR_sr_slice > PSNR_lr_slice:
        countPSNRSlice += 1    
    
    totalMAE_lr = totalMAE_lr + MAE_lr_slice
    totalMAE_sr = totalMAE_sr + MAE_sr_slice
    totalRMSE_lr = totalRMSE_lr + RMSE_lr_slice
    totalRMSE_sr = totalRMSE_sr + RMSE_sr_slice        
    totalMAX_lr = totalMAX_lr + MAX_lr_slice
    totalMAX_sr = totalMAX_sr + MAX_sr_slice 

    # print metrics for every tile if desired
    #print(file ,' SR MaxError: ', MAX_sr_slice  ,' LR MaxError: ', MAX_lr_slice, ' Difference MaxError LR - SR: ', MAX_lr_slice - MAX_sr_slice)
    #print(file ,' SR MAE: ', MAE_sr_slice ,' LR MAE: ', MAE_lr_slice, ' Difference LR - SR: ', MAE_lr_slice - MAE_sr_slice)
    #print(file ,' SR RMSE: ', RMSE_sr_slice ,' LR RMSE: ', RMSE_lr_slice, ' Difference LR - SR: ', RMSE_lr_slice - RMSE_sr_slice)
    #print(file ,' SR PSNR: ', PSNR_sr_slice ,' LR PSNR: ', PSNR_lr_slice, ' Difference LR - SR: ', PSNR_lr_slice - PSNR_sr_slice)

print('Better results of',fileCounter,'test areas for MAE sliced ',countMAESlice)
print('Better results of',fileCounter,'test areas for RMSE sliced',countRMSESlice)
print('Better results of',fileCounter,'test areas for PNSR sliced',countPSNRSlice)
print('Mean MAE LR', totalMAE_lr/fileCounter);
print('Mean MAE SR', totalMAE_sr/fileCounter);
print('Difference LR - SR MAE ', totalMAE_lr/fileCounter - totalMAE_sr/fileCounter)
print('Mean RMSE LR', totalRMSE_lr/fileCounter);
print('Mean RMSE SR', totalRMSE_sr/fileCounter);
print('Difference SR - LR RMSE ', totalRMSE_lr/fileCounter - totalRMSE_sr/fileCounter)
print('MAX LR', totalMAX_lr/fileCounter);
print('MAX SR', totalMAX_sr/fileCounter);
print('Difference LR - SR MAX ', totalMAX_lr/fileCounter - totalMAX_sr/fileCounter)

Better results of 101 test areas for MAE sliced  101
Better results of 101 test areas for RMSE sliced 101
Better results of 101 test areas for PNSR sliced 101
Mean MAE LR 2.6281430815706157
Mean MAE SR 0.48612110853595375
Difference LR - SR MAE  2.142021973034662
Mean RMSE LR 3.3327439539503345
Mean RMSE SR 0.9362317435480302
Difference SR - LR RMSE  2.3965122104023044
MAX LR 29.827931621287128
MAX SR 19.012399489778307
Difference LR - SR MAX  10.81553213150882


# Find the optimum parameter

In [None]:
from enum import Enum
import numpy as np
import rasterio
from rasterio.plot import reshape_as_image
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_squared_error
from skimage.metrics import peak_signal_noise_ratio

# adapt paths
# original images with high resolution
path_hr = '/content/drive/My Drive/dcci/sr_images/hr_test'
# downsampled images with GDAL next neighbor factor 2
path_lr = '/content/drive/My Drive/dcci/sr_images/lr_test_factor2'
# upsampled images with GDAL next neighbor factor 2
path_cc = '/content/drive/My Drive/dcci/sr_images/us_cubic_factor2'

best = ''
bestMAE = None

# adapt the ranges if desired
# exponent K range
for k in range(1,7):
# threshold T range  
   for t in range(50,300,10):

    meanAbsoluteError = []
    meanAbsoluteErrorCubic = []

    countMAE = 0
    countRMSE = 0
    countPSNR = 0
    countSSI = 0
    countMAESlice = 0
    countRMSESlice = 0
    countPSNRSlice = 0
    countSSISlice = 0
    totalMAE_lr = 0
    totalMAE_sr = 0
    totalRMSE_lr = 0
    totalRMSE_sr = 0
    totalMAX_lr = 0
    totalMAX_sr = 0
    fileCounter = 0 

    # read low resolution images
    for file in os.listdir(path_lr):
        current = os.path.join(path_lr, file)
        fileCounter += 1;
        x, y = [], []
        with rasterio.open(current) as src:
            arrayLR = src.read(
                out_shape=(
                    src.count,
                    int(src.height),
                    int(src.width)
                )
            )        
        max_val_lr = np.amax(arrayLR)
        min_val_lr = np.amin(arrayLR)
        arrayLR = reshape_as_image(arrayLR)
        arraySR = Dcci(np.squeeze(arrayLR, axis=None), 1, t, k)
        max_val_sr = np.amax(arraySR)

        # load hr image to compare images    
        currentHr = os.path.join(path_hr, file)
        with rasterio.open(currentHr) as src:
            arrayHR = src.read(
                out_shape=(
                    src.count,
                    int(src.height),
                    int(src.width)
                )
            )
        arrayHR = reshape_as_image(arrayHR)
        nsamples, nx, ny = arrayHR.shape
        arrayHR = arrayHR.reshape((nsamples,nx*ny))

        # load lr cubic image to compare images    
        currentCC = os.path.join(path_cc, file)
        with rasterio.open(currentCC) as src:
            arrayCC = src.read(
                out_shape=(
                    src.count,
                    int(src.height),
                    int(src.width)
                )
            )
        arrayCC = reshape_as_image(arrayCC)
        nsamples, nx, ny = arrayCC.shape
        arrayCC = arrayCC.reshape((nsamples,nx*ny))

        # slice 10 pixels from border because border effects
        slice_val = 10
        HrSlice = arrayHR[slice_val:-slice_val, slice_val:-slice_val]
        SrSlice = arraySR[slice_val:-slice_val, slice_val:-slice_val]
        CcSlice = arrayCC[slice_val:-slice_val, slice_val:-slice_val]
        
        # calculate the metrics for sr image
        MAE_sr_slice = mean_absolute_error(HrSlice, SrSlice)
        RMSE_sr_slice = mean_squared_error(HrSlice, SrSlice, squared=False)
        MAX_sr_slice = np.amax(abs(HrSlice - SrSlice))
        # adapt data_range to the maximal overall value  of the dataset
        PSNR_sr_slice = peak_signal_noise_ratio(HrSlice, SrSlice, data_range=3150)
        MAE_lr_slice = mean_absolute_error(HrSlice, CcSlice)
        RMSE_lr_slice = mean_squared_error(HrSlice, CcSlice, squared=False)
        MAX_lr_slice = np.amax(abs(HrSlice - CcSlice))
        # adapt data_range to the maximal overall value  of the dataset
        PSNR_lr_slice = peak_signal_noise_ratio(HrSlice, CcSlice, data_range=3150)

        # MAE count improved images
        if MAE_sr_slice < MAE_lr_slice:
            countMAESlice += 1    
        if RMSE_sr_slice < RMSE_lr_slice:
            countRMSESlice += 1 
        if PSNR_sr_slice > PSNR_lr_slice:
            countPSNRSlice += 1    
        
        totalMAE_lr = totalMAE_lr + MAE_lr_slice
        totalMAE_sr = totalMAE_sr + MAE_sr_slice
        totalRMSE_lr = totalRMSE_lr + RMSE_lr_slice
        totalRMSE_sr = totalRMSE_sr + RMSE_sr_slice        
        totalMAX_lr = totalMAX_lr + MAX_lr_slice
        totalMAX_sr = totalMAX_sr + MAX_sr_slice 

    print('Mean MAE', totalMAE_sr/fileCounter, 'k=', k, 't=', t);
    
    #lower MAE?
    if bestMAE is None:
        bestMAE = MAE_sr_slice
    if MAE_sr_slice < bestMAE:
        bestMAE = MAE_sr_slice
        best = 'Best parameters for MAE', totalMAE_sr/fileCounter, 'k=', k, 't=', t

print(best)