In [1]:
import os
import json
import pystac
import s3fs
import numpy as np
import intake
import fsspec
import xarray as xr
import hashlib
from datetime import datetime, timezone

In [2]:
MINIO_ENDPOINT_URL = "http://localhost:9000" # Your MinIO endpoint
MINIO_ACCESS_KEY = "minioadmin"
MINIO_SECRET_KEY = "minioadmin"
S3_BUCKET_NAME = "fusion-lake"

In [3]:
s3_options = {
    "key": MINIO_ACCESS_KEY,
    "secret": MINIO_SECRET_KEY,
    "client_kwargs": {'endpoint_url': MINIO_ENDPOINT_URL},
    "config_kwargs": {'s3': {'addressing_style': 'path'}} # Important for MinIO
}

In [4]:
try:
    # For fsspec >= 2023.9.0, S3FileSystem might be registered by default
    # but explicit registration with specific parameters can be useful.
    # Or, ensure s3fs is installed and fsspec will find it.
    # To be sure for custom endpoints / MinIO:
    s3fs.S3FileSystem.clear_instance_cache() # Clear any old cached instances
    fsspec.register_implementation("s3", s3fs.S3FileSystem, clobber=True)
    print("Successfully registered s3fs with fsspec for 's3' protocol.")
    # You can also pre-configure the default s3fs instance fsspec will use:
    # fsspec.config.conf['s3'] = s3_options # May work for some fsspec versions
except Exception as e:
    print(f"Warning: Could not register s3fs with fsspec: {e}. "
          "Explicit storage_options will be critical.")

# S3FS filesystem object for direct interaction (still useful for direct s3 operations)
s3_fs_instance = s3fs.S3FileSystem(**s3_options)

Successfully registered s3fs with fsspec for 's3' protocol.


In [5]:
s3 = s3fs.S3FileSystem(**s3_options)

In [6]:
prefix = 'stac/'  # Replace with your prefix
stac_files = s3.ls(f's3://{S3_BUCKET_NAME}/{prefix}')
stac_files = [f for f in stac_files if f.endswith('.json')]
for obj in stac_files:
    print(obj)

fusion-lake/stac/LC08_L1TP_032037_20160420_20170223_01_T1.json
fusion-lake/stac/LC08_L1TP_050024_20160520_20170324_01_T1.json


In [7]:
s3_storage_options_for_intake_xr = s3_options.copy()

In [8]:
fsspec.config.conf['s3'] = {
    "key": MINIO_ACCESS_KEY,
    "secret": MINIO_SECRET_KEY,
    "client_kwargs": {'endpoint_url': MINIO_ENDPOINT_URL},
    "config_kwargs": {'s3': {'addressing_style': 'path'}}
}
print("fsspec.config.conf['s3'] has been set globally.")

fsspec.config.conf['s3'] has been set globally.


In [9]:
for obj in stac_files:
    stac_catalog_url = f's3://{obj}'
    
    with s3.open(stac_catalog_url, 'r') as f:
        stac_item_dict = json.load(f)
    print(f"Successfully read JSON content from {stac_catalog_url}")
    
    # print(stac_item_dict['assets'])
    zarr_link = stac_item_dict['assets']['data_zarr']['href']
    zarr_catalog_url = f's3://{S3_BUCKET_NAME}/{zarr_link}'
    
    print(f"Zarr catalog URL: {zarr_catalog_url}")
    
    ds_lazy = xr.open_zarr(
        store=zarr_catalog_url, # Path to the S3 Zarr store root
        storage_options=s3_options,
        consolidated=True,  # Try True first if you consolidated metadata
        chunks={}
    )
    print(f"Loaded Zarr dataset (lazy): \n{ds_lazy}")
    
    # print(stac_item_dict['assets'])

    # # print(f"STAC Item JSON: {json.dumps(stac_item_dict, indent=2)}")

    # # Create a PySTAC Item object from the dictionary.
    # # Providing href helps resolve relative asset links if any (though we assume absolute here).
    # stac_item = pystac.Item.from_dict(stac_item_dict, href=stac_catalog_url)
    # print(f"Parsed STAC Item. ID: {stac_item.id}, Date: {stac_item.datetime}")
    
    
    
    break

Successfully read JSON content from s3://fusion-lake/stac/LC08_L1TP_032037_20160420_20170223_01_T1.json
Zarr catalog URL: s3://fusion-lake/raw/LC08_L1TP_032037_20160420_20170223_01_T1.zarr
Loaded Zarr dataset (lazy): 
<xarray.Dataset>
Dimensions:                                   (band: 7801, y: 7671, x: 3)
Dimensions without coordinates: band, y, x
Data variables:
    LC08_L1TP_032037_20160420_20170223_01_T1  (band, y, x) float32 dask.array<chunksize=(1, 512, 3), meta=np.ndarray>


In [10]:
print(ds_lazy)

<xarray.Dataset>
Dimensions:                                   (band: 7801, y: 7671, x: 3)
Dimensions without coordinates: band, y, x
Data variables:
    LC08_L1TP_032037_20160420_20170223_01_T1  (band, y, x) float32 dask.array<chunksize=(1, 512, 3), meta=np.ndarray>


In [13]:
data_variable_name = 'LC08_L1TP_032037_20160420_20170223_01_T1'
data_array_to_process = ds_lazy[data_variable_name]

# 2. Define your cloud posterior function (example from before)
# This function expects a 1D numpy array of spectral values for a single pixel
def simple_cloud_posterior(pixel_spectral_bands, bright_threshold=0.6, whiteness_factor=0.15):
    """
    Calculate a simple cloud posterior probability for a single pixel.
    Args:
        pixel_spectral_bands (np.ndarray): 1D array of spectral values (e.g., [R, G, B]).
                                          Assumed to be scaled roughly 0-1.
        bright_threshold (float): Pixels with mean reflectance below this are considered clear.
        whiteness_factor (float): How "white" a pixel needs to be.
    Returns:
        float: A pseudo-posterior probability (0.0 to 1.0) of the pixel being cloud.
    """
    if not isinstance(pixel_spectral_bands, np.ndarray): # Should be numpy array from dask chunk
        pixel_spectral_bands = np.array(pixel_spectral_bands)

    mean_reflectance = np.mean(pixel_spectral_bands)

    if mean_reflectance < bright_threshold:
        return 0.0  # Definitely not cloud if not bright enough

    if mean_reflectance > 1e-6:
        color_variation = np.std(pixel_spectral_bands) / mean_reflectance
    else:
        color_variation = 1.0

    brightness_score = min(1.0, max(0.0, (mean_reflectance - bright_threshold) / (1.0 - bright_threshold + 1e-6)))
    whiteness_score = max(0.0, 1.0 - (color_variation / whiteness_factor))
    cloud_prob = brightness_score * whiteness_score
    return float(cloud_prob)


# 3. Apply the function chunkwise using xarray.apply_ufunc
cloud_posterior_da = xr.apply_ufunc(
    simple_cloud_posterior,         # The function to apply
    data_array_to_process,          # The input DataArray (NOT the Dataset)
    input_core_dims=[['x']],        # The dimension your function operates on is 'x'
    output_core_dims=[[]],          # Your function returns a scalar
    exclude_dims=set(('x',)),       # The 'x' dimension is consumed by the function
    dask="parallelized",            # Enable Dask parallelization
    output_dtypes=[float],          # The data type of the output
    kwargs={'bright_threshold': 0.65, 'whiteness_factor': 0.2} # Pass additional args
)

# The resulting cloud_posterior_da will have dimensions ('band', 'y')
# because the 'x' dimension was consumed.
cloud_posterior_da.name = 'cloud_posterior' # Give it a name

print("Input DataArray to process:")
print(data_array_to_process)
print("\nCloud Posterior DataArray structure:")
print(cloud_posterior_da)
print("\nCloud Posterior DataArray Chunks:")
print(cloud_posterior_da.chunks)

Input DataArray to process:
<xarray.DataArray 'LC08_L1TP_032037_20160420_20170223_01_T1' (band: 7801,
                                                              y: 7671, x: 3)>
dask.array<open_dataset-db104ab1aac54c4aba9027a35337a70cLC08_L1TP_032037_20160420_20170223_01_T1, shape=(7801, 7671, 3), dtype=float32, chunksize=(1, 512, 3), chunktype=numpy.ndarray>
Dimensions without coordinates: band, y, x

Cloud Posterior DataArray structure:
<xarray.DataArray 'cloud_posterior' (band: 7801, y: 7671)>
dask.array<transpose, shape=(7801, 7671), dtype=float64, chunksize=(1, 512), chunktype=numpy.ndarray>
Dimensions without coordinates: band, y

Cloud Posterior DataArray Chunks:
((1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,

In [None]:
print("--- Inspecting cloud_posterior_da ---")
print(repr(cloud_posterior_da)) # Full representation
print("\nShape:", cloud_posterior_da.shape)
print("Dimensions:", cloud_posterior_da.dims)
print("Chunks:", cloud_posterior_da.chunks)
print("Dtype:", cloud_posterior_da.dtype)

# Important: Inspect the _meta attribute of the underlying Dask array
if hasattr(cloud_posterior_da, 'data') and hasattr(cloud_posterior_da.data, '_meta'):
    print("Dask array _meta:", repr(cloud_posterior_da.data._meta))
else:
    print("Could not access Dask array _meta.")

# Try to compute a small part to see the actual values in a chunk
print("\nComputing a small slice of cloud_posterior_da")

SyntaxError: EOL while scanning string literal (4138634389.py, line 15)

In [23]:
# cloud_posterior_values = cloud_posterior_da.values

# These are still Dask arrays. Compute() triggers calculation.
mean_cloud_cover = cloud_posterior_da.mean(skipna=True)
variance_cloud_cover = cloud_posterior_da.var(skipna=True)

print("Computing Cloud mean...")
cloud_mean_computed = mean_cloud_cover.compute()
print(f"Computed cloud Mean: {cloud_mean_computed.item()}") # .item() to get Python scalar

print("Computing cloud variance...")
cloud_variance_computed = variance_cloud_cover.compute()
print(f"Computed cloud Variance: {cloud_variance_computed.item()}")


Computing Cloud mean...


ValueError: axes don't match array

In [79]:
RED_BAND_INDEX_IN_X_DIM = 0
NIR_BAND_INDEX_IN_X_DIM = 1
DATA_VARIABLE_NAME = 'LC08_L1TP_050024_20160520_20170324_01_T1'

In [85]:
multi_band_da = ds_lazy[DATA_VARIABLE_NAME]
print(f"Extracted multi-band DataArray ('{multi_band_da.name}'): \n{multi_band_da}")

Extracted multi-band DataArray ('LC08_L1TP_050024_20160520_20170324_01_T1'): 
<xarray.DataArray 'LC08_L1TP_050024_20160520_20170324_01_T1' (band: 7861,
                                                              y: 7751, x: 3)>
dask.array<open_dataset-9e7e6baef3fd39ba4a17f9b1412593d1LC08_L1TP_050024_20160520_20170324_01_T1, shape=(7861, 7751, 3), dtype=float32, chunksize=(1, 512, 3), chunktype=numpy.ndarray>
Dimensions without coordinates: band, y, x


In [86]:
print(f"Selecting Red band using index {RED_BAND_INDEX_IN_X_DIM} along 'x' dimension...")
# .isel(x=...) selects by integer position along the 'x' dimension.
red_da = multi_band_da.isel(x=RED_BAND_INDEX_IN_X_DIM)
# Rename dimensions for clarity if needed, e.g., if 'band' is height and 'y' is width
# red_da = red_da.rename({'band': 'height', 'y': 'width'})


print(f"Selecting NIR band using index {NIR_BAND_INDEX_IN_X_DIM} along 'x' dimension...")
nir_da = multi_band_da.isel(x=NIR_BAND_INDEX_IN_X_DIM)
# nir_da = nir_da.rename({'band': 'height', 'y': 'width'}) # Keep dimensions consistent with red_da

print(f"Red band selected (lazy): \n{red_da}")
print(f"NIR band selected (lazy): \n{nir_da}")

# --- 3. Calculate NDVI (Chunk-wise operation) ---
epsilon = 1e-8
print("Calculating NDVI (lazy Dask computation)...")
ndvi_da = (nir_da - red_da) / (nir_da + red_da + epsilon)
ndvi_da = ndvi_da.rename("ndvi")

ndvi_da.attrs.update({
    'long_name': 'Normalized Difference Vegetation Index',
    'units': '1',
    'formula': '(NIR - Red) / (NIR + Red)',
    'source_data_variable': DATA_VARIABLE_NAME,
    'red_band_source_index_in_x_dim': RED_BAND_INDEX_IN_X_DIM,
    'nir_band_source_index_in_x_dim': NIR_BAND_INDEX_IN_X_DIM,
    'epsilon_for_division': epsilon
})

print(f"NDVI DataArray (lazy): \n{ndvi_da}")
print(f"NDVI Chunks: {ndvi_da.chunks}")


Selecting Red band using index 0 along 'x' dimension...
Selecting NIR band using index 1 along 'x' dimension...
Red band selected (lazy): 
<xarray.DataArray 'LC08_L1TP_050024_20160520_20170324_01_T1' (band: 7861,
                                                              y: 7751)>
dask.array<getitem, shape=(7861, 7751), dtype=float32, chunksize=(1, 512), chunktype=numpy.ndarray>
Dimensions without coordinates: band, y
NIR band selected (lazy): 
<xarray.DataArray 'LC08_L1TP_050024_20160520_20170324_01_T1' (band: 7861,
                                                              y: 7751)>
dask.array<getitem, shape=(7861, 7751), dtype=float32, chunksize=(1, 512), chunktype=numpy.ndarray>
Dimensions without coordinates: band, y
Calculating NDVI (lazy Dask computation)...
NDVI DataArray (lazy): 
<xarray.DataArray 'ndvi' (band: 7861, y: 7751)>
dask.array<truediv, shape=(7861, 7751), dtype=float32, chunksize=(1, 512), chunktype=numpy.ndarray>
Dimensions without coordinates: band, y
Attrib

In [88]:
print("Calculating NDVI mean and variance (lazy Dask computation)...")

# Calculate mean, explicitly skipping NaNs if they might exist
# If your data is clean (no NaNs expected), skipna=False might be slightly faster.
ndvi_mean_lazy = ndvi_da.mean(skipna=True)
ndvi_variance_lazy = ndvi_da.var(skipna=True)

# We need to compute these values to put them in the JSON.
# This will trigger reading the necessary data for the whole ndvi_da array.
# If ndvi_da is very large, computing global mean/variance can be memory intensive
# if not chunked optimally for reduction. Dask handles this reasonably well.
print("Computing NDVI mean...")
ndvi_mean_computed = ndvi_mean_lazy.compute()
print(f"Computed NDVI Mean: {ndvi_mean_computed.item()}") # .item() to get Python scalar

print("Computing NDVI variance...")
ndvi_variance_computed = ndvi_variance_lazy.compute()
print(f"Computed NDVI Variance: {ndvi_variance_computed.item()}")

Calculating NDVI mean and variance (lazy Dask computation)...
Computing NDVI mean...
Computed NDVI Mean: 0.21052692830562592
Computing NDVI variance...
Computed NDVI Variance: 0.02323130890727043


In [87]:
input_zarr_name = DATA_VARIABLE_NAME # Using the main variable name
output_s3_bucket = zarr_catalog_url.split('/')[2]
output_zarr_prefix = "posterior"
output_ndvi_zarr_name = f"{input_zarr_name}_ndvi.zarr" # Might want a cleaner name
output_ndvi_s3_path = f"s3://{output_s3_bucket}/{output_zarr_prefix}/{output_ndvi_zarr_name}"
print(f"Output NDVI Zarr will be saved to: {output_ndvi_s3_path}")

output_s3_map_root = f"{output_s3_bucket}/{output_zarr_prefix}/{output_ndvi_zarr_name}"
if 's3' not in locals() or not isinstance(s3, s3fs.S3FileSystem):
    s3 = s3fs.S3FileSystem(**s3_options)
s3_map_for_writing = s3fs.S3Map(root=output_s3_map_root, s3=s3, check=False)

# --- 5. Save the NDVI DataArray as a new Zarr store (Chunk-wise write) ---
ndvi_ds_to_save = ndvi_da.to_dataset()
print("Saving NDVI Zarr to S3...")
try:
    task = ndvi_ds_to_save.to_zarr(
        store=s3_map_for_writing,
        mode='w',
        consolidated=True,
        compute=True
    )
    print(f"NDVI Zarr store successfully written to {output_ndvi_s3_path}")
except Exception as e:
    print(f"Error saving NDVI Zarr to S3: {e}")
    import traceback
    traceback.print_exc()
    raise

print("\nNDVI calculation and Zarr upload complete.")

Output NDVI Zarr will be saved to: s3://fusion-lake/posterior/LC08_L1TP_050024_20160520_20170324_01_T1_ndvi.zarr
Saving NDVI Zarr to S3...


KeyboardInterrupt: 

In [None]:
scene_id_for_summary = input_zarr_name

summary_data = {
    "scene_id": scene_id_for_summary,
    "processed_product": "NDVI",
    "source_zarr": zarr_catalog_url, # Link back to the input
    "output_ndvi_zarr": output_ndvi_s3_path,
    "mean": float(ndvi_mean_computed.item()), # Ensure it's a standard float
    "variance": float(ndvi_variance_computed.item()),
    "calculation_timestamp_utc": datetime.now(timezone.utc).isoformat()
}

summary_json_content = json.dumps(summary_data, indent=2)
print(f"Summary JSON content: \n{summary_json_content}")

summary_json_s3_path = f"s3://{output_s3_bucket}/posterior/ndvi_summary_{scene_id_for_summary}.json"


try:
    print(f"Uploading summary JSON to {summary_json_s3_path}...")
    with s3.open(summary_json_s3_path, 'wb') as f: # s3.open expects path within bucket
        f.write(summary_json_content.encode('utf-8'))
    print("Summary JSON uploaded successfully.")
except Exception as e:
    print(f"Error uploading summary JSON: {e}")

Summary JSON content: 
{
  "scene_id": "LC08_L1TP_050024_20160520_20170324_01_T1",
  "processed_product": "NDVI",
  "source_zarr": "s3://fusion-lake/raw/LC08_L1TP_050024_20160520_20170324_01_T1.zarr",
  "output_ndvi_zarr": "s3://fusion-lake/posterior/LC08_L1TP_050024_20160520_20170324_01_T1_ndvi.zarr",
  "mean": 0.21052692830562592,
  "variance": 0.02323130890727043,
  "calculation_timestamp_utc": "2025-05-15T13:37:30.792425+00:00"
}
Uploading summary JSON to s3://fusion-lake/posterior/ndvi_summary_LC08_L1TP_050024_20160520_20170324_01_T1.json...
Summary JSON uploaded successfully.


In [99]:
fs_for_ledger = s3

ledger_bucket = output_s3_bucket
ledger_file_name = "ledgers.csv"
ledger_s3_key = f"{ledger_file_name}"
ledger_full_s3_path = f"s3://{ledger_bucket}/{ledger_s3_key}"

print(f"Updating placeholder ledger at: {ledger_full_s3_path}")

try:
    summary_json_bytes = summary_json_content.encode('utf-8')
    sha256_hash = hashlib.sha256(summary_json_bytes).hexdigest()
    timestamp_utc_str = summary_data["calculation_timestamp_utc"] # Use timestamp from summary

    scene_id_ledger = summary_data["scene_id"]
    mean_val_ledger = summary_data["mean"]
    var_val_ledger = summary_data["variance"]
    stage_ledger = "NDVI_Posterior_Placeholder"

    new_ledger_line = f"{scene_id_ledger},{sha256_hash},{timestamp_utc_str},{stage_ledger},{mean_val_ledger},{var_val_ledger}\n"

    if fs_for_ledger.exists(ledger_s3_key):
        print(f"Ledger file {ledger_s3_key} exists, appending.")
        with fs_for_ledger.open(ledger_s3_key, "ab") as f:
            f.write(new_ledger_line.encode("utf-8"))
    else:
        print(f"Ledger file {ledger_s3_key} does not exist, creating with header.")
        header = "scene_id,summary_json_sha256,timestamp_utc,stage,mean,variance\n"
        content = header + new_ledger_line
        with fs_for_ledger.open(ledger_full_s3_path, "wb") as f:
            f.write(content.encode("utf-8"))
    print(f"Ledger updated successfully for placeholder scene {scene_id_ledger}.")

except Exception as e:
    print(f"Failed to update ledger for placeholder scene {scene_id_for_summary}: {e}")
    import traceback
    traceback.print_exc()

print("\nPlaceholder summary JSON and ledger update complete.")

Updating placeholder ledger at: s3://fusion-lake/ledgers.csv
Ledger file ledgers.csv does not exist, creating with header.
Ledger updated successfully for placeholder scene LC08_L1TP_050024_20160520_20170324_01_T1.

Placeholder summary JSON and ledger update complete.
