In [3]:
import pandas as pd
df = pd.read_parquet("na_surface_chla.parquet")
date_str = sorted(df["time"].dt.strftime("%Y-%m-%d").unique())
df.head()

Unnamed: 0,PLATFORM_NUMBER,CYCLE_NUMBER,PRES,CHLA_surface,LATITUDE,LONGITUDE,TIME,CHLA_QC,log10_CHLA_surface,min_pres,lat,lon,time,source_file
0,1902383,78,19.889999,0.017404,23.0209,-53.222,2024-03-02 16:13:24.002000128,1,-1.759363,19.889999,23.0209,-53.222,2024-03-02 16:13:24.002000128,bgc_na_202403.nc
1,1902383,79,7.98,0.011863,22.7279,-52.3932,2024-03-12 15:19:00.002000128,1,-1.925805,7.98,22.7279,-52.3932,2024-03-12 15:19:00.002000128,bgc_na_202403.nc
2,1902383,80,11.85,0.066477,23.2649,-52.0005,2024-03-22 15:28:33.002000128,1,-1.177332,11.85,23.2649,-52.0005,2024-03-22 15:28:33.002000128,bgc_na_202403.nc
3,1902384,79,2.26,0.025275,20.0154,-42.7101,2024-03-08 20:58:18.002000128,1,-1.597309,2.26,20.0154,-42.7101,2024-03-08 20:58:18.002000128,bgc_na_202403.nc
4,1902384,80,2.14,0.014453,20.3018,-42.5525,2024-03-18 20:31:41.002000128,1,-1.840057,2.14,20.3018,-42.5525,2024-03-18 20:31:41.002000128,bgc_na_202403.nc


In [1]:
import earthaccess
auth = earthaccess.login()
# are we authenticated?
if not auth.authenticated:
    # ask for credentials and persist them in a .netrc file
    auth.login(strategy="interactive", persist=True)


In [4]:
import xarray as xr
results = earthaccess.search_data(
    short_name = "PACE_OCI_L3M_RRS",
    temporal = (df.time.min(), df.time.max()),
    granule_name="*.DAY.*.4km.nc"
)
len(results)

560

In [5]:
fileset = earthaccess.open(results);

QUEUEING TASKS | :   0%|          | 0/560 [00:00<?, ?it/s]

PROCESSING TASKS | :   0%|          | 0/560 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/560 [00:00<?, ?it/s]

In [7]:
# the netcdfs do not have time so we need to get that from the results metadata
# Use the beginning of each 8-day window as the time coordinate
import numpy as np
fileset_time = [
    np.datetime64(r["umm"]["TemporalExtent"]["RangeDateTime"]["BeginningDateTime"][:10])
    for r in results
]

In [13]:
i = 0
time_val = fileset_time[i]
ds = xr.open_dataset(fileset[i], chunks={})
ds = ds.expand_dims(time=[time_val])
ds = ds.assign_coords(time=("time", [time_val]))
matchups = match_points_fast(
    ds=ds,
    df=df,                      # your surface CHL df
    var_name="Rrs",             # or whatever var you're sampling
    vector_dim="wavelength",
    y_name="log10_CHLA_surface",
    ds_lat="lat",
    ds_lon="lon",
    df_time="TIME",
    df_lat="LATITUDE",
    df_lon="LONGITUDE",
)
matchups

In [6]:
import numpy as np
import pandas as pd
import dask.array as da
import xarray as xr

def match_points_fast(
    ds: xr.Dataset,
    df: pd.DataFrame,
    var_name: str,
    y_name: str,
    ds_time: str = "time",
    ds_lat: str = "lat",
    ds_lon: str = "lon",
    df_time: str = "TIME",
    df_lat: str = "LATITUDE",
    df_lon: str = "LONGITUDE",
    dropna_all: bool = False,
):
    """
    Sample variable ds[var_name] at (lat, lon, time) points from df.

    ds[var_name] can be either:
      - (time, lat, lon)           -> one column named var_name
      - (time, lat, lon, extra)   -> multiple columns var_name_<extra>

    Returns a DataFrame; if df is empty (after date filtering), returns an empty DF
    with proper columns.
    """

    # --- ensure time dim exists ---
    if ds_time not in ds.dims:
        tstr = ds.attrs["time_coverage_start"]
        t_utc = pd.to_datetime(tstr, utc=True)   # tz-aware Timestamp in UTC
        t = t_utc.tz_convert(None)
        ds = ds.expand_dims({ds_time: [t]})
        ds = ds.assign_coords({ds_time: (ds_time, [t])})

    # --- get variable ---
    if var_name not in ds:
        raise KeyError(f"{var_name!r} not found in dataset.")
    da_var = ds[var_name]

    # --- check dims ---
    needed_dims = [ds_time, ds_lat, ds_lon]
    if not all(d in da_var.dims for d in needed_dims):
        raise ValueError(
            f"{var_name} dims {da_var.dims} do not contain all of {needed_dims}. "
            "Adjust ds_lat/ds_lon or subset/transform ds[var_name] first."
        )

    # --- normalize to (time, lat, lon) or (time, lat, lon, extra) ---
    if da_var.ndim == 3:
        # (time, lat, lon)
        vector_dim = None
        da_var = da_var.transpose(ds_time, ds_lat, ds_lon)
    elif da_var.ndim == 4:
        # (time, lat, lon, extra)  (extra is always 4th dim)
        vector_dim = da_var.dims[3]
        da_var = da_var.transpose(ds_time, ds_lat, ds_lon, vector_dim)
    else:
        raise ValueError(f"{var_name} has unexpected dims {da_var.dims}")

    # --- coordinate values ---
    time_vals = ds[ds_time].values
    lat_vals = ds[ds_lat].values
    lon_vals = ds[ds_lon].values

    if vector_dim is None:
        # single “spectral” value per pixel
        vec_vals = [None]
    else:
        vec_vals = ds[vector_dim].values  # array of wavelengths / extra coords

    # --- filter df to matching dates ---
    t_dates = pd.to_datetime(time_vals).normalize()
    df_dates = pd.to_datetime(df[df_time]).dt.normalize()
    df2 = df[df_dates.isin(t_dates)].copy()

    # If df is empty: return an empty DF with the right columns
    if df2.empty:
        cols = [df_time, df_lat, df_lon, y_name]
        for v in vec_vals:
            if v is None:
                col_name = var_name
            else:
                try:
                    label = int(v)
                except Exception:
                    label = v
                col_name = f"{var_name}_{label}"
            cols.append(col_name)
        return pd.DataFrame(columns=cols)

    # --- Normal path below ---

    df_lats = df2[df_lat].to_numpy(dtype=float)
    df_lons = df2[df_lon].to_numpy(dtype=float)
    df_times = pd.to_datetime(df2[df_time]).values.astype("datetime64[ns]")
    y_vals = df2[y_name].to_numpy()

    def nearest_index(coord_vals, q):
        """
        Return index of nearest coord_vals for each query q.

        coord_vals: 1D array, strictly monotonic (e.g., time, lat, lon)
        q         : scalar or 1D array of query values

        Returns: 1D array of indices into coord_vals.
        """
        asc = coord_vals[0] <= coord_vals[-1]
        base = coord_vals if asc else -coord_vals
        tgt = q if asc else -q

        idx = np.searchsorted(base, tgt, side="left")
        idx0 = np.clip(idx - 1, 0, len(coord_vals) - 1)
        idx1 = np.clip(idx,     0, len(coord_vals) - 1)

        take_right = np.abs(coord_vals[idx1] - q) < np.abs(coord_vals[idx0] - q)
        return np.where(take_right, idx1, idx0)

    time_i = nearest_index(time_vals, df_times)
    lat_i = nearest_index(lat_vals, df_lats)
    lon_i = nearest_index(lon_vals, df_lons)

    data_da = da_var.data  # (time, lat, lon) or (time, lat, lon, extra)

    if vector_dim is None:
        # scalar per pixel: (N_points,)
        sampled_da = data_da.vindex[time_i, lat_i, lon_i]
    else:
        # vector per pixel: (N_points, N_vec)
        sampled_da = data_da.vindex[time_i, lat_i, lon_i, :]

    if isinstance(sampled_da, da.Array):
        sampled = sampled_da.compute()
    else:
        sampled = np.asarray(sampled_da)

    # unify shape: always (N_points, N_vec)
    if sampled.ndim == 1:
        sampled = sampled[:, None]

    # --- build all columns in a dict first ---
    data = {
        df_time: df_times,
        df_lat:  df_lats,
        df_lon:  df_lons,
        y_name:  y_vals,
    }

    for j, v in enumerate(vec_vals):
        if v is None:
            col_name = var_name
        else:
            try:
                label = int(v)
            except Exception:
                label = v
            col_name = f"{var_name}_{label}"
        data[col_name] = sampled[:, j].astype(float)

    out = pd.DataFrame(data)

    if dropna_all:
        spec_cols = [c for c in out.columns if c.startswith(f"{var_name}_") or c == var_name]
        mask = np.isfinite(out[spec_cols]).any(axis=1)
        out = out[mask].reset_index(drop=True)

    return out


In [None]:
import numpy as np
import pandas as pd
import dask.array as da
import xarray as xr

def match_points_fast(
    ds: xr.Dataset,
    df: pd.DataFrame,
    var_name: str,
    y_name: str,
    ds_time: str = "time",
    ds_lat: str = "lat",
    ds_lon: str = "lon",
    df_time: str = "TIME",
    df_lat: str = "LATITUDE",
    df_lon: str = "LONGITUDE",
    dropna_all: bool = False,
):
    """
    Sample variable ds[var_name] at (lat, lon, time) points from df.

    ds[var_name] can be either:
      - (time, lat, lon)           -> one column named var_name
      - (time, lat, lon, extra)   -> multiple columns var_name_<extra>

    Returns a DataFrame; if df is empty (after date filtering), returns an empty DF
    with proper columns.
    """

    # --- ensure time dim exists ---
    if ds_time not in ds.dims:
        tstr = ds.attrs["time_coverage_start"]
        t_utc = pd.to_datetime(tstr, utc=True)   # tz-aware Timestamp in UTC
        t = t_utc.tz_convert(None)
        ds = ds.expand_dims({ds_time: [t]})
        ds = ds.assign_coords({ds_time: (ds_time, [t])})

    # --- get variable ---
    if var_name not in ds:
        raise KeyError(f"{var_name!r} not found in dataset.")
    da_var = ds[var_name]

    # --- check dims ---
    needed_dims = [ds_time, ds_lat, ds_lon]
    if not all(d in da_var.dims for d in needed_dims):
        raise ValueError(
            f"{var_name} dims {da_var.dims} do not contain all of {needed_dims}. "
            "Adjust ds_lat/ds_lon or subset/transform ds[var_name] first."
        )

    # --- normalize to (time, lat, lon) or (time, lat, lon, extra) ---
    if da_var.ndim == 3:
        # (time, lat, lon)
        vector_dim = None
        da_var = da_var.transpose(ds_time, ds_lat, ds_lon)
    elif da_var.ndim == 4:
        # (time, lat, lon, extra)  (extra is always 4th dim)
        vector_dim = da_var.dims[3]
        da_var = da_var.transpose(ds_time, ds_lat, ds_lon, vector_dim)
    else:
        raise ValueError(f"{var_name} has unexpected dims {da_var.dims}")

    if vector_dim is None:        
        vec_vals = [None] # single value per pixel
    else:
        vec_vals = ds[vector_dim].values  # vector of values

    # --- filter df to matching dates in ds ---
    time_vals = ds[ds_time].values
    t_dates = pd.to_datetime(time_vals).normalize()
    df_dates = pd.to_datetime(df[df_time]).dt.normalize()
    df2 = df[df_dates.isin(t_dates)].copy()

    # If df is empty: return an empty DF with the right columns
    if df2.empty:
        cols = [df_time, df_lat, df_lon, y_name]
        for v in vec_vals:
            if v is None:
                col_name = var_name
            else:
                try:
                    label = int(v)
                except Exception:
                    label = v
                col_name = f"{var_name}_{label}"
            cols.append(col_name)
        return pd.DataFrame(columns=cols)

    # for the df with only dates in ds
    df_times = pd.to_datetime(df2[df_time]).values.astype("datetime64[ns]")
    df_lats = df2[df_lat].to_numpy(dtype=float)
    df_lons = df2[df_lon].to_numpy(dtype=float)
    y_vals = df2[y_name].to_numpy()

    time_idx = ds.get_index(ds_time)
    lat_idx = ds.get_index(ds_lat)
    lon_idx = ds.get_index(ds_lon)

    time_i = time_idx.get_indexer(df_times, method="nearest")
    lat_i = lat_idx.get_indexer(df_lats, method="nearest")
    lon_i = lon_idx.get_indexer(df_lons, method="nearest")

    data_da = da_var.data  # (time, lat, lon) or (time, lat, lon, extra)

    if vector_dim is None:
        # scalar per pixel: (N_points,)
        sampled_da = data_da.vindex[time_i, lat_i, lon_i]
    else:
        # vector per pixel: (N_points, N_vec)
        sampled_da = data_da.vindex[time_i, lat_i, lon_i, :]

    if isinstance(sampled_da, da.Array):
        sampled = sampled_da.compute()
    else:
        sampled = np.asarray(sampled_da)

    # unify shape: always (N_points, N_vec)
    if sampled.ndim == 1:
        sampled = sampled[:, None]

    # --- build all columns in a dict first ---
    data = {
        df_time: df_times,
        df_lat:  df_lats,
        df_lon:  df_lons,
        y_name:  y_vals,
    }

    for j, v in enumerate(vec_vals):
        if v is None:
            col_name = var_name
        else:
            try:
                label = int(v)
            except Exception:
                label = v
            col_name = f"{var_name}_{label}"
        data[col_name] = sampled[:, j].astype(float)

    out = pd.DataFrame(data)

    if dropna_all:
        spec_cols = [c for c in out.columns if c.startswith(f"{var_name}_") or c == var_name]
        mask = np.isfinite(out[spec_cols]).any(axis=1)
        out = out[mask].reset_index(drop=True)

    return out

In [48]:
# Now ds_band["Rrs"] has dims ("time", "lat", "lon")
matchups = match_points_fast(
    ds=ds,
    df=df,                      # your surface CHL df
    var_name="Rrs",             # or whatever var you're sampling
    vector_dim="wavelength",
    y_name="log10_CHLA_surface",
    ds_lat="lat",
    ds_lon="lon",
    df_time="TIME",
    df_lat="LATITUDE",
    df_lon="LONGITUDE",
)
matchups

Unnamed: 0,TIME,LATITUDE,LONGITUDE,log10_CHLA_surface,Rrs_346,Rrs_348,Rrs_351,Rrs_353,Rrs_356,Rrs_358,...,Rrs_706,Rrs_707,Rrs_708,Rrs_709,Rrs_711,Rrs_712,Rrs_713,Rrs_714,Rrs_717,Rrs_719
0,2024-03-05 15:35:00,39.151501,-66.5746,-0.65228,,,,,,,...,,,,,,,,,,


In [None]:
all_matchups = []

for uri, time_val in zip(fileset, fileset_time):
    print(f"Processing {time_val} from {uri}")

    # open with chunks so it's dask-backed, not all in RAM
    ds = xr.open_dataset(
        uri,
        chunks={}  # tweak as needed
    )

    # add 1-length time dim/coord
    ds = ds.expand_dims(time=[time_val])
    ds = ds.assign_coords(time=("time", [time_val]))

    # call your matcher (it will internally subset df to matching dates)
    match_day = match_points_fast(
        ds=ds,
        df=df,                      # your surface CHL df (all days)
        var_name="Rrs",
        vector_dim="wavelength",
        y_name="log10_CHLA_surface",
        ds_lat="lat",
        ds_lon="lon",
        df_time="TIME",
        df_lat="LATITUDE",
        df_lon="LONGITUDE",
    )

    ds.close()

    # skip totally empty days to keep concat clean
    if not match_day.empty:
        all_matchups.append(match_day)

# combine everything
if all_matchups:
    matchups_all = pd.concat(all_matchups, ignore_index=True)
else:
    # in case literally nothing matched
    matchups_all = pd.DataFrame()

matchups_all.head()
len(matchups_all)


Processing 2024-03-05 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.20240305.L3m.DAY.RRS.V3_1.Rrs.4km.nc>
Processing 2024-03-06 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.20240306.L3m.DAY.RRS.V3_1.Rrs.4km.nc>
Processing 2024-03-07 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.20240307.L3m.DAY.RRS.V3_1.Rrs.4km.nc>
Processing 2024-03-08 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.20240308.L3m.DAY.RRS.V3_1.Rrs.4km.nc>
Processing 2024-03-09 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.20240309.L3m.DAY.RRS.V3_1.Rrs.4km.nc>
Processing 2024-03-10 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.20240310.L3m.DAY.RRS.V3_1.Rrs.4km.nc>
Processing 2024-03-11 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.20240311.L3m.DAY.RRS.V3_1.Rrs.4km.nc>
Processing 2024-03-12 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.202403

In [None]:
all_matchups = []

for uri, time_val in zip(fileset, fileset_time):
    print(f"Processing {time_val} from {uri}")

    # Open lazily, don’t cache the file
    with xr.open_dataset(
        uri,
        chunks={},
        cache=False,
    ) as ds_raw:

        # Add 1-length time dimension
        ds = ds_raw.expand_dims(time=[time_val])
        ds = ds.assign_coords(time=("time", [time_val]))

        match_day = match_points_fast(
            ds=ds,
            df=df,                      # full Argo df; function filters by date
            var_name="Rrs",
            vector_dim="wavelength",
            y_name="log10_CHLA_surface",
            ds_lat="lat",
            ds_lon="lon",
            df_time="TIME",
            df_lat="LATITUDE",
            df_lon="LONGITUDE",
        )
    
    # be explicit about cleaning up
    del ds_raw
    gc.collect()

    if not match_day.empty:
        all_matchups.append(match_day)

# Combine all days into one DF
if all_matchups:
    matchups_all = pd.concat(all_matchups, ignore_index=True)
else:
    matchups_all = pd.DataFrame()

print(len(matchups_all))
matchups_all.head()


Processing 2024-03-05 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.20240305.L3m.DAY.RRS.V3_1.Rrs.4km.nc>
Processing 2024-03-06 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.20240306.L3m.DAY.RRS.V3_1.Rrs.4km.nc>
Processing 2024-03-07 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.20240307.L3m.DAY.RRS.V3_1.Rrs.4km.nc>
Processing 2024-03-08 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.20240308.L3m.DAY.RRS.V3_1.Rrs.4km.nc>
Processing 2024-03-09 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.20240309.L3m.DAY.RRS.V3_1.Rrs.4km.nc>
Processing 2024-03-10 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.20240310.L3m.DAY.RRS.V3_1.Rrs.4km.nc>
Processing 2024-03-11 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.20240311.L3m.DAY.RRS.V3_1.Rrs.4km.nc>
Processing 2024-03-12 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.202403

In [None]:
for uri, time_val in zip(fileset, fileset_time):
    print(f"Processing {time_val} from {uri}")

    # open with chunks so it's dask-backed, not all in RAM
    ds = xr.open_dataset(
        uri,
        chunks={}  # tweak as needed
    )


Processing 2024-03-05 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.20240305.L3m.DAY.RRS.V3_1.Rrs.4km.nc>
Processing 2024-03-06 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.20240306.L3m.DAY.RRS.V3_1.Rrs.4km.nc>
Processing 2024-03-07 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.20240307.L3m.DAY.RRS.V3_1.Rrs.4km.nc>
Processing 2024-03-08 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.20240308.L3m.DAY.RRS.V3_1.Rrs.4km.nc>
Processing 2024-03-09 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.20240309.L3m.DAY.RRS.V3_1.Rrs.4km.nc>
Processing 2024-03-10 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.20240310.L3m.DAY.RRS.V3_1.Rrs.4km.nc>
Processing 2024-03-11 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.20240311.L3m.DAY.RRS.V3_1.Rrs.4km.nc>
Processing 2024-03-12 from <File-like object S3FileSystem, ob-cumulus-prod-public/PACE_OCI.202403

NameError: name 'ds' is not defined