In [1]:
# Imports
import subprocess
import pdal
import math
import os
import sys
import json
import logging
import time
import numpy as np
import ipywidgets as widgets
from pathlib import Path
from IPython.display import display, clear_output, IFrame, HTML, Javascript
from math import sqrt

In [2]:
# Adjust the screen size to 80%
display(HTML("<style>.container { width:80% !important; }</style>"))

In [3]:
# Configure logging
logger = logging.getLogger()
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)-8s %(levelname)-8s %(message)s')
handler.setFormatter(formatter)
logger.handlers = [handler]
logging.getLogger().setLevel(logging.INFO)

In [4]:
# Path to npm http-package
npm_http_server_path = Path(sys.prefix,'node_modules/http-server/bin/http-server')

if not npm_http_server_path.exists():
    subprocess.call(['npm', 'install', '--global', 'http-server'], shell=True)
    if not npm_http_server_path.exists():
        logging.warning('NodeJs http-server installation failed!')
    else:
        npm_http_server_path = npm_http_server_path.as_posix()

In [5]:
# Helper functions
def run_subprocess(args):
    """ Runs a subprocess defined by the program args list and returns a CompletedProcess instance."""
    return subprocess.run(args, capture_output=True, text=True, check=True)

def run_PDAL(PDAL_pipeline):
    """Executes a PDAL pipeline."""
    pipeline = pdal.Pipeline(json.dumps(PDAL_pipeline))
    pipeline.validate()
    pipeline.execute()  

![](notebook_data/hcu_title_image.png)

The notebook is a first implementation of a processing workflow for MBES data (bathymetry and backscatter) in Jupyter Notebook. To call the underlying Python code, corresponding widgets are provided. For both bathymetry and backscatter processing, ASCII data is used as a basis. Those can be created with the Kongsberg MBES preprocessing module, or alternatively exported from other software.

# ASCII to EPT conversion

[EPT](https://entwine.io/entwine-point-tile.html) is an octree-based point cloud storage format. It is the basis of all processing steps within this notebook. To convert ASCII file to EPT, they need to be passed with a certain formatting:

- Bathymetry: `X,Y,Z(,Classification)`
- Backscatter: `X,Y(,Z),Amplitude`

Other dimensions can also be included. If they are non of the known [PDAL dimensions](https://pdal.io/dimensions.html), a custom dimension will be created.

For subsequent data processing, geographic data firstly has to be projected to the corresponding UTM zone. Typically the north-western extent is used to determine the suitable zone:

In [6]:
longitude_floattext = widgets.BoundedFloatText(
    value=0.000,
    min=-180.000,
    max=180.000,
    step=0.001,
    description='Longitude:',
    disabled=False
)

latitude_floattext = widgets.BoundedFloatText(
    value=0.000,
    min=-90.000,
    max=90.000,
    step=0.001,
    description='Latitude:',
    disabled=False
)

determine_EPSG_button = widgets.Button(
    description='Get UTM EPSG code',
    button_style='info',
    tooltip='Determine the EPSG code of the corresponding UTM zone'
)

determine_EPSG_output = widgets.Output()

def on_determine_EPSG_button_clicked(button):
    with determine_EPSG_output:
        clear_output()
        zone = str((math.floor((longitude_floattext.value + 180) / 6) % 60) + 1)
        epsg_code = 32600
        epsg_code += int(zone)
        if latitude_floattext.value < 0:
            epsg_code += 100
        print(f'EPSG: {epsg_code}')
        
determine_EPSG_button.on_click(on_determine_EPSG_button_clicked)

display(
    widgets.HBox(
        [
            longitude_floattext,
            latitude_floattext,
            determine_EPSG_button,
            determine_EPSG_output
        ]
    )
)

HBox(children=(BoundedFloatText(value=0.0, description='Longitude:', max=180.0, min=-180.0, step=0.001), Bound…

To build the EPT data set, either the ASCII directory or an idividual ASCII file path is required. If a directory is passed, it will be attempted to read all PDAL readable files in the directory. If force new build is deactivated, the EPT data set with an identical name will be extended.

In [7]:
ASCII_directory_text = widgets.Text(
    placeholder='Enter ASCII data file or directory',
    continuous_update=False,
    layout=widgets.Layout(width='67%')
)

def on_ASCII_directory_text_change(change):
    with ASCII_to_EPT_output:
        clear_output()
        if not os.path.exists(change['new']):
            logging.warning('Not a valid directory')
            ASCII_directory_text.value = ''
            
ASCII_directory_text.observe(on_ASCII_directory_text_change, names='value')

EPSG_code_text = widgets.Text(
    placeholder='EPSG code',
    continuous_update=False,
    layout=widgets.Layout(width='20%')
)

def on_EPSG_code_text_change(change):
    with ASCII_to_EPT_output:
        clear_output()
        if not len(change['new']) == 5:
            logging.warning('Not a valid EPSG code')
            EPSG_code_text.value = ''
            
EPSG_code_text.observe(on_EPSG_code_text_change, names='value')

force_EPT_checkbox = widgets.Checkbox(
    value=True,
    description='Force new build'
)

ASCII_to_EPT_button = widgets.Button(
    description='Build EPTs',
    button_style='info', 
    tooltip='Convert the ASCII directory/file to an EPT data set'
)

ASCII_to_EPT_output = widgets.Output()

def on_ASCII_to_EPT_button_clicked(button):
    ASCII_to_EPT_button.disabled = True
    ASCII_to_EPT_button.button_style = 'warning'
    with ASCII_to_EPT_output:
        clear_output()
        if ASCII_directory_text.value and EPSG_code_text.value:
            path = Path(ASCII_directory_text.value)
            # If path is a directory
            if path.is_dir():
                directory = path
                ASCII_to_EPT_config = {
                    "input": Path.joinpath(path, "*").as_posix(),
                    "reprojection": {
                        "in": "EPSG:4326",
                        "out": "EPSG:" + EPSG_code_text.value
                    },
                    "force": force_EPT_checkbox.value,
                    "output": Path.joinpath(directory, "EPT_data").as_posix()
                }
            # If path is a file
            if path.is_file():
                directory = path.parent
                ASCII_to_EPT_config = {
                    "input": path.as_posix(),
                    "reprojection": {
                        "in": "EPSG:4326",
                        "out": "EPSG:" + EPSG_code_text.value
                    },
                    "force": force_EPT_checkbox.value,
                    "output": Path.joinpath(directory, "EPT_data").as_posix()
                }
            json.dump(
                ASCII_to_EPT_config,
                open(Path.joinpath(directory, "ASCII_to_EPT_config.json").as_posix(), 'w'),
                indent=4
            )
            print('Building EPTs ...')
            run_subprocess(
                [
                    'entwine',
                    'build',
                    '-c',
                    Path.joinpath(directory, "ASCII_to_EPT_config.json").as_posix()
                ]
            )
            os.remove(Path.joinpath(directory, "ASCII_to_EPT_config.json").as_posix())
            clear_output()
            print('Finished!')
            EPT_directory_text.value = Path.joinpath(directory, "EPT_data").as_posix()
        else:
            logging.warning('Missing information')
    ASCII_to_EPT_button.disabled = False
    ASCII_to_EPT_button.button_style = 'info'
        
ASCII_to_EPT_button.on_click(on_ASCII_to_EPT_button_clicked)

display(
    ASCII_directory_text,
    widgets.HBox(
        [
            EPSG_code_text,
            force_EPT_checkbox,
            ASCII_to_EPT_button
        ]
    ),
    ASCII_to_EPT_output
)

Text(value='', continuous_update=False, layout=Layout(width='67%'), placeholder='Enter ASCII data file or dire…

HBox(children=(Text(value='', continuous_update=False, layout=Layout(width='20%'), placeholder='EPSG code'), C…

Output()

# MBES EPTs

If you have previously created an EPT data set, you can skip step 1 and directly continue with your EPT data set:

In [8]:
# Global variables which are used troughout the notebook
EPT_directory = ''
EPT_name = ''

###########################################################################################################

# 'Enter EPT data directory' text field to enter the directory to the EPT 
# data set
EPT_directory_text = widgets.Text(
    placeholder='Enter EPT data directory',
    # Disable continuous updates to only activate the observer
    # when the directory has been fully entered
    continuous_update=False,
    layout=widgets.Layout(width='80%')
)

# Output of 'Enter EPT data directory' text field
EPT_directory_output = widgets.Output()

# Function to be executed on when the EPT directory is changed
def on_EPT_directory_text_change(change):
    global EPT_directory
    global EPT_name
    # Disable 'Update Potree viewer' button
    update_Potree_button.disabled = True
    update_Potree_button.button_style = 'warning'
    with EPT_directory_output:
        clear_output()
        # Check if directory is valid
        if not os.path.isdir(change['new']):
            # If not: Throw a warning
            logging.warning('Not a valid directory')
        else:
            # If: Update EPT directory and name
            EPT_directory = os.path.dirname(change['new'])
            EPT_name = os.path.basename(change['new'])
            # Start a NodeJS HTTP server to deliver the EPT data set to the Potree page
            http_server = subprocess.Popen(
                [
                    'node', 
                    npm_http_server_path, 
                    EPT_directory, 
                    '-p', 
                    '8080', # Local host 8080
                    '--cors'
                ]
            )
            # Display an iFrame with the Potree viewer
            display(
                IFrame(
                    f'https://potree.entwine.io/data/custom.html?r=%22http://localhost:8080/{EPT_name}%22',
                    '100%', # Width of iFrame
                    '600px' # Height of IFrame
                )
            )
            # Terminate HTTP server with time delay
            time.sleep(15)
            http_server.terminate()
            # Enable 'Update Potree viewer' button again
            update_Potree_button.disabled = False
            update_Potree_button.button_style = 'info'

# Observe the 'Enter EPT data directory' text field and call the respective
# function when the directory value changes
EPT_directory_text.observe(on_EPT_directory_text_change, names='value')

###########################################################################################################

# 'Update Potree viewer' button which updates the Potree viewer 
# if changes have been applied to the given EPT data set
update_Potree_button = widgets.Button(
    description='Update Potree viewer',
    button_style='info',
    tooltip='Update Potree viewer if the EPT data set has been changed',
    layout=widgets.Layout(width='20%')
)

# Function to be executed on 'Update Potree viewer' button clicked
def on_update_Potree_button_clicked(button):
    # Disable 'Update Potree viewer' button
    update_Potree_button.disabled = True
    update_Potree_button.button_style = 'warning'
    with EPT_directory_output:
        clear_output()
        # Start a Python HTTP server to deliver the EPT data set to the Potree page
        http_server = subprocess.Popen(
            [
                'node', 
                npm_http_server_path, 
                EPT_directory, 
                '-p', 
                '8080', # Local host 8080
                '--cors'
            ]
        )
        # Display an iFrame with the Potree viewer
        display(
            IFrame(
                f'https://potree.entwine.io/data/custom.html?r=%22http://localhost:8080/{EPT_name}%22',
                '100%', # Width of iFrame
                '600px' # Height of IFrame
            )
        )
        # Terminate HTTP server with time delay
        time.sleep(15)
        http_server.terminate()
    # Enable 'Update Potree viewer' button again
    update_Potree_button.disabled = False
    update_Potree_button.button_style = 'info'

# Connect 'Update Potree viewer' button and function to be executed on button click
update_Potree_button.on_click(on_update_Potree_button_clicked)

###########################################################################################################

# Display the directory text field and the corrsponding output
display(
    widgets.HBox(
        [
            EPT_directory_text, 
            update_Potree_button
        ]
    ),
    EPT_directory_output)

HBox(children=(Text(value='', continuous_update=False, layout=Layout(width='80%'), placeholder='Enter EPT data…

Output()

# MBES Bathymetry Processing

The bathymetry processing pipeline consists of [PDAL filter](https://pdal.io/stages/filters.html) stages which are combined and adjusted in order to clean the bathymetric data set for blunders. Due to its orgin in LiDAR, the Potree viewer cannot show customized Classification. In order to respresent the different filter results, following workaround is taken:

Filter | Classification value| Class | Color
-------- | -------- | -------- | --------
Depth window   | 18 | High noise | Turquoise
Extended local minimum  | 7 | Low point (noise) | Pink
Radius outlier | 4 | Medium vegetation | Green
Statistical outlier | 9 | Water | Blue

## Pipeline configuration

When firstly running an outlier classification, the 'Classification' dimension might not be yet created (It is for ASCII files from the Kongsberg MBES preprocessing module).

Following stages can't be run then:
- Reset any classfication
- Depth window filter

This is beacuse the assign classification values rather then creating them.

A workaround is to firstly use one of the other filters (ELM, radius outlier and/or statistical outlier filter) and then reset them in order to apply the depth window at the beginning of the bathymetry filter pipeline.

### Reset previous classification

When enabled, this step will reset any classification which has been applied ealier.

In [9]:
# Reset classification
reset_classification = {
    "type":"filters.assign",
    "assignment":"Classification[:]=0"
}

reset_classification_checkbox = widgets.Checkbox(
    value=False,
    description='Reset any classification')

display(reset_classification_checkbox)

Checkbox(value=False, description='Reset any classification')

### Depth window filter

The depth window filter simply filters all the points which lie outside the defined min and max values. It is important that the passed values are negative.

In [10]:
window_min_depth_floattext = widgets.FloatText(
    value=-200,
    description='Min depth:'
)

window_max_depth_floattext = widgets.FloatText(
    value=-5000,
    description='Max depth:'
)

window_filter_checkbox = widgets.Checkbox(
    value=False,
    description='Use depth window filter')

display(window_min_depth_floattext, window_max_depth_floattext, window_filter_checkbox)

FloatText(value=-200.0, description='Min depth:')

FloatText(value=-5000.0, description='Max depth:')

Checkbox(value=False, description='Use depth window filter')

### Extended local minimum filter

The [Extended Local Minimum (ELM) filter](https://pdal.io/stages/filters.elm.html) works on a raster basis. In each cell, the lowest value will be classified as outlier when more than a defined threshold below the second lowest point. This procedure is then continued with the second, third ... lowest point until the threshold is not anymore exceeded. When determining the parameters of the ELM filter, it is helpfull to use Potree's measurement tools. The cell size should not be smaller than the minimum sounding distance.

In [11]:
# Cell size. [Default: 200.0]
ELM_cell_floattext = widgets.BoundedFloatText(
    value=200.0,
    min=0.1,
    max=500.0,
    step=0.1,
    description='Cell size:')

def on_ELM_cell_floattext_change(change):
    ELM_filter['cell'] = change['new']

ELM_cell_floattext.observe(on_ELM_cell_floattext_change, 'value')

# Threshold value to identify low noise points. [Default: 30.0]
ELM_threshold_floattext = widgets.BoundedFloatText(
    value=30.0,
    min=0.1,
    max=500.0,
    step=0.1,
    description='Threshold:')



def on_ELM_threshold_floattext_change(change):
    ELM_filter['threshold'] = change['new']

ELM_threshold_floattext.observe(on_ELM_threshold_floattext_change, 'value')

ELM_filter = {
    "type": "filters.elm",
    "where": "(Classification == 0)",
    "cell": ELM_cell_floattext.value,
    "threshold": ELM_threshold_floattext.value,
    "class": 7
}

ELM_filter_checkbox = widgets.Checkbox(
    value=False,
    description='Use extended local minimum filter')

display(ELM_cell_floattext, ELM_threshold_floattext, ELM_filter_checkbox)

BoundedFloatText(value=200.0, description='Cell size:', max=500.0, min=0.1, step=0.1)

BoundedFloatText(value=30.0, description='Threshold:', max=500.0, min=0.1, step=0.1)

Checkbox(value=False, description='Use extended local minimum filter')

### Radius outlier filter

For each point, the [radius outlier filter](https://pdal.io/stages/filters.outlier.html#filters-outlier) counts the number of neighbors k within the defined radius. If the number is lower than the $min_k$, the point will be classified as outlier.

Again, it is helpful to use the measurement tools provided in the Potree viewer to determine suitable settings

In [12]:
# Radius. [Default: 150.0]
ROF_radius_floattext = widgets.BoundedFloatText(
    value=150.0,
    min=0.1,
    max=500.0,
    step=0.1,
    description='Radius:')

def on_ROF_radius_floattext_change(change):
    radius_outlier_filter['radius'] = change['new']

ROF_radius_floattext.observe(on_ROF_radius_floattext_change, 'value')

# Minimum number of neighbors in radius. [Default: 11]
ROF_min_k_intslider = widgets.IntSlider(
    value=11,
    min=1,
    max=50,
    step=1,
    description='Min k:')

def on_ROF_min_k_intslider_change(change):
    radius_outlier_filter['min_k'] = change['new']

ROF_min_k_intslider.observe(on_ROF_min_k_intslider_change, 'value')

radius_outlier_filter = {
    "type": "filters.outlier",
    "method": "radius",
    "where": "(Classification == 0)",
    "radius": ROF_radius_floattext.value,
    "min_k": ROF_min_k_intslider.value,
    "class": 4
}

radius_outlier_filter_checkbox = widgets.Checkbox(
    value=False,
    description='Use radius outlier filter')

display(ROF_radius_floattext, ROF_min_k_intslider, radius_outlier_filter_checkbox)

BoundedFloatText(value=150.0, description='Radius:', max=500.0, min=0.1, step=0.1)

IntSlider(value=11, description='Min k:', max=50, min=1)

Checkbox(value=False, description='Use radius outlier filter')

### Statistical outlier filter

The [statistical outlier filter](https://pdal.io/stages/filters.outlier.html#filters-outlier), in contrast to the previous filters, uses global statistics to identify outliers. In a first pass through the point cloud, the mean distante to $mean_k$ neighbors is determined. Then the global mean of the distances $\mu$ and its standard deviation $\sigma$ is computed. Based on these two, a threshold $t$ defined:

\begin{equation*}
t = \mu + m \sigma
\end{equation*}

Thereby $m$ is a multiplier to in- or decrease the threshold

In a second pass, all the soundings are classified as outlier whichs mean distance exceeds the $t$.

In [13]:
# Mean number of neighbors. [Default: 7]
SOF_mean_k_intslider = widgets.IntSlider(
    value=7,
    min=1,
    max=50,
    step=1,
    description='Mean k:')

def on_SOF_mean_k_intslider_change(change):
    statistical_outlier_filter['mean_k'] = change['new']

SOF_mean_k_intslider.observe(on_SOF_mean_k_intslider_change, 'value')

# Standard deviation multiplier. [Default: 3.0]
SOF_multiplier_floatslider = widgets.FloatSlider(
    value=3.0,
    min=0.1,
    max=10.0,
    step=0.1,
    readout_format='.1f',
    description='Multiplier:')

def on_SOF_multiplier_floatslider_change(change):
    statistical_outlier_filter['multiplier'] = change['new']

SOF_multiplier_floatslider.observe(on_SOF_multiplier_floatslider_change, 'value')

statistical_outlier_filter = {
    "type": "filters.outlier",
    "method": "statistical",
    "where": "(Classification == 0)",
    "mean_k": SOF_mean_k_intslider.value,
    "multiplier": SOF_multiplier_floatslider.value,
    "class": 9
}

statistical_outlier_filter_checkbox = widgets.Checkbox(
    value=False,
    description='Use statistical outlier filter')

display(SOF_mean_k_intslider, SOF_multiplier_floatslider, statistical_outlier_filter_checkbox)

IntSlider(value=7, description='Mean k:', max=50, min=1)

FloatSlider(value=3.0, description='Multiplier:', max=10.0, min=0.1, readout_format='.1f')

Checkbox(value=False, description='Use statistical outlier filter')

## Execute bathymetry pipeline

To inspect the filtering result, the above viewer has to be updated using the provided button.

Hint: The pipeline which is shown once the pipeline is constructed, can be saved with the extension `*.json` and kept as record for the data cleaning

In [14]:
build_bathymetry_pipeline_button = widgets.Button(
    description='Build bathymetry pipeline',
    button_style='info',
    tooltip='Build PDAL filter pipeline for bathymetry',
    layout=widgets.Layout(width='20%'))

build_bathymetry_pipeline_output = widgets.Output()

bathymetry_pipeline = []

def on_build_bathymetry_pipeline_button_clicked(button):
    EPT_reader = {
    "type": "readers.ept",
    "filename": Path(os.path.join(EPT_directory, EPT_name, 'ept.json')).as_posix()

    }

    LAS_writer = {
        "type": "writers.las",
        "filename": Path(os.path.join(EPT_directory, 'temp.las')).as_posix()
    }
    with build_bathymetry_pipeline_output:
        clear_output()
        global bathymetry_pipeline
        bathymetry_pipeline = [EPT_reader]
        if reset_classification_checkbox.value:
            bathymetry_pipeline.append(reset_classification)
        if window_filter_checkbox.value:
            window_min_filter = {
                "type": "filters.assign",
                "value" : f'Classification = 18 WHERE Z > {window_min_depth_floattext.value}'
            }
            bathymetry_pipeline.append(window_min_filter)
            window_max_filter = {
                "type": "filters.assign",
                "value" : f'Classification = 18 WHERE Z < {window_max_depth_floattext.value}'
            }
            bathymetry_pipeline.append(window_max_filter)
        if ELM_filter_checkbox.value:
            bathymetry_pipeline.append(ELM_filter)
        if radius_outlier_filter_checkbox.value:
            bathymetry_pipeline.append(radius_outlier_filter)
        if statistical_outlier_filter_checkbox.value:
            bathymetry_pipeline.append(statistical_outlier_filter)
        bathymetry_pipeline.append(LAS_writer)
        print(json.dumps(bathymetry_pipeline, indent=4))

build_bathymetry_pipeline_button.on_click(on_build_bathymetry_pipeline_button_clicked)

display(build_bathymetry_pipeline_button, build_bathymetry_pipeline_output)

Button(button_style='info', description='Build bathymetry pipeline', layout=Layout(width='20%'), style=ButtonS…

Output()

In [15]:
# 'Run bathymetry pipeline' pipeline button 
run_bathymetry_pipeline_button = widgets.Button(
    description='Run bathymetry pipeline',
    button_style='info', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Run PDAL filter pipeline for bathymetry',
    layout=widgets.Layout(width='20%'))

# Output of 'Run bathymetry pipeline' pipeline button 
run_bathymetry_pipeline_output = widgets.Output()

# Function to be executed on 'Run bathymetry pipeline' pipeline button clicked
def on_run_bathymetry_pipeline_button_clicked(button):
    with run_bathymetry_pipeline_output:
        clear_output()
        # Save temporary JSON pipeline file for PDAL subprocess execution
        json.dump(
            bathymetry_pipeline,
            open(Path(os.path.join(EPT_directory, 'bathymetry_pipeline.json')).as_posix(), 'w'),
            indent=4
        )
        # Execute PDAL bathymetry filter pipeline as Python subprocess
        print('Executing filter pipeline ...')
        run_subprocess(
            [
                'pdal',
                'pipeline',
                Path(os.path.join(EPT_directory, 'bathymetry_pipeline.json')).as_posix(),
            ]
        )
        clear_output()
        # Build updated EPTs from intermediate LAS as Python subprocess
        # --force flag florces a new build instead of appending to the old
        print('Updating EPTs ..')
        run_subprocess(
            [
                'entwine',
                'build',
                '-i',
                Path(os.path.join(EPT_directory, 'temp.las')).as_posix(),
                '-o',
                Path(os.path.join(EPT_directory, EPT_name)).as_posix(),
                '--force'
            ]
        )
        print('Removing temporary files...')
        # Temporary files: JSON pipeline and intermediate LAS
        os.remove(Path(os.path.join(EPT_directory, 'bathymetry_pipeline.json')).as_posix())
        os.remove(Path(os.path.join(EPT_directory, 'temp.las')).as_posix())
        clear_output()
        print('Finished PDAL filter pipeline for bathymetry!')
        
# Connect 'Run bathymetry pipeline' button and function to be executed on button click
run_bathymetry_pipeline_button.on_click(on_run_bathymetry_pipeline_button_clicked)

# Display widgets and ouput
display(
    run_bathymetry_pipeline_button,
    run_bathymetry_pipeline_output
)

Button(button_style='info', description='Run bathymetry pipeline', layout=Layout(width='20%'), style=ButtonSty…

Output()

# MBES Backscatter Processing

The backscatter processing in contrast to the bathymetry processing cannot (yet) be inspected in the Potree viewer. The result will directly be exported to a grid and can than be inspected externally. A suitable tool would be the open source GIS software [QGIS](https://qgis.org/en/site/).

## Pipeline configuration

### ~~DEM filter~~ <font color='red'>not yet functional</font>

In [16]:
DEM_filter_output = widgets.Output()

DEM_raster_text = widgets.Text(
    value='',
    placeholder='GDAL readable raster',
    description='Raster:',
    continuous_update=False
)

def on_DEM_raster_text_change(change):
    with DEM_filter_output:
        clear_output()
        if not os.path.isfile(change['new']):
            logging.warning('Not a valid file')
        DEM_raster_text.value = ''

DEM_raster_text.observe(on_DEM_raster_text_change, 'value')
#####################################################################################
DEM_limits_lower_inttext= widgets.BoundedIntText(
    value=30,
    min=0,
    max=500,
    step=5,
    description='Lower limit:'
)
#####################################################################################
DEM_limits_upper_inttext = widgets.BoundedIntText(
    value=30,
    min=0,
    max=500,
    step=5,
    description='Upper limit:'
)
#####################################################################################
DEM_band_dropdown = widgets.Dropdown(
    options=[
        ('Min', 1),
        ('Max', 2),
        ('Mean', 3),
        ('IDW', 4)
    ],
    value=3,
    description='Band:',
)
#####################################################################################

DEM_filter_checkbox = widgets.Checkbox(
    value=False,
    description='Use DEM filter')

display(
    DEM_raster_text,
    DEM_limits_lower_inttext,
    DEM_limits_upper_inttext,
    DEM_band_dropdown,
    DEM_filter_checkbox,
    DEM_filter_output
)

Text(value='', continuous_update=False, description='Raster:', placeholder='GDAL readable raster')

BoundedIntText(value=30, description='Lower limit:', max=500, step=5)

BoundedIntText(value=30, description='Upper limit:', max=500, step=5)

Dropdown(description='Band:', index=2, options=(('Min', 1), ('Max', 2), ('Mean', 3), ('IDW', 4)), value=3)

Checkbox(value=False, description='Use DEM filter')

Output()

### Constant offset and multiplier

To apply a constant offset or multiplier to a dimension in PDAL, the [assign filter](https://pdal.io/stages/filters.assign.html#filters-assign) can be used. The assign filter requires a defined syntax to express how the specific dimension is supposed to be altered. The following statement shows the syntax used to formulate an assign filter expression:

    "Dimension = ValueExpression (WHERE ConditionalExpression)"
    
Thereby the stated dimension is modified. The value expression defines how each value of the dimension is modified and the conditional expression can be used to express what specific values are modified. When applying this to the backscatter data, it could exemplarily be formulated:

	"Amplitude = Amplitude * 0.7"

	"Amplitude = Amplitude - 20"

The first expression would multiply  each backscatter amplitude value by 0.7 and the second statement would remove -20 dB from each backscatter value. A conditional could as an instance be used to apply the value expression only to a specific range of values. 

	"Amplitude = Amplitude - 20 WHERE OriginId = 0"

This statement only applies the value expression to amplitude values from the source file indicated by the OriginID 0.

To apply this filter, an expression in the above named syntax has to be applied.

In [17]:
O_M_text = widgets.Text(
    value='Amplitude = Amplitude',
    placeholder='Amplitude = Amplitude',
    description='Expression:',
    continuous_update=False,
    layout=widgets.Layout(width='67%')
)

O_M_filter_checkbox = widgets.Checkbox(
    value=False,
    description='Use assign filter')

display(O_M_text, O_M_filter_checkbox)

Text(value='Amplitude = Amplitude', continuous_update=False, description='Expression:', layout=Layout(width='6…

Checkbox(value=False, description='Use assign filter')

## Despeckling (MAD filter)

Despeckling mainly occurs due to sensor noise in the receiver. It can be helpful to run a [median absolute deviation filter](https://pdal.io/stages/filters.mad.html#filters-mad) in order to filter out those artifacts. The setting found to work the best is a $k$ value of 3 which culls all samples which have an amplitude more than 3 deviations from the median. A higher $k$ will cull less points and a lower value more.

In [18]:
# The number of deviations from the median. [Default: ]
MAD_k_floatslider = widgets.FloatSlider(
    value=3.0,
    min=0.1,
    max=10.0,
    step=0.1,
    readout_format='.1f',
    description='k:')

MAD_filter_checkbox = widgets.Checkbox(
    value=False,
    description='Use MAD filter')

display(MAD_k_floatslider, MAD_filter_checkbox)

FloatSlider(value=3.0, description='k:', max=10.0, min=0.1, readout_format='.1f')

Checkbox(value=False, description='Use MAD filter')

###  Antialiasing (Poisson resampling)

Antialiasing can occur when a high resolution is represented in a grid with a much lower resolution. It can be countered by downsampling the spatial resolution prior to gridding. The [Poisson filter](https://pdal.io/stages/filters.poisson.html#filters-poisson) only keeps points ina distance of the defined radius and culls the other points. A good radius would be the approcimate resolution of the final grid. However, the filter simply culls the points and so that they are subsequently not considered anymore. It has to be checked if this actually improves the result.

In [19]:
# Minimum distance between samples. [Default: 30.0]
sample_radius_floattext = widgets.BoundedFloatText(
    value=30.0,
    min=0.1,
    max=200.0,
    step=0.1,
    description='Radius:'
)

sample_filter_checkbox = widgets.Checkbox(
    value=False,
    description='Use Poisson resample filter')

display(sample_radius_floattext, sample_filter_checkbox)

BoundedFloatText(value=30.0, description='Radius:', max=200.0, min=0.1, step=0.1)

Checkbox(value=False, description='Use Poisson resample filter')

## Execute backscatter pipeline

Hint: The pipeline which is shown once the pipeline is constructed, can be saved with the extension `*.json` and kept as record for the data cleaning

In [20]:
build_backscatter_pipeline_button = widgets.Button(
    description='Build backscatter pipeline',
    button_style='info',
    tooltip='Build PDAL filter pipeline for backscatter',
    layout=widgets.Layout(width='20%'))

build_backscatter_pipeline_output = widgets.Output()

backscatter_pipeline = []

def on_build_backscatter_pipeline_button_clicked(button):
    EPT_reader = {
    "type": "readers.ept",
    "filename": Path(os.path.join(EPT_directory, EPT_name, 'ept.json')).as_posix()
    }

    PCD_writer = {
        "type": "writers.bpf",
        "coord_id": "auto",
        "filename": Path(os.path.join(EPT_directory, 'temp.bpf')).as_posix()
    }
    with build_backscatter_pipeline_output:
        clear_output()
        global backscatter_pipeline
        backscatter_pipeline = [EPT_reader]
        if O_M_filter_checkbox.value:
            O_M_filter = {
                "type":"filters.assign",
                "value": O_M_text.value
            }
            backscatter_pipeline.append(O_M_filter)
        if MAD_filter_checkbox.value:
            MAD_filter = {
                "type": "filters.mad",
                "dimension":"Amplitude",
                "k": MAD_k_floatslider.value
            }
            backscatter_pipeline.append(MAD_filter)
        if sample_filter_checkbox:
            sample_filter = {
                "type": "filters.sample",
                "radius": sample_radius_floattext.value
            }
            backscatter_pipeline.append(sample_filter)
        backscatter_pipeline.append(PCD_writer)
        print(json.dumps(backscatter_pipeline, indent=4))

build_backscatter_pipeline_button.on_click(on_build_backscatter_pipeline_button_clicked)

display(build_backscatter_pipeline_button, build_backscatter_pipeline_output)

Button(button_style='info', description='Build backscatter pipeline', layout=Layout(width='20%'), style=Button…

Output()

In [21]:
# 'Run backscatter pipeline' pipeline button 
run_backscatter_pipeline_button = widgets.Button(
    description='Run backscatter pipeline',
    button_style='info', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Run PDAL filter pipeline for backscatter',
    layout=widgets.Layout(width='20%'))

# Output of 'Run backscatter pipeline' pipeline button 
run_backscatter_pipeline_output = widgets.Output()

# Function to be executed on 'Run backscatter pipeline' pipeline button clicked
def on_run_backscatter_pipeline_button_clicked(button):
    with run_backscatter_pipeline_output:
        clear_output()
        # Save temporary JSON pipeline file for PDAL subprocess execution
        json.dump(
            backscatter_pipeline,
            open(Path(os.path.join(EPT_directory, 'backscatter_pipeline.json')).as_posix(), 'w'),
            indent=4
        )
        # Execute PDAL backscatter filter pipeline as Python subprocess
        print('Executing filter pipeline ...')
        run_subprocess(
            [
                'pdal',
                'pipeline',
                Path(os.path.join(EPT_directory, 'backscatter_pipeline.json')).as_posix(),
            ]
        )
        clear_output()
        # Build updated EPTs from intermediate LAS as Python subprocess
        # --force flag florces a new build instead of appending to the old
        print('Updating EPTs ..')
        run_subprocess(
            [
                'entwine',
                'build',
                '-i',
                Path(os.path.join(EPT_directory, 'temp.bpf')).as_posix(),
                '-o',
                Path(os.path.join(EPT_directory, EPT_name)).as_posix(),
                '--force'
            ]
        )
        print('Removing temporary files...')
        # Temporary files: JSON pipeline and intermediate LAS
        os.remove(Path(os.path.join(EPT_directory, 'backscatter_pipeline.json')).as_posix())
        os.remove(Path(os.path.join(EPT_directory, 'temp.bpf')).as_posix())
        clear_output()
        print('Finished PDAL filter pipeline for backscatter!')
        
# Connect 'Run backscatter pipeline' button and function to be executed on button click
run_backscatter_pipeline_button.on_click(on_run_backscatter_pipeline_button_clicked)

# Display widgets and ouput
display(
    run_backscatter_pipeline_button,
    run_backscatter_pipeline_output
)

Button(button_style='info', description='Run backscatter pipeline', layout=Layout(width='20%'), style=ButtonSt…

Output()

# Data export

In [22]:
# Grid resolution. [Default: 100.0]
grid_resolution_floattext = widgets.BoundedFloatText(
    value=100.0,
    min=0.1,
    max=500.0,
    step=0.1,
    description='Grid resolution:')

def on_grid_resolution_floattext_change(change):
    grid_radius_floattext.value = round(change['new']*np.sqrt(2), 1)

grid_resolution_floattext.observe(on_grid_resolution_floattext_change, 'value')

###############################################################################

# Grid radius. [Default: 141.4] --> Grid resolution * sqrt(2)
grid_radius_floattext = widgets.BoundedFloatText(
    value=round(100*sqrt(2), 1),
    min=0.1,
    max=1000.0,
    step=0.1,
    description='Radius:')

###############################################################################

# Power for inverse distance weighting algorithm. [Default: 1]
grid_IDW_power_intslider = widgets.IntSlider(
    value=1,
    min=1,
    max=6,
    step=1,
    description='IDW power:')

###############################################################################

# Grid window size. [Default: 0]
# Maximum horizontal or vertical distance that a donor cell may be in order to contribute to the subject cell 
# --> A window_size of 1 essentially creates a 3x3 array around the subject cell. 
# --> A window_size of 2 creates a 5x5 array, and so on.
grid_window_size_intslider = widgets.IntSlider(
    value=1,
    min=0,
    max=5,
    step=1,
    description='Window size:')

###############################################################################

# Dimension to be gridded. [default: 'Z']
grid_dimension_dropdown = widgets.Dropdown(
    options=['Z', 'Classification', 'Amplitude'],
    value='Z',
    description='Dimension:'
)
###############################################################################

# Name of the output grid [default: 'output']
# Grid is saved to the directory of the EPT data set
grid_name_text = widgets.Text(
    value='output',
    placeholder='output',
    description='Filename:',
    disabled=False
)
###############################################################################

# Whether outliers are rejected or gridded [default: True]
grid_reject_outliers_checkbox = widgets.Checkbox(
    value=True,
    description='Reject outliers (bathymetry only)')

###############################################################################

# Display of widgets
display(
    grid_name_text,
    grid_resolution_floattext,
    grid_radius_floattext,
    grid_IDW_power_intslider,
    grid_window_size_intslider,
    grid_dimension_dropdown,
    grid_reject_outliers_checkbox
)

Text(value='output', description='Filename:', placeholder='output')

BoundedFloatText(value=100.0, description='Grid resolution:', max=500.0, min=0.1, step=0.1)

BoundedFloatText(value=141.4, description='Radius:', max=1000.0, min=0.1, step=0.1)

IntSlider(value=1, description='IDW power:', max=6, min=1)

IntSlider(value=1, description='Window size:', max=5)

Dropdown(description='Dimension:', options=('Z', 'Classification', 'Amplitude'), value='Z')

Checkbox(value=True, description='Reject outliers (bathymetry only)')

In [23]:
export_data_button = widgets.Button(
    description='Export data',
    button_style='info',
    tooltip='Export data to final product',
    layout=widgets.Layout(width='20%')
)

export_data_output = widgets.Output()

def on_export_data_button_clicked(button):
    with export_data_output:
        clear_output()
        EPT_reader = {
            "type": "readers.ept",
            "filename": Path(os.path.join(EPT_directory, EPT_name, 'ept.json')).as_posix()
        }
        reject_outliers = {
            "type":"filters.range",
            "limits": "Classification[0:0]"
        }
        grid_writer = {
            "filename": Path(os.path.join(EPT_directory, grid_name_text.value + '.tif')).as_posix(),
            "resolution": grid_resolution_floattext.value,
            "radius": grid_radius_floattext.value,
            "power": grid_IDW_power_intslider.value,
            "window_size": grid_window_size_intslider.value,
            "dimension": grid_dimension_dropdown.value
        }
        export_pipeline = [EPT_reader]
        if grid_reject_outliers_checkbox.value:
             export_pipeline.append(reject_outliers)
        export_pipeline.append(grid_writer)
        print(json.dumps(export_pipeline, indent=4))
        json.dump(
            export_pipeline,
            open(Path(os.path.join(EPT_directory, 'export_pipeline.json')).as_posix(), 'w'),
            indent=4
        )
        # Execute PDAL bathymetry filter pipeline as Python subprocess
        print('Exporting data ...')
        run_subprocess(
            [
                'pdal',
                'pipeline',
                Path(os.path.join(EPT_directory, 'export_pipeline.json')).as_posix(),
            ]
        )
        clear_output()
        print('Removing temporary files...')
        # Temporary files: JSON export pipeline
        os.remove(Path(os.path.join(EPT_directory, 'export_pipeline.json')).as_posix())
        clear_output()
        print(f'Exported {grid_name_text.value}!')

export_data_button.on_click(on_export_data_button_clicked)

display(export_data_button, export_data_output)

Button(button_style='info', description='Export data', layout=Layout(width='20%'), style=ButtonStyle(), toolti…

Output()