Skip to content
Merged
354 changes: 8 additions & 346 deletions src/mdio/converters/segy.py

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/mdio/ingestion/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""MDIO ingestion helpers."""
77 changes: 77 additions & 0 deletions src/mdio/ingestion/coordinates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""Generic coordinate population for ingestion."""

from __future__ import annotations

from typing import TYPE_CHECKING

import numpy as np

from mdio.segy.scalar import SCALE_COORDINATE_KEYS
from mdio.segy.scalar import _apply_coordinate_scalar

if TYPE_CHECKING:
from segy.arrays import HeaderArray as SegyHeaderArray
from xarray import Dataset as xr_Dataset

from mdio.core.grid import Grid


def populate_dim_coordinates(
dataset: xr_Dataset, grid: Grid, drop_vars_delayed: list[str]
) -> tuple[xr_Dataset, list[str]]:
"""Populate the xarray dataset with dimension coordinate variables."""
for dim in grid.dims:
dataset[dim.name].values[:] = dim.coords
drop_vars_delayed.append(dim.name)
return dataset, drop_vars_delayed


def populate_non_dim_coordinates(
dataset: xr_Dataset,
grid: Grid,
coordinates: dict[str, SegyHeaderArray],
drop_vars_delayed: list[str],
spatial_coordinate_scalar: int,
) -> tuple[xr_Dataset, list[str]]:
"""Populate the xarray dataset with coordinate variables.

Memory optimization: Processes coordinates one at a time and explicitly
releases intermediate arrays to reduce peak memory usage.
"""
non_data_domain_dims = grid.dim_names[:-1]

coord_names = list(coordinates.keys())
for coord_name in coord_names:
coord_values = coordinates.pop(coord_name)
da_coord = dataset[coord_name]

coord_shape = da_coord.shape

fill_value = da_coord.encoding.get("_FillValue") or da_coord.encoding.get("fill_value")
if fill_value is None:
fill_value = np.nan
tmp_coord_values = np.full(coord_shape, fill_value, dtype=da_coord.dtype)

coord_axes = tuple(non_data_domain_dims.index(coord_dim) for coord_dim in da_coord.dims)
coord_slices = tuple(slice(None) if idx in coord_axes else 0 for idx in range(len(non_data_domain_dims)))

coord_trace_indices = np.asarray(grid.map[coord_slices])

not_null = coord_trace_indices != grid.map.fill_value

if not_null.any():
valid_indices = coord_trace_indices[not_null]
tmp_coord_values[not_null] = coord_values[valid_indices]

if coord_name in SCALE_COORDINATE_KEYS:
tmp_coord_values = _apply_coordinate_scalar(tmp_coord_values, spatial_coordinate_scalar)

dataset[coord_name][:] = tmp_coord_values
drop_vars_delayed.append(coord_name)

del tmp_coord_values, coord_trace_indices, not_null, coord_values

# TODO(Altay): Add verification of reduced coordinates being the same as the first
# https://github.com/TGSAI/mdio-python/issues/645

return dataset, drop_vars_delayed
69 changes: 69 additions & 0 deletions src/mdio/ingestion/grid_qc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Grid sparsity quality control for ingestion."""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

import numpy as np

from mdio.converters.exceptions import GridTraceSparsityError
from mdio.core.config import MDIOSettings

if TYPE_CHECKING:
from mdio.core.grid import Grid

logger = logging.getLogger(__name__)


def grid_density_qc(grid: Grid, num_traces: int) -> None:
"""Quality control for sensible grid density during SEG-Y to MDIO conversion.

This function checks the density of the proposed grid by comparing the total possible traces
(`grid_traces`) to the actual number of traces in the SEG-Y file (`num_traces`). A warning is
logged if the sparsity ratio (`grid_traces / num_traces`) exceeds a configurable threshold,
indicating potential inefficiency or misconfiguration.

The warning threshold is set via the environment variable `MDIO__GRID__SPARSITY_RATIO_WARN`
(default 2), and the error threshold via `MDIO__GRID__SPARSITY_RATIO_LIMIT` (default 10). To
suppress the exception (but still log warnings), set `MDIO_IGNORE_CHECKS=1`.

Args:
grid: The Grid instance to check.
num_traces: Expected number of traces from the SEG-Y file.

Raises:
GridTraceSparsityError: If the sparsity ratio exceeds `MDIO__GRID__SPARSITY_RATIO_LIMIT`
and `MDIO_IGNORE_CHECKS` is not set to a truthy value (e.g., "1", "true").
"""
settings = MDIOSettings()
grid_traces = np.prod(grid.shape[:-1], dtype=np.uint64)

sparsity_ratio = float("inf") if num_traces == 0 else grid_traces / num_traces

warning_ratio = settings.grid_sparsity_ratio_warn
error_ratio = settings.grid_sparsity_ratio_limit
ignore_checks = settings.ignore_checks

should_warn = sparsity_ratio > warning_ratio
should_error = sparsity_ratio > error_ratio and not ignore_checks

if not should_warn and not should_error:
return

dims = dict(zip(grid.dim_names, grid.shape, strict=True))
msg = (
f"Ingestion grid is sparse. Sparsity ratio: {sparsity_ratio:.2f}. "
f"Ingestion grid: {dims}. "
f"SEG-Y trace count: {num_traces}, grid trace count: {grid_traces}."
)
for dim_name in grid.dim_names:
dim_min = grid.get_min(dim_name)
dim_max = grid.get_max(dim_name)
msg += f"\n{dim_name} min: {dim_min} max: {dim_max}"

if should_warn:
logger.warning(msg)

if should_error:
raise GridTraceSparsityError(grid.shape, num_traces, msg)
18 changes: 18 additions & 0 deletions src/mdio/ingestion/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""Generic dataset metadata helpers for ingestion."""

from __future__ import annotations

from typing import TYPE_CHECKING
from typing import Any

if TYPE_CHECKING:
from mdio.builder.schemas import Dataset


def _add_grid_override_to_metadata(dataset: Dataset, grid_overrides: dict[str, Any] | None) -> None:
"""Add grid override to Dataset metadata if needed."""
if dataset.metadata.attributes is None:
dataset.metadata.attributes = {}

if grid_overrides is not None:
dataset.metadata.attributes["gridOverrides"] = grid_overrides
1 change: 1 addition & 0 deletions src/mdio/ingestion/segy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""SEG-Y specific ingestion helpers."""
141 changes: 141 additions & 0 deletions src/mdio/ingestion/segy/coordinates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""Coordinate extraction and unit resolution for SEG-Y ingestion."""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

import numpy as np
from segy.standards.codes import MeasurementSystem as SegyMeasurementSystem
from segy.standards.fields import binary as binary_header_fields

from mdio.builder.schemas.v1.units import AngleUnitEnum
from mdio.builder.schemas.v1.units import AngleUnitModel
from mdio.builder.schemas.v1.units import LengthUnitEnum
from mdio.builder.schemas.v1.units import LengthUnitModel
from mdio.ingestion.coordinates import populate_dim_coordinates
from mdio.ingestion.coordinates import populate_non_dim_coordinates

if TYPE_CHECKING:
from segy.arrays import HeaderArray as SegyHeaderArray
from xarray import Dataset as xr_Dataset

from mdio.builder.templates.base import AbstractDatasetTemplate
from mdio.core.dimension import Dimension
from mdio.core.grid import Grid
from mdio.segy.file import SegyFileInfo

logger = logging.getLogger(__name__)


MEASUREMENT_SYSTEM_KEY = binary_header_fields.Rev0.MEASUREMENT_SYSTEM_CODE.model.name
ANGLE_UNIT_KEYS = ["angle", "azimuth"]
SPATIAL_UNIT_KEYS = [
"cdp_x",
"cdp_y",
"source_coord_x",
"source_coord_y",
"group_coord_x",
"group_coord_y",
"offset",
]


def _get_coordinates(
grid: Grid,
segy_headers: SegyHeaderArray,
mdio_template: AbstractDatasetTemplate,
) -> tuple[list[Dimension], dict[str, SegyHeaderArray]]:
"""Get the data dim and non-dim coordinates from the SEG-Y headers and MDIO template.

Select a subset of the segy_dimensions that corresponds to the MDIO dimensions
The dimensions are ordered as in the MDIO template.
The last dimension is always the vertical domain dimension

Args:
grid: Inferred MDIO grid for SEG-Y file.
segy_headers: Headers read in from SEG-Y file.
mdio_template: The MDIO template to use for the conversion.

Returns:
A tuple containing:
- A list of dimension coordinates (1-D arrays).
- A dict of non-dimension coordinates (str: N-D arrays).
"""
dimensions_coords = [grid.select_dim(name) for name in mdio_template.dimension_names]
non_dim_coords = {name: np.array(segy_headers[name]) for name in mdio_template.coordinate_names}
return dimensions_coords, non_dim_coords


def _populate_coordinates(
dataset: xr_Dataset,
grid: Grid,
coords: dict[str, SegyHeaderArray],
spatial_coordinate_scalar: int,
) -> tuple[xr_Dataset, list[str]]:
"""Populate dim and non-dim coordinates in the xarray dataset and write to Zarr.

This will write the xr Dataset with coords and dimensions, but empty traces and headers.

Args:
dataset: The xarray dataset to populate.
grid: The grid object containing the grid map.
coords: The non-dim coordinates to populate.
spatial_coordinate_scalar: The X/Y coordinate scalar from the SEG-Y file.

Returns:
Xarray dataset with filled coordinates and updated variables to drop after writing
"""
drop_vars_delayed = []
dataset, drop_vars_delayed = populate_dim_coordinates(dataset, grid, drop_vars_delayed=drop_vars_delayed)

dataset, drop_vars_delayed = populate_non_dim_coordinates(
dataset,
grid,
coordinates=coords,
drop_vars_delayed=drop_vars_delayed,
spatial_coordinate_scalar=spatial_coordinate_scalar,
)

return dataset, drop_vars_delayed


def _get_spatial_coordinate_unit(segy_file_info: SegyFileInfo) -> LengthUnitModel | None:
"""Get the coordinate unit from the SEG-Y headers."""
measurement_system_code = int(segy_file_info.binary_header_dict[MEASUREMENT_SYSTEM_KEY])

if measurement_system_code not in {s.value for s in SegyMeasurementSystem}:
logger.warning(
"Unexpected value in coordinate unit (%s) header: %s. Can't extract coordinate unit and will "
"ingest without coordinate units.",
MEASUREMENT_SYSTEM_KEY,
measurement_system_code,
)
return None

if measurement_system_code == SegyMeasurementSystem.METERS:
unit = LengthUnitEnum.METER
if measurement_system_code == SegyMeasurementSystem.FEET:
unit = LengthUnitEnum.FOOT

return LengthUnitModel(length=unit)


def _update_template_units(template: AbstractDatasetTemplate, unit: LengthUnitModel | None) -> AbstractDatasetTemplate:
"""Update the template with dynamic and some pre-defined units."""
new_units = {key: AngleUnitModel(angle=AngleUnitEnum.DEGREES) for key in ANGLE_UNIT_KEYS}

if unit is None:
template.add_units(new_units)
return template

for key in SPATIAL_UNIT_KEYS:
current_value = template.get_unit_by_key(key)
if current_value is not None:
logger.warning("Unit for %s already in template. Will keep the original unit: %s", key, current_value)
continue

new_units[key] = unit

template.add_units(new_units)
return template
48 changes: 48 additions & 0 deletions src/mdio/ingestion/segy/file_headers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Attach SEG-Y text and binary file headers to the dataset."""

from __future__ import annotations

import base64
from typing import TYPE_CHECKING

from mdio.core.config import MDIOSettings

if TYPE_CHECKING:
from xarray import Dataset as xr_Dataset

from mdio.segy.file import SegyFileInfo


def _add_segy_file_headers(xr_dataset: xr_Dataset, segy_file_info: SegyFileInfo) -> xr_Dataset:
"""Attach the SEG-Y text and binary file headers as attrs on a scalar variable."""
settings = MDIOSettings()

if not settings.save_segy_file_header:
return xr_dataset

expected_rows = 40
expected_cols = 80

text_header_rows = segy_file_info.text_header.splitlines()
text_header_cols_bad = [len(row) != expected_cols for row in text_header_rows]

if len(text_header_rows) != expected_rows:
err = f"Invalid text header count: expected {expected_rows}, got {len(segy_file_info.text_header)}"
raise ValueError(err)

if any(text_header_cols_bad):
err = f"Invalid text header columns: expected {expected_cols} per line."
raise ValueError(err)

xr_dataset["segy_file_header"] = ((), "")
xr_dataset["segy_file_header"].attrs.update(
{
"textHeader": segy_file_info.text_header,
"binaryHeader": segy_file_info.binary_header_dict,
}
)
if settings.raw_headers:
raw_binary_base64 = base64.b64encode(segy_file_info.raw_binary_headers).decode("ascii")
xr_dataset["segy_file_header"].attrs.update({"rawBinaryHeader": raw_binary_base64})

return xr_dataset
Loading
Loading