# Batch Processing

The FITS datacubes are compressed. This saves storage space and allows for faster downloads of the full data release, but the downside is that reading data is incredibly slow (about 15s per cube, compared to 0.03s on an uncompressed cube.). We recommend decompressing the cubes and saving to out_dir.

In [1]:
import os.path as op
from joblib import Parallel, delayed
import numpy as np
from astropy.table import Column, Table, hstack, unique

from astropy.coordinates import SkyCoord
import astropy.units as u
from astropy.io import fits

from astropy.wcs import WCS
from astropy.wcs.utils import proj_plane_pixel_scales

import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm

from tqdm import tqdm
import traceback
import time

In [2]:
pdr1_dir = '/home/jovyan/Hobby-Eberly-Public/HETDEX/internal/pdr1/'
ifu_data = Table.read(op.join( pdr1_dir, 'ifu-index.fits'))

In [3]:
# Create SkyCoord Object array for IFU centers
ifu_coords = SkyCoord( ra=ifu_data['ra_cen']*u.deg, dec=ifu_data['dec_cen']*u.deg)

## Optimize by Decompressing FITS files

This will copy the fits files to /tmp/datacubes/ or a defined diretory and will increase read time by a factor of 100. First we have an example code to do the decompression. Then we have a function that will do the decompression in parallel.

In [4]:
def decompress_ifu(shotid, ifuslot, pdr1_dir, out_dir="/tmp", verbose=False):
    """
    Decompress a RICE-compressed IFU FITS cube and save to /out_dir/datacubes/{shotid}/.
    Returns a tuple: (shotid, ifuslot, output_path, success_flag, error_message)
    """
    import os
    import os.path as op
    from astropy.io import fits
    import traceback

    infile = op.join(pdr1_dir, 'datacubes', str(shotid), f'dex_cube_{shotid}_{ifuslot}.fits')
    outdir = op.join(out_dir, 'datacubes', str(shotid))
    os.makedirs(outdir, exist_ok=True)

    outfile = op.join(outdir, f'dex_cube_{shotid}_{ifuslot}.fits')# keeping same file structure

    if op.exists(outfile):
        return (shotid, ifuslot, outfile, True, None)

    try:
        with fits.open(infile, memmap=False) as hdul:
            new_hdul = fits.HDUList()
            for hdu in hdul:
                if isinstance(hdu, fits.CompImageHDU):
                    new_hdul.append(fits.ImageHDU(data=hdu.data, header=hdu.header))
                else:
                    new_hdul.append(hdu.copy())
            new_hdul.writeto(outfile, overwrite=True)

        if verbose:
            print(f"✓ Decompressed: {shotid}-{ifuslot}")
        return (shotid, ifuslot, outfile, True, None)

    except Exception as e:
        err_msg = traceback.format_exc()
        if verbose:
            print(f"✗ Failed: {shotid}-{ifuslot}\n{err_msg}")
        return (shotid, ifuslot, None, False, err_msg)

In [5]:
def batch_decompress_ifus(ifu_list, data_dir, out_dir="/tmp", n_jobs=4, verbose=False):
    """
    Decompress a list of IFU cubes in parallel with progress and logging.

    Returns:
        results: list of tuples (shotid, ifuslot, path, success, error_msg)
    """
    results = Parallel(n_jobs=n_jobs)(
        delayed(decompress_ifu)(shotid, ifuslot, pdr1_dir, out_dir, verbose=False)
        for shotid, ifuslot in tqdm(ifu_list, desc="Decompressing IFUs", ncols=80)
    )

    failed = [r for r in results if not r[3]]
    if failed:
        print(f"\n⚠️  {len(failed)} cubes failed to decompress.")
        for f in failed:
            shotid, ifuslot, _, _, err_msg = f
            print(f" - {shotid}-{ifuslot}: {err_msg.splitlines()[-1]}")
    else:
        print("\n✅ All cubes decompressed successfully.")

    return results

## Example Decompression for a single coordinate with multiple observations

In [6]:
coord = SkyCoord(ra=150.23189*u.deg, dec=2.363963*u.deg)

# find list of possible datacubes with coverage
sel = coord.separation( ifu_coords) < 26*u.arcsec
# this AGN is observed 9 times
ifulist=ifu_data[sel]
ifu_pairs = [(int(row['shotid']), str(row['ifuslot'])) for row in ifulist]
print(ifu_pairs)

[(20181118020, '036'), (20181119015, '036'), (20181119016, '036'), (20181120012, '036'), (20181120013, '036'), (20220130020, '024'), (20220131010, '040'), (20240310015, '070'), (20240313016, '070')]


In [7]:
# location of decompressed cubes
out_dir = './decompressed'
for shotid, ifuslot in ifu_pairs:
    decompress_ifu( shotid, ifuslot, pdr1_dir, out_dir, verbose=False)


## Example Batch Decompression on a Large Catalog

In [8]:
catalog = Table.read('dr16q_hdr5.fits')
catalog.remove_column('shotid') # this example might be done witih different catalog later
catalog_coords = SkyCoord(ra = catalog['ra'], dec= catalog['dec'], unit='deg')

In [9]:
idx_ifu, idx_catalog, d2d, d3d = catalog_coords.search_around_sky(ifu_coords, 35*u.arcsec)

In [10]:
# create master table that matches IFU observation coverage with catalog objects

In [12]:
table = hstack( [catalog_coords[idx_catalog], catalog[idx_catalog], ifu_data[idx_ifu]] )
table.rename_column('col0', 'coords')

In [15]:
# reduce cube list to unique IFU observations

In [16]:
ifulist = unique( table['shotid', 'ifuslot'])

In [17]:
# Make sure 'ifuslot' is a string to preserve naming like '034'
ifu_pairs = [(int(row['shotid']), str(row['ifuslot'])) for row in ifulist]

In [None]:
results = batch_decompress_ifus(
    ifu_pairs,
    data_dir=pdr1_dir,
    out_dir='./decompressed/',
    n_jobs=100,
    verbose=False
)