From 7af1255aa96b4d054d015da66ee355dcbd7696cb Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Wed, 28 Jan 2026 20:48:50 -0500 Subject: [PATCH 1/2] Add some NISAR exploration scripts --- .gitignore | 3 + docs/examples/nisar/README.md | 69 +++++ docs/examples/nisar/analyze.py | 443 ++++++++++++++++++++++++++++++ docs/examples/nisar/benchmark.py | 190 +++++++++++++ docs/examples/nisar/layout.py | 440 +++++++++++++++++++++++++++++ docs/examples/nisar/virtualize.py | 98 +++++++ docs/examples/nisar/waterfall.py | 367 +++++++++++++++++++++++++ 7 files changed, 1610 insertions(+) create mode 100644 docs/examples/nisar/README.md create mode 100644 docs/examples/nisar/analyze.py create mode 100644 docs/examples/nisar/benchmark.py create mode 100644 docs/examples/nisar/layout.py create mode 100644 docs/examples/nisar/virtualize.py create mode 100644 docs/examples/nisar/waterfall.py diff --git a/.gitignore b/.gitignore index 35540fc..99f128b 100644 --- a/.gitignore +++ b/.gitignore @@ -82,3 +82,6 @@ site/* # uv lock files uv.lock + +# script output +*.html \ No newline at end of file diff --git a/docs/examples/nisar/README.md b/docs/examples/nisar/README.md new file mode 100644 index 0000000..98d23ea --- /dev/null +++ b/docs/examples/nisar/README.md @@ -0,0 +1,69 @@ +# NISAR Data Access Examples + +These scripts demonstrate obspec-utils for accessing NASA NISAR data via HTTPS with Earthdata authentication. + +## Prerequisites + +- NASA Earthdata account ([register here](https://urs.earthdata.nasa.gov/users/new)) +- `earthaccess` configured with your credentials + +## Scripts + +### `virtualize.py` (not functional) + +Minimal example showing how to create a virtual datatree from a remote NISAR file. + +> **Note:** This example currently fails due to upstream limitations: +> 1. The "crosstalk" variable has a complex dtype not supported by Zarr +> 2. `drop_variables` doesn't yet work for variables in nested HDF5 groups +> +> Fixes are needed in [VirtualiZarr](https://github.com/zarr-developers/VirtualiZarr). + +```bash +uv run --script docs/examples/nisar/virtualize.py +``` + +### `benchmark.py` + +Compare obspec-utils readers against fsspec for performance. + +```bash +uv run --script docs/examples/nisar/benchmark.py +uv run --script docs/examples/nisar/benchmark.py --block-size 32 +``` + +### `waterfall.py` + +Visualize HTTP range requests as a waterfall chart (byte position vs time). + +```bash +uv run --script docs/examples/nisar/waterfall.py +uv run --script docs/examples/nisar/waterfall.py --block-size 8 +uv run --script docs/examples/nisar/waterfall.py --output my_waterfall.html +``` + +### `analyze.py` + +Analyze request patterns with timeline, size histogram, and byte coverage stats. + +```bash +uv run --script docs/examples/nisar/analyze.py +uv run --script docs/examples/nisar/analyze.py --block-size 16 +uv run --script docs/examples/nisar/analyze.py --output my_analysis.html +``` + +### `layout.py` (not functional) + +Visualize file layout showing where data chunks vs metadata are located. Uses VirtualiZarr to extract chunk locations from the manifest. + +```bash +uv run --script docs/examples/nisar/layout.py +uv run --script docs/examples/nisar/layout.py --output my_layout.html +``` + +## Output + +- `benchmark.py` prints results to stdout +- `waterfall.py` generates `waterfall.html` +- `analyze.py` generates `analyze.html` +- `layout.py` generates `layout.html` diff --git a/docs/examples/nisar/analyze.py b/docs/examples/nisar/analyze.py new file mode 100644 index 0000000..1c97b1e --- /dev/null +++ b/docs/examples/nisar/analyze.py @@ -0,0 +1,443 @@ +#!/usr/bin/env -S uv run +# /// script +# requires-python = ">=3.11" +# dependencies = [ +# "earthaccess", +# "xarray", +# "h5netcdf", +# "h5py", +# "obspec-utils @ git+https://github.com/virtual-zarr/obspec-utils@main", +# "aiohttp", +# "pandas", +# "holoviews", +# "bokeh", +# ] +# /// +""" +Analyze HTTP range request patterns when reading NISAR data. + +Creates an interactive HTML visualization showing: +- Request timeline colored by byte offset +- Request size distribution histogram +- Byte coverage analysis (unique vs re-read bytes) +- Statistics panel + +Usage: + uv run docs/examples/nisar/analyze.py + uv run docs/examples/nisar/analyze.py --block-size 16 + uv run docs/examples/nisar/analyze.py --output my_analysis.html +""" + +from __future__ import annotations + +import argparse +import time +from pathlib import Path +from urllib.parse import urlparse + +import earthaccess +import holoviews as hv +import numpy as np +import pandas as pd +import xarray as xr + +from obspec_utils.readers import BlockStoreReader +from obspec_utils.stores import AiohttpStore +from obspec_utils.wrappers import RequestTrace, TracingReadableStore + +hv.extension("bokeh") + +MB = 1024 * 1024 + + +def parse_args(): + parser = argparse.ArgumentParser( + description="Analyze HTTP range request patterns when reading NISAR data" + ) + parser.add_argument( + "--block-size", + type=int, + default=32, + help="Block size in MB for BlockStoreReader (default: 32)", + ) + parser.add_argument( + "--output", + type=str, + default=None, + help="Output HTML file path (default: analyze.html in script dir)", + ) + return parser.parse_args() + + +def read_middle_pixel(file_like) -> float: + """Open dataset and read a single pixel from the middle.""" + ds = xr.open_datatree( + file_like, + engine="h5netcdf", + decode_timedelta=False, + phony_dims="access", + ) + ny, nx = ds.science.LSAR.GCOV.grids.frequencyA.HHHH.shape + value = ds.science.LSAR.GCOV.grids.frequencyA.HHHH[ny // 2, nx // 2].values + return float(value) + + +def visualize_requests( + requests_df: pd.DataFrame, + title: str = "BlockStoreReader Request Analysis", + file_size: int | None = None, + read_time: float | None = None, +): + """Create a visualization of request patterns.""" + if requests_df.empty: + return hv.Div("

No requests recorded

") + + requests_sorted = requests_df.sort_values("start").reset_index(drop=True) + + # Determine file boundaries (in MB) + file_end_bytes = file_size if file_size else requests_df["end"].max() + file_end_mb = file_end_bytes / MB + + # Calculate relative time from first request + min_ts = requests_df["timestamp"].min() + + # Requests colored by time sent (x-axis in MB) + request_segments_sent = [ + { + "x0": row["start"] / MB, + "x1": row["end"] / MB, + "y0": 1, + "y1": 1, + "source": "Requests (time sent)", + "length_mb": row["length"] / MB, + "method": row.get("method", "unknown"), + "duration_s": row.get("duration", 0), + "time_s": row.get("timestamp", 0) - min_ts, + } + for _, row in requests_sorted.iterrows() + ] + + # Requests colored by time received (x-axis in MB) + request_segments_recv = [ + { + "x0": row["start"] / MB, + "x1": row["end"] / MB, + "y0": 0, + "y1": 0, + "source": "Requests (time received)", + "length_mb": row["length"] / MB, + "method": row.get("method", "unknown"), + "duration_s": row.get("duration", 0), + "time_s": row.get("timestamp", 0) - min_ts + row.get("duration", 0), + } + for _, row in requests_sorted.iterrows() + ] + + request_sent_df = pd.DataFrame(request_segments_sent) + request_recv_df = pd.DataFrame(request_segments_recv) + + # Color requests by time using plasma colormap + request_sent_plot = hv.Segments( + request_sent_df, + kdims=["x0", "y0", "x1", "y1"], + vdims=["source", "length_mb", "method", "duration_s", "time_s"], + ).opts( + color="time_s", + cmap="plasma", + colorbar=True, + clabel="Time (s)", + alpha=0.8, + line_width=15, + tools=["hover"], + ) + + request_recv_plot = hv.Segments( + request_recv_df, + kdims=["x0", "y0", "x1", "y1"], + vdims=["source", "length_mb", "method", "duration_s", "time_s"], + ).opts( + color="time_s", + cmap="plasma", + alpha=0.8, + line_width=15, + tools=["hover"], + ) + + # File boundary lines (in MB) + start_line = hv.VLine(0).opts(color="red", line_width=2, line_dash="dashed") + end_line = hv.VLine(file_end_mb).opts(color="red", line_width=2, line_dash="dashed") + + boundary_labels = hv.Labels( + { + "x": [0, file_end_mb], + "y": [1.4, 1.4], + "text": ["File Start (0)", f"File End ({file_end_mb:.1f} MB)"], + }, + kdims=["x", "y"], + vdims=["text"], + ).opts(text_font_size="8pt", text_color="red") + + # Create timeline plot (colorbar in MB) + timeline_segments = [] + for idx, row in requests_sorted.iterrows(): + time_sent = row["timestamp"] - min_ts + duration = row.get("duration", 0) + time_received = time_sent + duration + timeline_segments.append( + { + "x0": time_sent, + "x1": time_received, + "y0": idx % 20, + "y1": idx % 20, + "length_mb": row["length"] / MB, + "start_mb": row["start"] / MB, + "duration_s": duration, + } + ) + + timeline_df = pd.DataFrame(timeline_segments) + timeline_plot = hv.Segments( + timeline_df, + kdims=["x0", "y0", "x1", "y1"], + vdims=["length_mb", "start_mb", "duration_s"], + ).opts( + color="start_mb", + cmap="viridis", + colorbar=True, + clabel="Offset (MB)", + alpha=0.8, + line_width=8, + tools=["hover"], + width=900, + height=200, + xlabel="Time (s)", + ylabel="Request (stacked)", + title="Request Timeline (colored by offset)", + ) + + # Compute statistics + + total_request_bytes = requests_df["length"].sum() + + # Analyze byte coverage + byte_read_count = np.zeros(file_end_bytes, dtype=np.uint8) + for _, row in requests_df.iterrows(): + start = int(row["start"]) + end = min(int(row["end"]), file_end_bytes) + if start < file_end_bytes: + byte_read_count[start:end] += 1 + + unique_bytes_read = np.sum(byte_read_count > 0) + bytes_read_multiple = np.sum(byte_read_count > 1) + total_reread_bytes = total_request_bytes - unique_bytes_read + max_reads = int(np.max(byte_read_count)) if len(byte_read_count) > 0 else 0 + + # Last request received time + last_request_received_s = max(seg["time_s"] for seg in request_segments_recv) + + # Format stats + read_time_str = f"{read_time:.2f} s" if read_time is not None else "N/A" + total_bytes_mb = total_request_bytes / MB + unique_mb = unique_bytes_read / MB + reread_mb = total_reread_bytes / MB + + # Throughput + if last_request_received_s > 0: + throughput_mbps = ( + total_request_bytes * 8 / 1_000_000 + ) / last_request_received_s + throughput_str = f"{throughput_mbps:.1f} Mbps" + else: + throughput_str = "N/A" + + stats_div = hv.Div( + f""" +
+ BlockStoreReader Statistics
+ File Size: {file_end_mb:.2f} MB
+ Requests: {len(requests_df):,}
+ Total bytes received: {total_bytes_mb:.2f} MB
+ Unique bytes read: {unique_mb:.2f} MB
+ + Bytes re-read: {reread_mb:.2f} MB +
+ Bytes read multiple times: {bytes_read_multiple:,} (max {max_reads}x)
+ Read time: {read_time_str}
+ Last request received: {last_request_received_s:.2f} s
+ Effective throughput: {throughput_str} +
+ """ + ) + + main_plot = ( + request_sent_plot * request_recv_plot * start_line * end_line * boundary_labels + ).opts( + width=900, + height=300, + title=title, + xlabel="Offset (MB)", + ylabel="", + yticks=[(0, "Requests (received)"), (1, "Requests (sent)")], + show_legend=True, + ylim=(-0.5, 1.8), + ) + + # Request size histogram with log-scaled bins + request_sizes_mb = requests_df["length"] / (1024 * 1024) + min_size = max(1e-6, request_sizes_mb.min()) + max_size = request_sizes_mb.max() + log_bins = np.geomspace(min_size, max_size, num=31) + frequencies, edges = np.histogram(request_sizes_mb, bins=log_bins) + request_histogram = hv.Histogram((edges, frequencies)).opts( + width=900, + height=200, + xlabel="Request Size (MB)", + ylabel="Count", + title="Request Size Distribution (log-scaled bins)", + tools=["hover"], + color="steelblue", + logx=True, + ) + + return ( + (main_plot + timeline_plot + request_histogram + stats_div) + .opts(shared_axes=False) + .cols(1) + ) + + +def print_summary(requests_df: pd.DataFrame, file_size: int) -> None: + """Print a summary of the request analysis.""" + total_bytes = requests_df["length"].sum() + min_size = requests_df["length"].min() + max_size = requests_df["length"].max() + mean_size = requests_df["length"].mean() + + print( + f""" +Request Summary +--------------- +Total Requests: {len(requests_df):,} +Total Bytes: {total_bytes / MB:.2f} MB +File Size: {file_size / MB:.2f} MB +Coverage: {(total_bytes / file_size) * 100:.1f}% + +Request Sizes: + Min: {min_size / MB:.4f} MB ({min_size:,} bytes) + Max: {max_size / MB:.4f} MB ({max_size:,} bytes) + Mean: {mean_size / MB:.4f} MB + +Byte Range: + Start: {requests_df['start'].min():,} + End: {requests_df['end'].max():,} +""" + ) + + +def main(): + args = parse_args() + block_size_mb = args.block_size + block_size = block_size_mb * MB + + print("=" * 60) + print("BlockStoreReader Request Analysis") + print("=" * 60) + print(f"Block size: {block_size_mb} MB") + + # Authenticate and Query Data + print("\n[1/5] Authenticating with NASA Earthdata...") + earthaccess.login() + + query = earthaccess.DataGranules() + query.short_name("NISAR_L2_GCOV_BETA_V1") + query.params["attribute[]"] = "int,FRAME_NUMBER,77" + query.params["attribute[]"] = "int,TRACK_NUMBER,5" + results = query.get_all() + + print(f" Found {len(results)} granules") + + # Get the HTTPS URL for the first granule + https_links = earthaccess.results.DataGranule.data_links( + results[0], access="external" + ) + https_url = https_links[0] + print(f" HTTPS URL: {https_url}") + + # Parse URL + parsed = urlparse(https_url) + base_url = f"{parsed.scheme}://{parsed.netloc}" + path = parsed.path.lstrip("/") + + print(f" Base URL: {base_url}") + print(f" Path: {path}") + + # Get EDL token + print("\n[2/5] Getting EDL token...") + token = earthaccess.get_edl_token()["access_token"] + + # Create store chain: AiohttpStore -> TracingReadableStore + print("\n[3/5] Setting up store chain...") + base_store = AiohttpStore( + base_url, + headers={"Authorization": f"Bearer {token}"}, + ) + trace = RequestTrace() + traced_store = TracingReadableStore(base_store, trace) + print(" AiohttpStore -> TracingReadableStore") + + # Get file size via HEAD request + print("\n[4/5] Getting file size...") + meta = traced_store.head(path) + file_size = meta["size"] + print(f" File size: {file_size / MB:.2f} MB") + + # Clear the HEAD request from the trace + trace.clear() + + # Read with BlockStoreReader + print( + f"\n[5/5] Reading middle pixel with BlockStoreReader ({block_size_mb} MB blocks)..." + ) + start_time = time.perf_counter() + + with BlockStoreReader( + traced_store, + path, + block_size=block_size, + max_cached_blocks=1024, + ) as reader: + value = read_middle_pixel(reader) + + read_time = time.perf_counter() - start_time + + print(f" Value: {value}") + print(f" Read time: {read_time:.2f}s") + print(f" Total requests: {trace.total_requests}") + print(f" Total bytes: {trace.total_bytes / MB:.2f} MB") + + # Get request dataframe + requests_df = trace.to_dataframe() + + # Print summary + print_summary(requests_df, file_size) + + # Create visualization + print("\nCreating visualization...") + plot = visualize_requests( + requests_df, + title=f"BlockStoreReader ({block_size_mb} MB): NISAR Request Analysis", + file_size=file_size, + read_time=read_time, + ) + + if args.output: + output_path = Path(args.output) + else: + output_path = Path(__file__).parent / "analyze.html" + hv.save(plot, str(output_path)) + print(f"\nVisualization saved to: {output_path}") + + +if __name__ == "__main__": + main() diff --git a/docs/examples/nisar/benchmark.py b/docs/examples/nisar/benchmark.py new file mode 100644 index 0000000..18d4433 --- /dev/null +++ b/docs/examples/nisar/benchmark.py @@ -0,0 +1,190 @@ +#!/usr/bin/env -S uv run +# /// script +# requires-python = ">=3.11" +# dependencies = [ +# "earthaccess", +# "xarray", +# "h5netcdf", +# "h5py", +# "obspec-utils @ git+https://github.com/virtual-zarr/obspec-utils@main", +# "obstore", +# "aiohttp", +# "fsspec", +# ] +# /// +""" +Compare obspec-utils readers against fsspec for NASA Earthdata access. + +This script benchmarks different approaches for reading NISAR data: +- fsspec/earthaccess (baseline) +- AiohttpStore + BlockStoreReader +- HTTPStore + BlockStoreReader + +Usage: + uv run docs/examples/nisar/benchmark.py + uv run docs/examples/nisar/benchmark.py --block-size 32 +""" + +import argparse +import time +from urllib.parse import urlparse + +import earthaccess +import xarray as xr +from obstore.store import HTTPStore + +from obspec_utils.readers import BlockStoreReader +from obspec_utils.stores import AiohttpStore + +MB = 1024 * 1024 + + +def parse_args(): + parser = argparse.ArgumentParser( + description="Compare obspec-utils readers against fsspec for NASA Earthdata" + ) + parser.add_argument( + "--block-size", + type=int, + default=16, + help="Block size in MB for BlockStoreReader (default: 16)", + ) + return parser.parse_args() + + +def read_middle_pixel(file_like) -> float: + """Open dataset and read a single pixel from the middle.""" + ds = xr.open_datatree( + file_like, + engine="h5netcdf", + decode_timedelta=False, + phony_dims="access", + ) + ny, nx = ds.science.LSAR.GCOV.grids.frequencyA.HHHH.shape + value = ds.science.LSAR.GCOV.grids.frequencyA.HHHH[ny // 2, nx // 2].values + return float(value) + + +def main(): + args = parse_args() + block_size_mb = args.block_size + block_size = block_size_mb * MB + + print("=" * 60) + print("Compare obspec-utils Readers vs fsspec") + print("=" * 60) + print(f"Block size: {block_size_mb} MB") + + # Authenticate and Query Data + print("\nAuthenticating with NASA Earthdata...") + earthaccess.login() + + query = earthaccess.DataGranules() + query.short_name("NISAR_L2_GCOV_BETA_V1") + query.params["attribute[]"] = "int,FRAME_NUMBER,77" + query.params["attribute[]"] = "int,TRACK_NUMBER,5" + results = query.get_all() + + print(f"Found {len(results)} granules") + + # Get the HTTPS URL for the first granule + https_links = earthaccess.results.DataGranule.data_links( + results[0], access="external" + ) + https_url = https_links[0] + print(f"HTTPS URL: {https_url}") + + # Parse the HTTPS URL to get base URL and path + parsed = urlparse(https_url) + base_url = f"{parsed.scheme}://{parsed.netloc}" + path = parsed.path.lstrip("/") + + print(f"Base URL: {base_url}") + print(f"Path: {path}") + + # Get the EDL token for authentication + token = earthaccess.get_edl_token()["access_token"] + + # Create AiohttpStore with EDL token authentication + aiohttp_store = AiohttpStore( + base_url, + headers={"Authorization": f"Bearer {token}"}, + ) + + # Create HTTPStore with EDL token authentication + http_store = HTTPStore.from_url( + https_url, + client_options={"default_headers": {"Authorization": f"Bearer {token}"}}, + ) + + # Benchmark: fsspec (baseline) + print("\n[1/3] Testing fsspec/earthaccess.open()...") + start = time.perf_counter() + + fs_file = earthaccess.open(results[:1])[0] + value_fsspec = read_middle_pixel(fs_file) + + elapsed_fsspec = time.perf_counter() - start + print(f" Value: {value_fsspec}") + print(f" Time: {elapsed_fsspec:.2f}s") + + # Benchmark: AiohttpStore + BlockStoreReader + print(f"\n[2/3] Testing AiohttpStore + BlockStoreReader ({block_size_mb} MB)...") + start = time.perf_counter() + + with BlockStoreReader( + aiohttp_store, + path, + block_size=block_size, + max_cached_blocks=1024, + ) as reader: + value_aiohttp = read_middle_pixel(reader) + + elapsed_aiohttp = time.perf_counter() - start + print(f" Value: {value_aiohttp}") + print(f" Time: {elapsed_aiohttp:.2f}s") + + # Benchmark: HTTPStore + BlockStoreReader + print(f"\n[3/3] Testing HTTPStore + BlockStoreReader ({block_size_mb} MB)...") + start = time.perf_counter() + + with BlockStoreReader( + http_store, + "", # HTTPStore already includes the full path + block_size=block_size, + max_cached_blocks=1024, + ) as reader: + value_http = read_middle_pixel(reader) + + elapsed_http = time.perf_counter() - start + print(f" Value: {value_http}") + print(f" Time: {elapsed_http:.2f}s") + + # Results Summary + print("\n" + "=" * 50) + print("RESULTS SUMMARY") + print("=" * 50) + + results_data = [ + ("fsspec (baseline)", elapsed_fsspec, 1.0), + ( + "AiohttpStore + BlockStoreReader", + elapsed_aiohttp, + elapsed_fsspec / elapsed_aiohttp, + ), + ("HTTPStore + BlockStoreReader", elapsed_http, elapsed_fsspec / elapsed_http), + ] + + print(f"{'Reader':<35} {'Time (s)':<12} {'Speedup':<10}") + print("-" * 60) + for name, elapsed, speedup in results_data: + print(f"{name:<35} {elapsed:<12.2f} {speedup:<10.2f}x") + + # Verify all values match + print("\nValue verification:") + all_equal = value_fsspec == value_aiohttp == value_http + print(f" All values equal: {all_equal}") + + +if __name__ == "__main__": + main() diff --git a/docs/examples/nisar/layout.py b/docs/examples/nisar/layout.py new file mode 100644 index 0000000..f63b798 --- /dev/null +++ b/docs/examples/nisar/layout.py @@ -0,0 +1,440 @@ +#!/usr/bin/env -S uv run +# /// script +# requires-python = ">=3.11" +# dependencies = [ +# "earthaccess", +# "virtualizarr[hdf] @ git+https://github.com/zarr-developers/VirtualiZarr.git@main", +# "obspec-utils @ git+https://github.com/virtual-zarr/obspec-utils@main", +# "aiohttp", +# "pandas", +# "holoviews", +# "bokeh", +# ] +# /// +""" +Visualize file layout: where data chunks vs metadata are in a NISAR file. + +Creates an interactive HTML visualization showing: +- Data chunks (from VirtualiZarr manifest) +- Metadata regions (gaps between chunks) +- HTTP requests made during parsing + +This helps understand what bytes need to be fetched for virtualization +vs what bytes contain the actual array data. + +Usage: + uv run docs/examples/nisar/layout.py + uv run docs/examples/nisar/layout.py --output my_layout.html +""" + +from __future__ import annotations + +import argparse +import time +from pathlib import Path +from urllib.parse import urlparse + +import earthaccess +import holoviews as hv +import pandas as pd +import virtualizarr as vz + +from obspec_utils.stores import AiohttpStore +from obspec_utils.registry import ObjectStoreRegistry +from obspec_utils.wrappers import RequestTrace, TracingReadableStore + +hv.extension("bokeh") + +MB = 1024 * 1024 + + +def parse_args(): + parser = argparse.ArgumentParser( + description="Visualize data vs metadata layout in NISAR files" + ) + parser.add_argument( + "--output", + type=str, + default=None, + help="Output HTML file path (default: layout.html in script dir)", + ) + return parser.parse_args() + + +def extract_chunks_dataframe(vdt) -> pd.DataFrame: + """Extract all chunks from a virtual datatree into a DataFrame.""" + from virtualizarr.manifests import ManifestArray + + records = [] + + def process_node(node, node_path=""): + for var_name in node.data_vars: + var = node[var_name] + if isinstance(var.data, ManifestArray): + manifest = var.data.manifest + for chunk_key, entry in manifest.dict().items(): + records.append( + { + "variable": f"{node_path}/{var_name}" + if node_path + else var_name, + "chunk_key": chunk_key, + "path": entry["path"], + "start": entry["offset"], + "length": entry["length"], + "end": entry["offset"] + entry["length"], + } + ) + + # Process root and all children + process_node(vdt.to_dataset(), "") + for path, node in vdt.subtree: + if path != "/": + process_node(node.to_dataset(), path) + + return pd.DataFrame(records) + + +def compute_gaps(chunks_df: pd.DataFrame, file_size: int) -> list[dict]: + """Compute gaps between chunks (metadata regions).""" + if chunks_df.empty: + return [ + { + "x0": 0, + "x1": file_size, + "length": file_size, + "description": f"Entire file: 0 - {file_size:,}", + } + ] + + sorted_chunks = chunks_df.sort_values("start").reset_index(drop=True) + gaps = [] + current_pos = 0 + + for _, row in sorted_chunks.iterrows(): + chunk_start = int(row["start"]) + chunk_end = int(row["end"]) + + if chunk_start > current_pos: + gaps.append( + { + "x0": current_pos, + "x1": chunk_start, + "length": chunk_start - current_pos, + "description": f"Metadata: {current_pos:,} - {chunk_start:,}", + } + ) + + current_pos = max(current_pos, chunk_end) + + if current_pos < file_size: + gaps.append( + { + "x0": current_pos, + "x1": file_size, + "length": file_size - current_pos, + "description": f"Metadata: {current_pos:,} - {file_size:,}", + } + ) + + return gaps + + +def visualize_layout( + chunks_df: pd.DataFrame, + requests_df: pd.DataFrame, + file_size: int, + title: str = "File Layout: Data vs Metadata", + open_vds_time: float | None = None, +): + """Create a visualization of file layout.""" + file_end_mb = file_size / MB + + # Data chunks + chunk_segments = [ + { + "x0": row["start"] / MB, + "x1": row["end"] / MB, + "y0": 2, + "y1": 2, + "source": "Data Chunks", + "variable": row["variable"], + "length_mb": row["length"] / MB, + } + for _, row in chunks_df.iterrows() + ] + + # Metadata gaps + gaps = compute_gaps(chunks_df, file_size) + gap_rects = [ + { + "x0": gap["x0"] / MB, + "x1": gap["x1"] / MB, + "y0": 0.85, + "y1": 1.15, + "source": "Metadata", + "length_mb": gap["length"] / MB, + "description": gap["description"], + } + for gap in gaps + ] + + # Requests (colored by time) + if not requests_df.empty and "timestamp" in requests_df.columns: + min_ts = requests_df["timestamp"].min() + request_segments = [ + { + "x0": row["start"] / MB, + "x1": row["end"] / MB, + "y0": 0, + "y1": 0, + "source": "Requests", + "length_mb": row["length"] / MB, + "time_s": row["timestamp"] - min_ts, + "duration_s": row.get("duration", 0), + } + for _, row in requests_df.iterrows() + ] + else: + request_segments = [] + + # Create plots + chunk_df = pd.DataFrame(chunk_segments) + chunk_plot = hv.Segments( + chunk_df, + kdims=["x0", "y0", "x1", "y1"], + vdims=["source", "variable", "length_mb"], + ).opts(color="green", alpha=0.7, line_width=15, tools=["hover"]) + + gap_df = pd.DataFrame(gap_rects) + gap_plot = hv.Rectangles( + gap_df, + kdims=["x0", "y0", "x1", "y1"], + vdims=["source", "length_mb", "description"], + ).opts( + color="orange", + alpha=0.8, + line_color="darkorange", + line_width=1, + tools=["hover"], + ) + + if request_segments: + request_df = pd.DataFrame(request_segments) + request_plot = hv.Segments( + request_df, + kdims=["x0", "y0", "x1", "y1"], + vdims=["source", "length_mb", "time_s", "duration_s"], + ).opts( + color="time_s", + cmap="plasma", + colorbar=True, + clabel="Time (s)", + alpha=0.8, + line_width=15, + tools=["hover"], + ) + else: + request_plot = hv.Overlay() + + # File boundaries + start_line = hv.VLine(0).opts(color="red", line_width=2, line_dash="dashed") + end_line = hv.VLine(file_end_mb).opts(color="red", line_width=2, line_dash="dashed") + + boundary_labels = hv.Labels( + { + "x": [0, file_end_mb], + "y": [2.4, 2.4], + "text": ["File Start (0)", f"File End ({file_end_mb:.1f} MB)"], + }, + kdims=["x", "y"], + vdims=["text"], + ).opts(text_font_size="8pt", text_color="red") + + # Statistics + total_chunk_bytes = chunks_df["length"].sum() + total_gap_bytes = sum(g["length"] for g in gaps) + total_request_bytes = requests_df["length"].sum() if not requests_df.empty else 0 + + chunk_pct = (total_chunk_bytes / file_size) * 100 + gap_pct = (total_gap_bytes / file_size) * 100 + + open_vds_str = f"{open_vds_time:.2f} s" if open_vds_time else "N/A" + + stats_div = hv.Div(f""" +
+ File Layout Statistics
+
+ File Size: {file_size / MB:.2f} MB
+
+ Data Chunks:
+   Count: {len(chunks_df):,}
+   Size: {total_chunk_bytes / MB:.2f} MB ({chunk_pct:.1f}%)
+
+ Metadata Regions:
+   Count: {len(gaps):,}
+   Size: {total_gap_bytes / MB:.2f} MB ({gap_pct:.1f}%)
+
+ Requests Made: {len(requests_df):,}
+ Bytes Requested: {total_request_bytes / MB:.2f} MB
+ open_virtual_datatree: {open_vds_str} +
+ """) + + # Combine plots + main_plot = ( + chunk_plot * gap_plot * request_plot * start_line * end_line * boundary_labels + ).opts( + width=1000, + height=400, + title=title, + xlabel="Offset (MB)", + ylabel="", + yticks=[(0, "Requests"), (1, "Metadata"), (2, "Data Chunks")], + ylim=(-0.5, 2.8), + show_grid=True, + gridstyle={"grid_line_alpha": 0.3, "grid_line_dash": [4, 4]}, + ) + + # Variable breakdown + if not chunks_df.empty: + var_summary = ( + chunks_df.groupby("variable") + .agg({"length": ["count", "sum"]}) + .reset_index() + ) + var_summary.columns = ["variable", "chunk_count", "total_bytes"] + var_summary["total_mb"] = var_summary["total_bytes"] / MB + var_summary = var_summary.sort_values("total_bytes", ascending=False).head(15) + + bars = hv.Bars( + var_summary, + kdims=["variable"], + vdims=["total_mb"], + ).opts( + width=1000, + height=250, + xrotation=45, + xlabel="Variable", + ylabel="Size (MB)", + title="Top 15 Variables by Data Size", + tools=["hover"], + color="steelblue", + ) + else: + bars = hv.Div("

No chunk data available

") + + return (main_plot + bars + stats_div).opts(shared_axes=False).cols(1) + + +def main(): + args = parse_args() + + print("=" * 60) + print("NISAR File Layout: Data vs Metadata") + print("=" * 60) + + # Authenticate + print("\n[1/5] Authenticating with NASA Earthdata...") + earthaccess.login() + + query = earthaccess.DataGranules() + query.short_name("NISAR_L2_GCOV_BETA_V1") + query.params["attribute[]"] = "int,FRAME_NUMBER,77" + query.params["attribute[]"] = "int,TRACK_NUMBER,5" + results = query.get_all() + + print(f" Found {len(results)} granules") + + https_links = earthaccess.results.DataGranule.data_links( + results[0], access="external" + ) + https_url = https_links[0] + print(f" HTTPS URL: {https_url}") + + parsed = urlparse(https_url) + base_url = f"{parsed.scheme}://{parsed.netloc}" + path = parsed.path.lstrip("/") + + # Get EDL token + print("\n[2/5] Getting EDL token...") + token = earthaccess.get_edl_token()["access_token"] + + # Create traced store + print("\n[3/5] Setting up tracing...") + base_store = AiohttpStore( + base_url, + headers={"Authorization": f"Bearer {token}"}, + ) + trace = RequestTrace() + traced_store = TracingReadableStore(base_store, trace) + registry = ObjectStoreRegistry({base_url: traced_store}) + + # Get file size + meta = traced_store.head(path) + file_size = meta["size"] + print(f" File size: {file_size / MB:.2f} MB") + trace.clear() + + # Open virtual datatree + print("\n[4/5] Opening virtual datatree...") + start_time = time.perf_counter() + + parser = vz.parsers.HDFParser(drop_variables=["crosstalk"]) + vdt = vz.open_virtual_datatree( + https_url, + registry=registry, + parser=parser, + loadable_variables=[], + ) + + open_vds_time = time.perf_counter() - start_time + + print(f" Requests: {trace.total_requests}") + print(f" Bytes requested: {trace.total_bytes / MB:.2f} MB") + print(f" Time: {open_vds_time:.2f}s") + + # Extract chunk info + chunks_df = extract_chunks_dataframe(vdt) + requests_df = trace.to_dataframe() + print(f" Chunks in manifest: {len(chunks_df)}") + + # Create visualization + print("\n[5/5] Creating visualization...") + plot = visualize_layout( + chunks_df, + requests_df, + file_size, + title=f"NISAR File Layout: {path.split('/')[-1]}", + open_vds_time=open_vds_time, + ) + + if args.output: + output_path = Path(args.output) + else: + output_path = Path(__file__).parent / "layout.html" + hv.save(plot, str(output_path)) + print(f"\nVisualization saved to: {output_path}") + + # Print summary + total_chunk_bytes = chunks_df["length"].sum() + gaps = compute_gaps(chunks_df, file_size) + total_gap_bytes = sum(g["length"] for g in gaps) + + print(f"\n{'=' * 60}") + print("SUMMARY") + print(f"{'=' * 60}") + print( + f"Data chunks: {total_chunk_bytes / MB:>10.2f} MB ({100 * total_chunk_bytes / file_size:.1f}%)" + ) + print( + f"Metadata: {total_gap_bytes / MB:>10.2f} MB ({100 * total_gap_bytes / file_size:.1f}%)" + ) + print( + f"Requests made: {trace.total_bytes / MB:>10.2f} MB ({100 * trace.total_bytes / file_size:.1f}%)" + ) + + +if __name__ == "__main__": + main() diff --git a/docs/examples/nisar/virtualize.py b/docs/examples/nisar/virtualize.py new file mode 100644 index 0000000..fd821b5 --- /dev/null +++ b/docs/examples/nisar/virtualize.py @@ -0,0 +1,98 @@ +#!/usr/bin/env -S uv run +# /// script +# requires-python = ">=3.11" +# dependencies = [ +# "earthaccess", +# "virtualizarr[hdf] @ git+https://github.com/zarr-developers/VirtualiZarr.git@main", +# "obspec-utils @ git+https://github.com/virtual-zarr/obspec-utils@main", +# "aiohttp", +# ] +# /// +""" +Minimal example: Create a virtual datatree from a NISAR file. + +This script demonstrates how to use VirtualiZarr with obspec-utils to +create a virtual representation of a remote HDF5 file without downloading it. + +KNOWN LIMITATIONS (will fail): + 1. The "crosstalk" variable has a complex dtype not supported by Zarr. + We use `drop_variables=["crosstalk"]` to skip it, but... + 2. `drop_variables` doesn't yet work for variables in nested HDF5 groups. + The crosstalk variable is in a nested group, so it still gets parsed. + +These issues require upstream fixes in VirtualiZarr. See: + https://github.com/zarr-developers/VirtualiZarr + +Usage: + uv run --script docs/examples/nisar/virtualize.py +""" + +from urllib.parse import urlparse + +import earthaccess +import virtualizarr as vz + +from obspec_utils.stores import AiohttpStore +from obspec_utils.registry import ObjectStoreRegistry + + +def main(): + # Authenticate with NASA Earthdata + print("Authenticating with NASA Earthdata...") + earthaccess.login() + + # Query for NISAR data + query = earthaccess.DataGranules() + query.short_name("NISAR_L2_GCOV_BETA_V1") + query.params["attribute[]"] = "int,FRAME_NUMBER,77" + query.params["attribute[]"] = "int,TRACK_NUMBER,5" + results = query.get_all() + print(f"Found {len(results)} granules") + + # Get the HTTPS URL + https_links = earthaccess.results.DataGranule.data_links( + results[0], access="external" + ) + https_url = https_links[0] + print(f"URL: {https_url}") + + # Parse URL and get auth token + parsed = urlparse(https_url) + base_url = f"{parsed.scheme}://{parsed.netloc}" + token = earthaccess.get_edl_token()["access_token"] + + # Create store with authentication + store = AiohttpStore( + base_url, + headers={"Authorization": f"Bearer {token}"}, + ) + registry = ObjectStoreRegistry({base_url: store}) + + # Create virtual datatree + # NOTE: drop_variables is intended to skip "crosstalk" (complex dtype), + # but it doesn't work for nested groups yet. This will currently fail. + print("\nCreating virtual datatree...") + parser = vz.parsers.HDFParser(drop_variables=["crosstalk"]) + vdt = vz.open_virtual_datatree( + https_url, + registry=registry, + parser=parser, + loadable_variables=[], + ) + + # Print structure + print("\nVirtual DataTree structure:") + print(vdt) + + # Print some stats + print("\nVariables by group:") + for path, node in vdt.subtree: + ds = node.to_dataset() + if ds.data_vars: + print( + f" {path}: {list(ds.data_vars)[:5]}{'...' if len(ds.data_vars) > 5 else ''}" + ) + + +if __name__ == "__main__": + main() diff --git a/docs/examples/nisar/waterfall.py b/docs/examples/nisar/waterfall.py new file mode 100644 index 0000000..d83536d --- /dev/null +++ b/docs/examples/nisar/waterfall.py @@ -0,0 +1,367 @@ +#!/usr/bin/env -S uv run +# /// script +# requires-python = ">=3.11" +# dependencies = [ +# "earthaccess", +# "xarray", +# "h5netcdf", +# "h5py", +# "obspec-utils @ git+https://github.com/virtual-zarr/obspec-utils@main", +# "aiohttp", +# "pandas", +# "holoviews", +# "bokeh", +# ] +# /// +""" +Waterfall visualization of HTTP range requests when reading NISAR data. + +Creates an interactive HTML visualization showing: +- Request byte ranges over time (waterfall chart) +- Request concurrency over time +- Statistics (throughput, request sizes, timing) + +Usage: + uv run docs/examples/nisar/waterfall.py + uv run docs/examples/nisar/waterfall.py --block-size 8 + uv run docs/examples/nisar/waterfall.py --output my_waterfall.html +""" + +from __future__ import annotations + +import argparse +import time +from pathlib import Path +from urllib.parse import urlparse + +import earthaccess +import holoviews as hv +import numpy as np +import pandas as pd +import xarray as xr + +from obspec_utils.readers import BlockStoreReader +from obspec_utils.stores import AiohttpStore +from obspec_utils.wrappers import RequestTrace, TracingReadableStore + +hv.extension("bokeh") + +MB = 1024 * 1024 + + +def parse_args(): + parser = argparse.ArgumentParser( + description="Visualize HTTP range requests when reading NISAR data" + ) + parser.add_argument( + "--block-size", + type=int, + default=13, + help="Block size in MB for BlockStoreReader (default: 13)", + ) + parser.add_argument( + "--output", + type=str, + default=None, + help="Output HTML file path (default: waterfall.html in script dir)", + ) + return parser.parse_args() + + +def read_middle_pixel(file_like) -> float: + """Open dataset and read a single pixel from the middle.""" + ds = xr.open_datatree( + file_like, + engine="h5netcdf", + decode_timedelta=False, + phony_dims="access", + ) + ny, nx = ds.science.LSAR.GCOV.grids.frequencyA.HHHH.shape + value = ds.science.LSAR.GCOV.grids.frequencyA.HHHH[ny // 2, nx // 2].values + return float(value) + + +def visualize_waterfall( + requests_df: pd.DataFrame, + title: str = "Request Waterfall", + file_size: int | None = None, + read_time: float | None = None, +): + """ + Create a waterfall visualization of requests over time. + + X-axis: byte position in file (MB) + Y-axis: time (inverted, so time flows downward) + """ + if requests_df.empty or "timestamp" not in requests_df.columns: + return hv.Div("

No request timing data available

") + + requests_sorted = requests_df.sort_values("timestamp").reset_index(drop=True) + + # Calculate time relative to first request + min_ts = requests_sorted["timestamp"].min() + + # Determine file boundaries (in MB) + file_end_bytes = file_size if file_size else requests_df["end"].max() + file_end_mb = file_end_bytes / MB + + # Create rectangles for each request (x-axis in MB) + waterfall_rects = [] + for _, row in requests_sorted.iterrows(): + time_sent_s = float(row["timestamp"] - min_ts) + duration_s = float(row.get("duration", 0)) + time_received_s = time_sent_s + duration_s + + waterfall_rects.append( + { + "x0": row["start"] / MB, + "x1": row["end"] / MB, + "y0": time_sent_s, + "y1": time_received_s, + "length_mb": row["length"] / MB, + "duration_s": duration_s, + "time_sent_s": time_sent_s, + "time_received_s": time_received_s, + "method": row.get("method", "unknown"), + } + ) + + waterfall_df = pd.DataFrame(waterfall_rects) + + # Ensure proper numeric dtypes + waterfall_df = waterfall_df.astype( + { + "x0": "float64", + "x1": "float64", + "y0": "float64", + "y1": "float64", + "length_mb": "float64", + "duration_s": "float64", + "time_sent_s": "float64", + "time_received_s": "float64", + } + ) + + # Debug info + print( + f" Request byte ranges: {waterfall_df['x0'].min():.2f} - {waterfall_df['x1'].max():.2f} MB" + ) + print( + f" Request sizes: min={waterfall_df['length_mb'].min():.3f} MB, max={waterfall_df['length_mb'].max():.3f} MB, mean={waterfall_df['length_mb'].mean():.3f} MB" + ) + + # Calculate max time for y-axis + max_time_s = waterfall_df["time_received_s"].max() + + # Create the main waterfall plot colored by time sent + waterfall_plot = hv.Rectangles( + waterfall_df, + kdims=["x0", "y0", "x1", "y1"], + vdims=["length_mb", "duration_s", "time_sent_s", "time_received_s", "method"], + ).opts( + color="time_sent_s", + cmap="viridis", + colorbar=True, + clabel="Time Sent (s)", + alpha=0.7, + line_color="black", + line_width=0.5, + tools=["hover"], + width=1000, + height=600, + xlabel="Offset (MB)", + ylabel="Time (s)", + title=title, + invert_yaxis=True, + xlim=(0, file_end_mb), + show_grid=True, + gridstyle={"grid_line_alpha": 0.3, "grid_line_dash": [4, 4]}, + ylim=(max_time_s * 1.05, -max_time_s * 0.02), + ) + + # Add file boundary lines + start_line = hv.VLine(0).opts(color="red", line_width=2, line_dash="dashed") + end_line = hv.VLine(file_end_mb).opts(color="red", line_width=2, line_dash="dashed") + + waterfall_plot = waterfall_plot * start_line * end_line + + # Create concurrency plot + time_resolution_s = max(0.001, max_time_s / 200) + time_bins = np.arange(0, max_time_s + time_resolution_s, time_resolution_s) + concurrent_counts = np.zeros(len(time_bins) - 1) + + for _, row in waterfall_df.iterrows(): + start_bin = int(row["time_sent_s"] / time_resolution_s) + end_bin = int(row["time_received_s"] / time_resolution_s) + 1 + start_bin = max(0, min(start_bin, len(concurrent_counts) - 1)) + end_bin = max(0, min(end_bin, len(concurrent_counts))) + concurrent_counts[start_bin:end_bin] += 1 + + concurrency_df = pd.DataFrame( + { + "time_s": time_bins[:-1] + time_resolution_s / 2, + "concurrent": concurrent_counts, + } + ) + + concurrency_plot = hv.Area( + concurrency_df, + kdims=["time_s"], + vdims=["concurrent"], + ).opts( + width=1000, + height=150, + xlabel="Time (s)", + ylabel="Concurrent Requests", + title="Request Concurrency Over Time", + color="steelblue", + alpha=0.7, + tools=["hover"], + show_grid=True, + gridstyle={"grid_line_alpha": 0.3, "grid_line_dash": [4, 4]}, + ) + + # Calculate statistics + total_request_bytes = requests_df["length"].sum() + total_bytes_mb = total_request_bytes / MB + avg_duration_s = waterfall_df["duration_s"].mean() + max_duration_s = waterfall_df["duration_s"].max() + min_duration_s = waterfall_df["duration_s"].min() + max_concurrent = int(concurrent_counts.max()) + + # Throughput + throughput_mbps = ( + (total_request_bytes * 8 / 1_000_000) / max_time_s if max_time_s > 0 else 0 + ) + + read_time_str = f"{read_time:.2f} s" if read_time is not None else "N/A" + + stats_div = hv.Div(f""" +
+ Waterfall Statistics
+ File Size: {file_end_mb:.2f} MB
+ Total Requests: {len(requests_df):,}
+ Total Bytes: {total_bytes_mb:.2f} MB
+
+ Timing:
+ Total Duration: {max_time_s:.2f} s
+ Avg Request Duration: {avg_duration_s:.3f} s
+ Min Request Duration: {min_duration_s:.3f} s
+ Max Request Duration: {max_duration_s:.3f} s
+
+ Concurrency:
+ Max Concurrent Requests: {max_concurrent}
+ Effective Throughput: {throughput_mbps:.1f} Mbps
+
+ Read time: {read_time_str} +
+ """) + + layout = ( + (waterfall_plot + concurrency_plot + stats_div).opts(shared_axes=False).cols(1) + ) + + return layout + + +def main(): + args = parse_args() + block_size_mb = args.block_size + block_size = block_size_mb * MB + + print("=" * 60) + print("BlockStoreReader Request Waterfall") + print("=" * 60) + print(f"Block size: {block_size_mb} MB") + + # Authenticate and Query Data + print("\n[1/6] Authenticating with NASA Earthdata...") + earthaccess.login() + + query = earthaccess.DataGranules() + query.short_name("NISAR_L2_GCOV_BETA_V1") + query.params["attribute[]"] = "int,FRAME_NUMBER,77" + query.params["attribute[]"] = "int,TRACK_NUMBER,5" + results = query.get_all() + + print(f" Found {len(results)} granules") + + # Get the HTTPS URL + https_links = earthaccess.results.DataGranule.data_links( + results[0], access="external" + ) + https_url = https_links[0] + print(f" HTTPS URL: {https_url}") + + # Parse URL + parsed = urlparse(https_url) + base_url = f"{parsed.scheme}://{parsed.netloc}" + path = parsed.path.lstrip("/") + + # Get EDL token + print("\n[2/6] Getting EDL token...") + token = earthaccess.get_edl_token()["access_token"] + + # Create store chain + print("\n[3/6] Setting up store chain...") + base_store = AiohttpStore( + base_url, + headers={"Authorization": f"Bearer {token}"}, + ) + trace = RequestTrace() + traced_store = TracingReadableStore(base_store, trace) + print(" AiohttpStore -> TracingReadableStore") + + # Get file size + print("\n[4/6] Getting file size...") + meta = traced_store.head(path) + file_size = meta["size"] + print(f" File size: {file_size / MB:.2f} MB") + + # Clear HEAD request from trace + trace.clear() + + # Read with BlockStoreReader + print( + f"\n[5/6] Reading middle pixel with BlockStoreReader ({block_size_mb} MB blocks)..." + ) + start_time = time.perf_counter() + + with BlockStoreReader( + traced_store, + path, + block_size=block_size, + max_cached_blocks=1024, + ) as reader: + value = read_middle_pixel(reader) + + read_time = time.perf_counter() - start_time + + print(f" Value: {value}") + print(f" Read time: {read_time:.2f}s") + print(f" Total requests: {trace.total_requests}") + print(f" Total bytes: {trace.total_bytes / MB:.2f} MB") + + # Get request dataframe + requests_df = trace.to_dataframe() + + # Create visualization + print("\n[6/6] Creating waterfall visualization...") + plot = visualize_waterfall( + requests_df, + title=f"BlockStoreReader ({block_size_mb} MB) + AiohttpStore: NISAR Request Waterfall", + file_size=file_size, + read_time=read_time, + ) + + if args.output: + output_path = Path(args.output) + else: + output_path = Path(__file__).parent / "waterfall.html" + hv.save(plot, str(output_path)) + print(f"\nVisualization saved to: {output_path}") + + +if __name__ == "__main__": + main() From 93cc426288c97212e481ec12bfc0770716014c68 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Wed, 4 Feb 2026 22:13:21 -0500 Subject: [PATCH 2/2] Use branch --- docs/examples/nisar/layout.py | 258 ++++++++++++++++++++++-------- docs/examples/nisar/virtualize.py | 36 +---- 2 files changed, 198 insertions(+), 96 deletions(-) diff --git a/docs/examples/nisar/layout.py b/docs/examples/nisar/layout.py index f63b798..1926fe5 100644 --- a/docs/examples/nisar/layout.py +++ b/docs/examples/nisar/layout.py @@ -3,7 +3,7 @@ # requires-python = ">=3.11" # dependencies = [ # "earthaccess", -# "virtualizarr[hdf] @ git+https://github.com/zarr-developers/VirtualiZarr.git@main", +# "virtualizarr[hdf] @ git+https://github.com/maxrjones/VirtualiZarr@c-dtype", # "obspec-utils @ git+https://github.com/virtual-zarr/obspec-utils@main", # "aiohttp", # "pandas", @@ -15,7 +15,7 @@ Visualize file layout: where data chunks vs metadata are in a NISAR file. Creates an interactive HTML visualization showing: -- Data chunks (from VirtualiZarr manifest) +- Data chunks (from VirtualiZarr ManifestStore) - Metadata regions (gaps between chunks) - HTTP requests made during parsing @@ -61,49 +61,96 @@ def parse_args(): return parser.parse_args() -def extract_chunks_dataframe(vdt) -> pd.DataFrame: - """Extract all chunks from a virtual datatree into a DataFrame.""" - from virtualizarr.manifests import ManifestArray +def extract_chunks_dataframe(manifest_store) -> pd.DataFrame: + """Extract all chunks from a ManifestStore into a DataFrame.""" + from virtualizarr.manifests import ManifestGroup records = [] - def process_node(node, node_path=""): - for var_name in node.data_vars: - var = node[var_name] - if isinstance(var.data, ManifestArray): - manifest = var.data.manifest - for chunk_key, entry in manifest.dict().items(): - records.append( - { - "variable": f"{node_path}/{var_name}" - if node_path - else var_name, - "chunk_key": chunk_key, - "path": entry["path"], - "start": entry["offset"], - "length": entry["length"], - "end": entry["offset"] + entry["length"], - } - ) - - # Process root and all children - process_node(vdt.to_dataset(), "") - for path, node in vdt.subtree: - if path != "/": - process_node(node.to_dataset(), path) + def process_group(group: ManifestGroup, group_path: str = ""): + """Recursively process all arrays in groups.""" + # Process arrays at this level + for array_name, array in group.arrays.items(): + var_path = f"{group_path}/{array_name}" if group_path else array_name + manifest = array.manifest + for chunk_key, entry in manifest.dict().items(): + records.append( + { + "variable": var_path, + "chunk_key": chunk_key, + "path": entry["path"], + "start": entry["offset"], + "length": entry["length"], + "end": entry["offset"] + entry["length"], + } + ) + + # Recursively process subgroups + for group_name, subgroup in group.groups.items(): + sub_path = f"{group_path}/{group_name}" if group_path else group_name + process_group(subgroup, group_path=sub_path) + + # Start from root group + process_group(manifest_store._group) return pd.DataFrame(records) -def compute_gaps(chunks_df: pd.DataFrame, file_size: int) -> list[dict]: - """Compute gaps between chunks (metadata regions).""" +def compute_gaps( + chunks_df: pd.DataFrame, + file_size: int, + requests_df: pd.DataFrame | None = None, +) -> list[dict]: + """Compute gaps between chunks and classify by request coverage. + + Args: + chunks_df: DataFrame with chunk info (start, end columns) + file_size: Total file size in bytes + requests_df: Optional DataFrame with request info (start, end columns) + + Returns: + List of gap dicts with coverage info: + - x0, x1: byte range + - length: gap size in bytes + - bytes_read: how many bytes in this gap were read during parsing + - coverage: fraction of gap that was read (0.0 to 1.0) + - classification: "confirmed metadata", "unread gap", or "partially read" + - description: human-readable description + """ + + def compute_coverage(gap_start: int, gap_end: int) -> int: + """Compute how many bytes in a gap were read by requests.""" + if requests_df is None or requests_df.empty: + return 0 + bytes_read = 0 + for _, req in requests_df.iterrows(): + overlap_start = max(gap_start, int(req["start"])) + overlap_end = min(gap_end, int(req["end"])) + if overlap_start < overlap_end: + bytes_read += overlap_end - overlap_start + return bytes_read + + def classify_gap(coverage: float) -> str: + """Classify gap based on how much was read.""" + if coverage >= 0.9: + return "confirmed metadata" + elif coverage <= 0.1: + return "unread gap" + else: + return "partially read" + if chunks_df.empty: + bytes_read = compute_coverage(0, file_size) + coverage = bytes_read / file_size if file_size > 0 else 0.0 return [ { "x0": 0, "x1": file_size, "length": file_size, - "description": f"Entire file: 0 - {file_size:,}", + "bytes_read": bytes_read, + "coverage": coverage, + "classification": classify_gap(coverage), + "description": f"Entire file: 0 - {file_size:,} ({coverage:.0%} read)", } ] @@ -116,24 +163,38 @@ def compute_gaps(chunks_df: pd.DataFrame, file_size: int) -> list[dict]: chunk_end = int(row["end"]) if chunk_start > current_pos: + gap_length = chunk_start - current_pos + bytes_read = compute_coverage(current_pos, chunk_start) + coverage = bytes_read / gap_length if gap_length > 0 else 0.0 + classification = classify_gap(coverage) gaps.append( { "x0": current_pos, "x1": chunk_start, - "length": chunk_start - current_pos, - "description": f"Metadata: {current_pos:,} - {chunk_start:,}", + "length": gap_length, + "bytes_read": bytes_read, + "coverage": coverage, + "classification": classification, + "description": f"Gap: {current_pos:,} - {chunk_start:,} ({coverage:.0%} read, {classification})", } ) current_pos = max(current_pos, chunk_end) if current_pos < file_size: + gap_length = file_size - current_pos + bytes_read = compute_coverage(current_pos, file_size) + coverage = bytes_read / gap_length if gap_length > 0 else 0.0 + classification = classify_gap(coverage) gaps.append( { "x0": current_pos, "x1": file_size, - "length": file_size - current_pos, - "description": f"Metadata: {current_pos:,} - {file_size:,}", + "length": gap_length, + "bytes_read": bytes_read, + "coverage": coverage, + "classification": classification, + "description": f"Gap: {current_pos:,} - {file_size:,} ({coverage:.0%} read, {classification})", } ) @@ -145,7 +206,7 @@ def visualize_layout( requests_df: pd.DataFrame, file_size: int, title: str = "File Layout: Data vs Metadata", - open_vds_time: float | None = None, + parse_time: float | None = None, ): """Create a visualization of file layout.""" file_end_mb = file_size / MB @@ -164,17 +225,28 @@ def visualize_layout( for _, row in chunks_df.iterrows() ] - # Metadata gaps - gaps = compute_gaps(chunks_df, file_size) + # Metadata gaps (colored by classification) + gaps = compute_gaps(chunks_df, file_size, requests_df) + + # Color mapping for gap classification + classification_colors = { + "confirmed metadata": "orange", + "partially read": "yellow", + "unread gap": "gray", + } + gap_rects = [ { "x0": gap["x0"] / MB, "x1": gap["x1"] / MB, "y0": 0.85, "y1": 1.15, - "source": "Metadata", + "source": gap["classification"], "length_mb": gap["length"] / MB, + "bytes_read_mb": gap["bytes_read"] / MB, + "coverage_pct": gap["coverage"] * 100, "description": gap["description"], + "color": classification_colors[gap["classification"]], } for gap in gaps ] @@ -210,11 +282,18 @@ def visualize_layout( gap_plot = hv.Rectangles( gap_df, kdims=["x0", "y0", "x1", "y1"], - vdims=["source", "length_mb", "description"], + vdims=[ + "source", + "length_mb", + "bytes_read_mb", + "coverage_pct", + "description", + "color", + ], ).opts( - color="orange", + color="color", alpha=0.8, - line_color="darkorange", + line_color="black", line_width=1, tools=["hover"], ) @@ -254,12 +333,21 @@ def visualize_layout( # Statistics total_chunk_bytes = chunks_df["length"].sum() total_gap_bytes = sum(g["length"] for g in gaps) + total_gap_bytes_read = sum(g["bytes_read"] for g in gaps) total_request_bytes = requests_df["length"].sum() if not requests_df.empty else 0 chunk_pct = (total_chunk_bytes / file_size) * 100 gap_pct = (total_gap_bytes / file_size) * 100 + gap_coverage_pct = ( + (total_gap_bytes_read / total_gap_bytes * 100) if total_gap_bytes > 0 else 0 + ) - open_vds_str = f"{open_vds_time:.2f} s" if open_vds_time else "N/A" + # Count gaps by classification + confirmed_gaps = [g for g in gaps if g["classification"] == "confirmed metadata"] + partial_gaps = [g for g in gaps if g["classification"] == "partially read"] + unread_gaps = [g for g in gaps if g["classification"] == "unread gap"] + + parse_time_str = f"{parse_time:.2f} s" if parse_time else "N/A" stats_div = hv.Div(f"""
Metadata Regions:
-   Count: {len(gaps):,}
-   Size: {total_gap_bytes / MB:.2f} MB ({gap_pct:.1f}%)
+ Gap Regions: {len(gaps):,} totaling {total_gap_bytes / MB:.2f} MB ({gap_pct:.1f}%)
+   Confirmed metadata: {len(confirmed_gaps)} ({sum(g["length"] for g in confirmed_gaps) / MB:.2f} MB)
+   Partially read: {len(partial_gaps)} ({sum(g["length"] for g in partial_gaps) / MB:.2f} MB)
+   Unread gaps: {len(unread_gaps)} ({sum(g["length"] for g in unread_gaps) / MB:.2f} MB)
+   Gap coverage: {gap_coverage_pct:.1f}% of gap bytes were read

Requests Made: {len(requests_df):,}
Bytes Requested: {total_request_bytes / MB:.2f} MB
- open_virtual_datatree: {open_vds_str} + Parse Time: {parse_time_str}
""") @@ -371,35 +461,59 @@ def main(): traced_store = TracingReadableStore(base_store, trace) registry = ObjectStoreRegistry({base_url: traced_store}) + # Debug: verify registry setup + print(f" Registry keys: {list(registry.map.keys())}") + print(f" URL to resolve: {https_url}") + try: + resolved_store, resolved_path = registry.resolve(https_url) + print(f" Resolved store type: {type(resolved_store).__name__}") + print(f" Resolved path: {resolved_path}") + print( + f" Store is traced: {isinstance(resolved_store, TracingReadableStore)}" + ) + except ValueError as e: + print(f" ERROR: Resolution failed: {e}") + # Get file size meta = traced_store.head(path) file_size = meta["size"] print(f" File size: {file_size / MB:.2f} MB") trace.clear() - # Open virtual datatree - print("\n[4/5] Opening virtual datatree...") + # Parse file to ManifestStore + print("\n[4/5] Parsing file to ManifestStore...") start_time = time.perf_counter() - parser = vz.parsers.HDFParser(drop_variables=["crosstalk"]) - vdt = vz.open_virtual_datatree( - https_url, - registry=registry, - parser=parser, - loadable_variables=[], - ) + parser = vz.parsers.HDFParser() + manifest_store = parser(https_url, registry=registry) - open_vds_time = time.perf_counter() - start_time + parse_time = time.perf_counter() - start_time print(f" Requests: {trace.total_requests}") print(f" Bytes requested: {trace.total_bytes / MB:.2f} MB") - print(f" Time: {open_vds_time:.2f}s") + print(f" Time: {parse_time:.2f}s") # Extract chunk info - chunks_df = extract_chunks_dataframe(vdt) + chunks_df = extract_chunks_dataframe(manifest_store) requests_df = trace.to_dataframe() print(f" Chunks in manifest: {len(chunks_df)}") + # Debug: inspect requests DataFrame + print(f"\n [DEBUG] requests_df shape: {requests_df.shape}") + print(f" [DEBUG] requests_df columns: {requests_df.columns.tolist()}") + print(f" [DEBUG] requests_df empty: {requests_df.empty}") + if not requests_df.empty: + print(" [DEBUG] First 3 requests:") + for i, row in requests_df.head(3).iterrows(): + print( + f" {i}: start={row['start']:,}, length={row['length']:,}, method={row['method']}" + ) + print( + f" [DEBUG] Request methods: {requests_df['method'].value_counts().to_dict()}" + ) + else: + print(" [DEBUG] No requests captured - tracing may not be working") + # Create visualization print("\n[5/5] Creating visualization...") plot = visualize_layout( @@ -407,7 +521,7 @@ def main(): requests_df, file_size, title=f"NISAR File Layout: {path.split('/')[-1]}", - open_vds_time=open_vds_time, + parse_time=parse_time, ) if args.output: @@ -419,8 +533,16 @@ def main(): # Print summary total_chunk_bytes = chunks_df["length"].sum() - gaps = compute_gaps(chunks_df, file_size) + gaps = compute_gaps(chunks_df, file_size, requests_df) total_gap_bytes = sum(g["length"] for g in gaps) + total_gap_bytes_read = sum(g["bytes_read"] for g in gaps) + gap_coverage_pct = ( + (total_gap_bytes_read / total_gap_bytes * 100) if total_gap_bytes > 0 else 0 + ) + + confirmed_gaps = [g for g in gaps if g["classification"] == "confirmed metadata"] + partial_gaps = [g for g in gaps if g["classification"] == "partially read"] + unread_gaps = [g for g in gaps if g["classification"] == "unread gap"] print(f"\n{'=' * 60}") print("SUMMARY") @@ -429,8 +551,18 @@ def main(): f"Data chunks: {total_chunk_bytes / MB:>10.2f} MB ({100 * total_chunk_bytes / file_size:.1f}%)" ) print( - f"Metadata: {total_gap_bytes / MB:>10.2f} MB ({100 * total_gap_bytes / file_size:.1f}%)" + f"Gap regions: {total_gap_bytes / MB:>10.2f} MB ({100 * total_gap_bytes / file_size:.1f}%)" + ) + print( + f" Confirmed metadata: {len(confirmed_gaps):>3} gaps, {sum(g['length'] for g in confirmed_gaps) / MB:.2f} MB" + ) + print( + f" Partially read: {len(partial_gaps):>3} gaps, {sum(g['length'] for g in partial_gaps) / MB:.2f} MB" + ) + print( + f" Unread gaps: {len(unread_gaps):>3} gaps, {sum(g['length'] for g in unread_gaps) / MB:.2f} MB" ) + print(f" Gap coverage: {gap_coverage_pct:.1f}% of gap bytes were read") print( f"Requests made: {trace.total_bytes / MB:>10.2f} MB ({100 * trace.total_bytes / file_size:.1f}%)" ) diff --git a/docs/examples/nisar/virtualize.py b/docs/examples/nisar/virtualize.py index fd821b5..355e8cc 100644 --- a/docs/examples/nisar/virtualize.py +++ b/docs/examples/nisar/virtualize.py @@ -3,7 +3,7 @@ # requires-python = ">=3.11" # dependencies = [ # "earthaccess", -# "virtualizarr[hdf] @ git+https://github.com/zarr-developers/VirtualiZarr.git@main", +# "virtualizarr[hdf] @ git+https://github.com/maxrjones/VirtualiZarr@c-dtype", # "obspec-utils @ git+https://github.com/virtual-zarr/obspec-utils@main", # "aiohttp", # ] @@ -14,15 +14,6 @@ This script demonstrates how to use VirtualiZarr with obspec-utils to create a virtual representation of a remote HDF5 file without downloading it. -KNOWN LIMITATIONS (will fail): - 1. The "crosstalk" variable has a complex dtype not supported by Zarr. - We use `drop_variables=["crosstalk"]` to skip it, but... - 2. `drop_variables` doesn't yet work for variables in nested HDF5 groups. - The crosstalk variable is in a nested group, so it still gets parsed. - -These issues require upstream fixes in VirtualiZarr. See: - https://github.com/zarr-developers/VirtualiZarr - Usage: uv run --script docs/examples/nisar/virtualize.py """ @@ -68,30 +59,9 @@ def main(): ) registry = ObjectStoreRegistry({base_url: store}) - # Create virtual datatree - # NOTE: drop_variables is intended to skip "crosstalk" (complex dtype), - # but it doesn't work for nested groups yet. This will currently fail. print("\nCreating virtual datatree...") - parser = vz.parsers.HDFParser(drop_variables=["crosstalk"]) - vdt = vz.open_virtual_datatree( - https_url, - registry=registry, - parser=parser, - loadable_variables=[], - ) - - # Print structure - print("\nVirtual DataTree structure:") - print(vdt) - - # Print some stats - print("\nVariables by group:") - for path, node in vdt.subtree: - ds = node.to_dataset() - if ds.data_vars: - print( - f" {path}: {list(ds.data_vars)[:5]}{'...' if len(ds.data_vars) > 5 else ''}" - ) + parser = vz.parsers.HDFParser() + parser(https_url, registry=registry) if __name__ == "__main__":