# Debugging mdio

In this notebook we will configure an environment that is useful for profiling and debugging mdio segy ingestion and export:

- environment
- SEGY generation
- segy to mdio
- mdio to segy

Memory issues:
https://distributed.dask.org/en/stable/worker-memory.html#memory-not-released-back-to-the-os




## Environment

First configure environment

In [None]:
import sys
# Make sure ipykernel is installed
!{sys.executable} -m pip install ipykernel
# Install QC tools
!{sys.executable} -m pip install matplotlib pandas dask_memusage memray
# Make sure mdio is installed
!poetry install --extras "distributed"




After the previous cell is run the kernel needs to be restarted so the module gets picked up. Failure to do so will result in the following cell to fail with the error:  `ModuleNotFoundError: No module named 'mdio'`

In [None]:
from mdio import mdio_to_segy, MDIOReader
#import dask.array as dask
import dask
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")
from dask.diagnostics import ProgressBar
import time
import os
from dask.distributed import LocalCluster, Client

### Setup dask cluster


For dask applications the flow can use dask_memusage which is a much simpler profiler based on polling.  memray seems to be a recent and significant improvement.

In [None]:

import dask_memusage
import pandas as pd


tmp_path = "/scratch/tmp2/"
MY_TEMP = tmp_path

dask.config.set({"temporary_directory": os.path.join(MY_TEMP, "temp")})

dask.config.set({"distributed.comm.timeouts.tcp": "90s"})
dask.config.set({"distributed.comm.timeouts.connect": "60s"})

num_cut_dask_workers = 2  
memory_cut_dask_worker = 60  

gb = 1024**3

use_dask = True
single_process = False

if use_dask:
    print(
        f"New local cluster. n_workers {num_cut_dask_workers} mem_limit = {memory_cut_dask_worker} Gb"
    )
    with dask.config.set({"distributed.scheduler.worker-saturation": 1.0}):
        if single_process:
            client = Client(processes=False) 
        else:
            cluster = LocalCluster(
                n_workers=num_cut_dask_workers,
                threads_per_worker=1,
                memory_limit=memory_cut_dask_worker * gb,
            )

            client = Client(cluster)
else:
    client = None

### Setup monitoring dashboard for dask

The dask dashboard should automatically be setup on http://127.0.0.1:8787/status.  The [configuration for the dev container](../.devcontainer/devcontainer.json) should have the port forwarding setup for this port enabling this to be viewed.  The following cell will also give a summary of the client.




In [None]:
client


#### Check python and mdio versions

In [None]:
import sys
print(f"Python version: {sys.version}")
print(f"Python path: {sys.executable}")
import mdio
print(f"mdio version: {mdio.__version__}")

## SEGY generation

#### Functions to generate segy files based on tests

In [None]:
"""Test configuration before everything runs."""


from __future__ import annotations

import os

import numpy as np
import pytest
import segyio

from mdio.segy.geometry import StreamerShotGeometryType
def create_segy_mock_6d(
    fake_segy_tmp: str,
    num_samples: int,
    shots: list,
    cables: list,
    receivers_per_cable: list,
    shot_lines: list = [  # noqa:  B006
        1,
    ],
    comp_types: list = [  # noqa:  B006
        1,
    ],
    chan_header_type: StreamerShotGeometryType = StreamerShotGeometryType.A,
    index_receivers: bool = True,
) -> str:
    """Dummy 6D SEG-Y file for use in tests.

    Data will be created with:

    offset is byte location 37 - offset 4 bytes
    fldr is byte location 9 - shot 4 byte
    ep is byte location 17 - shot 4 byte
    stae is byte location 137 - cable 2 byte
    tracf is byte location 13 - channel 4 byte
    styp is byte location 133 - shot_line 2 byte
    afilf is byte location 141 - comptype 2 byte

    """
    spec = segyio.spec()
    segy_file = fake_segy_tmp

    shot_count = len(shots)
    total_chan = np.sum(receivers_per_cable)
    trace_count_per_line = shot_count * total_chan
    sline_count = len(shot_lines)
    comp_trace_count = trace_count_per_line * sline_count
    comp_count = len(comp_types)
    trace_count = comp_trace_count * comp_count

    spec.format = 1
    spec.samples = range(num_samples)
    spec.tracecount = trace_count
    spec.endian = "big"

    # Calculate shot, cable, channel/receiver numbers and header values
    cable_headers = []
    channel_headers = []

    # TODO: Add strict=True and remove noqa when minimum Python is 3.10
    for cable, num_rec in zip(cables, receivers_per_cable):  # noqa: B905
        cable_headers.append(np.repeat(cable, num_rec))

        channel_headers.append(np.arange(num_rec) + 1)

    cable_headers = np.hstack(cable_headers)
    channel_headers = np.hstack(channel_headers)

    if chan_header_type == StreamerShotGeometryType.B:
        channel_headers = np.arange(total_chan) + 1

    index_receivers = True
    if chan_header_type == StreamerShotGeometryType.C:
        index_receivers = False

    shot_headers = np.hstack([np.repeat(shot, total_chan) for shot in shots])
    cable_headers = np.tile(cable_headers, shot_count)
    channel_headers = np.tile(channel_headers, shot_count)

    # Add shot lines
    shot_line_headers = np.hstack(
        [np.repeat(shot_line, trace_count_per_line) for shot_line in shot_lines]
    )

    shot_headers = np.tile(shot_headers, sline_count)
    cable_headers = np.tile(cable_headers, sline_count)
    channel_headers = np.tile(channel_headers, sline_count)

    # Add multiple components
    comptype_headers = np.hstack(
        [np.repeat(comp, comp_trace_count) for comp in comp_types]
    )

    shot_line_headers = np.tile(shot_line_headers, comp_count)
    shot_headers = np.tile(shot_headers, comp_count)
    cable_headers = np.tile(cable_headers, comp_count)
    channel_headers = np.tile(channel_headers, comp_count)

    with segyio.create(segy_file, spec) as f:
        for trc_idx in range(trace_count):
            shot = shot_headers[trc_idx]
            cable = cable_headers[trc_idx]
            channel = channel_headers[trc_idx]
            shot_line = shot_line_headers[trc_idx]
            comptype = comptype_headers[trc_idx]

            # offset is byte location 37 - offset 4 bytes
            # fldr is byte location 9 - shot 4 byte
            # ep is byte location 17 - shot 4 byte
            # stae is byte location 137 - cable 2 byte
            # tracf is byte location 13 - channel 4 byte
            # styp is byte location 133 - shot_line 2 byte
            # afilf is byte location 141 - comptype 2 byte

            if index_receivers:
                f.header[trc_idx].update(
                    offset=0,
                    fldr=shot,
                    ep=shot,
                    stae=cable,
                    tracf=channel,
                    styp=shot_line,
                    afilf=comptype,
                )
            else:
                f.header[trc_idx].update(
                    offset=0,
                    fldr=shot,
                    ep=shot,
                    stae=cable,
                    styp=shot_line,
                    afilf=comptype,
                )

            samples = np.linspace(start=shot, stop=shot + 1, num=num_samples)
            f.trace[trc_idx] = samples.astype("float32")

        f.bin.update()

    return segy_file

def segy_mock_6d_shots(segy_path: str) -> dict[str, str]:
    """Generate mock 6D shot SEG-Y files."""
    num_samples = 25
    shots = [2, 3, 5]
    cables = [0, 101, 201, 301]
    receivers_per_cable = [1, 5, 7, 5]
    shot_lines = [1, 2, 4, 5, 99]
    comp_types = [1, 2, 3, 4]

    
    chan_header_type =     StreamerShotGeometryType.A,
    
    segy_path = create_segy_mock_6d(
        segy_path,
        num_samples=num_samples,
        shots=shots,
        cables=cables,
        receivers_per_cable=receivers_per_cable,
        chan_header_type=chan_header_type,
        shot_lines=shot_lines,
        comp_types=comp_types,
    )
    return segy_path

def segy_mock_4d_shots(segy_path: str) -> dict[str, str]:
    """Generate mock 4D shot SEG-Y files."""
    num_samples = 25
    shots = [2, 3, 5]
    cables = [0, 101, 201, 301]
    receivers_per_cable = [1, 5, 7, 5]
    shot_lines = [1,]
    comp_types = [1,]

    
    chan_header_type =     StreamerShotGeometryType.A,
    
    segy_path = create_segy_mock_6d(
        segy_path,
        num_samples=num_samples,
        shots=shots,
        cables=cables,
        receivers_per_cable=receivers_per_cable,
        chan_header_type=chan_header_type,
        shot_lines=shot_lines,
        comp_types=comp_types,
    )
    return segy_path

def segy_mock_4d_shots_large(segy_path: str, num_shots:int=100) -> dict[str, str]:
    """Generate mock 4D shot SEG-Y files at a reasonable scale."""
    num_samples = 4000
    num_cables = 12
    num_receivers_per_cable = 250 
    shots = range(num_shots)
    cables = range(num_cables)
    receivers_per_cable = [num_receivers_per_cable,]*num_cables
    shot_lines = [1,]
    comp_types = [1,]

    
    chan_header_type =     StreamerShotGeometryType.A,
    
    segy_path = create_segy_mock_6d(
        segy_path,
        num_samples=num_samples,
        shots=shots,
        cables=cables,
        receivers_per_cable=receivers_per_cable,
        chan_header_type=chan_header_type,
        shot_lines=shot_lines,
        comp_types=comp_types,
    )
    return segy_path

#### segy config

In [None]:
dims = 4
large_segy = True
num_shots = 1000

if dims == 6:
    index_header_names = ("comptype", "shot_line","shot_point", "cable", "channel")
    index_types = ("int16", "int16", "int32", "int16", "int32")
    index_bytes= (141, 133, 17, 137, 13)
    chunksize = (1, 2, 4, 2, 128, 1024)
    grid_overrides = {"AutoChannelWrap": True}
    num_shots = 3
    segy_path = os.path.join(tmp_path, f"segy_{dims}d_{num_shots}.sgy")
    print(segy_path)
    access_pattern="012345"
elif dims == 4:
    index_header_names = ("shot_point", "cable", "channel")
    index_types = ("int32", "int16", "int32")
    index_bytes= ( 17, 137, 13)
    chunksize = (4, 2, 128, 1024)
    grid_overrides = {"AutoChannelWrap": True}
    if large_segy:
        segy_path = os.path.join(tmp_path, f"segy_{dims}d_{num_shots}.sgy")
    
    else:
        num_shots = 3
        segy_path = os.path.join(tmp_path, f"segy_{dims}d_{num_shots}.sgy")
    access_pattern="0123"
    
print(segy_path)


#### Create SEGY

In [None]:

if dims == 6:
    segy_path = segy_mock_6d_shots(segy_path)
elif dims == 4:
    if large_segy:
        segy_path = segy_mock_4d_shots_large(segy_path, num_shots=num_shots)
    else:
        segy_path = segy_mock_4d_shots(segy_path)
    
print(segy_path)

## Ingest segy to mdio

#### Config

In [None]:
mdio_path = os.path.join(tmp_path, f"segy_{dims}d_import_{num_shots}.mdio")
kwargs = {
    'segy_path': segy_path,
    'mdio_path_or_buffer': mdio_path,
    'index_names': index_header_names,
    'index_bytes': index_bytes,
    'index_types': index_types,
    'chunksize': chunksize,  # (1, chunksize_2d, -1),
    'overwrite': True
}
if grid_overrides is not None:
    kwargs['grid_overrides'] = grid_overrides
kwargs

#### Actual segy to mdio conversion based on config

In [None]:
%%time
mdio.segy_to_mdio(**kwargs)

#### QC of generated mdio file

In [None]:

def info(
    input_mdio_file,
    output_format="plain",
    access_pattern="012",
):
    """Provide information on MDIO dataset.
    By default this returns human readable information about the grid and stats for
    the dataset. If output-format is set to json then a json is returned to
    facilitate parsing.
    """
    reader = mdio.MDIOReader(
        input_mdio_file, access_pattern=access_pattern, return_metadata=True
    )
    mdio_dict = {}
    mdio_dict["grid"] = {}
    for axis in reader.grid.dim_names:
        dim = reader.grid.select_dim(axis)
        min = dim.coords[0]
        max = dim.coords[-1]
        size = dim.coords.shape[0]
        axis_dict = {"name": axis, "min": min, "max": max, "size": size}
        mdio_dict["grid"][axis] = axis_dict

    if output_format == "plain":
        print("{:<10} {:<10} {:<10} {:<10}".format("NAME", "MIN", "MAX", "SIZE"))
        print("=" * 40)

        for _, axis_dict in mdio_dict["grid"].items():
            print(
                "{:<10} {:<10} {:<10} {:<10}".format(
                    axis_dict["name"],
                    axis_dict["min"],
                    axis_dict["max"],
                    axis_dict["size"],
                )
            )

        print("\n\n{:<10} {:<10}".format("STAT", "VALUE"))
        print("=" * 20)
        for name, stat in reader.stats.items():
            print(f"{name:<10} {stat:<10}")
    if output_format == "json":
        mdio_dict["stats"] = reader.stats
        print(mdio_dict)

In [None]:
info(
    mdio_path,
    output_format="plain",
    access_pattern=access_pattern,
)

In [None]:

reader = mdio.MDIOReader(
    mdio_path, access_pattern=access_pattern, return_metadata=True
)
comp_dim = reader.grid.select_dim(index_header_names[0])

print(f"comp_dim: {comp_dim} for {reader}")

## SEGY export (cut)

#### First declare functions

In [None]:
%load_ext memray

import csv
import psutil
import matplotlib.pyplot as plt

from distributed.diagnostics import MemorySampler
from distributed.diagnostics.memray import memray_workers


def processing_time(end, start):
    return (end - start) / 60


def file_size(file):
    import os

    filesize = os.path.getsize(file)
    return filesize


def make_folders(folder_path):
    import os

    msg = "Folder already exists"
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
        msg = "Folders created"
    return msg

def create_segy(mdio_source, temp_local_destination, client, selection_mask=None, access_pattern="0123"):
    start = time.perf_counter()
    access_pattern = "0123"
    print("Started_conv")

    _ = psutil.cpu_percent(interval=None, percpu=True)
    mdio_to_segy(
        mdio_source,
        temp_local_destination,
        selection_mask=selection_mask,
        access_pattern=access_pattern,
        client=client,
    )
    mdio_to_segy_time = time.perf_counter()
    cpu_mdio_to_segy = psutil.cpu_percent(interval=None, percpu=True)
    max_cpu_mdio_to_sgy = max(cpu_mdio_to_segy)
    min_cpu_usage = min(cpu_mdio_to_segy)
    cpu_usage_avg = np.mean(np.array(cpu_mdio_to_segy))
    print("cpu_usage_mdio_to_segy_max", max_cpu_mdio_to_sgy)
    mem_usage_mdio_to_sgy = int(
        psutil.virtual_memory().total - psutil.virtual_memory().available
    )
    return (
        max_cpu_mdio_to_sgy,
        min_cpu_usage,
        cpu_usage_avg,
        mem_usage_mdio_to_sgy,
        processing_time(mdio_to_segy_time, start),
    )


def get_max_mem_from_csv(filename: str):
    """Find maximum memory usage from a dask_memusage memory sampler profiler."""
    print(f"mem_file = {filename}")
    try:
        mem_df = pd.read_csv(filename)
        max_dask_task_memory = int(mem_df["max_memory_mb"].max() * (1024**2))
    except:
        max_mem_array = []
        task_name_array = []
        with open(filename) as fp:
            Lines = fp.readlines()
            for line in Lines:
                csv = line.split(',')
                if len(csv) > 4:
                    max_mem = csv[-1]
                    task_name = csv[0]
                    try:
                        my_mm = float(max_mem)
                        max_mem_array.append(my_mm)
                        task_name_array.append(task_name)
                    except:
                        continue
        max_index = max_mem_array.index(max(max_mem_array))
        print(f"max_index={max_index} max_mem={max_mem_array[max_index]}MB max_task_name = {task_name_array[max_index]}")

        max_dask_task_memory = int(max_mem_array[max_index] * (1024**2))
        return max_dask_task_memory
    
def get_large_mem_fns_from_csv(filename: str, thresh=50.):
    """Find functions with a large from a dask_memusage memory sampler profiler."""
    print(f"mem_file = {filename}")
    task_name_array = []
    with open(filename) as fp:
        Lines = fp.readlines()
        for line in Lines:
            csv = line.split(',')
            if len(csv) > 2:
                max_mem = csv[-1]
                task_name = csv[0]
                try:
                    my_mm = float(max_mem)
                    if my_mm > thresh:
                        task_name_array.append(task_name)
                except:
                    continue
    return list(set(task_name_array))

        
    
def plot_function_mem_from_csv(filename: str, fn_name: str):
    """Plot memory usage for a single function
    
    Inputs
    ------
    filename: str   csv from dask_memusage memory sampler profiler
    fn_name: str    name of function to track and plot"""
    print(f"mem_file = {filename}")
    mem_array_1 = []
    mem_array_2 = []
    with open(filename) as fp:
        Lines = fp.readlines()
        for line in Lines:
            csv = line.split(',')
            if len(csv) > 4:
                max_mem = csv[-1]
                max_mem_2 = csv[-2]
                task_name = csv[0]
                try:
                    if fn_name in task_name:
                        my_mm = float(max_mem)
                        mem_array_1.append(my_mm)
                        my_mm = float(max_mem_2)
                        mem_array_2.append(my_mm)
                except:
                    continue
    if len(mem_array_1) > 1:
        plt.figure()
        plt.plot(mem_array_1, label="Total")
        plt.plot(mem_array_2, label="Proc")
        plt.title(f"{fn_name}")
        plt.xlabel("Occurrence")
        plt.ylabel("Memory")
        plt.show
        plt.savefig(f'{filename}_{fn_name}.png')
    elif len(mem_array_1) == 1:
        print(f"{fn_name}  used {mem_array_1[0]}mb memory")
    else:
        print(f"Had issue reading {fn_name} memory usage")
    return mem_array_1

def cut(input_mdio: str, run=0, access_pattern="0123", client=None, test_name="6372"):
    """Cuts segy from mdio  with memory QC"""
    with open(
        os.path.join(MY_TEMP, test_name+"_metrics_export.csv"), "a+", newline=""
    ) as write_obj:
        csv_writer = csv.writer(write_obj)
        csv_writer.writerow(
            [
                "chunk_case",
                "file_size",
                "reader_shape",
                "time",
                "cpu_usage_max",
                "cpu_usage_min",
                "cpu_usage_avg",
                "mem_usage",
                "run",
            ]
        )


    print("Converting Multidimio to Segy via Local Dask")

    TEMP_DESTINATION = os.path.join(MY_TEMP, test_name+".sgy")

    print("TEMP_DESTINATION is:", TEMP_DESTINATION)
    # Set to true to use dask_memusage to track memory usage
    track_memusage = False
    # Set to true to use memray to track memory usage
    use_memray = False
    # Flag to create dask cluster
    use_dask = False
    if client is not None:
        use_dask = True

        if use_dask:
            print(
                f"New local cluster. n_workers {num_cut_dask_workers} mem_limit = {memory_cut_dask_worker} Gb"
            )
            with dask.config.set({"distributed.scheduler.worker-saturation": 1.0}):
                cluster = LocalCluster(
                    n_workers=num_cut_dask_workers,
                    threads_per_worker=1,
                    memory_limit=memory_cut_dask_worker * gb,
                )

                client = Client(cluster)
            if track_memusage:
                mem_file = os.path.join(os.getcwd(), f"{test_name}_ram_usage.csv")
                dask_memusage.install(client.cluster.scheduler, mem_file)
        else:
            track_memusage = False
            client = None

        if track_memusage:
                    
            if use_memray:
                with memray_workers(f"memray_{test_name}", report_args=('flamegraph', '--temporal')):
                    ms = MemorySampler()

                    with ms.sample(test_name):
                        (
                            cpu_usage_max,
                            cpu_usage_min,
                            cpu_usage_avg,
                            mem_usage,
                            time_taken,
                        ) = create_segy(input_mdio, TEMP_DESTINATION, client)
            else:
                with ms.sample(test_name):
                        (
                            cpu_usage_max,
                            cpu_usage_min,
                            cpu_usage_avg,
                            mem_usage,
                            time_taken,
                        ) = create_segy(input_mdio, TEMP_DESTINATION, client)
        else:
            (
                cpu_usage_max,
                cpu_usage_min,
                cpu_usage_avg,
                mem_usage,
                time_taken,
            ) = create_segy(input_mdio, TEMP_DESTINATION, client)

    if client is not None:
        # mem_file = os.path.join(os.getcwd(), test_name + "_ram_usage.csv")
        mem_file = os.path.join(MY_TEMP, test_name + "_ram_usage.csv")
        
        dask_memusage.install(client.cluster.scheduler, mem_file)
        ms = MemorySampler()

        with ms.sample(test_name):
            (
                cpu_usage_max,
                cpu_usage_min,
                cpu_usage_avg,
                mem_usage,
                time_taken,
            ) = create_segy(input_mdio, TEMP_DESTINATION, client, access_pattern=access_pattern)
        fig = plt.figure()
        memory_plot = ms.plot(align=True)
        fig = memory_plot.get_figure()
        fig.savefig(test_name + "_.jpg", bbox_inches="tight")

    else:
        (
            cpu_usage_max,
            cpu_usage_min,
            cpu_usage_avg,
            mem_usage,
            time_taken,
        ) = create_segy(input_mdio, TEMP_DESTINATION, client, access_pattern=access_pattern)

    print("cut completed")

    file_size = os.path.getsize(TEMP_DESTINATION) / (1024**3)
    print(f"mdio to segy completed in {time_taken}")

    reader = MDIOReader(
        mdio_path_or_buffer=input_mdio, backend="dask", access_pattern=access_pattern,
    )
    
    if client is not None:
        # mem_df = pd.read_csv(mem_file)
        # max_dask_task_memory = int(mem_df["max_memory_mb"].max() * (1024**2))

        max_dask_task_memory = get_max_mem_from_csv(mem_file)
        # Find all functions that use a significant amount of memory
        large_mem_fns = get_large_mem_fns_from_csv(mem_file)
        # Make plots of function memory over time
        for fn_name in large_mem_fns:
            plot_function_mem_from_csv(mem_file, fn_name)
        plot_function_mem_from_csv(mem_file, "write_to_segy_stack-segy_concat")
        metrics = [
            chunksize,
            file_size,
            reader.shape,
            time_taken,
            cpu_usage_max,
            cpu_usage_min,
            cpu_usage_avg,
            mem_usage,
            max_dask_task_memory,
            run,
        ]

        with open(
            os.path.join(MY_TEMP, test_name + "_metrics_export.csv"), "a+", newline=""
        ) as write_obj:
            csv_writer = csv.writer(write_obj)
            csv_writer.writerow(metrics)

        print(f"{metrics=}")
        os.remove(TEMP_DESTINATION)
        time.sleep(30)




### Run cut

In [None]:

file_list = [os.path.join(MY_TEMP,'segy_4d_import_100.mdio'),os.path.join(MY_TEMP,'segy_4d_import_1000.mdio') ]
test_name = ["test_100shots","test_1000shots"]

file_list = [os.path.join(MY_TEMP,'segy_4d_import_1000.mdio'), ]
test_name = ["test_1000shots",]
for mdio_file, test_name in zip(file_list, test_name):
    cut(mdio_file, client=client, test_name=test_name)

Viewing html from memray using HTML Preview plugin fails.  Open in an external web-browser such as chrome. 