In [1]:
import pandas as pd

In [3]:
dr6_splus = pd.read_csv("dr6_list.csv")

In [6]:
dr6_splus

Unnamed: 0,field,ra,dec
0,HYDRA-0011,150.770833,-23.908611
1,HYDRA-0012,152.308333,-23.908611
2,HYDRA-0013,153.845833,-23.908611
3,HYDRA-0014,155.383333,-23.908611
4,HYDRA-0015,156.925000,-23.908611
...,...,...,...
3045,STRIPE82-0166,355.750000,0.700000
3046,STRIPE82-0167,357.166667,-0.700000
3047,STRIPE82-0168,357.166667,0.700000
3048,STRIPE82-0169,358.583333,-0.700000


In [None]:
/home/astrodados/gaia_dr3/GaiaSource_463472-463488.csv.gz

/home/astrodados3/splus/idr6/SPLUS-s02s01/SPLUS-s02s01_F378.fits.fz

/home/astrodados4/downloads/desi_spectra/25861/20220124/coadd-0-25861-thru20220124.fits

/home/astrodados3/dados/ztf22/dataset/Norder=2/Dir=0/Npix=137.parquet

Unnamed: 0,field,ra,dec
0,HYDRA-0011,150.770833,-23.908611
1,HYDRA-0012,152.308333,-23.908611
2,HYDRA-0013,153.845833,-23.908611
3,HYDRA-0014,155.383333,-23.908611
4,HYDRA-0015,156.925000,-23.908611
...,...,...,...
3045,STRIPE82-0166,355.750000,0.700000
3046,STRIPE82-0167,357.166667,-0.700000
3047,STRIPE82-0168,357.166667,0.700000
3048,STRIPE82-0169,358.583333,-0.700000


In [None]:
#!/usr/bin/env python
import os
import re
import glob
from collections import defaultdict

import numpy as np
import pandas as pd
import healpy as hp
import pyarrow.parquet as pq

from astropy.io import fits
from astropy.table import Table
from astropy.coordinates import SkyCoord
import astropy.units as u

# -------------------------------------------------
# CONFIG – adjust paths/parameters to your setup
# -------------------------------------------------

SPLUS_BANDS = ["R", "I", "G", "U", "Z", "F378", "F395", "F410",
               "F430", "F515", "F660", "F861"]

SPLUS_IMAGES_PATH = ["/home/astrodados3/splus/idr6/"]

# Your driver list (one field per line)
DR6_LIST = "dr6_list.csv"   # columns: field, ra, dec  (ra/dec lowercase)

# Pattern for S-PLUS dual catalogs (if ever needed)
SPLUS_CAT_PATTERN = "/home/astrodados3/splus/idr6_final/main/SPLUS-*_dual.fits"

# Gaia DR3 source chunks (GaiaSource_<start>-<end>.csv.gz, HEALPix level-8 ranges)
GAIA_DIR = "/home/astrodados/gaia_dr3"

# Gaia XP spectra chunks (XpContinuousMeanSpectrum_<start>-<end>.csv.gz, same scheme)
GAIA_SPEC_DIR = "/home/astrodados4/downloads/gaia_spectra"

# ZTF HiPS parquet
ZTF_ROOT = "/home/astrodados3/dados/ztf22/dataset"

# DESI spectra
DESI_ROOT = "/home/astrodados4/downloads/desi_spectra"
DESI_TILES_CSV = "desi_tiles.csv"   # must contain TILEID, TILERA, TILEDEC
DESI_TILE_RADIUS_DEG = 1.7         # match S-PLUS field center to DESI tile center

# HEALPix schemes
# Gaia DR3 bulk download: files named GaiaSource_<start>-<end>.csv.gz
# are contiguous ranges of HEALPix level-8 indices → NSIDE=256
GAIA_NSIDE = 256
GAIA_NEST = True

GAIA_SPEC_NSIDE = GAIA_NSIDE
GAIA_SPEC_NEST = GAIA_NEST

# ZTF HiPS: Norder=2 → NSIDE = 2^2 = 4, NESTED
ZTF_NORDER = 2
ZTF_NSIDE = 2 ** ZTF_NORDER
ZTF_NEST = True

# Matching radii
MATCH_RADIUS_SPLUS_GAIA = 0.5 * u.arcsec
MATCH_RADIUS_SPLUS_ZTF  = 1.0 * u.arcsec
MATCH_RADIUS_SPLUS_DESI = 1.0 * u.arcsec

# How many S-PLUS rows to process at once (per field)
CHUNK_SIZE = 300000

# -------------------------------------------------
# Small helpers
# -------------------------------------------------

def radec_to_hpix(ra_deg, dec_deg, nside, nest=True):
    ra = np.asarray(ra_deg)
    dec = np.asarray(dec_deg)
    theta = np.radians(90.0 - dec)
    phi = np.radians(ra)
    return hp.ang2pix(nside, theta, phi, nest=nest)

def build_range_index(directory, prefix, suffix):
    """
    Build list of (start, end, path) for files like:
      prefix<start>-<end>suffix

    For Gaia DR3 bulk download, <start>,<end> are HEALPix level-8 indices.
    """
    entries = []
    pattern = re.escape(prefix) + r"(\d+)-(\d+)" + re.escape(suffix)
    for path in glob.glob(os.path.join(directory, prefix + "*" + suffix)):
        fname = os.path.basename(path)
        m = re.search(pattern, fname)
        if not m:
            continue
        start = int(m.group(1))
        end   = int(m.group(2))
        entries.append((start, end, path))
    entries.sort(key=lambda x: x[0])
    return entries

def files_for_hpix_ranges(hpix_indices, range_index):
    """
    Given a list of HEALPix indices and a list of (start,end,path) ranges,
    return the subset of files whose [start,end] overlaps those indices.
    """
    hpix_set = set(int(h) for h in np.unique(hpix_indices))
    files = set()
    for start, end, path in range_index:
        for h in list(hpix_set):
            if start <= h <= end:
                files.add(path)
    return sorted(files)

def ztf_file_for_hpix(hpix, norder=ZTF_NORDER, root=ZTF_ROOT):
    hpix = int(hpix)
    directory = hpix // 1000
    return os.path.join(
        root,
        f"Norder={norder}",
        f"Dir={directory}",
        f"Npix={hpix}.parquet",
    )

def desi_file_paths_for_tiles(tile_ids):
    """
    For a list of DESI TILEID values, return all coadd-*.fits files under
    DESI_ROOT/<TILEID>/<NIGHT>/coadd-*-TILEID-thru*.fits (or fallback coadd-*.fits).
    """
    paths = []
    for tileid in tile_ids:
        tileid = int(tileid)
        tile_dir = os.path.join(DESI_ROOT, str(tileid))
        if not os.path.isdir(tile_dir):
            print(f"      [DESI] Tile dir not found for TILEID={tileid}: {tile_dir}")
            continue

        pattern_main = os.path.join(tile_dir, "*", f"coadd-*-{tileid}-thru*.fits")
        tile_paths = glob.glob(pattern_main)
        if not tile_paths:
            # fallback: any coadd file
            pattern_fallback = os.path.join(tile_dir, "*", "coadd-*.fits")
            tile_paths = glob.glob(pattern_fallback)

        tile_paths = sorted(tile_paths)
        if not tile_paths:
            print(f"      [DESI] No coadd files in {tile_dir}")
        else:
            print(f"      [DESI] TILEID {tileid} → {len(tile_paths)} coadd files")
            paths.extend(tile_paths)

    return paths

def match_catalogs(src_coord, cat_coord, max_sep):
    idx, d2d, _ = src_coord.match_to_catalog_sky(cat_coord)
    idx[d2d > max_sep] = -1
    return idx

def read_gaia_ecsv(path):
    """
    Read Gaia DR3 bulk-download file (GaiaSource_* or XpContinuousMeanSpectrum_*)
    using the required ECSV reader configuration.
    """
    print(f"        [Gaia/ECSV] Opening {path}")
    tab = Table.read(
        path,
        format="ascii.ecsv",
        guess=False,
        fill_values=[('null', 99), ('nan', 99)],
    )
    return tab.to_pandas()

# -------------------------------------------------
# S-PLUS per-field
# -------------------------------------------------

def get_splus_path_for_field(field_name):
    """
    Return the S-PLUS dual catalog FITS path for a given field.

    Here we assume files like:
      /home/astrodados3/splus/idr6_final/main/HYDRA-0011_dual.fits
    i.e., <FIELD>_dual.fits
    """
    pattern = os.path.join(
        "/home/astrodados3/splus/idr6_final/main",
        f"{field_name}*.fits"
    )
    print(f"  [SPLUS] Globbing S-PLUS files with pattern: {pattern}")
    candidates = glob.glob(pattern)
    print(f"  [SPLUS] Found {len(candidates)} S-PLUS files for field {field_name}.")
    if len(candidates) == 1:
        return candidates[0]

    raise RuntimeError(
        f"Please define S-PLUS file path for field '{field_name}' "
        f"in get_splus_path_for_field()."
    )

def load_splus_for_field(field_row):
    """
    Open ONLY the S-PLUS catalog corresponding to this field.
    Assumes entire tile is in one FITS file.
    """
    field_name = field_row["field"]
    fits_path = get_splus_path_for_field(field_name)
    print(f"  [SPLUS] Opening S-PLUS file: {fits_path}")
    tab = Table.read(fits_path)
    df = tab.to_pandas()
    if "ra" not in df.columns or "dec" not in df.columns:
        raise ValueError(f"{fits_path} has no 'ra'/'dec' columns.")
    print(f"  [SPLUS] Loaded {len(df)} rows for field {field_name}")
    return df

# -------------------------------------------------
# Per-chunk external survey processing
# -------------------------------------------------

def process_chunk_gaia(splus_chunk, gaia_index):
    if splus_chunk.empty:
        return pd.DataFrame(index=splus_chunk.index)

    print(f"      [Gaia] Processing chunk with {len(splus_chunk)} S-PLUS rows")

    splus_coord = SkyCoord(
        splus_chunk["ra"].values * u.deg,
        splus_chunk["dec"].values * u.deg,
    )
    hpix_gaia = radec_to_hpix(
        splus_chunk["ra"], splus_chunk["dec"], GAIA_NSIDE, GAIA_NEST
    )

    needed_files = files_for_hpix_ranges(hpix_gaia, gaia_index)
    print(f"      [Gaia] Files needed for this chunk: {len(needed_files)}")
    if not needed_files:
        return pd.DataFrame(index=splus_chunk.index)

    gaia_rows = []
    for path in needed_files:
        df = read_gaia_ecsv(path)
        gaia_rows.append(df)

    gaia_cat = pd.concat(gaia_rows, ignore_index=True)
    print(f"      [Gaia] Total Gaia rows loaded for this chunk: {len(gaia_cat)}")

    if "ra" not in gaia_cat.columns or "dec" not in gaia_cat.columns:
        raise ValueError("Gaia catalog has no 'ra'/'dec' columns.")

    gaia_coord = SkyCoord(
        gaia_cat["ra"].values * u.deg,
        gaia_cat["dec"].values * u.deg,
    )
    match_idx = match_catalogs(splus_coord, gaia_coord, MATCH_RADIUS_SPLUS_GAIA)
    n_matches = np.sum(match_idx != -1)
    print(f"      [Gaia] Matched {n_matches} / {len(splus_chunk)}")

    out = []
    for idx in match_idx:
        out.append({} if idx == -1 else gaia_cat.iloc[int(idx)].to_dict())

    gaia_df = pd.DataFrame(out, index=splus_chunk.index)
    if not gaia_df.empty:
        gaia_df.columns = [f"gaia_{c}" for c in gaia_df.columns]

    return gaia_df

def process_chunk_gaia_spectra(gaia_df, gaia_spec_index):
    if gaia_df.empty or "gaia_source_id" not in gaia_df.columns:
        print("      [GaiaXP] No Gaia matches or missing 'gaia_source_id'.")
        return pd.DataFrame(index=gaia_df.index)

    sid_series = gaia_df["gaia_source_id"].dropna()
    if sid_series.empty:
        print("      [GaiaXP] No valid source_ids.")
        return pd.DataFrame(index=gaia_df.index)

    if not {"gaia_ra", "gaia_dec"}.issubset(gaia_df.columns):
        print("      [GaiaXP] Missing 'gaia_ra'/'gaia_dec'.")
        return pd.DataFrame(index=gaia_df.index)

    ra = gaia_df["gaia_ra"].astype(float)
    dec = gaia_df["gaia_dec"].astype(float)
    mask_valid = (~ra.isna()) & (~dec.isna()) & (~gaia_df["gaia_source_id"].isna())
    if not mask_valid.any():
        print("      [GaiaXP] No rows with valid RA/DEC + source_id.")
        return pd.DataFrame(index=gaia_df.index)

    hpix_spec = radec_to_hpix(
        ra[mask_valid].values,
        dec[mask_valid].values,
        GAIA_SPEC_NSIDE,
        GAIA_SPEC_NEST,
    )
    needed_files = files_for_hpix_ranges(hpix_spec, gaia_spec_index)
    print(f"      [GaiaXP] Files needed: {len(needed_files)}")

    if not needed_files:
        return pd.DataFrame(index=gaia_df.index)

    needed_source_ids = set(sid_series.astype("int64").values.tolist())

    spec_rows = []
    for path in needed_files:
        df = read_gaia_ecsv(path)
        if "source_id" not in df.columns:
            print(f"        [GaiaXP] 'source_id' missing in {path}, skipping.")
            continue
        df = df[df["source_id"].isin(needed_source_ids)]
        print(f"        [GaiaXP] {len(df)} rows match in {path}")
        if not df.empty:
            spec_rows.append(df)

    if not spec_rows:
        print("      [GaiaXP] No XP spectra matched.")
        return pd.DataFrame(index=gaia_df.index)

    spec_cat = pd.concat(spec_rows, ignore_index=True)
    spec_unique = spec_cat.drop_duplicates(subset="source_id", keep="first")
    spec_unique = spec_unique.set_index("source_id")

    out = []
    for sid in gaia_df["gaia_source_id"]:
        if pd.isna(sid):
            out.append({})
        else:
            sid_int = int(sid)
            if sid_int in spec_unique.index:
                out.append(spec_unique.loc[sid_int].to_dict())
            else:
                out.append({})

    gaiaxp_df = pd.DataFrame(out, index=gaia_df.index)
    if not gaiaxp_df.empty:
        gaiaxp_df.columns = [f"gaiaxp_{c}" for c in gaiaxp_df.columns]

    return gaiaxp_df

def process_chunk_ztf(splus_chunk):
    if splus_chunk.empty:
        return pd.DataFrame(index=splus_chunk.index)

    print(f"      [ZTF] Processing chunk with {len(splus_chunk)} S-PLUS rows")

    splus_coord = SkyCoord(
        splus_chunk["ra"].values * u.deg,
        splus_chunk["dec"].values * u.deg,
    )
    hpix_ztf = radec_to_hpix(
        splus_chunk["ra"], splus_chunk["dec"], ZTF_NSIDE, ZTF_NEST
    )

    files_by_hpix = defaultdict(list)
    for hpix in np.unique(hpix_ztf):
        path = ztf_file_for_hpix(hpix)
        if os.path.exists(path):
            files_by_hpix[int(hpix)].append(path)

    print(f"      [ZTF] Unique hpix with existing files: {len(files_by_hpix)}")

    if not files_by_hpix:
        return pd.DataFrame(index=splus_chunk.index)

    ztf_rows = []
    for hpix, paths in files_by_hpix.items():
        print(f"        [ZTF] hpix={hpix} → {len(paths)} parquet files")
        for path in paths:
            tab = pq.read_table(path).to_pandas()
            ztf_rows.append(tab)

    if not ztf_rows:
        return pd.DataFrame(index=splus_chunk.index)

    ztf_cat = pd.concat(ztf_rows, ignore_index=True)
    print(f"      [ZTF] Total ZTF rows loaded: {len(ztf_cat)}")

    if "ra" not in ztf_cat.columns or "dec" not in ztf_cat.columns:
        raise ValueError("ZTF parquet must contain 'ra'/'dec' columns.")

    ztf_coord = SkyCoord(
        ztf_cat["ra"].values * u.deg,
        ztf_cat["dec"].values * u.deg,
    )
    match_idx = match_catalogs(splus_coord, ztf_coord, MATCH_RADIUS_SPLUS_ZTF)
    n_matches = np.sum(match_idx != -1)
    print(f"      [ZTF] Matched {n_matches} / {len(splus_chunk)}")

    out = []
    for idx in match_idx:
        if idx == -1:
            out.append({})
        else:
            out.append(ztf_cat.iloc[int(idx)].to_dict())

    ztf_df = pd.DataFrame(out, index=splus_chunk.index)
    if not ztf_df.empty:
        ztf_df.columns = [f"ztf_{c}" for c in ztf_df.columns]

    return ztf_df

def process_chunk_desi(splus_chunk, desi_tile_ids):
    if splus_chunk.empty:
        return pd.DataFrame(index=splus_chunk.index)

    print(f"      [DESI] Processing chunk with {len(splus_chunk)} S-PLUS rows")
    print(f"      [DESI] Tiles considered for this field: {len(desi_tile_ids)}")

    if not desi_tile_ids:
        return pd.DataFrame(index=splus_chunk.index)

    splus_coord = SkyCoord(
        splus_chunk["ra"].values * u.deg,
        splus_chunk["dec"].values * u.deg,
    )

    desi_rows = [ {} for _ in range(len(splus_chunk)) ]

    paths = desi_file_paths_for_tiles(desi_tile_ids)
    print(f"      [DESI] Total DESI coadd files to inspect: {len(paths)}")

    for path in paths:
        print(f"        [DESI] Opening {path}")
        with fits.open(path) as hd:
            if "FIBERMAP" not in hd:
                print("        [DESI] No FIBERMAP extension, skipping.")
                continue
            fibermap = Table(hd["FIBERMAP"].data).to_pandas()

        # Drop fibermap rows with NaN coordinates
        if "TARGET_RA" not in fibermap.columns or "TARGET_DEC" not in fibermap.columns:
            print("        [DESI] FIBERMAP missing TARGET_RA/DEC, skipping.")
            continue

        fibermap = fibermap.dropna(subset=["TARGET_RA", "TARGET_DEC"])
        if fibermap.empty:
            print("        [DESI] All TARGET_RA/DEC are NaN in this file, skipping.")
            continue

        desi_coord = SkyCoord(
            fibermap["TARGET_RA"].values * u.deg,
            fibermap["TARGET_DEC"].values * u.deg,
        )
        match_idx = match_catalogs(splus_coord, desi_coord, MATCH_RADIUS_SPLUS_DESI)
        n_matches = np.sum(match_idx != -1)
        print(f"        [DESI] Matched {n_matches} / {len(splus_chunk)} in this file")

        for i, idx in enumerate(match_idx):
            if idx == -1:
                continue
            if not desi_rows[i]:
                row = dict(fibermap.iloc[int(idx)])
                row["desi_file"] = path
                desi_rows[i] = row

    desi_df = pd.DataFrame(desi_rows, index=splus_chunk.index)
    if not desi_df.empty:
        desi_df.columns = [f"desi_{c}" for c in desi_df.columns]

    return desi_df

# -------------------------------------------------
# Per-field datacube
# -------------------------------------------------

def build_datacube_for_field(field_row,
                             gaia_index,
                             gaia_spec_index,
                             desi_tiles,
                             desi_tiles_coord):
    field_name = field_row["field"]
    field_ra = float(field_row["ra"])
    field_dec = float(field_row["dec"])

    print(f"\n=== Field {field_name} ({field_ra}, {field_dec}) ===")

    # 0. Determine DESI tiles near this field (based on field center)
    field_center = SkyCoord(field_ra * u.deg, field_dec * u.deg)
    sep = field_center.separation(desi_tiles_coord)
    mask_tiles = sep.deg < DESI_TILE_RADIUS_DEG
    field_tile_ids = desi_tiles.loc[mask_tiles, "TILEID"].astype(int).tolist()
    print(f"  [DESI] Tiles within {DESI_TILE_RADIUS_DEG} deg of field center: {len(field_tile_ids)}")

    # 1. Open ONLY the S-PLUS catalog for this field
    splus_field = load_splus_for_field(field_row)
    print(f"  [SPLUS] Objects in this field file: {len(splus_field)}")

    all_chunks = []

    # 2. Process this field in chunks (for memory)
    for start in range(0, len(splus_field), CHUNK_SIZE):
        end = min(start + CHUNK_SIZE, len(splus_field))
        print(f"    [Field {field_name}] chunk {start}-{end}")
        chunk = splus_field.iloc[start:end].copy()

        gaia_df   = process_chunk_gaia(chunk, gaia_index)
        gaiaxp_df = process_chunk_gaia_spectra(gaia_df, gaia_spec_index)
        ztf_df    = process_chunk_ztf(chunk)
        desi_df   = process_chunk_desi(chunk, field_tile_ids)

        merged = (chunk
                  .join(gaia_df)
                  .join(gaiaxp_df)
                  .join(ztf_df)
                  .join(desi_df))
        all_chunks.append(merged)

    datacube_field = pd.concat(all_chunks, ignore_index=True)
    out_name = f"datacube_{field_name}.parquet"
    datacube_field.to_parquet(out_name)
    print(f"  [OUT] Saved {len(datacube_field)} rows to {out_name}")

    # free memory for this field
    del datacube_field
    del all_chunks
    del splus_field

# -------------------------------------------------
# Main
# -------------------------------------------------

def main():
    # Driver list
    dr6_splus = pd.read_csv(DR6_LIST)
    print(f"Loaded {len(dr6_splus)} rows from {DR6_LIST}")

    # Gaia indices (HEALPix level-8 ranges, small metadata kept once)
    gaia_index = build_range_index(GAIA_DIR, "GaiaSource_", ".csv.gz")
    print(f"Indexed {len(gaia_index)} Gaia source chunks")

    gaia_spec_index = build_range_index(
        GAIA_SPEC_DIR, "XpContinuousMeanSpectrum_", ".csv.gz"
    )
    print(f"Indexed {len(gaia_spec_index)} Gaia XP spectra chunks")

    # DESI tiles
    desi_tiles = pd.read_csv(DESI_TILES_CSV)
    # Expect TILEID, TILERA, TILEDEC
    for col in ("TILEID", "TILERA", "TILEDEC"):
        if col not in desi_tiles.columns:
            raise ValueError(f"{DESI_TILES_CSV} is missing '{col}' column.")
    print(f"Loaded {len(desi_tiles)} DESI tiles from {DESI_TILES_CSV}")

    desi_tiles_coord = SkyCoord(
        desi_tiles["TILERA"].values * u.deg,
        desi_tiles["TILEDEC"].values * u.deg,
    )

    # Process ONE FIELD AT A TIME
    for _, row in dr6_splus.iterrows():
        build_datacube_for_field(row,
                                 gaia_index,
                                 gaia_spec_index,
                                 desi_tiles,
                                 desi_tiles_coord)

if __name__ == "__main__":
    main()

Loaded 3050 rows from dr6_list.csv
Indexed 3386 Gaia source chunks
Indexed 3381 Gaia XP spectra chunks
Loaded 6101 DESI tiles from desi_tiles.csv

=== Field HYDRA-0011 (150.7708333, -23.90861111) ===
  [DESI] Tiles within 1.7 deg of field center: 0
  [SPLUS] Globbing S-PLUS files with pattern: /home/astrodados3/splus/idr6_final/main/HYDRA-0011*.fits
  [SPLUS] Found 1 S-PLUS files for field HYDRA-0011.
  [SPLUS] Opening S-PLUS file: /home/astrodados3/splus/idr6_final/main/HYDRA-0011_dual.fits
  [SPLUS] Loaded 77127 rows for field HYDRA-0011
  [SPLUS] Objects in this field file: 77127
    [Field HYDRA-0011] chunk 0-77127
      [Gaia] Processing chunk with 77127 S-PLUS rows
      [Gaia] Files needed for this chunk: 2
        [Gaia/ECSV] Opening /home/astrodados/gaia_dr3/GaiaSource_643214-644005.csv.gz
        [Gaia/ECSV] Opening /home/astrodados/gaia_dr3/GaiaSource_644006-645213.csv.gz
      [Gaia] Total Gaia rows loaded for this chunk: 1060042
      [Gaia] Matched 19215 / 77127
      [Ga

KeyboardInterrupt: 

In [None]:
#!/usr/bin/env python
import os
import re
import glob
from collections import defaultdict

import numpy as np
import pandas as pd
import healpy as hp
import pyarrow.parquet as pq

from astropy.io import fits
from astropy.table import Table
from astropy.coordinates import SkyCoord
from astropy.wcs import WCS
from astropy.nddata import Cutout2D
import astropy.units as u

# -------------------------------------------------
# CONFIG – adjust paths/parameters to your setup
# -------------------------------------------------

SPLUS_BANDS = ["R", "I", "G", "U", "Z", "F378", "F395", "F410",
               "F430", "F515", "F660", "F861"]

# Base dirs where images live; we’ll search in each
SPLUS_IMAGES_PATH = ["/home/astrodados3/splus/idr6/"]

# Cutout configuration
SPLUS_CUTOUT_SIZE = 96  # pixels
# If True, only make S-PLUS cutouts for objects that have a Gaia match
ONLY_GAIA_MATCH_CUTOUTS = True

# Your driver list (one field per line)
DR6_LIST = "dr6_list.csv"   # columns: field, ra, dec  (ra/dec lowercase)

# Pattern for S-PLUS dual catalogs (if ever needed)
SPLUS_CAT_PATTERN = "/home/astrodados3/splus/idr6_final/main/SPLUS-*_dual.fits"

# Gaia DR3 source chunks (GaiaSource_<start>-<end>.csv.gz, HEALPix level-8 ranges)
GAIA_DIR = "/home/astrodados/gaia_dr3"

# Gaia XP spectra chunks (XpContinuousMeanSpectrum_<start>-<end>.csv.gz, same scheme)
GAIA_SPEC_DIR = "/home/astrodados4/downloads/gaia_spectra"

# ZTF HiPS parquet
ZTF_ROOT = "/home/astrodados3/dados/ztf22/dataset"

# DESI spectra
DESI_ROOT = "/home/astrodados4/downloads/desi_spectra"
DESI_TILES_CSV = "desi_tiles.csv"   # must contain TILEID, TILERA, TILEDEC
DESI_TILE_RADIUS_DEG = 1.7         # match S-PLUS field center to DESI tile center

# APOGEE allStar DR17
APOGEE_PATH = "/home/astrodados4/downloads/allStar-dr17-synspec_rev1.fits"

# HEALPix schemes
# Gaia DR3 bulk download: files named GaiaSource_<start>-<end>.csv.gz
# are contiguous ranges of HEALPix level-8 indices → NSIDE=256
GAIA_NSIDE = 256
GAIA_NEST = True

GAIA_SPEC_NSIDE = GAIA_NSIDE
GAIA_SPEC_NEST = GAIA_NEST

# ZTF HiPS: Norder=2 → NSIDE = 2^2 = 4, NESTED
ZTF_NORDER = 2
ZTF_NSIDE = 2 ** ZTF_NORDER
ZTF_NEST = True

# Matching radii
MATCH_RADIUS_SPLUS_GAIA    = 0.5 * u.arcsec
MATCH_RADIUS_SPLUS_ZTF     = 1.0 * u.arcsec
MATCH_RADIUS_SPLUS_DESI    = 1.0 * u.arcsec
MATCH_RADIUS_SPLUS_APOGEE  = 1.0 * u.arcsec

# How many S-PLUS rows to process at once (per field)
CHUNK_SIZE = 300000

# -------------------------------------------------
# Small helpers
# -------------------------------------------------

def radec_to_hpix(ra_deg, dec_deg, nside, nest=True):
    ra = np.asarray(ra_deg)
    dec = np.asarray(dec_deg)
    theta = np.radians(90.0 - dec)
    phi = np.radians(ra)
    return hp.ang2pix(nside, theta, phi, nest=nest)

def build_range_index(directory, prefix, suffix):
    """
    Build list of (start, end, path) for files like:
      prefix<start>-<end>suffix

    For Gaia DR3 bulk download, <start>,<end> are HEALPix level-8 indices.
    """
    entries = []
    pattern = re.escape(prefix) + r"(\d+)-(\d+)" + re.escape(suffix)
    for path in glob.glob(os.path.join(directory, prefix + "*" + suffix)):
        fname = os.path.basename(path)
        m = re.search(pattern, fname)
        if not m:
            continue
        start = int(m.group(1))
        end   = int(m.group(2))
        entries.append((start, end, path))
    entries.sort(key=lambda x: x[0])
    return entries

def files_for_hpix_ranges(hpix_indices, range_index):
    """
    Given a list of HEALPix indices and a list of (start,end,path) ranges,
    return the subset of files whose [start,end] overlaps those indices.
    """
    hpix_set = set(int(h) for h in np.unique(hpix_indices))
    files = set()
    for start, end, path in range_index:
        for h in list(hpix_set):
            if start <= h <= end:
                files.add(path)
    return sorted(files)

def ztf_file_for_hpix(hpix, norder=ZTF_NORDER, root=ZTF_ROOT):
    hpix = int(hpix)
    directory = hpix // 1000
    return os.path.join(
        root,
        f"Norder={norder}",
        f"Dir={directory}",
        f"Npix={hpix}.parquet",
    )

def desi_file_paths_for_tiles(tile_ids):
    """
    For a list of DESI TILEID values, return all coadd-*.fits files under
    DESI_ROOT/<TILEID>/<NIGHT>/coadd-*-TILEID-thru*.fits (or fallback coadd-*.fits).
    """
    paths = []
    for tileid in tile_ids:
        tileid = int(tileid)
        tile_dir = os.path.join(DESI_ROOT, str(tileid))
        if not os.path.isdir(tile_dir):
            print(f"      [DESI] Tile dir not found for TILEID={tileid}: {tile_dir}")
            continue

        pattern_main = os.path.join(tile_dir, "*", f"coadd-*-{tileid}-thru*.fits")
        tile_paths = glob.glob(pattern_main)
        if not tile_paths:
            # fallback: any coadd file
            pattern_fallback = os.path.join(tile_dir, "*", "coadd-*.fits")
            tile_paths = glob.glob(pattern_fallback)

        tile_paths = sorted(tile_paths)
        if not tile_paths:
            print(f"      [DESI] No coadd files in {tile_dir}")
        else:
            print(f"      [DESI] TILEID {tileid} → {len(tile_paths)} coadd files")
            paths.extend(tile_paths)

    return paths

def match_catalogs(src_coord, cat_coord, max_sep):
    idx, d2d, _ = src_coord.match_to_catalog_sky(cat_coord)
    idx[d2d > max_sep] = -1
    return idx

def read_gaia_ecsv(path):
    """
    Read Gaia DR3 bulk-download file (GaiaSource_* or XpContinuousMeanSpectrum_*)
    using the required ECSV reader configuration.
    """
    print(f"        [Gaia/ECSV] Opening {path}")
    tab = Table.read(
        path,
        format="ascii.ecsv",
        guess=False,
        fill_values=[('null', 99), ('nan', 99)],
    )
    return tab.to_pandas()

# -------------------------------------------------
# S-PLUS per-field (catalog)
# -------------------------------------------------

def get_splus_path_for_field(field_name):
    """
    Return the S-PLUS dual catalog FITS path for a given field.

    Here we assume files like:
      /home/astrodados3/splus/idr6_final/main/HYDRA-0011_dual.fits
    i.e., <FIELD>_dual.fits
    """
    pattern = os.path.join(
        "/home/astrodados3/splus/idr6_final/main",
        f"{field_name}*.fits"
    )
    print(f"  [SPLUS] Globbing S-PLUS files with pattern: {pattern}")
    candidates = glob.glob(pattern)
    print(f"  [SPLUS] Found {len(candidates)} S-PLUS files for field {field_name}.")
    if len(candidates) == 1:
        return candidates[0]

    raise RuntimeError(
        f"Please define S-PLUS file path for field '{field_name}' "
        f"in get_splus_path_for_field()."
    )

def load_splus_for_field(field_row):
    """
    Open ONLY the S-PLUS catalog corresponding to this field.
    Assumes entire tile is in one FITS file.
    """
    field_name = field_row["field"]
    fits_path = get_splus_path_for_field(field_name)
    print(f"  [SPLUS] Opening S-PLUS file: {fits_path}")
    tab = Table.read(fits_path)
    df = tab.to_pandas()
    if "ra" not in df.columns or "dec" not in df.columns:
        raise ValueError(f"{fits_path} has no 'ra'/'dec' columns.")
    print(f"  [SPLUS] Loaded {len(df)} rows for field {field_name}")
    return df

# -------------------------------------------------
# S-PLUS image cutouts (stored in parquet)
# -------------------------------------------------

def get_splus_image_path_for_band(field_name, band):
    """
    Find the S-PLUS image file for a given field and band,
    trying all base paths in SPLUS_IMAGES_PATH.
    Expected pattern:
      <base>/<FIELD>/<FIELD>_<BAND>.fits.fz
    Example:
      /home/astrodados3/splus/idr6/HYDRA-0011/HYDRA-0011_F378.fits.fz
    """
    for base in SPLUS_IMAGES_PATH:
        path = os.path.join(base, field_name, f"{field_name}_{band}.fits.fz")
        if os.path.exists(path):
            return path
    raise FileNotFoundError(
        f"No image found for field {field_name}, band {band} "
        f"in any of {SPLUS_IMAGES_PATH}"
    )

def process_chunk_splus_images(field_name,
                               splus_chunk,
                               cutout_size=SPLUS_CUTOUT_SIZE,
                               mask_rows=None):
    """
    For this S-PLUS chunk, generate cutouts in all SPLUS_BANDS and
    return a DataFrame with columns of pixel data:

        splus_cut_R, splus_cut_I, ..., splus_cut_F378, ...

    Each cell is either:
        - a 1D float32 array of length cutout_size*cutout_size,
        - or None if cutout could not be made or was skipped.

    If mask_rows is provided (Series aligned with splus_chunk.index),
    cutouts are generated ONLY for rows where mask_rows is True.
    """
    if splus_chunk.empty:
        return pd.DataFrame(index=splus_chunk.index)

    print(f"      [SPLUS IMG] Making cutouts for chunk with {len(splus_chunk)} rows")

    # Normalize mask_rows to a boolean numpy array aligned to splus_chunk
    mask_arr = None
    if mask_rows is not None:
        mask_rows = mask_rows.reindex(splus_chunk.index)
        mask_arr = mask_rows.fillna(False).values

    # Coordinates for all rows (we'll skip via mask if needed)
    coords = SkyCoord(
        splus_chunk["ra"].values * u.deg,
        splus_chunk["dec"].values * u.deg,
    )

    # Initialize result dict: one column per band
    result = {f"splus_cut_{b}": [None] * len(splus_chunk) for b in SPLUS_BANDS}

    # Iterate over bands, load image once per band
    for band in SPLUS_BANDS:
        col_name = f"splus_cut_{band}"
        try:
            img_path = get_splus_image_path_for_band(field_name, band)
        except FileNotFoundError as e:
            print(f"      [SPLUS IMG] {e}")
            continue

        print(f"      [SPLUS IMG] Using image {img_path} for band {band}")
        try:
            with fits.open(img_path, memmap=True) as hdul:
                # Image is known to be in HDU=1
                if len(hdul) <= 1 or hdul[1].data is None:
                    print(f"      [SPLUS IMG] HDU 1 has no data in {img_path}, skipping band {band}")
                    continue
                image_hdu = hdul[1]
                data = image_hdu.data
                hdr = image_hdu.header
                wcs = WCS(hdr)
        except Exception as e:
            print(f"      [SPLUS IMG] Failed to open {img_path}: {e}")
            continue

        # For each source in chunk, try to make a cutout
        for pos, coord in enumerate(coords):
            # if mask is provided and this row has no Gaia match (or whatever criteria), skip
            if mask_arr is not None and not mask_arr[pos]:
                continue

            try:
                cut = Cutout2D(data, position=coord, wcs=wcs, size=cutout_size)
            except Exception:
                # outside image or WCS error
                continue

            # Store flattened float32 array directly in the DataFrame
            result[col_name][pos] = cut.data.astype("float32").ravel()

    cutout_df = pd.DataFrame(result, index=splus_chunk.index)
    return cutout_df

# -------------------------------------------------
# Per-chunk external survey processing (Gaia / XP / ZTF / DESI / APOGEE)
# -------------------------------------------------

def process_chunk_gaia(splus_chunk, gaia_index):
    if splus_chunk.empty:
        return pd.DataFrame(index=splus_chunk.index)

    print(f"      [Gaia] Processing chunk with {len(splus_chunk)} S-PLUS rows")

    splus_coord = SkyCoord(
        splus_chunk["ra"].values * u.deg,
        splus_chunk["dec"].values * u.deg,
    )
    hpix_gaia = radec_to_hpix(
        splus_chunk["ra"], splus_chunk["dec"], GAIA_NSIDE, GAIA_NEST
    )

    needed_files = files_for_hpix_ranges(hpix_gaia, gaia_index)
    print(f"      [Gaia] Files needed for this chunk: {len(needed_files)}")
    if not needed_files:
        return pd.DataFrame(index=splus_chunk.index)

    gaia_rows = []
    for path in needed_files:
        df = read_gaia_ecsv(path)
        gaia_rows.append(df)

    gaia_cat = pd.concat(gaia_rows, ignore_index=True)
    print(f"      [Gaia] Total Gaia rows loaded for this chunk: {len(gaia_cat)}")

    if "ra" not in gaia_cat.columns or "dec" not in gaia_cat.columns:
        raise ValueError("Gaia catalog has no 'ra'/'dec' columns.")

    gaia_coord = SkyCoord(
        gaia_cat["ra"].values * u.deg,
        gaia_cat["dec"].values * u.deg,
    )
    match_idx = match_catalogs(splus_coord, gaia_coord, MATCH_RADIUS_SPLUS_GAIA)
    n_matches = np.sum(match_idx != -1)
    print(f"      [Gaia] Matched {n_matches} / {len(splus_chunk)}")

    out = []
    for idx in match_idx:
        out.append({} if idx == -1 else gaia_cat.iloc[int(idx)].to_dict())

    gaia_df = pd.DataFrame(out, index=splus_chunk.index)
    if not gaia_df.empty:
        gaia_df.columns = [f"gaia_{c}" for c in gaia_df.columns]

    return gaia_df

def process_chunk_gaia_spectra(gaia_df, gaia_spec_index):
    if gaia_df.empty or "gaia_source_id" not in gaia_df.columns:
        print("      [GaiaXP] No Gaia matches or missing 'gaia_source_id'.")
        return pd.DataFrame(index=gaia_df.index)

    sid_series = gaia_df["gaia_source_id"].dropna()
    if sid_series.empty:
        print("      [GaiaXP] No valid source_ids.")
        return pd.DataFrame(index=gaia_df.index)

    if not {"gaia_ra", "gaia_dec"}.issubset(gaia_df.columns):
        print("      [GaiaXP] Missing 'gaia_ra'/'gaia_dec'.")
        return pd.DataFrame(index=gaia_df.index)

    ra = gaia_df["gaia_ra"].astype(float)
    dec = gaia_df["gaia_dec"].astype(float)
    mask_valid = (~ra.isna()) & (~dec.isna()) & (~gaia_df["gaia_source_id"].isna())
    if not mask_valid.any():
        print("      [GaiaXP] No rows with valid RA/DEC + source_id.")
        return pd.DataFrame(index=gaia_df.index)

    hpix_spec = radec_to_hpix(
        ra[mask_valid].values,
        dec[mask_valid].values,
        GAIA_SPEC_NSIDE,
        GAIA_SPEC_NEST,
    )
    needed_files = files_for_hpix_ranges(hpix_spec, gaia_spec_index)
    print(f"      [GaiaXP] Files needed: {len(needed_files)}")

    if not needed_files:
        return pd.DataFrame(index=gaia_df.index)

    needed_source_ids = set(sid_series.astype("int64").values.tolist())

    spec_rows = []
    for path in needed_files:
        df = read_gaia_ecsv(path)
        if "source_id" not in df.columns:
            print(f"        [GaiaXP] 'source_id' missing in {path}, skipping.")
            continue
        df = df[df["source_id"].isin(needed_source_ids)]
        print(f"        [GaiaXP] {len(df)} rows match in {path}")
        if not df.empty:
            spec_rows.append(df)

    if not spec_rows:
        print("      [GaiaXP] No XP spectra matched.")
        return pd.DataFrame(index=gaia_df.index)

    spec_cat = pd.concat(spec_rows, ignore_index=True)
    spec_unique = spec_cat.drop_duplicates(subset="source_id", keep="first")
    spec_unique = spec_unique.set_index("source_id")

    out = []
    for sid in gaia_df["gaia_source_id"]:
        if pd.isna(sid):
            out.append({})
        else:
            sid_int = int(sid)
            if sid_int in spec_unique.index:
                out.append(spec_unique.loc[sid_int].to_dict())
            else:
                out.append({})

    gaiaxp_df = pd.DataFrame(out, index=gaia_df.index)
    if not gaiaxp_df.empty:
        gaiaxp_df.columns = [f"gaiaxp_{c}" for c in gaiaxp_df.columns]

    return gaiaxp_df

def process_chunk_ztf(splus_chunk):
    if splus_chunk.empty:
        return pd.DataFrame(index=splus_chunk.index)

    print(f"      [ZTF] Processing chunk with {len(splus_chunk)} S-PLUS rows")

    splus_coord = SkyCoord(
        splus_chunk["ra"].values * u.deg,
        splus_chunk["dec"].values * u.deg,
    )
    hpix_ztf = radec_to_hpix(
        splus_chunk["ra"], splus_chunk["dec"], ZTF_NSIDE, ZTF_NEST
    )

    files_by_hpix = defaultdict(list)
    for hpix in np.unique(hpix_ztf):
        path = ztf_file_for_hpix(hpix)
        if os.path.exists(path):
            files_by_hpix[int(hpix)].append(path)

    print(f"      [ZTF] Unique hpix with existing files: {len(files_by_hpix)}")

    if not files_by_hpix:
        return pd.DataFrame(index=splus_chunk.index)

    ztf_rows = []
    for hpix, paths in files_by_hpix.items():
        print(f"        [ZTF] hpix={hpix} → {len(paths)} parquet files")
        for path in paths:
            tab = pq.read_table(path).to_pandas()
            ztf_rows.append(tab)

    if not ztf_rows:
        return pd.DataFrame(index=splus_chunk.index)

    ztf_cat = pd.concat(ztf_rows, ignore_index=True)
    print(f"      [ZTF] Total ZTF rows loaded: {len(ztf_cat)}")

    if "ra" not in ztf_cat.columns or "dec" not in ztf_cat.columns:
        raise ValueError("ZTF parquet must contain 'ra'/'dec' columns.")

    ztf_coord = SkyCoord(
        ztf_cat["ra"].values * u.deg,
        ztf_cat["dec"].values * u.deg,
    )
    match_idx = match_catalogs(splus_coord, ztf_coord, MATCH_RADIUS_SPLUS_ZTF)
    n_matches = np.sum(match_idx != -1)
    print(f"      [ZTF] Matched {n_matches} / {len(splus_chunk)}")

    out = []
    for idx in match_idx:
        if idx == -1:
            out.append({})
        else:
            out.append(ztf_cat.iloc[int(idx)].to_dict())

    ztf_df = pd.DataFrame(out, index=splus_chunk.index)
    if not ztf_df.empty:
        ztf_df.columns = [f"ztf_{c}" for c in ztf_df.columns]

    return ztf_df

def process_chunk_desi(splus_chunk, desi_tile_ids):
    if splus_chunk.empty:
        return pd.DataFrame(index=splus_chunk.index)

    print(f"      [DESI] Processing chunk with {len(splus_chunk)} S-PLUS rows")
    print(f"      [DESI] Tiles considered for this field: {len(desi_tile_ids)}")

    if not desi_tile_ids:
        return pd.DataFrame(index=splus_chunk.index)

    splus_coord = SkyCoord(
        splus_chunk["ra"].values * u.deg,
        splus_chunk["dec"].values * u.deg,
    )

    desi_rows = [ {} for _ in range(len(splus_chunk)) ]

    paths = desi_file_paths_for_tiles(desi_tile_ids)
    print(f"      [DESI] Total DESI coadd files to inspect: {len(paths)}")

    for path in paths:
        print(f"        [DESI] Opening {path}")
        with fits.open(path) as hd:
            if "FIBERMAP" not in hd:
                print("        [DESI] No FIBERMAP extension, skipping.")
                continue
            fibermap = Table(hd["FIBERMAP"].data).to_pandas()

        # Drop fibermap rows with NaN coordinates
        if "TARGET_RA" not in fibermap.columns or "TARGET_DEC" not in fibermap.columns:
            print("        [DESI] FIBERMAP missing TARGET_RA/DEC, skipping.")
            continue

        fibermap = fibermap.dropna(subset=["TARGET_RA", "TARGET_DEC"])
        if fibermap.empty:
            print("        [DESI] All TARGET_RA/DEC are NaN in this file, skipping.")
            continue

        desi_coord = SkyCoord(
            fibermap["TARGET_RA"].values * u.deg,
            fibermap["TARGET_DEC"].values * u.deg,
        )
        match_idx = match_catalogs(splus_coord, desi_coord, MATCH_RADIUS_SPLUS_DESI)
        n_matches = np.sum(match_idx != -1)
        print(f"        [DESI] Matched {n_matches} / {len(splus_chunk)} in this file")

        for i, idx in enumerate(match_idx):
            if idx == -1:
                continue
            if not desi_rows[i]:
                row = dict(fibermap.iloc[int(idx)])
                row["desi_file"] = path
                desi_rows[i] = row

    desi_df = pd.DataFrame(desi_rows, index=splus_chunk.index)
    if not desi_df.empty:
        desi_df.columns = [f"desi_{c}" for c in desi_df.columns]

    return desi_df

def process_chunk_apogee(splus_chunk, apogee_cat, apogee_coord):
    """
    Cross-match this S-PLUS chunk with APOGEE allStar DR17 (single big table).

    apogee_cat:  DataFrame for all APOGEE rows (already loaded once in main)
    apogee_coord: SkyCoord(APOGEE_RA, APOGEE_DEC)
    """
    if splus_chunk.empty or apogee_cat is None or apogee_coord is None:
        return pd.DataFrame(index=splus_chunk.index)

    print(f"      [APOGEE] Processing chunk with {len(splus_chunk)} S-PLUS rows")

    splus_coord = SkyCoord(
        splus_chunk["ra"].values * u.deg,
        splus_chunk["dec"].values * u.deg,
    )

    match_idx = match_catalogs(splus_coord, apogee_coord, MATCH_RADIUS_SPLUS_APOGEE)
    n_matches = np.sum(match_idx != -1)
    print(f"      [APOGEE] Matched {n_matches} / {len(splus_chunk)}")

    out = []
    for idx in match_idx:
        out.append({} if idx == -1 else apogee_cat.iloc[int(idx)].to_dict())

    apogee_df = pd.DataFrame(out, index=splus_chunk.index)
    if not apogee_df.empty:
        apogee_df.columns = [f"apogee_{c}" for c in apogee_df.columns]

    return apogee_df

# -------------------------------------------------
# Per-field datacube
# -------------------------------------------------

def build_datacube_for_field(field_row,
                             gaia_index,
                             gaia_spec_index,
                             desi_tiles,
                             desi_tiles_coord,
                             apogee_cat,
                             apogee_coord):
    field_name = field_row["field"]
    field_ra = float(field_row["ra"])
    field_dec = float(field_row["dec"])

    print(f"\n=== Field {field_name} ({field_ra}, {field_dec}) ===")

    # 0. Determine DESI tiles near this field (based on field center)
    field_center = SkyCoord(field_ra * u.deg, field_dec * u.deg)
    sep = field_center.separation(desi_tiles_coord)
    mask_tiles = sep.deg < DESI_TILE_RADIUS_DEG
    field_tile_ids = desi_tiles.loc[mask_tiles, "TILEID"].astype(int).tolist()
    print(f"  [DESI] Tiles within {DESI_TILE_RADIUS_DEG} deg of field center: {len(field_tile_ids)}")

    # 1. Open ONLY the S-PLUS catalog for this field
    splus_field = load_splus_for_field(field_row)
    print(f"  [SPLUS] Objects in this field file: {len(splus_field)}")

    all_chunks = []

    # 2. Process this field in chunks (for memory)
    for start in range(0, len(splus_field), CHUNK_SIZE):
        end = min(start + CHUNK_SIZE, len(splus_field))
        print(f"    [Field {field_name}] chunk {start}-{end}")
        chunk = splus_field.iloc[start:end].copy()

        # Gaia first (so we can decide which rows get cutouts)
        gaia_df   = process_chunk_gaia(chunk, gaia_index)

        # Mask for cutouts: only rows with Gaia matches (if enabled)
        cutout_mask = None
        if ONLY_GAIA_MATCH_CUTOUTS and not gaia_df.empty and "gaia_source_id" in gaia_df.columns:
            cutout_mask = ~gaia_df["gaia_source_id"].isna()
            print(f"    [SPLUS IMG] Gaia-matched rows for cutouts: {cutout_mask.sum()} / {len(cutout_mask)}")

        # S-PLUS cutouts stored directly in parquet
        splus_img_df = process_chunk_splus_images(field_name, chunk, mask_rows=cutout_mask)

        # Gaia XP, ZTF, DESI, APOGEE
        gaiaxp_df = process_chunk_gaia_spectra(gaia_df, gaia_spec_index)
        ztf_df    = process_chunk_ztf(chunk)
        desi_df   = process_chunk_desi(chunk, field_tile_ids)
        apogee_df = process_chunk_apogee(chunk, apogee_cat, apogee_coord)

        merged = (chunk
                  .join(splus_img_df)
                  .join(gaia_df)
                  .join(gaiaxp_df)
                  .join(ztf_df)
                  .join(desi_df)
                  .join(apogee_df))
        all_chunks.append(merged)

    datacube_field = pd.concat(all_chunks, ignore_index=True)
    out_name = f"datacube_{field_name}.parquet"
    datacube_field.to_parquet(out_name)
    print(f"  [OUT] Saved {len(datacube_field)} rows to {out_name}")

    # free memory for this field
    del datacube_field
    del all_chunks
    del splus_field

# -------------------------------------------------
# Main
# -------------------------------------------------

def main():
    # Driver list
    dr6_splus = pd.read_csv(DR6_LIST)
    dr6_splus = dr6_splus[dr6_splus['field'] == 'STRIPE82-0001']
    print(f"Loaded {len(dr6_splus)} rows from {DR6_LIST}")

    # Gaia indices (HEALPix level-8 ranges, small metadata kept once)
    gaia_index = build_range_index(GAIA_DIR, "GaiaSource_", ".csv.gz")
    print(f"Indexed {len(gaia_index)} Gaia source chunks")

    gaia_spec_index = build_range_index(
        GAIA_SPEC_DIR, "XpContinuousMeanSpectrum_", ".csv.gz"
    )
    print(f"Indexed {len(gaia_spec_index)} Gaia XP spectra chunks")

    # DESI tiles
    desi_tiles = pd.read_csv(DESI_TILES_CSV)
    # Expect TILEID, TILERA, TILEDEC
    for col in ("TILEID", "TILERA", "TILEDEC"):
        if col not in desi_tiles.columns:
            raise ValueError(f"{DESI_TILES_CSV} is missing '{col}' column.")
    print(f"Loaded {len(desi_tiles)} DESI tiles from {DESI_TILES_CSV}")

    desi_tiles_coord = SkyCoord(
        desi_tiles["TILERA"].values * u.deg,
        desi_tiles["TILEDEC"].values * u.deg,
    )

    # APOGEE allStar DR17 (drop multidimensional columns before to_pandas)
    apogee_cat = None
    apogee_coord = None
    if os.path.exists(APOGEE_PATH):
        print(f"Loading APOGEE catalog from {APOGEE_PATH}")
        tab = Table.read(APOGEE_PATH)

        # Keep only 1D columns (drop FPARAM, FELEM, etc.)
        names_1d = [name for name in tab.colnames if tab[name].ndim <= 1]
        tab_1d = tab[names_1d]

        apogee_cat = tab_1d.to_pandas()

        # Ensure RA/DEC exist and drop NaNs
        if "RA" not in apogee_cat.columns or "DEC" not in apogee_cat.columns:
            raise ValueError("APOGEE catalog has no 'RA'/'DEC' columns.")

        mask = (~apogee_cat["RA"].isna()) & (~apogee_cat["DEC"].isna())
        apogee_cat = apogee_cat.loc[mask].reset_index(drop=True)
        apogee_coord = SkyCoord(
            apogee_cat["RA"].values * u.deg,
            apogee_cat["DEC"].values * u.deg,
        )
        print(f"Loaded {len(apogee_cat)} APOGEE rows with valid RA/DEC and 1D columns")
    else:
        print(f"[APOGEE] File not found: {APOGEE_PATH} (APOGEE will be skipped)")

    # Process ONE FIELD AT A TIME
    for _, row in dr6_splus.iterrows():
        build_datacube_for_field(row,
                                 gaia_index,
                                 gaia_spec_index,
                                 desi_tiles,
                                 desi_tiles_coord,
                                 apogee_cat,
                                 apogee_coord)

if __name__ == "__main__":
    main()

Loaded 1 rows from dr6_list.csv
Indexed 3386 Gaia source chunks
Indexed 3381 Gaia XP spectra chunks
Loaded 6101 DESI tiles from desi_tiles.csv
Loading APOGEE catalog from /home/astrodados4/downloads/allStar-dr17-synspec_rev1.fits




Loaded 733900 APOGEE rows with valid RA/DEC and 1D columns

=== Field STRIPE82-0001 (0.0, -0.7) ===
  [DESI] Tiles within 1.7 deg of field center: 3
  [SPLUS] Globbing S-PLUS files with pattern: /home/astrodados3/splus/idr6_final/main/STRIPE82-0001*.fits
  [SPLUS] Found 1 S-PLUS files for field STRIPE82-0001.
  [SPLUS] Opening S-PLUS file: /home/astrodados3/splus/idr6_final/main/STRIPE82-0001_dual.fits
  [SPLUS] Loaded 53997 rows for field STRIPE82-0001
  [SPLUS] Objects in this field file: 53997
    [Field STRIPE82-0001] chunk 0-53997
      [Gaia] Processing chunk with 53997 S-PLUS rows
      [Gaia] Files needed for this chunk: 3
        [Gaia/ECSV] Opening /home/astrodados/gaia_dr3/GaiaSource_277913-281529.csv.gz
        [Gaia/ECSV] Opening /home/astrodados/gaia_dr3/GaiaSource_288627-292004.csv.gz
        [Gaia/ECSV] Opening /home/astrodados/gaia_dr3/GaiaSource_299573-302248.csv.gz
      [Gaia] Total Gaia rows loaded for this chunk: 1646076
      [Gaia] Matched 5673 / 53997
    [SPLU



      [SPLUS IMG] Using image /home/astrodados3/splus/idr6/STRIPE82-0001/STRIPE82-0001_I.fits.fz for band I




      [SPLUS IMG] Using image /home/astrodados3/splus/idr6/STRIPE82-0001/STRIPE82-0001_G.fits.fz for band G




      [SPLUS IMG] Using image /home/astrodados3/splus/idr6/STRIPE82-0001/STRIPE82-0001_U.fits.fz for band U




      [SPLUS IMG] Using image /home/astrodados3/splus/idr6/STRIPE82-0001/STRIPE82-0001_Z.fits.fz for band Z




      [SPLUS IMG] Using image /home/astrodados3/splus/idr6/STRIPE82-0001/STRIPE82-0001_F378.fits.fz for band F378




      [SPLUS IMG] Using image /home/astrodados3/splus/idr6/STRIPE82-0001/STRIPE82-0001_F395.fits.fz for band F395




      [SPLUS IMG] Using image /home/astrodados3/splus/idr6/STRIPE82-0001/STRIPE82-0001_F410.fits.fz for band F410




      [SPLUS IMG] Using image /home/astrodados3/splus/idr6/STRIPE82-0001/STRIPE82-0001_F430.fits.fz for band F430




      [SPLUS IMG] Using image /home/astrodados3/splus/idr6/STRIPE82-0001/STRIPE82-0001_F515.fits.fz for band F515




      [SPLUS IMG] Using image /home/astrodados3/splus/idr6/STRIPE82-0001/STRIPE82-0001_F660.fits.fz for band F660




      [SPLUS IMG] Using image /home/astrodados3/splus/idr6/STRIPE82-0001/STRIPE82-0001_F861.fits.fz for band F861




      [GaiaXP] Files needed: 3
        [Gaia/ECSV] Opening /home/astrodados4/downloads/gaia_spectra/XpContinuousMeanSpectrum_277913-281529.csv.gz


In [1]:
import pandas as pd
df = pd.read_parquet("datacube_HYDRA-0011.parquet")

In [4]:
list(df.columns)

['id',
 'random_idx',
 'ra',
 'dec',
 'field',
 'a_pixel_det',
 'a_restricted_pixel_r',
 'b_pixel_det',
 'b_restricted_pixel_r',
 'class_star_det',
 'class_star_g',
 'class_star_i',
 'class_star_j0378',
 'class_star_j0395',
 'class_star_j0410',
 'class_star_j0430',
 'class_star_j0515',
 'class_star_j0660',
 'class_star_j0861',
 'class_star_r',
 'class_star_u',
 'class_star_z',
 'ellipticity_det',
 'elongation_det',
 'err_a_pixel_det',
 'err_b_pixel_det',
 'err_mag_aper_3_g',
 'err_mag_aper_3_i',
 'err_mag_aper_3_j0378',
 'err_mag_aper_3_j0395',
 'err_mag_aper_3_j0410',
 'err_mag_aper_3_j0430',
 'err_mag_aper_3_j0515',
 'err_mag_aper_3_j0660',
 'err_mag_aper_3_j0861',
 'err_mag_aper_3_r',
 'err_mag_aper_3_u',
 'err_mag_aper_3_z',
 'err_mag_aper_6_g',
 'err_mag_aper_6_i',
 'err_mag_aper_6_j0378',
 'err_mag_aper_6_j0395',
 'err_mag_aper_6_j0410',
 'err_mag_aper_6_j0430',
 'err_mag_aper_6_j0515',
 'err_mag_aper_6_j0660',
 'err_mag_aper_6_j0861',
 'err_mag_aper_6_r',
 'err_mag_aper_6_u',
 '

In [8]:
df["gaia_phot_bp_mean_mag"]

0              NaN
1              NaN
2              NaN
3        22.006819
4              NaN
           ...    
77122          NaN
77123    19.555763
77124          NaN
77125          NaN
77126          NaN
Name: gaia_phot_bp_mean_mag, Length: 77127, dtype: float64

In [5]:
df["splus_cut_R"]

0                                                     None
1                                                     None
2                                                     None
3        [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...
4                                                     None
                               ...                        
77122                                                 None
77123    [0.07828938, 0.034619495, -0.11812654, -0.2108...
77124                                                 None
77125                                                 None
77126                                                 None
Name: splus_cut_R, Length: 77127, dtype: object