# dead pixel detection
dead pixel are defined as nan pixels
the pipeline reads the npy files and scans from one frame to the next, checking if any nan pixels get added/removed, and updates the mask accordingly

functions

In [14]:
import os
import glob
import numpy as np
import xarray as xr
import rasterio
import matplotlib.pyplot as plt

# Reading all npy files and extracting bands -> saving in xarray dataset named ds

def load_frames_to_xarray(npy_folder):
    def extract_number(filename):
        # Extract the number from the filename, e.g., "frame_12.npy" -> 12
        basename = os.path.basename(filename)
        num = ''.join(filter(str.isdigit, basename))
        return int(num) if num else -1

    npy_files = sorted(glob.glob(os.path.join(npy_folder, "*.npy")), key=extract_number)
    # print("Read order:", npy_files)

    L2_list = []
    L1_list = []
    M0_list = []

    for f in npy_files:
        arr = np.load(f)
        L2 = arr[0:100, :, 0]      # L2: rows 0-100, band 0
        L1 = arr[349:419, :, 1]    # L1: rows 349-419, band 1
        M0 = arr[668:768, :, 2]    # M0: rows 668-767, band 2
        L2_list.append(L2)
        L1_list.append(L1)
        M0_list.append(M0)

    L2_arr = np.stack(L2_list)  # shape: (frame, x, y)
    L1_arr = np.stack(L1_list)
    M0_arr = np.stack(M0_list)

    ds = xr.Dataset(
        {
            "L2": (["frame", "x_L2", "y_L2"], L2_arr),
            "L1": (["frame", "x_L1", "y_L1"], L1_arr),
            "M0": (["frame", "x_M0", "y_M0"], M0_arr)
        },
        coords={
            "frame": np.arange(L2_arr.shape[0]),
            "x_L2": np.arange(L2_arr.shape[1]),
            "y_L2": np.arange(L2_arr.shape[2]),
            "x_L1": np.arange(L1_arr.shape[1]),
            "y_L1": np.arange(L1_arr.shape[2]),
            "x_M0": np.arange(M0_arr.shape[1]),
            "y_M0": np.arange(M0_arr.shape[2])
        }
    )
    return ds

# with the mode you can choose which band to process, either "L2", "L1" or "M0"
# If geotransform is provided, it will be used for saving the GeoTIFF, otherwise no georeferencing will be applied

def create_dead_pixel_mask(ds, mode, out_path, transform = None):
    if mode == "L2":
        band_name = "L2"
    elif mode == "L1":
        band_name = "L1"
    elif mode == "M0":
        band_name = "M0"
    else:
        raise ValueError("mode must be 'L2', 'L1', or 'M0'")

    band = ds[band_name].values

    # Initialize mask with NaNs from the first frame
    current_nan_mask = np.isnan(band[0]).copy()

    # Track added and removed NaNs across frames, update mask accordingly
    for frame_idx in range(1, band.shape[0]):
        frame_nan_mask = np.isnan(band[frame_idx])
        current_nan_mask |= frame_nan_mask

    # Save the final (updated) NaN mask as GeoTIFF
    dead_mask_final = current_nan_mask.astype("float32") * 10000
    band_out_path = os.path.join(out_path, f"dead_pixel_mask_{band_name}.tif")
    with rasterio.open(
        band_out_path,
        "w",
        driver="GTiff",
        height=dead_mask_final.shape[0],
        width=dead_mask_final.shape[1],
        count=1,
        dtype="float32",
        transform=transform,
        crs="EPSG:4326"
    ) as dst:
        dst.write(dead_mask_final, 1)

    print(f"Dead pixel mask (all frames) saved at: {band_out_path}")
    return dead_mask_final

example pipeline

In [None]:
npy_folder = "/frames"  # Replace with your npy files folder
out_path = "/output"  # Replace with your desired output folder
mode = "L2"  # Choose between "L2", "L1", or "M0"

ds = load_frames_to_xarray(npy_folder)
dead_mask = create_dead_pixel_mask(ds, mode, out_path, transform=None)


Dead pixel mask (all frames) saved at: /Users/simonreichl/Desktop/Cal_Val working student challenge/output/dead_pixel_mask_L2.tif
[[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]
