In [None]:
# Tests for methods that process horizon surface (filtering, creating carcass, holes, etc.)
import os
import sys
import warnings
import numpy as np
from cv2 import dilate

warnings.filterwarnings('ignore')

sys.path.insert(0, '../../../seismiqb')

from seismiqb import Field, Horizon, plot

In [None]:
""" You can manage cube and horizon for the test:

CUBE_PATH : str
    Path to an existed seismic cube.
HORIZON_PATH : str
    Path to an existed seismic horizon.
"""
# Tests parameters
OUTPUT_DIR = './horizon_test_files'

CUBE_PATH = os.path.join(OUTPUT_DIR, 'test_cube.sgy')
HORIZON_PATH = os.path.join(OUTPUT_DIR, 'test_horizon')

SEED = 42

# Visualization parameters
SCALE = 1
SHOW_FIGURES = True

In [None]:
%%time
field = Field(CUBE_PATH)
horizon = Horizon(HORIZON_PATH, field=field)

horizon.show(show=SHOW_FIGURES)

# Filtering

In [None]:
%%time
# from_subset
# Cut horizon by i_line and check its filtered and non-filtered parts
# Create filtering matrix
i_line_cut = horizon.shape[0] // 3

filtering_matrix = np.zeros_like(horizon.full_matrix)
filtering_matrix[i_line_cut:] = 1

# Horizon filtered by matrix
filtered_horizon = Horizon(storage=horizon.full_matrix, field=field, name='filtered')
filtered_horizon = filtered_horizon.from_subset(filtering_matrix)

filtered_horizon.show(show=SHOW_FIGURES)

not_empty_traces_error = "`from_subset` filtering test failed: filtered traces are not empty"
assert np.all(filtered_horizon.full_matrix[:i_line_cut] == filtered_horizon.FILL_VALUE), not_empty_traces_error

empty_traces_error = "`from_subset` filtering test failed: non-filtered traces was changed by filtering"
assert np.array_equal(filtered_horizon.full_matrix[i_line_cut:],
                      horizon.full_matrix[i_line_cut:]), empty_traces_error

In [None]:
%%time
# filter
filtered_horizon = Horizon(horizon.full_matrix, field=field, name='filtered')
filtered_horizon.filter(inplace=True)

filtered_horizon.show(show=SHOW_FIGURES)

# Check that dead traces filled by FILL_VALUE
dead_traces = filtered_horizon.full_matrix[field.dead_traces_matrix == 1]

not_empty_traces_error = "`filter` test failed: filtered traces are not empty"
assert np.all(dead_traces == filtered_horizon.FILL_VALUE), not_empty_traces_error

# Check that other traces didn't change
nonzero_traces = filtered_horizon.full_matrix[field.dead_traces_matrix == 0]
original_nonzero_traces = horizon.full_matrix[field.dead_traces_matrix == 0]

empty_traces_error = "`filter` test failed: non-filtered traces were changed by filtering"
assert np.array_equal(nonzero_traces, original_nonzero_traces), empty_traces_error

In [None]:
%%time
# filter
# Cut horizon by i_line and check its filtered and non-filtered parts
i_line_cut = horizon.shape[0] // 3

filtered_horizon = Horizon(horizon.full_matrix, field=field, name='filtered')

filtering_matrix = np.zeros_like(horizon.full_matrix)
filtering_matrix[:i_line_cut] = 1

filtered_horizon.filter(filtering_matrix, inplace=True)

filtered_horizon.show(show=SHOW_FIGURES)

not_empty_traces_error = "`filter` method test failed: filtered traces are not empty"
assert np.all(filtered_horizon.full_matrix[:i_line_cut] == filtered_horizon.FILL_VALUE), not_empty_traces_error

empty_traces_error = "`filter` method test failed: non-filtered traces was changed by filtering"
assert np.array_equal(filtered_horizon.full_matrix[i_line_cut:],
                      horizon.full_matrix[i_line_cut:]), empty_traces_error

In [None]:
%%time
# Filter spikes with median_diff, gradient and spikes maps
# Create constant horizon with a spike
spike_point = (horizon.shape[0] // 2, horizon.shape[1] // 2)

horizon_matrix = np.ones_like(horizon.full_matrix) * horizon.field.shape[-1]//2
horizon_matrix[spike_point] += 5

horizon_with_spikes = Horizon(horizon_matrix, field=field, name='constant_with_spike')
horizon_with_spikes.filter(inplace=True)

# Get mask of areas with spike cut
dilation_iterations = 1

spikes_mask = np.zeros(shape=horizon_with_spikes.full_matrix.shape, dtype=np.float32)
spikes_mask[spike_point] = 1            
spikes_mask = dilate(spikes_mask, kernel=np.ones((3, 3), np.uint8), iterations=dilation_iterations)

spikes_mask = spikes_mask > 0

spikes = {
    'median': horizon_with_spikes.get_median_diff_map,
    'gradient': horizon_with_spikes.get_gradient_map,
    'spikes': horizon_with_spikes.get_spikes_mask
}

# Check despiking
for mode, get_spikes_method in spikes.items():
    spikes_map = get_spikes_method(dilation_iterations=dilation_iterations)
    spikes_map = spikes_map > 0
    
    despiked_horizon = horizon_with_spikes.filter(spikes_map, inplace=False)

    not_empty_traces_error = f"Despike with mode '{mode}' test failed: filtered traces with spike are not empty"
    assert np.all(despiked_horizon.full_matrix[spikes_mask] == despiked_horizon.FILL_VALUE), not_empty_traces_error

    empty_traces_error = f"Despike with mode '{mode}' test failed: despike changes traces without spikes"
    assert np.array_equal(despiked_horizon.full_matrix[~spikes_mask], horizon_with_spikes.full_matrix[~spikes_mask]), empty_traces_error

    print(f"Despiking mode '{mode}' was successfully tested")

In [None]:
%%time
# filter_disconnected_regions
# Split horizon in two parts by i_line_cut
i_line_cut = horizon.shape[0]//3
filtering_matrix = np.zeros_like(horizon.full_matrix)
filtering_matrix[i_line_cut, :] = 1

splitted_horizon = Horizon(horizon.full_matrix, field=field, name='splitted')
splitted_horizon.filter(filtering_matrix, inplace=True)

splitted_horizon.filter_disconnected_regions(inplace=True)

not_empty_traces_error = f"`filter_disconnected_regions` test failed: disconnected region wasn't filtered"
assert np.all(splitted_horizon.full_matrix[:i_line_cut+1]==splitted_horizon.FILL_VALUE), not_empty_traces_error

empty_traces_error = f"`filter_disconnected_regions` test failed: wrong traces were filtered"
assert np.array_equal(splitted_horizon.full_matrix[i_line_cut+1:],  horizon.full_matrix[i_line_cut+1:]), empty_traces_error

# Specific manipulations

In [None]:
horizon = Horizon(horizon.full_matrix, field=field, name='filtered')
horizon.filter(inplace=True)

frequency = 100

def calculate_grid_coverage(horizon, frequencies=100, width=1, **kwargs):
    """ Approximate calculation of coverage of regular grid.

    horizon : :class:`Horizon`
        Seismic horizon instance.
    frequencies : int or sequence of two integers
        (iline, xline) grid frequencies.
        If int, then ilines and xlines frequencies are equal.
    width : int or sequence of two integers
        (iline, xline) grid width.
        If int, then ilines and xlines grid widths are equal.
    """
    if isinstance(frequencies, int):
        frequencies = (frequencies, frequencies)

    i_min = kwargs.get('i_min', horizon.i_min)
    x_min = kwargs.get('x_min', horizon.x_min)

    amount_of_ilines = (horizon.i_length - i_min) // frequencies[0]
    amount_of_xlines = (horizon.x_length - x_min) // frequencies[1]

    # On borders we have a line with half of grid width
    if (horizon.i_length - i_min) % frequencies[0] > 0:
        amount_of_ilines += 0.5
    else:
        amount_of_ilines -= 0.5

    if (horizon.x_length - x_min) % frequencies[1] > 0:
        amount_of_xlines += 0.5
    else:
        amount_of_xlines -= 0.5

    # Count amount of traces for grid with width = 1
    intersection_traces = amount_of_ilines * amount_of_xlines
    grid_traces = amount_of_ilines * horizon.x_length + amount_of_xlines * horizon.i_length

    # Apply width
    intersection_traces = width * width * intersection_traces
    grid_traces = width * grid_traces
    grid_traces -= intersection_traces

    # Calculate approximate amount of traces in grid in horizon area
    coverage = grid_traces / horizon.size
    return coverage

## Carcass

In [None]:
%%time
carcass = horizon.make_carcass(margin=0, frequencies=frequency, regular=True, interpolate=True)

carcass.show(show=SHOW_FIGURES, load_kwargs={'enlarge': True})

approximate_coverage = calculate_grid_coverage(horizon=horizon, frequencies=frequency, width=3, i_min=0, x_min=0)

assert np.isclose(carcass.coverage, approximate_coverage, atol=2e-3), "`make_carcass` test failed: resulted coverage is not expected"

## thin_out

In [None]:
%%time
thined_horizon = Horizon(horizon.full_matrix, field=field, name='thined')
thined_horizon.thin_out(factor=(frequency, frequency), threshold=np.min(horizon.shape)//10, inplace=True)

thined_horizon.show(show=SHOW_FIGURES, load_kwargs={'enlarge': True})

approximate_coverage = calculate_grid_coverage(horizon=horizon, frequencies=frequency, width=1,
                                               i_min=horizon.i_min, x_min=horizon.x_min)

assert np.isclose(thined_horizon.coverage, approximate_coverage, atol=2e-3), "`thin_out` test failed: resulted coverage is not expected"

# Interpolate

In [None]:
%%time
interpolated_horizon = Horizon(thined_horizon.full_matrix, field=field, name='interpolated')
interpolated_horizon.interpolate(inplace=True)

interpolated_horizon.show(show=SHOW_FIGURES, load_kwargs={'enlarge': True})

approximate_coverage = calculate_grid_coverage(horizon=horizon, frequencies=frequency, width=3,
                                               i_min=horizon.i_min, x_min=horizon.x_min)

assert np.isclose(interpolated_horizon.coverage, approximate_coverage, atol=2e-3), "`interpolate` test failed: resulted coverage is not expected"

# Existed traces mustn't be changed
nonbad_traces_mask = thined_horizon.full_matrix != thined_horizon.FILL_VALUE
nonbad_traces_thined = thined_horizon.full_matrix[nonbad_traces_mask]
nonbad_traces_interpolated = interpolated_horizon.full_matrix[nonbad_traces_mask]

assert np.array_equal(
    nonbad_traces_thined, nonbad_traces_interpolated, equal_nan=False
), "interpolate test failed: surface values were changed"

## Holes

In [None]:
%%time
filtering_matrix = horizon.generate_holes_matrix(seed=SEED)
filtering_matrix = horizon.matrix_put_on_full(filtering_matrix)

horizon_with_holes = Horizon(horizon.full_matrix, field=field, name='holed')
horizon_with_holes.filter(filtering_matrix, inplace=True)

if SHOW_FIGURES:
    plot(filtering_matrix, cmap='viridis', scale=SCALE, title='Holes matrix')
    horizon_with_holes.show(scale=SCALE)

assert (horizon_with_holes.coverage < horizon.coverage), "`generate_holes_matrix` test failed: no traces was filtered"