In [4]:
import matplotlib.pyplot as plt
import numpy as np

In [None]:
# https://chatgpt.com/share/68d37473-f210-8000-b586-6a24785e4aff

In [None]:
import astrometry
from astropy.io import fits
from astropy.stats import sigma_clipped_stats
from astropy.coordinates import Angle
from astropy import units as u
from astropy.time import Time
from photutils.detection import DAOStarFinder
import ccdproc as ccdp
from ccdproc import CCDData
from datetime import datetime
from tqdm import tqdm
import numpy as np
from pathlib import Path
import logging
from typing import List, Union

def prep(
    sciframes: List[Union[str, Path]],
    masterdir: Union[str, Path],
    prepdir: Union[str, Path],
    filtername: str,
    key_exptime: str = 'exptime'
) -> None:
    """
    Preprocess science frames by subtracting bias, dark, and flat frames.

    Parameters
    ----------
    sciframes : list of str or Path
        List of paths to science frame FITS files.
    masterdir : str or Path
        Directory containing master bias, dark, and flat frames.
    prepdir : str or Path
        Directory to save preprocessed science frames.
    filtername : str
        Filter name used for selecting appropriate flat field.
    key_exptime : str, optional
        Header keyword for exposure time (default is 'exptime').
    """
    logger = logging.getLogger(__name__)
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

    if not sciframes:
        logger.warning("No science frames found.")
        return

    masterdir = Path(masterdir)
    if not masterdir.exists():
        logger.error(f"Directory not found: {masterdir}")
        return

    prepdir = Path(prepdir)
    prepdir.mkdir(parents=True, exist_ok=True)

    mbiasframes = ccdp.ImageFileCollection(masterdir, glob_include="[!._]*.fits").filter(imagetyp="Bias")
    if len(mbiasframes.files) == 0:
        logger.error(f"No master bias found in directory: {masterdir}")
        return

    mdarkframes = ccdp.ImageFileCollection(masterdir, glob_include="[!._]*.fits").filter(imagetyp="Dark")
    if len(mdarkframes.files) == 0:
        logger.error(f"No master dark found in directory: {masterdir}")
        return

    darkexptime = set(mdarkframes.summary[key_exptime])

    mflatframes = ccdp.ImageFileCollection(masterdir, glob_include="[!._]*.fits").filter(imagetyp="Flat", filter=filtername)
    if len(mflatframes.files) == 0:
        logger.error(f"No master flat (filter={filtername}) found in directory: {masterdir}")
        return

    logger.info(f"{len(sciframes)} science frames found. Preprocessing...")

    for fpath_sci in tqdm(sciframes, desc="Processing frames"):
        fpath_sci = Path(fpath_sci)
        sci = CCDData.read(fpath_sci, unit='adu')

        try:
            exptime = float(sci.header.get(key_exptime, np.nan))
            obsdate_jd = float(sci.header.get("j_date", "0").split()[0])
        except Exception as e:
            logger.error(f"Header read error in {fpath_sci}: {e}")
            continue

        fpath_psci = prepdir / (fpath_sci.stem + ".p" + fpath_sci.suffix)

        try:
            jd_values = mbiasframes.summary.to_pandas()["j_date"].apply(lambda x: float(x.split()[0]))
            idx_mbias = (jd_values - obsdate_jd).abs().idxmin()
            fpath_mbias = Path(mbiasframes.files_filtered(include_path=True)[idx_mbias])
            mbias = CCDData.read(fpath_mbias)

            jd_values = mflatframes.summary.to_pandas()["j_date"].apply(lambda x: float(x.split()[0]))
            idx_mflat = (jd_values - obsdate_jd).abs().idxmin()
            fpath_mflat = Path(mflatframes.files_filtered(include_path=True)[idx_mflat])
            mflat = CCDData.read(fpath_mflat)

            darkexptime_closest = min(darkexptime, key=lambda x: abs(float(x) - exptime))
            mdarkframes_exptime = mdarkframes.filter(**{key_exptime: darkexptime_closest})
            jd_values = mdarkframes_exptime.summary.to_pandas()["j_date"].apply(lambda x: float(x.split()[0]))
            idx_mdark = (jd_values - obsdate_jd).abs().idxmin()
            fpath_mdark = Path(mdarkframes_exptime.files_filtered(include_path=True)[idx_mdark])
            mdark = CCDData.read(fpath_mdark)

            bsci = ccdp.subtract_bias(sci, mbias)
            bdsci = ccdp.subtract_dark(bsci, mdark, exposure_time=key_exptime, exposure_unit=u.second)

            bdsci.mask = bdsci.data < 0
            bdsci.data = np.nan_to_num(np.clip(bdsci.data, a_min=0., a_max=None), nan=0.0)
            bdsci.meta['history'] = f"Negative values masked. {datetime.now().isoformat()}"

            psci = ccdp.flat_correct(bdsci, mflat)

            psci.meta['jd'] = (obsdate_jd, "Julian date")
            psci.meta['irafname'] = (fpath_sci.stem, "File Name")
            psci.meta['rdnoise'] = (4.8, "Readout noise of ccd. (electron/pixel)")
            psci.meta['egain'] = (1, "Gain factor. (electron/adu)")
            psci.meta['history'] = f'bias, dark, flat corrected with rasapy.calib {datetime.now().isoformat()}\n' \
                                   f'Master bias: {fpath_mbias.name}\n' \
                                   f'Master dark: {fpath_mdark.name}\n' \
                                   f'Master flat: {fpath_mflat.name}'

            psci.data = psci.data.astype(np.float32)
            psci.write(fpath_psci, overwrite=True)
            logger.info(f"Preprocessed: {fpath_psci}")

        except Exception as e:
            logger.error(f"Processing failed for {fpath_sci}: {e}")
            continue


In [None]:
import logging
from pathlib import Path
from astropy.io import fits
from astropy.coordinates import EarthLocation, AltAz
from astropy.time import Time
import astropy.units as u
from datetime import datetime
import ccdproc
import astrometry
import sep
import numpy as np

class RasaLcpyLV1:
    """
    Class for Level-1 processing of RASA lcpy KL4040 science frames.

    Provides methods to solve and update WCS, and to apply bias, dark, and flat corrections.
    """

    def __init__(self, log_file: str = None):
        """
        Configure logging to console and optional file.
        """
        self.logger = logging.getLogger(self.__class__.__name__)
        self.logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
        self.logger.addHandler(handler)

        if log_file:
            file_h = logging.FileHandler(log_file)
            file_h.setFormatter(handler.formatter)
            self.logger.addHandler(file_h)
            
    def update_wcs(self, fpath_fits, outdir, return_fpath=True):
        """
        Solve for WCS using astrometry.net and update FITS header accordingly.
        """
        fpath_fits = Path(fpath_fits)
        outdir = Path(outdir)

        try:
            sci = ccdproc.CCDData.read(fpath_fits)
        except FileNotFoundError:
            self.logger.error(f"File not found: {fpath_fits}")
            return
        except ValueError:
            sci = ccdproc.CCDData.read(fpath_fits, unit="adu")
            self.logger.warning(f"BUNIT undefined; defaulting to 'adu': {fpath_fits}")
        except Exception as e:
            self.logger.error(f"Error reading FITS: {e}")
            return

        # detect stars
        sci.data = sci.data.astype(np.float32)
        bkg = sep.Background(sci.data)
        objs = sep.extract(sci.data - bkg.back(), thresh=3.0 * bkg.globalrms, minarea=5)
        bright = objs[np.argsort(objs['flux'])[::-1][:50]]
        coords = np.vstack((bright['x'], bright['y'])).T

        # solve WCS
        with astrometry.Solver(
            astrometry.series_4100.index_files(cache_directory="astrometry_cache", scales={8,9,10})
        ) as solver:
            sol = solver.solve(
                stars=coords,
                size_hint=astrometry.SizeHint(lower_arcsec_per_pixel=2.95, upper_arcsec_per_pixel=3.00),
                position_hint=astrometry.PositionHint(
                    ra_deg=sci.header["RA"], dec_deg=sci.header["DEC"], radius_deg=5.0
                ),
                solution_parameters=astrometry.SolutionParameters(
                    logodds_callback=lambda l: astrometry.Action.STOP if len(l)>=10 else astrometry.Action.CONTINUE
                )
            )
            if sol.has_match():
                best = sol.best_match()
                self.logger.info(f"WCS match: RA={best.center_ra_deg:.5f}, DEC={best.center_dec_deg:.5f}, scale={best.scale_arcsec_per_pixel:.3f}" )
                sci.wcs = best.astropy_wcs()
                hdr = best.astropy_wcs().to_header(relax=False)
                sci.header.extend(hdr, update=True)
                sci.header['PIXSCALE'] = (best.scale_arcsec_per_pixel, "arcsec/pixel")
                sci.header['HISTORY'] = f"({datetime.now().isoformat()}) WCS updated"
            else:
                self.logger.warning("No WCS solution found.")

        # compute center coords
        cen = (sci.header['NAXIS1']//2, sci.header['NAXIS2']//2)
        sky = sci.wcs.pixel_to_world(*cen)
        loc = EarthLocation(lat=sci.header['LAT']*u.deg, lon=sci.header['LON']*u.deg, height=sci.header['ELEV']*u.km)
        altaz = sky.transform_to(AltAz(obstime=Time(sci.header['JD'], format='jd'), location=loc))
        sci.header['RACEN'] = (sky.ra.value, "deg center RA")
        sci.header['DECCEN'] = (sky.dec.value, "deg center DEC")
        sci.header['ALTCEN'] = (altaz.alt.value, "deg center Alt")
        sci.header['AZCEN'] = (altaz.az.value, "deg center Az")

        outpath = Path(outdir)/ (fpath_fits.stem + ".wcs" + fpath_fits.suffix)
        fits.PrimaryHDU(data=sci.data, header=sci.header).writeto(outpath, overwrite=True)
        self.logger.info(f"WCS updated: {outpath.name}")
        if return_fpath:
            return outpath

    def preprocessing(self, fpath_fits, outdir, masterdir, return_fpath=True):
        """
        Subtract bias, dark, mask bad pixels, and flat-correct a science frame.
        """
        fpath_fits = Path(fpath_fits)
        outdir = Path(outdir)
        masterdir = Path(masterdir)
        outdir.mkdir(parents=True, exist_ok=True)

        try:
            sci = ccdproc.CCDData.read(fpath_fits)
        except FileNotFoundError:
            self.logger.error(f"File not found: {fpath_fits}")
            return
        except ValueError:
            sci = ccdproc.CCDData.read(fpath_fits, unit="adu")
            self.logger.warning(f"BUNIT undefined; defaulting to 'adu': {fpath_fits}")
        except Exception as e:
            self.logger.error(f"Error reading FITS: {e}")
            return

        # load masters
        mbias = self._select_master(masterdir, 'BIAS', sci.header['JD'])
        mdark = self._select_master(masterdir, 'DARK', sci.header['JD'], sci.header['EXPTIME'])
        mflat = self._select_master(masterdir, 'FLAT', sci.header['JD'])

        # bias
        bsci = ccdproc.subtract_bias(sci, mbias)
        bsci.meta['BIASCORR'] = True
        self.logger.info("Bias subtracted.")
        # dark
        bdsci = ccdproc.subtract_dark(bsci, mdark, exposure_time="EXPTIME", exposure_unit=u.second)
        bdsci.meta['DARKCORR'] = True
        self.logger.info("Dark subtracted.")
        # mask
        bdsci.mask = bdsci.data < 0
        bdsci.data = np.nan_to_num(np.clip(bdsci.data, 0, None))
        self.logger.info("Bad pixels masked.")
        # flat
        psci = ccdproc.flat_correct(bdsci, mflat)
        psci.meta['FLATCORR'] = True
        self.logger.info("Flat corrected.")

        # build output name
        rac = int(round(psci.header['RACEN']))
        dec = int(round(psci.header['DECCEN']))
        pm = 'p' if dec >= 0 else 'n'
        exp = int(round(psci.header['EXPTIME']))
        obst = Time(psci.header['DATE-OBS']).strftime("%Y%m%d%H%M%S")
        outname = f"kl4040.sci.{rac:03d}.{pm}{abs(dec):02d}.{exp:03d}.{obst}.fits"
        outpath = outdir / outname
        fits.PrimaryHDU(data=psci.data, header=psci.header).writeto(outpath, overwrite=True)
        self.logger.info(f"Preprocessing complete: {outname}")
        if return_fpath:
            return outpath

    def _select_master(self, masterdir, imagetyp, jd_target, exptime=None):
        """
        Helper to select the closest master frame by JD (and exposure time if given).
        """
        coll = ccdproc.ImageFileCollection(masterdir, glob_include="*.fits").filter(imagetyp=imagetyp)
        df = coll.summary.to_pandas()
        df['jd'] = df.get('jd', df.get('JD')).astype(float)
        df['diff'] = (df['jd'] - jd_target).abs()
        if exptime is not None and 'exptime' in df.columns:
            df = df[df['exptime'].astype(float) == float(exptime)]
        idx = df['diff'].idxmin()
        return ccdproc.CCDData.read(Path(coll.files_filtered(include_path=True)[idx]), unit='adu')


In [None]:
import logging
from pathlib import Path
from astropy.io import fits
from astropy.coordinates import EarthLocation, AltAz
from astropy.time import Time
import astropy.units as u
from datetime import datetime
import ccdproc
import astrometry
import sep
import numpy as np

class RasaLcpyLV1:
    
    """
    Class for updating Level-1 (pre-processed) FITS headers in the RASA lcpy KL4040 data pipeline.

    This encapsulates metadata verification, enrichment (coordinate transforms, elongations),
    and logging into a reusable component suitable for CLI or workflow integration.
    """

    def __init__(self, log_file: str = None):
        """
        Initialize logger. If log_file is provided, logs are written to that file as well as stdout.

        Parameters
        ----------
        log_file : str, optional
            Path to a log file to append processing records.
        """
        self.logger = logging.getLogger(self.__class__.__name__)
        self.logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
        self.logger.addHandler(handler)

        if log_file:
            file_h = logging.FileHandler(log_file)
            file_h.setFormatter(handler.formatter)
            self.logger.addHandler(file_h)
            
    def update_wcs(self, fpath_fits, outdir, return_fpath=True):
    
        fpath_fits = Path(fpath_fits)
        outdir = Path(outdir)

        try:
            sci = ccdproc.CCDData.read(fpath_fits)
            
        except FileNotFoundError:
            print(f"File Not Found: {fpath_fits}")
            
        except ValueError:
            sci = ccdproc.CCDData.read(fpath_fits, unit="adu")
            print(f"Warning. the BUNIT is undefined. Use BUNIT='adu': {fpath_fits}")
            
        except Exception as e:
            print(f"Failed to read FITS file: {e}")

        # Source detection (sep)
        sci.data         = sci.data.astype(np.float32)
        bkg              = sep.Background(sci.data) 
        objects          = sep.extract(sci.data - bkg.back(), thresh=3.0*bkg.globalrms, minarea=5)
        objects_bright   = objects[np.argsort(objects['flux'])[::-1][:50]] # Brightest 50 stars
        xcoords, ycoords = objects_bright['x'], objects_bright['y']
        coords           = np.vstack((xcoords, ycoords)).T

        # Solve WCS (astrometry.Solver)
        with astrometry.Solver(
                astrometry.series_4100.index_files(
                    cache_directory="astrometry_cache",
                    scales={8, 9, 10}
                    # Series scales. Each scale has different skymark diameter. 
                    # Recommend 0.1x - 1.0x of FOV diameter.
                    # For detail, see https://pypi.org/project/astrometry/#choosing-series
                )
            ) as solver:

                solution = solver.solve(
                    stars=coords,
                    size_hint=astrometry.SizeHint(
                        lower_arcsec_per_pixel=2.95,
                        upper_arcsec_per_pixel=3.00, # kl4040 pixel scale 2.97"
                    ),
                    position_hint=astrometry.PositionHint(
                        ra_deg=sci.header["RA"],
                        dec_deg=sci.header["DEC"],
                        radius_deg=5.0,
                    ),
                    solution_parameters=astrometry.SolutionParameters(
                        logodds_callback=lambda l: (
                            astrometry.Action.STOP if len(l) >= 10 else astrometry.Action.CONTINUE
                        )
                    )
                )

                # Solution found
                if solution.has_match():
                    
                    best = solution.best_match()
                    print(f"\tWCS matched: RA={best.center_ra_deg:.5f} deg, DEC={best.center_dec_deg:.5f} deg, scale={best.scale_arcsec_per_pixel:.3f} \"/pix")

                    # Update wcs information into header
                    sci.wcs = best.astropy_wcs()

                    hdr_wcs = best.astropy_wcs().to_header(relax=False)
                    sci.header.extend(hdr_wcs, update=True)
                    sci.header['PIXSCALE'] =(best.scale_arcsec_per_pixel, "[arcsec/pixel] Pixel Scale")
                    sci.header["HISTORY"]  = f"({datetime.now().isoformat()}) WCS updated. (astrometry.Solver)"
                    
                else:
                    print("\tSkipping: No WCS match found.")
                    
        # Center pixel coordiates based on WCS
        xycoord_cenpix  = sci.header["NAXIS1"]//2, sci.header["NAXIS2"]//2 # Center pixel
        skycoord_cenpix = sci.wcs.pixel_to_world(*xycoord_cenpix) # Sky coordinate of center pixel
        location        = EarthLocation(lat=sci.header["LAT"]*u.deg, lon=sci.header["LON"]*u.deg, height=sci.header["ELEV"]*u.km)
        obstime         = Time(sci.header['JD'], format='jd')
        altaz_frame     = AltAz(obstime=obstime, location=location)
        altaz_cenpix    = skycoord_cenpix.transform_to(altaz_frame)

        # Update header information
        sci.header["RACEN"]  = (skycoord_cenpix.ra.value, "[deg] Right Ascension of center pixel")
        sci.header["DECCEN"] = (skycoord_cenpix.dec.value, "[deg] Declination of center pixel")
        sci.header["ALTCEN"] = (altaz_cenpix.alt.value, "[deg] Altitude of center pixel")
        sci.header["AZCEN"]  = (altaz_cenpix.az.value, "[deg] Azimuth of center pixel")

        hdul_updated = fits.PrimaryHDU(data=sci.data, header=sci.header)
        fpath_fits_updated = outdir / (fpath_fits.stem + ".wcs" + fpath_fits.suffix)
        hdul_updated.writeto(fpath_fits_updated, overwrite=True)
        print(f"WCS updated. {fpath_fits.name} -> {fpath_fits_updated.name}")
        
        if return_fpath:
            return fpath_fits_updated
        
    
    def preprocessing(self, fpath_fits, outdir, masterdir, return_fpath=True):

        fpath_fits = Path(fpath_fits)
        masterdir  = Path(masterdir)
        outdir     = Path(outdir)
        outdir.mkdir(exist_ok=True, parents=True)

        try:
            sci = ccdproc.CCDData.read(fpath_fits)
            
        except FileNotFoundError:
            print(f"File Not Found: {fpath_fits}")
            
        except ValueError:
            sci = ccdproc.CCDData.read(fpath_fits, unit="adu")
            print(f"Warning. the BUNIT is undefined. Use BUNIT='adu': {fpath_fits}")
            
        except Exception as e:
            print(f"Failed to read FITS file: {e}")

        mbiasframes = ccdproc.ImageFileCollection(masterdir, glob_include="[!._]*.fits").filter(imagetyp="BIAS")
        if len(mbiasframes.files) == 0:
            print(f"No master bias found in the directory: {masterdir}")
            return
            
        mdarkframes = ccdproc.ImageFileCollection(masterdir, glob_include="[!._]*.fits").filter(imagetyp="DARK")
        if len(mdarkframes.files) == 0:
            print(f"No master dark found in the directory: {masterdir}")
            return

        mflatframes = ccdproc.ImageFileCollection(masterdir, glob_include="[!._]*.fits").filter(imagetyp="FLAT")
        if len(mflatframes.files) == 0:
            print(f"No master flat found in the directory: {masterdir}")
            return
            
        # Load mbias (Observed closest to science frame)
        jd_mbiases          = mbiasframes.summary.to_pandas()["jd"]
        idx_mbias           = (jd_mbiases - sci.header["JD"]).abs().idxmin()
        fpath_mbias         = Path(mbiasframes.files_filtered(include_path=True)[idx_mbias])
        mbias               = ccdproc.CCDData.read(fpath_mbias)

        # Load the master dark frame (Observed and exposured closest to science frame)
        darkexptime         = set(mdarkframes.summary["exptime"])
        darkexptime_closest = min(darkexptime, key=lambda x: abs(float(x) - sci.header["EXPTIME"]))
        mdarkframes_exptime = mdarkframes.filter(**{"EXPTIME": darkexptime_closest})
        jd_values           = mdarkframes_exptime.summary.to_pandas()["jd"]
        idx_mdark           = (jd_values - sci.header["JD"]).abs().idxmin()
        fpath_mdark         = Path(mdarkframes_exptime.files_filtered(include_path=True)[idx_mdark])
        mdark               = ccdproc.CCDData.read(fpath_mdark)

        # Load master flat (Close to the observation time)
        jd_values            = mflatframes.summary.to_pandas()["jd"]
        idx_mflat            = (jd_values - sci.header["JD"]).abs().idxmin()
        fpath_mflat          = Path(mflatframes.files_filtered(include_path=True)[idx_mflat])
        mflat                = ccdproc.CCDData.read(fpath_mflat)

        # Subtract bias
        bsci = ccdproc.subtract_bias(sci, mbias)
        bsci.meta["BIASCORR"] = True
        bsci.meta["HISTORY"] = f"({datetime.now().isoformat()}) Bias subtracted."
        bsci.meta["HISTORY"] = f'Master bias: {fpath_mbias.name}'

        # Subtract dark
        bdsci = ccdproc.subtract_dark(bsci, mdark, exposure_time="EXPTIME", exposure_unit=u.second)
        bdsci.meta["DARKCORR"] = True
        bdsci.meta["HISTORY"] = f"({datetime.now().isoformat()}) Dark subtracted."
        bdsci.meta["HISTORY"] = f'Master dark: {fpath_mdark.name}'

        # Bad pixel masking
        bdsci.mask = bdsci.data < 0
        bdsci.data = np.nan_to_num(np.clip(bdsci.data, a_min=0., a_max=None), nan=0.0)
        bdsci.meta['HISTORY'] = f"({datetime.now().isoformat()}) Negative values masked."

        # Correct Flat
        psci = ccdproc.flat_correct(bdsci, mflat)
        if mflat.mask is not None:
            psci.mask = psci.mask * mflat.mask
        psci.meta["FLATCORR"] = True
        psci.meta["HISTORY"] = f"({datetime.now().isoformat()}) Flat corrected."
        psci.meta["HISTORY"] = f'Master flat: {fpath_mflat.name}'

        #### Bad pixel masking (strange flat values, TBA)

        # Updated file name
        racen_int   = int(round(psci.header['RACEN']))
        deccen_int  = int(round(psci.header['DECCEN']))
        deccen_pm   = "p" if deccen_int >=0. else "n"
        exptime_int = int(round(psci.header["EXPTIME"]))
        obstime     = Time(psci.header["DATE-OBS"]).strftime("%Y%m%d%H%M%S")
        fpath_fits_updated = outdir / f"kl4040.sci.{racen_int:03d}.{deccen_pm}{abs(deccen_int):02d}.{exptime_int:03d}.{obstime}.fits"

        psci.meta["OBJECT"] = fpath_fits_updated.stem

        hdul_updated = fits.PrimaryHDU(data=psci.data, header=psci.header)
        hdul_updated.writeto(fpath_fits_updated, overwrite=True)
        print(f"Preprocessing: {fpath_fits} -> {fpath_fits_updated}")

        if return_fpath:
            return fpath_fits_updated

In [None]:
import logging
from pathlib import Path
from astropy.io import fits
from astropy.coordinates import EarthLocation, AltAz
from astropy.time import Time
import astropy.units as u
from datetime import datetime
import ccdproc
import astrometry
import sep
import numpy as np

class RasaLcpyLV1:
    """
    Class for updating Level-1 (pre-processed) FITS headers in the RASA lcpy KL4040 data pipeline.

    This encapsulates metadata verification, enrichment (coordinate transforms, elongations),
    and logging into a reusable component suitable for CLI or workflow integration.
    """

    def __init__(self, log_file: str = None):
        """
        Initialize logger. If log_file is provided, logs are written to that file as well as stdout.

        Parameters
        ----------
        log_file : str, optional
            Path to a log file to append processing records.
        """
        self.logger = logging.getLogger(self.__class__.__name__)
        self.logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
        self.logger.addHandler(handler)

        if log_file:
            file_h = logging.FileHandler(log_file)
            file_h.setFormatter(handler.formatter)
            self.logger.addHandler(file_h)
            
    def update_wcs(self, fpath_fits, outdir, return_fpath=True):
        """Solve and update WCS information in a Level-1 FITS file."""
        fpath_fits = Path(fpath_fits)
        outdir = Path(outdir)

        try:
            sci = ccdproc.CCDData.read(fpath_fits)
        except FileNotFoundError:
            self.logger.error(f"File Not Found: {fpath_fits}")
            return
        except ValueError:
            sci = ccdproc.CCDData.read(fpath_fits, unit="adu")
            self.logger.warning(f"BUNIT undefined, using 'adu': {fpath_fits}")
        except Exception as e:
            self.logger.error(f"Failed to read FITS file: {e}")
            return

        # Source detection (sep)
        sci.data = sci.data.astype(np.float32)
        bkg = sep.Background(sci.data)
        objects = sep.extract(sci.data - bkg.back(), thresh=3.0 * bkg.globalrms, minarea=5)
        objects_bright = objects[np.argsort(objects['flux'])[::-1][:50]]
        coords = np.vstack((objects_bright['x'], objects_bright['y'])).T

        with astrometry.Solver(
                astrometry.series_4100.index_files(
                    cache_directory="astrometry_cache",
                    scales={8, 9, 10}
                )
            ) as solver:

            solution = solver.solve(
                stars=coords,
                size_hint=astrometry.SizeHint(
                    lower_arcsec_per_pixel=2.95,
                    upper_arcsec_per_pixel=3.00,
                ),
                position_hint=astrometry.PositionHint(
                    ra_deg=sci.header["RA"],
                    dec_deg=sci.header["DEC"],
                    radius_deg=5.0,
                ),
                solution_parameters=astrometry.SolutionParameters(
                    logodds_callback=lambda l: (
                        astrometry.Action.STOP if len(l) >= 10 else astrometry.Action.CONTINUE
                    )
                )
            )

            if solution.has_match():
                best = solution.best_match()
                self.logger.info(
                    f"WCS matched: RA={best.center_ra_deg:.5f} deg, DEC={best.center_dec_deg:.5f} deg, "
                    f"scale={best.scale_arcsec_per_pixel:.3f} \"/pix"
                )
                sci.wcs = best.astropy_wcs()
                hdr_wcs = best.astropy_wcs().to_header(relax=False)
                sci.header.extend(hdr_wcs, update=True)
                sci.header['PIXSCALE'] = (best.scale_arcsec_per_pixel, "[arcsec/pixel] Pixel Scale")
                sci.header['HISTORY'] = f"({datetime.now().isoformat()}) WCS updated. (astrometry.Solver)"
            else:
                self.logger.warning("No WCS match found; skipping update.")

        # Center pixel coordinates
        xy_cen = sci.header['NAXIS1'] // 2, sci.header['NAXIS2'] // 2
        sky_cen = sci.wcs.pixel_to_world(*xy_cen)
        loc = EarthLocation(lat=sci.header['LAT']*u.deg,
                            lon=sci.header['LON']*u.deg,
                            height=sci.header['ELEV']*u.km)
        obstime = Time(sci.header['JD'], format='jd')
        altaz = sky_cen.transform_to(AltAz(obstime=obstime, location=loc))

        sci.header['RACEN'] = (sky_cen.ra.value, "[deg] RA of center pixel")
        sci.header['DECCEN'] = (sky_cen.dec.value, "[deg] Dec of center pixel")
        sci.header['ALTCEN'] = (altaz.alt.value, "[deg] Alt of center pixel")
        sci.header['AZCEN'] = (altaz.az.value, "[deg] Az of center pixel")

        hdul = fits.PrimaryHDU(data=sci.data, header=sci.header)
        outpath = outdir / (fpath_fits.stem + ".wcs" + fpath_fits.suffix)
        hdul.writeto(outpath, overwrite=True)
        self.logger.info(f"WCS updated: {fpath_fits.name} -> {outpath.name}")

        if return_fpath:
            return outpath

    def preprocessing(self, fpath_fits, outdir, masterdir, return_fpath=True):
        """Apply bias, dark, and flat corrections to a science frame."""
        fpath_fits = Path(fpath_fits)
        outdir = Path(outdir)
        masterdir = Path(masterdir)
        outdir.mkdir(exist_ok=True, parents=True)

        try:
            sci = ccdproc.CCDData.read(fpath_fits)
        except FileNotFoundError:
            self.logger.error(f"File Not Found: {fpath_fits}")
            return
        except ValueError:
            sci = ccdproc.CCDData.read(fpath_fits, unit="adu")
            self.logger.warning(f"BUNIT undefined, using 'adu': {fpath_fits}")
        except Exception as e:
            self.logger.error(f"Failed to read FITS file: {e}")
            return

        # Load masters
        mbias_frames = ccdproc.ImageFileCollection(masterdir, glob_include="*.fits").filter(imagetyp="BIAS")
        mdark_frames = ccdproc.ImageFileCollection(masterdir, glob_include="*.fits").filter(imagetyp="DARK")
        mflat_frames = ccdproc.ImageFileCollection(masterdir, glob_include="*.fits").filter(imagetyp="FLAT")

        if not mbias_frames.files:
            self.logger.error(f"No master bias in {masterdir}")
            return
        if not mdark_frames.files:
            self.logger.error(f"No master dark in {masterdir}")
            return
        if not mflat_frames.files:
            self.logger.error(f"No master flat in {masterdir}")
            return

        # Bias subtraction
        jd_vals = mbias_frames.summary.to_pandas()["jd"].astype(float)
        idx = (abs(jd_vals - sci.header['JD'])).idxmin()
        mbias = ccdproc.CCDData.read(Path(mbias_frames.files_filtered(include_path=True)[idx]), unit="adu")
        bsci = ccdproc.subtract_bias(sci, mbias)
        bsci.meta['BIASCORR'] = True
        bsci.meta['HISTORY'] = f"({datetime.now().isoformat()}) Bias subtracted."
        self.logger.info(f"Bias subtracted using {Path(mbias_frames.files_filtered(include_path=True)[idx]).name}")

        # Dark subtraction
        exp_list = set(mdark_frames.summary['exptime'])
        closest_exp = min(exp_list, key=lambda x: abs(float(x) - sci.header['EXPTIME']))
        drange = mdark_frames.filter(**{"EXPTIME": closest_exp})
        jd_vals = drange.summary.to_pandas()['jd'].astype(float)
        idxd = (abs(jd_vals - sci.header['JD'])).idxmin()
        mdark = ccdproc.CCDData.read(Path(drange.files_filtered(include_path=True)[idxd]), unit="adu")
        bdsci = ccdproc.subtract_dark(bsci, mdark, exposure_time="EXPTIME", exposure_unit=u.second)
        bdsci.meta['DARKCORR'] = True
        bdsci.meta['HISTORY'] = f"({datetime.now().isoformat()}) Dark subtracted."
        self.logger.info(f"Dark subtracted using {Path(drange.files_filtered(include_path=True)[idxd]).name}")

        # Bad pixel masking
        bdsci.mask = bdsci.data < 0
        bdsci.data = np.nan_to_num(np.clip(bdsci.data, a_min=0., a_max=None), nan=0.0)
        bdsci.meta['HISTORY'] = f"({datetime.now().isoformat()}) Negative values masked."
        self.logger.info("Bad pixels masked.")

        # Flat correction
        jd_vals = mflat_frames.summary.to_pandas()['jd'].astype(float)
        idxf = (abs(jd_vals - sci.header['JD'])).idxmin()
        mflat = ccdproc.CCDData.read(Path(mflat_frames.files_filtered(include_path=True)[idxf]), unit="adu")
        psci = ccdproc.flat_correct(bdsci, mflat)
        if mflat.mask is not None:
            psci.mask = psci.mask * mflat.mask
        psci.meta['FLATCORR'] = True
        psci.meta['HISTORY'] = f"({datetime.now().isoformat()}) Flat corrected."  
        self.logger.info(f"Flat corrected using {Path(mflat_frames.files_filtered(include_path=True)[idxf]).name}")

        # Construct output filename
        rac = int(round(psci.header['RACEN']))
        dec = int(round(psci.header['DECCEN']))
        pm = 'p' if dec >= 0 else 'n'
        exp = int(round(psci.header['EXPTIME']))
        obst = Time(psci.header['DATE-OBS']).strftime("%Y%m%d%H%M%S")
        outname = f"kl4040.sci.{rac:03d}.{pm}{abs(dec):02d}.{exp:03d}.{obst}.fits"
        outpath = outdir / outname

        psci.meta['OBJECT'] = outpath.stem
        fits.PrimaryHDU(data=psci.data, header=psci.header).writeto(outpath, overwrite=True)
        self.logger.info(f"Preprocessing complete: {fpath_fits.name} -> {outname}")

        if return_fpath:
            return outpath


In [None]:

from astropy.io import fits
from astropy.time import Time
from astropy.wcs import FITSFixedWarning
import astropy.units as u
from astropy.nddata import CCDData
from astropy.stats import mad_std
import ccdproc as ccdp
import numpy as np
import warnings
from datetime import datetime
from pathlib import Path
from tqdm import tqdm
import _funcs as funcs

warnings.filterwarnings("ignore", category=FITSFixedWarning)

def make_mbias(biasframes, masterdir):
    
    """
    Combine multiple bias frames into a master bias frame and save the result.
    
    Parameters:
    -----------
    biasframes : list of str
        List of file paths to the individual bias frames to be combined.
    masterdir : str or Path
        Directory where the master bias frame will be saved.
        
    Returns:
    --------
    None
        The function saves the combined master bias frame as a FITS file in the specified directory.
        
    Notes:
    ------
    - The function checks if any bias frames are provided. If none are found, it exits without processing.
    - The observation date is extracted from the header of the first bias frame and used to name the output file.
    - The bias frames are combined using a median method with sigma clipping to remove outliers.
    - Metadata is added to the resulting master bias frame, including information about the combination process and the input files.
    - The output file is saved in the specified directory with a name in the format `rasa.bias.<obsdate>.comb.fits`.
    
    Example:
    --------
    >>> biasframes = ["bias1.fits", "bias2.fits", "bias3.fits"]
    >>> mbiasdir = "/path/to/output/directory"
    >>> make_mbias(biasframes, mbiasdir)
    """
    
    print("""\n================================
Master Bias
================================""")
    
    masterdir = Path(masterdir)
    masterdir.mkdir(parents=True, exist_ok=True)
    
    # Check if bias frames exist.
    if len(biasframes) == 0:
        print(f"No bias frames found.")
        return
    
    # Metatdata (first frame)
    meta_bias  = fits.getheader(biasframes[0])
    
    # Extract observation date.
    # obsdate_jd = meta_bias["JD"]
    obsdate    = meta_bias["OBSDATE"] # YYYYMMDD
    
    # Output file name 
    fpath_mbias = masterdir/(f"kl4040.bias.comb.{obsdate}.fits")
    
    print(f"\nobs-date: {obsdate}")
    print(f"\n{len(biasframes)} bias frames have been found. Combining master frame...")
    
    mbias = ccdp.combine(
        biasframes,
        method='median',
        # unit='adu',
        sigma_clip=True,
        sigma_clip_low_thresh=5,
        sigma_clip_high_thresh=5,
        sigma_clip_func=np.ma.median,
        sigma_clip_dev_func=mad_std,
        mem_limit=500e6,
        dtype=np.float32
        )
    
    # Additional metadata
    mbias.meta['COMBINED'] = True
    mbias.meta["NCOMBINE"] = (len(biasframes), "Number of combined frames")
    # mbias.meta['jd']       = (obsdate_jd, "Julian date")
    mbias.meta['IMAGETYP'] = "BIAS"
    mbias.meta['OBJECT']   = fpath_mbias.stem
    mbias.meta['HISTORY']  = f'({datetime.now().isoformat()}) {len(biasframes)} bias frames combined.'
    for i in range(len(biasframes)):
        mbias.meta['HISTORY'] = f"bias-{i+1:2d}: {biasframes[i]}"
    
    # Save the output
    hdu_mbias = fits.PrimaryHDU(data=mbias.data, header=mbias.header)
    hdu_mbias.writeto(fpath_mbias, overwrite=True)
    # mbias.write(fpath_mbias, overwrite=True)
    print(f"Done. {fpath_mbias} has been created. ({datetime.now().isoformat()})")



def make_mdark(darkframes, masterdir, key_exptime='exptime'):
    """
    Create master dark frames by subtracting a master bias frame from dark frames 
    and combining the bias-subtracted dark frames for each exposure time.
    
    Parameters:
    -----------
    darkframes : list of pathlib.Path
        List of file paths to the dark frames to be processed.
    masterdir : pathlib.Path
        Directory where the master bias frames are located and where the resulting master dark frames will be saved.
    key_exptime : str, optional
        Header keyword used to extract the exposure time from the FITS files. 
        Default is 'exptime'.
        
    Returns:
    --------
    None
        The function saves the master dark frames to the specified directory 
        and does not return any value.
        
    Notes:
    ------
    - The function checks for the existence of the master bias directory and 
      ensures that it contains valid master bias frames.
    - Dark frames are bias-subtracted using the closest-in-time master bias frame.
    - A temporary directory is created to store intermediate bias-subtracted dark frames, 
      which is cleaned up after processing.
    - Master dark frames are created for each unique exposure time by combining 
      the bias-subtracted dark frames using median combination with sigma clipping.
    - Metadata is added to the resulting master dark frames, including a history 
      of the combined frames.
    - The function prints progress and status messages during execution.
    """
    
    print("""\n================================
Master Dark
================================""")
    
    # Check the directory
    masterdir = Path(masterdir)
    if not masterdir.exists():
        print(f"Directory not found : {masterdir}")
        return
    
    # Check master bias.
    mbiasframes = ccdp.ImageFileCollection(masterdir, glob_include="[!._]*.fits").filter(imagetyp="Bias")
    if len(mbiasframes.files) == 0:
        print(f"No master bias found in directory: {masterdir}")
        return
        
    # Check dark frames.
    if len(darkframes) == 0:
        print("No dark frames found.")
        return

    # Metatdata (first frame)
    meta_dark  = fits.getheader(darkframes[0])
    
    # Extract observation date.
    obsdate_jd = meta_dark["JD"]
    obsdate    = meta_dark["OBSDATE"]

    print(f"\nobs-date: {obsdate}")
    
    # Load the master bias frame (Closed to the observation time)
    jd_mbias    = mbiasframes.summary.to_pandas()["jd"]
    idx_mbias   = (jd_mbias - obsdate_jd).abs().idxmin()
    fpath_mbias = Path(mbiasframes.files_filtered(include_path=True)[idx_mbias])
    mbias       = CCDData.read(fpath_mbias, unit="adu")
    
    # Temporary directory (removed after end of process)
    TMPDIR = masterdir/'tmp' 
    TMPDIR.mkdir(parents=True, exist_ok=True)
    funcs.clear_dir(TMPDIR)
    
    # -------------------------------------
    # Subtract master bias from dark frames
    # -------------------------------------
    print(f'{len(darkframes)} dark frames have been found. Subtracting master bias...')
    for fpath_dark in darkframes: 
                           
        fpath_bdark = TMPDIR / (fpath_dark.stem+'.bs'+fpath_dark.suffix)
        
        dark       = CCDData.read(fpath_dark)
        bdark      = ccdp.subtract_bias(dark, mbias)
        bdark.meta["history"] = f"Master bias: {fpath_mbias.name}"
        bdark.write(fpath_bdark, overwrite=True)
    
    # Collect the bias-subtracted dark frames
    bdarkframes   = ccdp.ImageFileCollection(TMPDIR, glob_include='[!._]*.fits')
    
    # Get the exposure time of the dark frames
    allexptime    = set(bdarkframes.summary[key_exptime])
    
    # --------------------------------------------
    # combine master dark (for each exposure time)
    # --------------------------------------------
    for exptime in allexptime:
        
        bdarkframes_exptime = [Path(f) for f in bdarkframes.files_filtered(exptime=exptime, include_path=True)]
        
        # Output file name 
        fpath_mdark = masterdir/(f"kl4040.dark.{int(round(exptime)):03d}s.comb.{obsdate}.fits")
        
        # filename    = fits.getheader(bdarkframes_exptime[0])["irafname"]
        # parts       = filename.split(".")
        # parts[-1]   = "comb"
        # fpath_mdark = masterdir/(".".join(parts) + ".fits") 
        # fpath_mdark = masterdir / f"dark.2x2.{exptime:1.0f}.{obsdate}.comb.fits"
        
        print(f'\n{len(bdarkframes_exptime)} dark frames have been found (exptime={exptime:3.1f}s). Combining master frame...')
        mdark = ccdp.combine(bdarkframes_exptime,
                            method='median',
                            sigma_clip=True,
                            sigma_clip_low_thresh=5,
                            sigma_clip_high_thresh=5,
                            sigma_clip_func=np.ma.median,
                            sigma_clip_dev_func=mad_std,
                            mem_limit=500e6,
                            dtype=np.float32
                            )    
        
        # Add metadata
        mdark.meta['combined'] = True
        mdark.meta['NCOMBINE'] = len(bdarkframes_exptime)
        # mdark.meta['jd']       = (obsdate_jd, "Julian date")
        mdark.meta['imagetyp'] = "DARK"
        mdark.meta['object'] = fpath_mdark.stem
        mdark.meta['history']  = f'({datetime.now().isoformat()}) {len(bdarkframes_exptime)} dark frames combined.'
        for i in range(len(bdarkframes_exptime)):
            mdark.meta['history'] = f"dark-{i+1:2d}: {bdarkframes_exptime[i]}"
        
        # Save the output
        # mdark.write(fpath_mdark, overwrite=True)
        hdu_mdark = fits.PrimaryHDU(data=mdark.data, header=mdark.header)
        hdu_mdark.writeto(fpath_mdark, overwrite=True)
    
        print(f"Done. {fpath_mdark} has been created. ({datetime.now().isoformat()})")

    # Clear out the temporary directory
    funcs.clear_dir(TMPDIR)
    TMPDIR.rmdir()
    print(f"\nTemporary directory {TMPDIR} has been deleted. ({datetime.now().isoformat()})")



def make_mflat(flatframes, masterdir, filtername, key_exptime="exptime"):
    """
    Create a master flat frame by processing a set of flat frames.
    This function performs the following steps:
    1. Checks for the existence of the master directory and required master bias and dark frames.
    2. Subtracts the master bias from the flat frames.
    3. Subtracts the master dark from the bias-subtracted flat frames.
    4. Combines the processed flat frames into a single master flat frame using median combination.
    Parameters:
    -----------
    flatframes : list of pathlib.Path
        List of file paths to the flat frames to be processed.
    masterdir : str or pathlib.Path
        Path to the directory containing the master calibration frames (bias and dark).
    filtername : str
        Name of the filter used for the flat frames (e.g., "R", "G", "B").
    key_exptime : str, optional
        Header keyword for the exposure time in the FITS files. Default is "exptime".
    Returns:
    --------
    None
        The function saves the resulting master flat frame to the specified master directory.
    Notes:
    ------
    - The function assumes that the flat frames, master bias, and master dark frames are in FITS format.
    - Temporary files are created during processing and are removed after the master flat frame is created.
    - The resulting master flat frame is saved with a filename format: `rasa.flat.<obsdate>.<filtername>.comb.fits`.
    Raises:
    -------
    FileNotFoundError:
        If the specified master directory does not exist.
    ValueError:
        If no flat frames, master bias, or master dark frames are found.
    Example:
    --------
    >>> from pathlib import Path
    >>> flatframes = [Path("flat1.fits"), Path("flat2.fits")]
    >>> masterdir = Path("/path/to/master/frames")
    >>> filtername = "R"
    >>> make_mflat(flatframes, masterdir, filtername)
    """
    
    print("""\n================================
Master Flat
================================""")
    
    # Check directory
    masterdir = Path(masterdir)
    if not masterdir.exists():
        print(f"Directory not found : {masterdir}")
        return

    # Check master bias.
    mbiasframes = ccdp.ImageFileCollection(masterdir, glob_include="[!._]*.fits").filter(imagetyp="Bias")
    if len(mbiasframes.files) == 0:
        print(f"No master bias found in directory: {masterdir}")
        return
    
    # # Check master dark.
    # mdarkframes = ccdp.ImageFileCollection(masterdir, glob_include="[!._]*.fits").filter(imagetyp="Dark")
    # if len(mdarkframes.files) == 0:
    #     print(f"No master bias found in directory: {masterdir}")
    #     return
                
    # Check flat frames    
    if len(flatframes) == 0:
        print("No flat frame found.")
        return
    
    # Extract observation date.
    obsdate_jd = fits.getheader(flatframes[0])["j_date"]
    obsdate_jd = float(obsdate_jd.split()[0])
    obsdate = Time(obsdate_jd, format="jd").to_datetime().strftime("%Y%m%d") 
    
    print(f"\nobs-date: {obsdate}")
    
    # Load the master bias frame (Closed to the observation time)
    jd_values = (mbiasframes.summary.to_pandas())["j_date"].apply(lambda x: float(x.split()[0]))
    idx_mbias = (jd_values - obsdate_jd).abs().idxmin()
    fpath_mbias = Path(mbiasframes.files_filtered(include_path=True)[idx_mbias])
    mbias = CCDData.read(fpath_mbias)

    # Temporary directory (removed after end of process)
    TMPDIR = masterdir/'tmp' 
    TMPDIR.mkdir(parents=True, exist_ok=True)
    funcs.clear_dir(TMPDIR)
    
    # Output file name 
    filename    = fits.getheader(flatframes[0])["irafname"]
    parts       = filename.split(".")
    parts[-1]   = "comb"
    fpath_mflat = masterdir/(".".join(parts) + ".fits") 
    # fpath_mflat = masterdir / f"mf.2x2.{obsdate}.{filtername}.comb.fits"
    
    # -----------------------------------
    # Subtract master bias from flat frames
    # -----------------------------------
    for fpath_flat in tqdm(flatframes,
                           total=len(flatframes), 
                           desc=f'Subtracting master bias from {len(flatframes)} flat frames...'):
        
        fpath_bflat = TMPDIR / (fpath_flat.stem+'.bs'+fpath_flat.suffix) # Bias subtracted flat
        
        flat       = CCDData.read(fpath_flat, unit='adu')
        bflat      = ccdp.subtract_bias(flat, mbias)
        bflat.meta["history"] = f"Master bias: {fpath_mbias.name}"
        bflat.write(fpath_bflat, overwrite=True)
    
    # Collect the bias-subtracted flat frames
    bflatframes   = ccdp.ImageFileCollection(TMPDIR, glob_include="[!._]*.fits")
    bflatframes   = [Path(f) for f in bflatframes.files_filtered(include_path=True)]
    
    # # Get the exposure time of the flat frames
    # allexptime    = set(bflatframes.summary[key_exptime])
    
    # # Get the exposure time of the dark frames
    # darkexptime   = set(mdarkframes.summary[key_exptime])
    
    # # Save bias and dark subtracted flat frames
    # bdflatframes  = [] 

    # # -----------------------------------
    # # Subtract master dark from flat frames
    # # -----------------------------------
    # for exptime in allexptime:
        
    #     # Load the master dark frame (Close obsdate and exposure time)
    #     darkexptime_closest = min(darkexptime, key=lambda x: abs(x - exptime))
    #     mdarkframes_exptime = mdarkframes.filter(exptime=darkexptime_closest)
        
    #     jd_values = (mdarkframes_exptime.summary.to_pandas())["jd"]
    #     idx_mdark = (jd_values - obsdate_jd).abs().idxmin()
    #     fpath_mdark = Path(mdarkframes_exptime.files_filtered(include_path=True)[idx_mdark])
    #     mdark = CCDData.read(fpath_mdark)

    #     # Collect the bias-subtracted flat frames with same expsoure time
    #     bflatframes_exptime = [Path(bflat) for bflat in bflatframes.files_filtered(exptime=exptime, include_path=True)]
        
    #     # subtract master dark from flat frames
    #     for fpath_bflat in tqdm(bflatframes_exptime,
    #                             total=len(bflatframes_exptime), 
    #                             desc=f'Subtracting master dark (exptime={exptime:3.1f}s) from {len(bflatframes_exptime)} flat frames...'):
        
    #         fpath_bdflat = TMPDIR / (fpath_bflat.stem+'.ds'+fpath_bflat.suffix)
            
    #         bflat        = CCDData.read(fpath_bflat)
    #         bdflat       = ccdp.subtract_dark(bflat, mdark, exposure_time=key_exptime, exposure_unit=u.second)
    #         bdflat.meta["history"] = f"Master dark: {fpath_mdark.name}"
    #         bdflat.write(fpath_bdflat)
            
    #         fpath_bflat.unlink()
    #         bdflatframes.append(fpath_bdflat)
    
    # -----------------------------------
    # Combine master flat
    # -----------------------------------
    print(f'\n{len(bflatframes)} flat frames have been found (filter={filtername}). Combining master frame...')

    # # average-combine (recommneded for fast-calculation)
    # mflat = ccdp.combine(bdflatframes,
    #                      method='average',
    #                      scale=funcs.inv_median,
    #                      sigma_clip=True,
    #                      sigma_clip_low_thresh=5,
    #                      sigma_clip_high_thresh=5,
    #                      sigma_clip_func=np.ma.median,
    #                      sigma_clip_dev_func=mad_std,
    #                      mem_limit=350e6,
    #                      dtype=np.float32
    #                      )    

    # median combine (recommended for science frames)
    mflat = ccdp.combine(bflatframes,
                        method='median',
                        scale=funcs.inv_median,
                        sigma_clip=True,
                        sigma_clip_low_thresh=5,
                        sigma_clip_high_thresh=5,
                        sigma_clip_func=np.ma.median,
                        sigma_clip_dev_func=mad_std,
                        mem_limit=500e6,
                        dtype=np.float32
                        )

    # Add metadata
    mflat.meta['jd']       = (obsdate_jd, "Julian date") 
    mflat.meta['combined'] = True
    mflat.meta['filter']   = filtername
    mflat.meta['imagetyp'] = "Flat"
    mflat.meta['object']   = fpath_mflat.stem
    mflat.meta['history']  = f'{len(bflatframes)} flat frames combined at {datetime.now().isoformat()}'
    for i in range(len(bflatframes)):
            mflat.meta['history'] = f"flat-{i+1:2d}: {bflatframes[i]}"

    # Save the output
    mflat.write(fpath_mflat, overwrite=True)
    print(f"Master flat frame: {fpath_mflat} has been created. ({datetime.now()})")

    # delete temporary directory
    funcs.clear_dir(TMPDIR)
    TMPDIR.rmdir()
    print(f"Temporary directory {TMPDIR} has been deleted ({datetime.now().isoformat()}).")

In [None]:
import logging
from pathlib import Path
from astropy.io import fits
from astropy.coordinates import Angle, SkyCoord, GCRS, GeocentricTrueEcliptic
from astropy.coordinates import get_body, get_sun
from astropy.time import Time
import astropy.units as u
from datetime import datetime

class RasaLcpyLV0:
    """
    Class for updating Level-0 (raw) FITS headers in the RASA lcpy KL4040 data pipeline.
    """

    def __init__(self, log_file: str = None):
        # set up console + optional file logging
        self.logger = logging.getLogger(self.__class__.__name__)
        self.logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
        self.logger.addHandler(handler)
        if log_file:
            file_h = logging.FileHandler(log_file)
            file_h.setFormatter(handler.formatter)
            self.logger.addHandler(file_h)

    def update_header(self, fpath_fits):
        """
        Open a raw FITS file, validate and update its header, then overwrite the file.
        """
        fpath = Path(fpath_fits)
        if not fpath.exists():
            self.logger.error(f"File not found: {fpath}")
            return

        hdul = fits.open(fpath)
        hdr = hdul[0].header.copy()

        # ... (all the header-update logic here) ...

        new_hdu = fits.PrimaryHDU(data=hdul[0].data, header=hdr)
        new_hdu.writeto(fpath, overwrite=True)
        self.logger.info(f"Updated LV0 header: {fpath}")


In [None]:
import logging
from pathlib import Path
from astropy.io import fits
from astropy.time import Time
from astropy.wcs import FITSFixedWarning
import astropy.units as u
from astropy.nddata import CCDData
from astropy.stats import mad_std
import ccdproc as ccdp
import numpy as np
import warnings
from datetime import datetime
from tqdm import tqdm
import _funcs as funcs

warnings.filterwarnings("ignore", category=FITSFixedWarning)

class CombMaster:
    """
    Class to create master calibration frames (bias, dark, flat) for RASA lcpy KL4040.
    """

    def __init__(self, log_file: str = None):
        self.logger = logging.getLogger(self.__class__.__name__)
        self.logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
        self.logger.addHandler(handler)
        if log_file:
            file_h = logging.FileHandler(log_file)
            file_h.setFormatter(handler.formatter)
            self.logger.addHandler(file_h)

    def make_mbias(self, bias_frames, master_dir):
        """Combine bias frames into a master bias."""
        # ... (implementation as before) ...

    def make_mdark(self, dark_frames, master_dir, key_exptime='exptime'):
        """Subtract master bias and combine dark frames by exposure time."""
        # ... (implementation as before) ...

    def make_mflat(self, flat_frames, master_dir, filter_name, key_exptime='exptime'):
        """Subtract bias (and dark) and combine flat frames."""
        # ... (implementation as before) ...


In [None]:
# in your processing script:
from rasa_lcpy_lv0 import RasaLcpyLV0
from comb_master import CombMaster

lv0 = RasaLcpyLV0(log_file="pipeline.log")
comb = CombMaster(log_file="pipeline.log")

# update headers
lv0.update_header("/data/raw/image0001.fits")

# make master frames
comb.make_mbias(bias_list, "/data/cal/master")
comb.make_mdark(dark_list, "/data/cal/master")
comb.make_mflat(flat_list, "/data/cal/master", filter_name="R")


In [None]:
def make_mbias(biasframes, masterdir):
    """
    Combine multiple bias frames into a master bias frame and save the result.

    Parameters
    ----------
    biasframes : list of str or pathlib.Path
        Paths to the individual bias FITS frames to be combined.
    masterdir : str or pathlib.Path
        Directory where the master bias frame will be saved.

    Returns
    -------
    None
        Writes a FITS file named `kl4040.bias.comb.<obsdate>.fits` into masterdir.

    Notes
    -----
    - If biasframes is empty, the function exits without writing.
    - Uses median combination with sigma clipping (±5σ).
    - Adds metadata including COMBINED, NCOMBINE, IMAGETYP, OBJECT, and HISTORY.
    """
    print("""\n================================
Master Bias
================================""")
    …  # rest of original code unchanged

def make_mdark(darkframes, masterdir, key_exptime='exptime'):
    """
    Create master dark frames by subtracting a master bias and combining by exposure time.

    Parameters
    ----------
    darkframes : list of pathlib.Path
        Paths to the individual dark FITS frames to process.
    masterdir : pathlib.Path
        Directory containing the master bias and where master darks will be saved.
    key_exptime : str, optional
        Header keyword for exposure time (default: 'exptime').

    Returns
    -------
    None
        Writes one master dark FITS per unique exposure time `kl4040.dark.<exp>s.comb.<obsdate>.fits`.

    Notes
    -----
    - Requires at least one master bias in masterdir (IMAGETYP='Bias').
    - Subtracts the closest-in-time bias from each dark.
    - Combines resulting darks with median+sigma clipping.
    - Cleans up temporary bias-subtracted files on exit.
    """
    print("""\n================================
Master Dark
================================""")
    …  # rest of original code unchanged

def make_mflat(flatframes, masterdir, filtername, key_exptime="exptime"):
    """
    Create a master flat by bias-subtracting and combining flat frames.

    Parameters
    ----------
    flatframes : list of pathlib.Path
        Paths to the individual flat FITS frames to process.
    masterdir : str or pathlib.Path
        Directory containing master bias (and optionally dark) and where result is saved.
    filtername : str
        Filter identifier for naming (e.g., 'R', 'G', 'B', or 'CLEAR').
    key_exptime : str, optional
        Header keyword for exposure time (default: 'exptime').

    Returns
    -------
    None
        Writes a FITS file named `rasa.flat.<obsdate>.<filtername>.comb.fits` into masterdir.

    Notes
    -----
    - Checks for and uses master bias frames (IMAGETYP='Bias').
    - (Optional dark subtraction code is commented out.)
    - Combines bias-subtracted flats via median+sigma clipping, scaled by funcs.inv_median.
    - Removes temporary files and reports completion.
    """
    print("""\n================================
Master Flat
================================""")
    …  # rest of original code unchanged


In [None]:
import logging
from astropy.io import fits
from astropy.coordinates import Angle, SkyCoord, GCRS, GeocentricTrueEcliptic
from astropy.coordinates import get_body, get_sun
from astropy.time import Time
import astropy.units as u
from datetime import datetime

def LV0_HeaderUpdate(fpath_fits):
    

    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    # FITS file open
    try:
        hdul = fits.open(fpath_fits)
        
    except FileNotFoundError:
        logger.error(f"File not found: {fpath_fits}")
        return
    
    # Import header
    hdr = hdul[0].header
    
    ###
    ### (TBA) Check if the header info. is valid. 
    ###
    
    hdr_updated = hdr.copy()

    # Updates header info.
    hdr_updated.comments["LT"]  = "Local Time"
    hdr_updated.comments["UTC"] = "Universal Time Coordinated"
    hdr_updated["EXPTIME"]  = (float(hdr["EXPTIME"]), "Exposure Time (sec)")
    hdr_updated["CCDTEMP"]  = (float(hdr["CCDTEMP"]), "CCD Temperature (C)")
    hdr_updated["PIXSZ"]    = (float(hdr["PIXSZ"]), "Pixel Size (um)")
    hdr_updated["JD"]       = (float(hdr["JD"]), "Julian Date")
    hdr_updated["DATE-OBS"] = (Time(hdr_updated["JD"], format="jd").isot, "Observation Datetime")
    hdr_updated["MJD-OBS"]  = (Time(hdr_updated["JD"], format="jd").mjd, "Modified Julian Date")
    hdr_updated["FOCALLEN"] = (float(hdr["FOCALLEN"]), "Focal Length (mm)")
    hdr_updated["APTDIA"]   = (float(hdr["APTDIA"]), "Aperture Diameter (mm)")
    hdr_updated["FOCUS"]    = (int(hdr["FOCUS"]), "Focal Position")
    hdr_updated["OBSERVER"] = (hdr["OBSERVER"].upper(), "Observer")
    hdr_updated["IMAGETYP"] = (hdr["IMAGETYP"].upper(), "Image Type (BIAS, DARK, FLAT, LIGHT)")

    # Telescope & Site coordinates
    hdr_updated["RA"]       = (Angle(hdr["RA"], unit='hourangle').degree, "Telescope Right Ascension (deg)") if type(hdr["RA"]) is str else hdr["RA"]
    hdr_updated["DEC"]      = (Angle(hdr["DEC"], unit='degree').degree, "Telescope Declination (deg)") if type(hdr["DEC"]) is str else hdr["DEC"]
    hdr_updated["ALT"]      = (Angle(hdr["ALT"], unit='degree').degree, "Telescope Altitude (deg)") if type(hdr["ALT"]) is str else hdr["ALT"]
    hdr_updated["AZ"]       = (Angle(hdr["AZ"], unit='degree').degree, "Telescope Azimuth (deg)") if type(hdr["AZ"]) is str else hdr["AZ"]
    hdr_updated["LON"]      = (-119.4, "Site Longitude (deg)")
    hdr_updated["LAT"]      = (37.07, "Site Latitude (deg)")
    hdr_updated["ELEV"]     = (1.405, "Site Elevation (km)")

    # Additional info.
    hdr_updated["BUNIT"]    = ("adu", "array data unit")
    hdr_updated["OBSERVAT"] = "Sierra Remote Observatories"
    hdr_updated["DETECTOR"] = ("FLI Camera KL4040 FI", "Detector Name")
    hdr_updated["FILTER"]   = ("CLEAR", "Filter Name")
    hdr_updated["OBSDATE"]  = (Time(hdr["JD"], format="jd").to_datetime().strftime("%Y%m%d"), "YYYYMMD (UT)")

    hdr_updated["DATLEVEL"] = (0, "Data Process Level (0-2)")
    hdr_updated["COMBINED"] = (False, "Is image combined? (True/False)")
    hdr_updated["BIASCORR"] = (False, "Bias Corrected (True/False)")
    hdr_updated["DARKCORR"] = (False, "Dark Corrected (True/False)")
    hdr_updated["FLATCORR"] = (False, "Flat Corrected (True/False)")
    
    ### Coordinates info.
    obstime = Time(hdr_updated["JD"], format="jd") # observation time

    coords_icrs = SkyCoord(
        ra=hdr_updated["RA"]*u.deg,
        dec=hdr_updated["DEC"]*u.deg,
        frame='icrs'
        )
    coords_gcrs = coords_icrs.transform_to(GCRS(obstime=obstime))

    # Galactic coordinates
    gal = coords_icrs.galactic
    
    # Ecliptic coordinates
    ecl = coords_icrs.transform_to(GeocentricTrueEcliptic(equinox=obstime))
    
    # Solar elongation
    scoord_gcrs = get_sun(obstime)
    selong = coords_gcrs.separation(scoord_gcrs)

    # Lunar elongation
    mcoord_gcrs = get_body("Moon", obstime)
    lelong = coords_gcrs.separation(mcoord_gcrs)
    
    hdr_updated["SELONG"] = (selong.value, "Field Solar Elongation (deg)")
    hdr_updated["MELONG"] = (lelong.value, "Field Lunar Elongation (deg)")
    hdr_updated["ECLAT"]  = (ecl.lat.value, "Field Ecliptic Latitude (deg)")
    hdr_updated["ECLON"]  = (ecl.lon.value, "Field Ecliptic Longitude (deg)")
    hdr_updated["GXLAT"]  = (gal.l.value, "Field Galactic Latitude (deg)")
    hdr_updated["GXLON"]  = (gal.b.value, "Field Galactic Longitude (deg)")
    
    hdr_updated["HISTORY"] = f"({datetime.now().isoformat()}) Lv.0 header updated."
    
    hdul_updated = fits.PrimaryHDU(data = hdul[0].data, header=hdr_updated)
    
    hdul_updated.writeto(fpath_fits, overwrite=True)
    print(f"({datetime.now().isoformat()}) Lv.0 header updated: {fpath_fits}")

In [None]:
# process_lv0.py
from rasa_lcpy_lv0 import RasaLcpyLV0

def main():
    # initialize—with optional logfile
    lv0 = RasaLcpyLV0(log_file="rasa_pipeline.log")

    # update one file
    lv0.update_header("/data/raw/night1/image001.fits")

    # or a whole directory
    import glob
    for f in glob.glob("/data/raw/**/*.fits", recursive=True):
        lv0.update_header(f)

if __name__ == "__main__":
    main()


In [None]:
import logging
from pathlib import Path
from astropy.io import fits
from astropy.coordinates import Angle, SkyCoord, GCRS, GeocentricTrueEcliptic
from astropy.coordinates import get_body, get_sun
from astropy.time import Time
import astropy.units as u
from datetime import datetime

class RasaLcpyLV0:
    """
    Class for updating Level-0 (raw) FITS headers in the RASA lcpy KL4040 data pipeline.

    This encapsulates metadata verification, enrichment (coordinate transforms, elongations),
    and logging into a reusable component suitable for CLI or workflow integration.
    """

    def __init__(self, log_file: str = None):
        """
        Initialize logger. If log_file is provided, logs are written to that file as well as stdout.

        Parameters
        ----------
        log_file : str, optional
            Path to a log file to append processing records.
        """
        self.logger = logging.getLogger(self.__class__.__name__)
        self.logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
        self.logger.addHandler(handler)

        if log_file:
            file_h = logging.FileHandler(log_file)
            file_h.setFormatter(handler.formatter)
            self.logger.addHandler(file_h)

    def update_header(self, fpath_fits):
        """
        Open a raw FITS file, validate and update its header, then overwrite the file.

        Parameters
        ----------
        fpath_fits : str or pathlib.Path
            Path to the Level-0 FITS file to update.
        """
        fpath = Path(fpath_fits)
        if not fpath.exists():
            self.logger.error(f"File not found: {fpath}")
            return

        hdul = fits.open(fpath)
        hdr = hdul[0].header.copy()

        # Core metadata fields
        hdr.comments['LT']  = 'Local Time'
        hdr.comments['UTC'] = 'Universal Time Coordinated'
        hdr['EXPTIME']      = (float(hdr['EXPTIME']), 'Exposure Time (sec)')
        hdr['CCDTEMP']      = (float(hdr['CCDTEMP']), 'CCD Temperature (C)')
        hdr['PIXSZ']        = (float(hdr['PIXSZ']), 'Pixel Size (um)')
        hdr['JD']           = (float(hdr['JD']), 'Julian Date')
        hdr['DATE-OBS']     = (Time(hdr['JD'], format='jd').isot, 'Observation Datetime')
        hdr['MJD-OBS']      = (Time(hdr['JD'], format='jd').mjd, 'Modified Julian Date')
        hdr['FOCALLEN']     = (float(hdr['FOCALLEN']), 'Focal Length (mm)')
        hdr['APTDIA']       = (float(hdr['APTDIA']), 'Aperture Diameter (mm)')
        hdr['FOCUS']        = (int(hdr['FOCUS']), 'Focal Position')
        hdr['OBSERVER']     = (hdr['OBSERVER'].upper(), 'Observer')
        hdr['IMAGETYP']     = (hdr['IMAGETYP'].upper(), 'Image Type')

        # Telescope & Site
        for key, unit, comment in [('RA', 'hourangle', 'Telescope RA (deg)'),
                                   ('DEC', 'degree', 'Telescope Dec (deg)'),
                                   ('ALT', 'degree', 'Telescope Alt (deg)'),
                                   ('AZ', 'degree', 'Telescope Az (deg)')]:
            if isinstance(hdr[key], str):
                hdr[key] = (Angle(hdr[key], unit=unit).degree, comment)
        hdr['LON']   = (-119.4, 'Site Longitude (deg)')
        hdr['LAT']   = (37.07, 'Site Latitude (deg)')
        hdr['ELEV']  = (1.405, 'Site Elevation (km)')

        # Processing flags
        flags = ['BIASCORR', 'DARKCORR', 'FLATCORR']
        for fl in flags:
            hdr[fl] = (False, f'{fl.capitalize()} applied?')
        hdr['DATLEVEL']  = (0, 'Data Level')
        hdr['COMBINED']  = (False, 'Combined frames?')

        # Coordinate enrichments
        obstime = Time(hdr['JD'], format='jd')
        coords = SkyCoord(ra=hdr['RA']*u.deg, dec=hdr['DEC']*u.deg, frame='icrs')
        gcrs = coords.transform_to(GCRS(obstime=obstime))
        gal  = coords.galactic
        ecl  = coords.transform_to(GeocentricTrueEcliptic(equinox=obstime))

        # Elongations
        sun_gcrs  = get_sun(obstime)
        moon_gcrs = get_body('Moon', obstime)
        hdr['SELONG'] = (gcrs.separation(sun_gcrs).value, 'Solar elongation (deg)')
        hdr['MELONG'] = (gcrs.separation(moon_gcrs).value, 'Lunar elongation (deg)')

        # Galactic & ecliptic coords
        hdr['GXLAT'] = (gal.b.value, 'Galactic latitude (deg)')
        hdr['GXLON'] = (gal.l.value, 'Galactic longitude (deg)')
        hdr['ECLAT'] = (ecl.lat.value, 'Ecliptic latitude (deg)')
        hdr['ECLON'] = (ecl.lon.value, 'Ecliptic longitude (deg)')

        hdr['HISTORY'] = f"({datetime.now().isoformat()}) LV0 header updated."

        # Write back
        new_hdu = fits.PrimaryHDU(data=hdul[0].data, header=hdr)
        new_hdu.writeto(fpath, overwrite=True)
        self.logger.info(f"Updated LV0 header: {fpath}")


In [None]:
import logging
from pathlib import Path
from astropy.io import fits
from astropy.time import Time
from astropy.wcs import FITSFixedWarning
import astropy.units as u
from astropy.nddata import CCDData
from astropy.stats import mad_std
import ccdproc as ccdp
import numpy as np
import warnings
from datetime import datetime
from tqdm import tqdm
import _funcs as funcs

warnings.filterwarnings("ignore", category=FITSFixedWarning)

class RasaLcpyLV0:
    """
    Class for updating Level-0 (raw) FITS headers in the RASA lcpy KL4040 data pipeline.

    This encapsulates metadata verification, enrichment (coordinate transforms, elongations),
    and logging into a reusable component suitable for CLI or workflow integration.
    """

    def __init__(self, log_file: str = None):
        self.logger = logging.getLogger(self.__class__.__name__)
        self.logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
        self.logger.addHandler(handler)
        if log_file:
            file_h = logging.FileHandler(log_file)
            file_h.setFormatter(handler.formatter)
            self.logger.addHandler(file_h)

    def update_header(self, fpath_fits):
        fpath = Path(fpath_fits)
        if not fpath.exists():
            self.logger.error(f"File not found: {fpath}")
            return
        hdul = fits.open(fpath)
        hdr = hdul[0].header.copy()
        # [update logic as before...]
        new_hdu = fits.PrimaryHDU(data=hdul[0].data, header=hdr)
        new_hdu.writeto(fpath, overwrite=True)
        self.logger.info(f"Updated LV0 header: {fpath}")

class CombMaster:
    """
    Class to create master calibration frames (bias, dark, flat) for RASA lcpy KL4040.

    Methods
    -------
    make_mbias(bias_frames, master_dir)
        Combine bias frames into a master bias.
    make_mdark(dark_frames, master_dir, key_exptime='exptime')
        Subtract master bias and combine dark frames by exposure time.
    make_mflat(flat_frames, master_dir, filter_name, key_exptime='exptime')
        Subtract bias, optionally dark, and combine flat frames.
    """

    def __init__(self, log_file: str = None):
        self.logger = logging.getLogger(self.__class__.__name__)
        self.logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
        self.logger.addHandler(handler)
        if log_file:
            file_h = logging.FileHandler(log_file)
            file_h.setFormatter(handler.formatter)
            self.logger.addHandler(file_h)

    def make_mbias(self, bias_frames, master_dir):
        """
        Combine multiple bias frames into a master bias frame and save it.
        """
        self.logger.info("Starting master bias combination...")
        master_dir = Path(master_dir)
        master_dir.mkdir(parents=True, exist_ok=True)
        if not bias_frames:
            self.logger.warning("No bias frames provided.")
            return
        hdr0 = fits.getheader(bias_frames[0])
        obsdate = hdr0.get('OBSDATE', Time(hdr0['JD'], format='jd').to_value('iso'))
        out_name = master_dir / f"kl4040.bias.comb.{obsdate}.fits"
        mbias = ccdp.combine(
            bias_frames,
            method='median',
            sigma_clip=True,
            sigma_clip_low_thresh=5,
            sigma_clip_high_thresh=5,
            sigma_clip_func=np.ma.median,
            sigma_clip_dev_func=mad_std,
            mem_limit=500e6,
            dtype=np.float32
        )
        mbias.meta.update({
            'COMBINED': True,
            'NCOMBINE': len(bias_frames),
            'IMAGETYP': 'BIAS',
            'HISTORY': f"({datetime.now().isoformat()}) Combined {len(bias_frames)} bias frames." 
        })
        fits.PrimaryHDU(data=mbias.data, header=mbias.meta).writeto(out_name, overwrite=True)
        self.logger.info(f"Master bias saved to {out_name}")

    def make_mdark(self, dark_frames, master_dir, key_exptime='exptime'):
        """
        Create master dark frames by bias-subtracting and combining dark frames.
        """
        self.logger.info("Starting master dark creation...")
        master_dir = Path(master_dir)
        # find nearest master bias
        mbias_coll = ccdp.ImageFileCollection(master_dir, glob_include='*.fits').filter(imagetyp='BIAS')
        if not mbias_coll.files:
            self.logger.error("No master bias found.")
            return
        if not dark_frames:
            self.logger.warning("No dark frames provided.")
            return
        hdr0 = fits.getheader(dark_frames[0])
        obs_jd = hdr0['JD']
        # select closest bias
        jd_series = mbias_coll.summary['jd'].astype(float)
        idx = (abs(jd_series - obs_jd)).idxmin()
        bias_path = Path(mbias_coll.files_filtered(include_path=True)[idx])
        mbias = CCDData.read(bias_path, unit='adu')
        tmp = master_dir / 'tmp'
        funcs.clear_dir(tmp)
        tmp.mkdir(parents=True, exist_ok=True)
        # subtract bias
        for d in dark_frames:
            dark = CCDData.read(d, unit='adu')
            bdark = ccdp.subtract_bias(dark, mbias)
            bdark.meta['history'] = f"Bias: {bias_path.name}"
            bdark.write(tmp / d.name, overwrite=True)
        bdcoll = ccdp.ImageFileCollection(tmp, glob_include='*.fits')
        exposures = set(bdcoll.summary[key_exptime])
        for exp in exposures:
            group = [Path(f) for f in bdcoll.files_filtered(**{key_exptime:exp}, include_path=True)]
            out_dark = master_dir / f"kl4040.dark.{int(round(exp))}s.comb.{hdr0['OBSDATE']}.fits"
            mdark = ccdp.combine(group, method='median', sigma_clip=True,
                                 sigma_clip_low_thresh=5, sigma_clip_high_thresh=5,
                                 sigma_clip_func=np.ma.median, sigma_clip_dev_func=mad_std,
                                 mem_limit=500e6, dtype=np.float32)
            mdark.meta.update({'COMBINED': True,'NCOMBINE': len(group),'IMAGETYP':'DARK',
                               'history':f"Combined {len(group)} darks at {datetime.now().isoformat()}"})
            fits.PrimaryHDU(data=mdark.data, header=mdark.meta).writeto(out_dark, overwrite=True)
            self.logger.info(f"Master dark saved to {out_dark}")
        funcs.clear_dir(tmp)
        tmp.rmdir()
        self.logger.info("Temporary files cleaned.")

    def make_mflat(self, flat_frames, master_dir, filter_name, key_exptime='exptime'):
        """
        Create a master flat frame by subtracting bias (and dark) and combining flats.
        """
        self.logger.info(f"Starting master flat for filter {filter_name}...")
        master_dir = Path(master_dir)
        if not flat_frames:
            self.logger.warning("No flat frames provided.")
            return
        hdr0 = fits.getheader(flat_frames[0])
        obs_jd = float(hdr0['j_date'].split()[0])
        obsdate = Time(obs_jd, format='jd').to_datetime().strftime('%Y%m%d')
        # load bias
        mbias_coll = ccdp.ImageFileCollection(master_dir, glob_include='*.fits').filter(imagetyp='BIAS')
        jd_series = mbias_coll.summary['jd'].astype(float)
        idx = (abs(jd_series - obs_jd)).idxmin()
        mbias = CCDData.read(Path(mbias_coll.files_filtered(include_path=True)[idx]), unit='adu')
        tmp = master_dir / 'tmp'
        funcs.clear_dir(tmp)
        tmp.mkdir(parents=True, exist_ok=True)
        # subtract bias
        for f in flat_frames:
            flat = CCDData.read(f, unit='adu')
            bflat = ccdp.subtract_bias(flat, mbias)
            bflat.meta['history'] = f"Bias: {mbias.meta.get('object','') }"
            bflat.write(tmp / f.name, overwrite=True)
        bflist = list((ccdp.ImageFileCollection(tmp, glob_include='*.fits').files_filtered(include_path=True)))
        out_flat = master_dir / f"kl4040.flat.{filter_name}.{obsdate}.comb.fits"
        mflat = ccdp.combine([Path(p) for p in bflist], method='median', scale=funcs.inv_median,
                             sigma_clip=True, sigma_clip_low_thresh=5, sigma_clip_high_thresh=5,
                             sigma_clip_func=np.ma.median, sigma_clip_dev_func=mad_std,
                             mem_limit=500e6, dtype=np.float32)
        mflat.meta.update({'COMBINED': True,'FILTER':filter_name,'IMAGETYP':'Flat',
                           'history':f"Combined {len(bflist)} flats at {datetime.now().isoformat()}"})
        fits.PrimaryHDU(data=mflat.data, header=mflat.meta).writeto(out_flat, overwrite=True)
        funcs.clear_dir(tmp)
        tmp.rmdir()
        self.logger.info(f"Master flat saved to {out_flat}")


In [None]:
import logging
from pathlib import Path
from astropy.io import fits
from astropy.time import Time
from astropy.wcs import FITSFixedWarning
import astropy.units as u
from astropy.nddata import CCDData
from astropy.stats import mad_std
import ccdproc as ccdp
import numpy as np
import warnings
from datetime import datetime
from tqdm import tqdm
import _funcs as funcs

warnings.filterwarnings("ignore", category=FITSFixedWarning)

class CombMaster:
    """
    Class to create master calibration frames (bias, dark, flat) for RASA lcpy KL4040.

    Provides methods to combine raw calibration frames into master frames,
    with automated metadata handling, logging, and temporary file management.
    """

    def __init__(self, log_file: str = None):
        """
        Initialize CombMaster with optional file logging.

        Parameters
        ----------
        log_file : str, optional
            Path to a log file where processing messages are appended. If None,
            logging will only be output to stdout.
        """
        self.logger = logging.getLogger(self.__class__.__name__)
        self.logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
        self.logger.addHandler(handler)
        if log_file:
            file_h = logging.FileHandler(log_file)
            file_h.setFormatter(handler.formatter)
            self.logger.addHandler(file_h)

    def make_mbias(self, bias_frames, master_dir):
        """
        Combine multiple bias frames into a single master bias frame.

        Parameters
        ----------
        bias_frames : list of str or pathlib.Path
            Paths to individual raw bias FITS frames to combine.
        master_dir : str or pathlib.Path
            Directory where the master bias file will be saved. Created if it does not exist.

        Returns
        -------
        pathlib.Path or None
            Path to the generated master bias FITS file, or None if no bias_frames supplied.

        Side Effects
        ------------
        - Writes a FITS file named 'kl4040.bias.comb.<obsdate>.fits' in master_dir.
        - Logs progress and any errors.
        """
        self.logger.info("Starting master bias combination...")
        master_dir = Path(master_dir)
        master_dir.mkdir(parents=True, exist_ok=True)
        if not bias_frames:
            self.logger.warning("No bias frames provided.")
            return None
        hdr0 = fits.getheader(bias_frames[0])
        obsdate = hdr0.get('OBSDATE', Time(hdr0['JD'], format='jd').to_value('iso'))
        out_name = master_dir / f"kl4040.bias.comb.{obsdate}.fits"
        mbias = ccdp.combine(
            bias_frames,
            method='median',
            sigma_clip=True,
            sigma_clip_low_thresh=5,
            sigma_clip_high_thresh=5,
            sigma_clip_func=np.ma.median,
            sigma_clip_dev_func=mad_std,
            mem_limit=500e6,
            dtype=np.float32
        )
        mbias.meta.update({
            'COMBINED': True,
            'NCOMBINE': len(bias_frames),
            'IMAGETYP': 'BIAS',
            'HISTORY': f"({datetime.now().isoformat()}) Combined {len(bias_frames)} bias frames." 
        })
        fits.PrimaryHDU(data=mbias.data, header=mbias.meta).writeto(out_name, overwrite=True)
        self.logger.info(f"Master bias saved to {out_name}")
        return out_name

    def make_mdark(self, dark_frames, master_dir, key_exptime='exptime'):
        """
        Create master dark frames by subtracting a master bias and combining per exposure time.

        Parameters
        ----------
        dark_frames : list of str or pathlib.Path
            Paths to raw dark FITS frames for processing.
        master_dir : str or pathlib.Path
            Directory containing master bias and where master dark files will be saved.
        key_exptime : str, optional
            Header keyword for exposure time in dark frames (default 'exptime').

        Returns
        -------
        list of pathlib.Path
            Paths to generated master dark FITS files for each unique exposure time.

        Side Effects
        ------------
        - Reads master bias from master_dir (closest in JD to first dark frame).
        - Writes master dark FITS files named 'kl4040.dark.<exp>s.comb.<obsdate>.fits'.
        - Cleans up temporary bias-subtracted dark files.
        - Logs progress and errors.
        """
        self.logger.info("Starting master dark creation...")
        master_dir = Path(master_dir)
        mbias_coll = ccdp.ImageFileCollection(master_dir, glob_include='*.fits').filter(imagetyp='BIAS')
        if not mbias_coll.files:
            self.logger.error("No master bias found.")
            return []
        if not dark_frames:
            self.logger.warning("No dark frames provided.")
            return []
        hdr0 = fits.getheader(dark_frames[0])
        obs_jd = hdr0['JD']
        jd_series = mbias_coll.summary['jd'].astype(float)
        idx = (abs(jd_series - obs_jd)).idxmin()
        bias_path = Path(mbias_coll.files_filtered(include_path=True)[idx])
        mbias = CCDData.read(bias_path, unit='adu')
        tmp = master_dir / 'tmp'
        funcs.clear_dir(tmp)
        tmp.mkdir(parents=True, exist_ok=True)
        # Subtract bias
        for d in dark_frames:
            dark = CCDData.read(d, unit='adu')
            bdark = ccdp.subtract_bias(dark, mbias)
            bdark.meta['history'] = f"Bias: {bias_path.name}"
            bdark.write(tmp / Path(d).name, overwrite=True)
        bdcoll = ccdp.ImageFileCollection(tmp, glob_include='*.fits')
        exposures = set(bdcoll.summary[key_exptime])
        out_files = []
        for exp in exposures:
            group = [Path(f) for f in bdcoll.files_filtered(**{key_exptime:exp}, include_path=True)]
            out_dark = master_dir / f"kl4040.dark.{int(round(exp))}s.comb.{hdr0['OBSDATE']}.fits"
            mdark = ccdp.combine(group, method='median', sigma_clip=True,
                                 sigma_clip_low_thresh=5, sigma_clip_high_thresh=5,
                                 sigma_clip_func=np.ma.median, sigma_clip_dev_func=mad_std,
                                 mem_limit=500e6, dtype=np.float32)
            mdark.meta.update({'COMBINED': True,'NCOMBINE': len(group),'IMAGETYP':'DARK',
                               'HISTORY':f"Combined {len(group)} darks at {datetime.now().isoformat()}"})
            fits.PrimaryHDU(data=mdark.data, header=mdark.meta).writeto(out_dark, overwrite=True)
            self.logger.info(f"Master dark saved to {out_dark}")
            out_files.append(out_dark)
        funcs.clear_dir(tmp)
        tmp.rmdir()
        self.logger.info("Temporary files cleaned.")
        return out_files

    def make_mflat(self, flat_frames, master_dir, filter_name, key_exptime='exptime'):
        """
        Generate a master flat frame by subtracting bias (and optionally dark) and combining flats.

        Parameters
        ----------
        flat_frames : list of str or pathlib.Path
            Paths to raw flat FITS frames for processing.
        master_dir : str or pathlib.Path
            Directory containing master bias/dark and where the master flat is saved.
        filter_name : str
            Filter identifier (e.g., 'R', 'G', 'B') used in naming the output.
        key_exptime : str, optional
            Header keyword for exposure time in flat frames (default 'exptime').

        Returns
        -------
        pathlib.Path or None
            Path to the generated master flat FITS file, or None if failure.

        Side Effects
        ------------
        - Reads master bias from master_dir (closest in JD to first flat frame).
        - Writes master flat named 'kl4040.flat.<filter>.<obsdate>.comb.fits'.
        - Cleans up any temporary files.
        - Logs all steps.
        """
        self.logger.info(f"Starting master flat for filter {filter_name}...")
        master_dir = Path(master_dir)
        if not flat_frames:
            self.logger.warning("No flat frames provided.")
            return None
        hdr0 = fits.getheader(flat_frames[0])
        obs_jd = float(hdr0['j_date'].split()[0])
        obsdate = Time(obs_jd, format='jd').to_datetime().strftime('%Y%m%d')
        mbias_coll = ccdp.ImageFileCollection(master_dir, glob_include='*.fits').filter(imagetyp='BIAS')
        jd_series = mbias_coll.summary['jd'].astype(float)
        idx = (abs(jd_series - obs_jd)).idxmin()
        mbias = CCDData.read(Path(mbias_coll.files_filtered(include_path=True)[idx]), unit='adu')
        tmp = master_dir / 'tmp'
        funcs.clear_dir(tmp)
        tmp.mkdir(parents=True, exist_ok=True)
        # Subtract bias
        for f in flat_frames:
            flat = CCDData.read(f, unit='adu')
            bflat = ccdp.subtract_bias(flat, mbias)
            bflat.meta['history'] = f"Bias: {mbias.meta.get('object','')}"
            bflat.write(tmp / Path(f).name, overwrite=True)
        bflist = list((ccdp.ImageFileCollection(tmp, glob_include='*.fits').files_filtered(include_path=True)))
        out_flat = master_dir / f"kl4040.flat.{filter_name}.{obsdate}.comb.fits"
        mflat = ccdp.combine([Path(p) for p in bflist], method='median', scale=funcs.inv_median,
                             sigma_clip=True, sigma_clip_low_thresh=5, sigma_clip_high_thresh=5,
                             sigma_clip_func=np.ma.median, sigma_clip_dev_func=mad_std,
                             mem_limit=500e6, dtype=np.float32)
        mflat.meta.update({'COMBINED': True,'FILTER':filter_name,'IMAGETYP':'FLAT',
                           'HISTORY':f"Combined {len(bflist)} flats at {datetime.now().isoformat()}"})
        fits.PrimaryHDU(data=mflat.data, header=mflat.meta).writeto(out_flat, overwrite=True)
        funcs.clear_dir(tmp)
        tmp.rmdir()
        self.logger.info(f"Master flat saved to {out_flat}")
        return out_flat
