In [9]:
import os
import requests
import tarfile
import glob
import healpy as hp
from astropy.io import fits
from astropy.table import Table
from multiprocessing import Pool
import numpy as np
from tqdm import tqdm
import h5py

URL = "http://vipers.inaf.it/data/pdr2/spectra/"
SURVEYS = ["VIPERS_W1_SPECTRA_1D_PDR2.tar.gz", "VIPERS_W4_SPECTRA_1D_PDR2.tar.gz"]

SURVEY_SAVE_DIRS = ["vipers_w1", "vipers_w4"]
HEADER_KEYS = ['ID', 'RA', 'DEC', 'REDSHIFT', 'REDFLAG', 'EXPTIME', 'NORM', 'MAG']


def download_data(vipers_data_path: str = ''):
    """Download the VIPERS data from the web and unpack it into the specified directory."""
    # Create the output directory if it does not exist
    if not os.path.exists(vipers_data_path):
        os.makedirs(vipers_data_path)

    # Download each file
    for file in SURVEYS:
        local_path = os.path.join(vipers_data_path, file)
        subdirectory_path = os.path.join(vipers_data_path, file.replace(".tar.gz", ""))

        # Create a subdirectory for each file
        if not os.path.exists(subdirectory_path):
            os.makedirs(subdirectory_path)

        # Check if file needs to be downloaded
        if not os.path.exists(local_path):
            print(f"Downloading {file}...")
            response = requests.get(URL + file, stream=True)
            if response.status_code == 200:
                with open(local_path, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        f.write(chunk)
            else:
                print(f"Failed to download {file}. Status code: {response.status_code}")
                continue

        # Unpack the tar.gz file into its specific subdirectory
        print(f"Unpacking into {subdirectory_path}...")
        with tarfile.open(local_path, "r:gz") as tar:
            tar.extractall(path=subdirectory_path)
        print(f"Unpacked successfully!\n")

        # Remove the tar files
        os.remove(local_path)


def extract_data(filename):
    """Extract the contents of a tar file to a dictionary for each file"""
    hdu = fits.open(filename)
    header = hdu[1].header
    data = hdu[1].data

    results = {}

    # Loop through the header keys and add them to the results dictionary
    for key in HEADER_KEYS:
        results[key] = float(header[key])
    
    # Add the spectrum data to the results dictionary
    results['spectrum_flux'] = data['FLUXES'].astype(np.float32)
    results['spectrum_wave'] = data['WAVES'].astype(np.float32)
    results['spectrum_ivar'] = data['NOISE'].astype(np.float32)
    results['spectrum_mask'] = data['MASK'].astype(np.float32)
    
    hdu.close()
    return results


def save_in_standard_format(results: Table, survey_subdir: str, nside: int):
    """Save the extracted data in a standard format for the given survey."""
    table = Table(results)

    # Get keys
    keys = table.keys()

    # Get healpix files
    healpix_indices = hp.ang2pix(nside, table['RA'], table['DEC'], lonlat=True, nest=True)
    unique_indices = np.unique(healpix_indices)

    for index in tqdm(unique_indices, desc="Processing HEALPix indices"):
        mask = healpix_indices == index
        grouped_data = table[mask]
        healpix_subdir = os.path.join(survey_subdir, f'healpix={index}')

        if not os.path.exists(healpix_subdir):
            os.makedirs(healpix_subdir)

        output_path = os.path.join(healpix_subdir, '001-of-001.h5')

        with h5py.File(output_path, 'w') as output_file:
            for key in keys:
                output_file.create_dataset(key.lower(), data=grouped_data[key])
            output_file.create_dataset('object_id', data=grouped_data['ID'])
            output_file.create_dataset('healpix', data=np.full(grouped_data['ID'].shape, index))


def main(vipers_data_path: str = '', nside: int = 16, num_processes: int = 10):
    """
    Download and extract the VIPERS spectra into a standard format using HEALPix indices.

    Args:
        vipers_data_path (str): The path to the directory where the VIPERS data is stored.
        nside (int): The nside parameter for the HEALPix indexing.
        num_processes (int): The number of parallel processes to run for extracting the data.
    """
    # Download the VIPERS data if it does not exist
    if not os.path.exists(vipers_data_path):
        download_data(vipers_data_path)

    for survey, survey_save_dir in zip(SURVEYS, SURVEY_SAVE_DIRS):
        print(f"Processing {survey}...", flush=True)

        # Load all fits file, standardize them and append to HDF5 file
        survey = survey.replace(".tar.gz", "")
        files = glob.glob(os.path.join(vipers_data_path, survey, '*.fits'))
        files = files

        # Run the parallel processing
        with Pool(num_processes) as pool:
            results = list(tqdm(pool.imap(extract_data, files), total=len(files)))

        survey_save_dir = os.path.join(vipers_data_path, survey_save_dir)
        if not os.path.exists(survey_save_dir):
            os.makedirs(survey_save_dir)

        save_in_standard_format(results, survey_save_dir, nside)
        print(f"Finished processing {survey}!\n")

        os.remove(os.path.join(vipers_data_path, survey))


In [10]:
main('/mnt/ceph/users/polymathic/AstroPile/vipers/', num_processes=30)

Processing VIPERS_W1_SPECTRA_1D_PDR2.tar.gz...


100%|██████████| 60528/60528 [00:31<00:00, 1934.79it/s]
Processing HEALPix indices: 100%|██████████| 5/5 [00:00<00:00,  6.73it/s]

Processing VIPERS_W4_SPECTRA_1D_PDR2.tar.gz...



100%|██████████| 30979/30979 [00:39<00:00, 785.85it/s]
Processing HEALPix indices: 100%|██████████| 3/3 [00:00<00:00,  7.86it/s]


In [16]:
ls /mnt/ceph/users/polymathic/AstroPile/vipers/vipers_w4

[0m[01;34m'healpix=1189'[0m/  [01;34m'healpix=1190'[0m/  [01;34m'healpix=1191'[0m/


In [14]:
vipers_w1

NameError: name 'vipers_w1' is not defined