In [1]:
from astropy.io import fits
import astropy.wcs as wcs
from astropy.stats import mad_std
import numpy as np
from reproject import reproject_interp
from scipy.ndimage import binary_dilation, binary_closing
from glob import glob 
import os

from astropy.convolution import Gaussian2DKernel, convolve, interpolate_replace_nans
from deepCR import deepCR

import warnings 
warnings.filterwarnings('ignore')

# Loading files

In [2]:
def get_hdu(rootdir, filename, hdu_id=0):
    filename_full = glob(rootdir+filename)[0]
    if hdu_id == 'all':
        hdu = fits.open(filename_full)
    else:
        hdu = fits.open(filename_full)[hdu_id]
    print(filename_full)
    return(hdu)

galaxy = 'ngc1566'
galaxy_muse = galaxy
rootdir = '/Users/abarnes/Dropbox/work/Smallprojects/galaxies/data_hstha/%s/' %galaxy
rootdir_bp = '/Users/abarnes/Dropbox/work/Smallprojects/galaxies/data_misc/hst_filters/' 
run_cleanup = False

narrowband_filter = 'f658n'
instrument_f555w  = 'uvis'
instrument_f65Xn = 'uvis'
instrument_f814w = 'uvis'

# Take the anchored version, careful of name change in variable
hdu_hst_ha =      get_hdu(rootdir, 'hst_contsub/hdu_hst_an_ha.fits')
hdu_hst_ha_noan = get_hdu(rootdir, 'hst_contsub/hdu_hst_ha.fits')

hdu_hst_f555w_an = get_hdu(rootdir, 'hst_contsub/hdu_hst_f555w_an.fits') 
hdu_hst_f65Xn_an = get_hdu(rootdir, 'hst_contsub/hdu_hst_%s_an.fits' %narrowband_filter) 
hdu_hst_f814w_an = get_hdu(rootdir, 'hst_contsub/hdu_hst_f814w_an.fits') 

hdu_muse_stars  = get_hdu(rootdir, 'muse/%s_starmask.fits' %galaxy_muse.upper())

/Users/abarnes/Dropbox/work/Smallprojects/galaxies/data_hstha/ngc1566/hst_contsub/hdu_hst_an_ha.fits
/Users/abarnes/Dropbox/work/Smallprojects/galaxies/data_hstha/ngc1566/hst_contsub/hdu_hst_ha.fits
/Users/abarnes/Dropbox/work/Smallprojects/galaxies/data_hstha/ngc1566/hst_contsub/hdu_hst_f555w_an.fits
/Users/abarnes/Dropbox/work/Smallprojects/galaxies/data_hstha/ngc1566/hst_contsub/hdu_hst_f658n_an.fits
/Users/abarnes/Dropbox/work/Smallprojects/galaxies/data_hstha/ngc1566/hst_contsub/hdu_hst_f814w_an.fits
/Users/abarnes/Dropbox/work/Smallprojects/galaxies/data_hstha/ngc1566/muse/NGC1566_starmask.fits


In [3]:
def get_mask(hdu):
    mask = ~np.isnan(hdu.data)*1
    mask_close = binary_closing(mask, structure=np.ones((10,10)), iterations=1)
    mask_hdu = fits.PrimaryHDU(np.int32(mask_close*1), hdu.header)
    return(mask_hdu) 

hdu_mask = get_mask(hdu_hst_ha)

In [4]:
def mask_stars(hdu, hdu_mask, hdu_mask_map, sigma=0.5, factor=np.sqrt(2)):
    
    def smooth_hdu_gaussian(data, sigma_x=0.5, sigma_y=0.5):
        kernel = Gaussian2DKernel(sigma_x, sigma_y)
        smoothed_data = convolve(data, kernel, normalize_kernel=True)
        return smoothed_data

    # hdu_masked, mask = mask_hdu_with_ds9(hdu, region_filename)
    hdu_masked = hdu.copy()
    mask = hdu_mask.data>0

    std = mad_std(hdu.data, ignore_nan=True)
    mean = np.nanmean(hdu.data[hdu.data < std * 5])

    noise = np.random.normal(mean, std, hdu_masked.data.shape)
    noise = smooth_hdu_gaussian(noise * factor, sigma_x=sigma, sigma_y=sigma) 
    hdu_masked.data[mask] = noise[mask]

    mask = hdu_mask_map.data == 0
    hdu_masked.data[mask] = np.nan

    return(hdu_masked)


def regrid(hdu_input, hdu_template, output_filename=None, conserve_flux=True, order='bilinear'):

    # Extract the WCS information from the input and template headers
    wcs_input = wcs.WCS(hdu_input.header)
    wcs_template = wcs.WCS(hdu_template.header)

    # Calculate the pixel scale for input and template datas
    pixscale_input = wcs.utils.proj_plane_pixel_area(wcs_input.celestial)
    pixscale_template = wcs.utils.proj_plane_pixel_area(wcs_template.celestial)

    # Reproject the input data to match the template WCS
    print("[INFO] Performing data reprojection...")
    data_output = reproject_interp(hdu_input, hdu_template.header, order=order)[0]
    hdu_output = fits.PrimaryHDU(data_output, hdu_template.header)
    print("[INFO] data reprojection complete.")

    if conserve_flux:
        # Scale the output data to conserve flux 
        print(f"[INFO] Scaling the output data to conserve flux with factor {(pixscale_template / pixscale_input):.2f}")
        hdu_output.data = hdu_output.data * (pixscale_template / pixscale_input)
        hdu_output.data = np.array(hdu_output.data, dtype=np.float32)
        print("[INFO] Flux scaling complete.")

    print("[INFO] Reprojection process completed.")
    return hdu_output

hdu_muse_stars_r = regrid(hdu_muse_stars, hdu_hst_ha, conserve_flux=False, order='nearest-neighbor')
hdu_hst_ha_s = mask_stars(hdu_hst_ha, hdu_muse_stars_r, hdu_mask)

[INFO] Performing data reprojection...
[INFO] data reprojection complete.
[INFO] Reprojection process completed.


In [5]:
def process_anchored_fit_data(hdu, hdu_mask, sigma_high=5, sigma_low=1):

    # Create a copy of the anchored HST data HDU
    hdu_i = hdu.copy()

    # Replace negative values with NaNs
    std = mad_std(hdu_i.data, ignore_nan=True)
    mask = hdu_i.data <= 3*std
    std = mad_std(hdu_i.data[mask], ignore_nan=True)

    # Replace negative values with NaNs
    std = mad_std(hdu_i.data[mask], ignore_nan=True)
    mask_high = hdu_i.data <= -std*sigma_high
    mask_low = hdu_i.data <= -std*sigma_low
    # mask_low = hdu_i.data <= 0 
    mask = binary_dilation(mask_high, iterations=-1, mask=mask_low)
    mask = binary_closing(mask, iterations=1)
    hdu_i.data[mask] = np.nan

    # Perform interpolation using Gaussian kernels
    kernel = Gaussian2DKernel(x_stddev=1)
    hdu_i.data = interpolate_replace_nans(hdu_i.data, kernel)
    hdu_i.data = interpolate_replace_nans(hdu_i.data, kernel)
    hdu_i.data = interpolate_replace_nans(hdu_i.data, kernel)

    mask = hdu_mask.data == 0
    hdu_i.data[mask] = np.nan

    # Save the processed anchored HST image to the output file
    print(f"[INFO] Negative values processed")

    return hdu_i

hdu_hst_ha_si = process_anchored_fit_data(hdu_hst_ha_s, hdu_mask)

[INFO] Negative values processed


In [6]:
def cosmicray_finder_nnet(hdu, hdu_mask, dilation_iterations=5, threshold=0.25, patch=1024,
                          model_path='/Users/abarnes/opt/anaconda3/lib/python3.9/site-packages/learned_models/mask/ACS-WFC-F606W.pth'):
    
    # Load the FITS file and extract image data and header
    data = hdu.data.copy()

    # Look elsewhere for model 
    if not os.path.isfile(model_path):
        model_path = '/lustre/opsw/work/abarnes/applications/anaconda3/lib/python3.11/site-packages/learned_models/mask/ACS-WFC-F606W.pth'

    # Create an instance of deepCR with specified model configuration
    mdl = deepCR(mask=model_path, device="CPU")

    print('[INFO] [deepCR] Running deepCR...')
    # Apply the model to the input image to detect cosmic rays and generate a mask
    if patch==None: 
        mask = mdl.clean(data, threshold=threshold, inpaint=False)
    else: 
        print('[INFO] [deepCR] Running with patch=%i' %patch)
        mask = mdl.clean(data, threshold=threshold, inpaint=False, segment=True, patch=patch)
        mask = mask != 0
    
    # Dilate the mask to ensure surrounding regions of cosmic rays are also masked
    if dilation_iterations != 0:  
        print('[INFO] [deepCR] Dilation of deepCR mask...')
        mask = binary_dilation(mask, iterations=dilation_iterations)

    # hdu_mask = fits.PrimaryHDU(np.array(mask*1, dtype=np.int32), hdu.header)

    # Copy the original image and set the masked regions (cosmic rays) to NaN
    data[mask] = np.nan
    
    print('[INFO] [deepCR] Interpolated deepCR mask...')
    # Interpolate over the masked regions to fill them with suitable values
    kernel = Gaussian2DKernel(x_stddev=1)
    data_i = interpolate_replace_nans(data, kernel)

    mask = hdu_mask.data == 0
    data_i[mask] = np.nan

    data_i = np.array(data_i, dtype=np.float32)
    hdu_i = fits.PrimaryHDU(data_i, hdu.header)
    
    return(hdu_i)

hdu_hst_ha_sic = cosmicray_finder_nnet(hdu_hst_ha_si, hdu_mask, threshold=0.9, dilation_iterations=0)

[INFO] [deepCR] Running deepCR...
[INFO] [deepCR] Running with patch=1024
[INFO] [deepCR] Interpolated deepCR mask...


In [7]:
def remove_nan_padding(hdu):
    """
    Remove padding of NaN values from the edges of an HDU image.
    
    Parameters:
    - hdu (HDU): The input HDU.
    
    Returns:
    - HDU: A new HDU with padding removed.
    """
    
    print('[INFO] Remove NaN values around edge of image...')

    data = hdu.data
    
    # Check if the entire image is NaNs
    if np.all(np.isnan(data)):
        print("Warning: Entire image is NaNs. Returning the original image.")
        return hdu
    
    # Find valid data indices along each axis
    valid_x = np.where(np.nansum(data, axis=0)!=0)[0]
    valid_y = np.where(np.nansum(data, axis=1)!=0)[0]

    # In the rare case there's still no valid data
    if len(valid_x) == 0 or len(valid_y) == 0:
        print("Warning: No valid data found. Returning the original image.")
        return hdu
    
    # Crop the data array
    cropped_data = data[valid_y[0]:valid_y[-1]+1, valid_x[0]:valid_x[-1]+1]
    
    # Create a new header by copying the old one
    new_header = hdu.header.copy()
    
    # Update reference pixel (CRPIX) values in the header
    if 'CRPIX1' in new_header:
        new_header['CRPIX1'] -= valid_x[0]
    if 'CRPIX2' in new_header:
        new_header['CRPIX2'] -= valid_y[0]
    
    # Create a new HDU with cropped data and updated header
    new_hdu = fits.PrimaryHDU(cropped_data, new_header)
    
    return new_hdu

hdu_hst_ha_crop = remove_nan_padding(hdu_hst_ha)
hdu_hst_ha_s_crop = remove_nan_padding(hdu_hst_ha_s)
hdu_hst_ha_si_crop = remove_nan_padding(hdu_hst_ha_si)
hdu_hst_ha_sic_crop = remove_nan_padding(hdu_hst_ha_sic)

[INFO] Remove NaN values around edge of image...
[INFO] Remove NaN values around edge of image...
[INFO] Remove NaN values around edge of image...
[INFO] Remove NaN values around edge of image...


In [8]:
hdu_hst_f555w_an.writeto(rootdir+'hst_contsub/%s_hst_f555w_an.fits' %galaxy, overwrite=True)
hdu_hst_f65Xn_an.writeto(rootdir+'hst_contsub/%s_hst_%s_an.fits' %(galaxy, narrowband_filter), overwrite=True)
hdu_hst_f814w_an.writeto(rootdir+'hst_contsub/%s_hst_f814w_an.fits' %galaxy, overwrite=True)

hdu_hst_ha_noan.writeto(rootdir+'hst_contsub/%s_hst_ha_noan.fits' %galaxy, overwrite=True)
hdu_hst_ha.writeto(rootdir+'hst_contsub/%s_hst_ha.fits' %galaxy, overwrite=True)
hdu_hst_ha_s.writeto(rootdir+'hst_contsub/%s_hst_ha_s.fits' %galaxy, overwrite=True)
hdu_hst_ha_si.writeto(rootdir+'hst_contsub/%s_hst_ha_si.fits' %galaxy, overwrite=True)
hdu_hst_ha_sic.writeto(rootdir+'hst_contsub/%s_hst_ha_sic.fits' %galaxy, overwrite=True)

hdu_hst_ha_crop.writeto(rootdir+'hst_contsub/%s_hst_ha_crop.fits' %galaxy, overwrite=True)
hdu_hst_ha_s_crop.writeto(rootdir+'hst_contsub/%s_hst_ha_s_crop.fits' %galaxy, overwrite=True)
hdu_hst_ha_si_crop.writeto(rootdir+'hst_contsub/%s_hst_ha_si_crop.fits' %galaxy, overwrite=True)
hdu_hst_ha_sic_crop.writeto(rootdir+'hst_contsub/%s_hst_ha_sic_crop.fits' %galaxy, overwrite=True)

if run_cleanup: 
    os.system('rm -rf %s/hst_contsub/hdu_*' %rootdir)