In [1]:
import os
import re
import numpy as np
import numpy.ma as ma
import pandas as pd
import copy
from datetime import datetime
from scipy.ndimage import convolve1d
from kpfpipe.models.level1 import KPF1
from modules.Utils.utils import DummyLogger, styled_text
from modules.Utils.kpf_parse import HeaderParse, get_datetime_obsid, get_kpf_level, get_data_products_expected
from modules.Utils.kpf_parse import get_data_products_L0, get_data_products_L1
from modules.quicklook.src.analyze_guider import AnalyzeGuider
from modules.quicklook.src.analyze_2d import Analyze2D
from modules.quicklook.src.analyze_l1 import AnalyzeL1
from modules.calibration_lookup.src.alg import GetCalibrations
from modules.Utils.kpf_parse import HeaderParse, get_data_products_L2
from kpfpipe.models.level2 import KPF2

In [2]:
class AnalyzeL2:

    """
    Description:
        This class contains functions to analyze L2 spectra (storing them
        as attributes) and functions to plot the results.

    Arguments:
        L2 - an L2 object

    Attributes:
        name - name of source (e.g., 'Bias', 'Etalon', '185144')
        ObsID - observation  ID (e.g. 'KP.20230704.02326.27')
        header - header of the PRIMARY extension of the L2 object
        rv_header - header of the RV extension
    
    To do:
        Add plot showing combined CCF - https://github.com/Keck-DataReductionPipelines/KPF-Pipeline/issues/940
        Add plot showing correlations between per-order RVs and per-chip RVs and overall RVs.
    """

    def __init__(self, L2, logger=None):
        if logger:
            self.logger = logger
            self.logger.debug('Initializing AnalyzeL2 object')
        else:
            self.logger = None
        self.L2 = copy.deepcopy(L2)
        self.df_RV = self.L2['RV']
        self.n_green_orders = 35
        self.n_red_orders   = 32
        primary_header = HeaderParse(L2, 'PRIMARY')
        self.header = primary_header.header
        self.name = primary_header.get_name()
        if primary_header.get_name(use_star_names=False) in ['Star', 'Sun']:
            self.is_star = True
        else:
            self.is_star = False
        self.ObsID = primary_header.get_obsid()
        self.rv_header = HeaderParse(L2, 'RV').header
        self.df_RVs = self.L2['RV'] # Table of RVs per order and orderlet
        self.data_products = get_data_products_L2(self.L2)
        self.green_present = 'Green' in self.data_products
        self.red_present = 'Red' in self.data_products
        self.texp = self.header['ELAPSED']

        self.compute_statistics()
        
        
    def compute_statistics(self):
        """
        Compute various metrics of dispersion of the per-order BJD values
        """
        # compute weighted Barycentric RV correction
        x = self.df_RV['Bary_RVC']
        w = self.df_RV['CCF Weights']
        self.CCFBCV = np.sum(w * x) / np.sum(w)

        # compute weighted BJD (this should be computed elsewhere and read from the L2 header)
        x = self.df_RV['CCFBJD']
        w = self.df_RV['CCF Weights']
        self.CCFBJD = np.sum(w * x) / np.sum(w)

        # compute per-order BJD differences
        self.df_RV['Delta_CCFBJD'] = self.df_RV['CCFBJD'].copy()
        self.df_RV['Delta_CCFBJD'] -= self.CCFBJD
        #    compute weighted standard deviation
        x = self.df_RV['Delta_CCFBJD']
        w = self.df_RV['CCF Weights']
        nonzero_mask = w != 0
        wmean = np.sum(w * x) / np.sum(w)
        var_pop = np.sum(w * (x - wmean)**2) / np.sum(w) # weighted variance
        self.Delta_CCFBJD_weighted_std = np.sqrt(var_pop) * 24*60*60  # seconds
        self.Delta_CCFBJD_weighted_range = (x[nonzero_mask].max() - x[nonzero_mask].min()) * 24*60*60  # seconds

        # compute per-order Barycentric RV differences
        self.df_RV['Delta_Bary_RVC'] = self.df_RV['Bary_RVC'].copy()
        self.df_RV['Delta_Bary_RVC'] -= self.CCFBCV
        #    compute weighted standard deviation
        x = self.df_RV['Delta_Bary_RVC']
        wmean = np.sum(w * x) / np.sum(w)
        var_pop = np.sum(w * (x - wmean)**2) / np.sum(w) # weighted variance
        self.Delta_Bary_RVC_weighted_std = np.sqrt(var_pop) * 1000 # m/s
        self.Delta_Bary_RVC_weighted_range = (x[nonzero_mask].max() - x[nonzero_mask].min()) * 1000 # m/s
        # compute per-order percent change difference from weighted Barycentric RV correction
        if self.CCFBCV != 0:
            self.df_RV['Perc_Delta_Bary_RVC'] = (self.df_RV['Delta_Bary_RVC'].copy() / self.CCFBCV) * 100 # percent
        else:
            self.df_RV['Perc_Delta_Bary_RVC'] = self.df_RV['Delta_Bary_RVC'].copy() * 0 # just set to zero
        # compute maximum and minimum percent change difference (for only orders with nonzero weights)
        x = self.df_RV['Perc_Delta_Bary_RVC']
        self.Max_Perc_Delta_Bary_RV = x[nonzero_mask].max()
        self.Min_Perc_Delta_Bary_RV = x[nonzero_mask].min()

In [3]:
"""
This module contains classes for KPF data quality control (QC).  Various QC metrics are defined in
class QCDefinitions.  Other classes QCL0, QC2D, QCL1, and QCL2 contain methods to compute QC values,
which are with the QC metrics, for specific data products, and then store them in the primary header
of the corresponding KPF object (which will be saved to a FITS file).  Normally QC values are stored
headers, but storage in the KPF pipeline-operations database may be set up later by the database
administrator, depending upon the special requirements for some QC metrics.
"""

iam = 'quality_control'
version = '1.3'

"""
The following are methods common across data levels, which are given at the beginning
of this module, before the QC classes are defined.

Includes helper functions that compute statistics of data of arbitrary shape.
"""

#####################################
# Module helper functions.
#####################################

def what_am_i():
    print('Software version:',iam + ' ' + version)

def compute_clip_corr(n_sigma):

    """
    Compute a correction factor to properly reinflate the variance after it is
    naturally diminished via data-clipping.  Employ a simple Monte Carlo method
    and standard normal deviates to simulate the data-clipping and obtain the
    correction factor.
    """

    var_trials = []
    for x in range(0,10):
        a = np.random.normal(0.0, 1.0, 1000000)
        med = np.median(a, axis=0)
        p16 = np.percentile(a, 16, axis=0)
        p84 = np.percentile(a, 84, axis=0)
        sigma = 0.5 * (p84 - p16)
        mdmsg = med - n_sigma * sigma
        b = np.less(a,mdmsg)
        mdpsg = med + n_sigma * sigma
        c = np.greater(a,mdpsg)
        mask = np.any([b,c],axis=0)
        mx = ma.masked_array(a, mask)
        var = ma.getdata(mx.var(axis=0))
        var_trials.append(var)

    np_var_trials = np.array(var_trials)
    avg_var_trials = np.mean(np_var_trials)
    std_var_trials = np.std(np_var_trials)
    corr_fact = 1.0 / avg_var_trials

    return corr_fact

def avg_data_with_clipping(data_array,n_sigma = 3.0):

    """
    Statistics with outlier rejection (n-sigma data-trimming), ignoring NaNs, across all data array dimensions.
    """

    cf = compute_clip_corr(n_sigma)
    sqrtcf = np.sqrt(cf)

    a = np.array(data_array)

    med = np.nanmedian(a)
    p16 = np.nanpercentile(a,16)
    p84 = np.nanpercentile(a,84)
    sigma = 0.5 * (p84 - p16)
    mdmsg = med - n_sigma * sigma
    b = np.less(a,mdmsg)
    mdpsg = med + n_sigma * sigma
    c = np.greater(a,mdpsg)
    d = np.where(np.isnan(a),True,False)
    mask = b | c | d
    mx = ma.masked_array(a, mask)
    avg = ma.getdata(mx.mean())
    std = ma.getdata(mx.std()) * sqrtcf
    cnt = ma.getdata(mx.count())

    return avg,std,cnt


def check_all_qc_keywords(kpf_object,fname,input_master_type='all',logger=None):

    """
    Method to check all QC keywords in PRIMARY header of FITS object.

    Agnostic of data level; checks all QC keywords in PRIMARY header
    that have assigned value for qc_definitions.fits_keyword_fail_value[dict_key]
    (which are not None).  Failure is declared only for the relevant master type.
    Currently only integer fail_values are handled.

    Returns:
        qc_fail - a boolean signifying that the QC failed (True) for at least one of the QC keywords or not (False).
    """

    logger = logger if logger is not None else DummyLogger()

    qc_fail = False

    qc_definitions = QCDefinitions()

    dict_keys_list = qc_definitions.fits_keywords.keys()

    for dict_key in dict_keys_list:

        kw = qc_definitions.fits_keywords[dict_key]
        master_types = qc_definitions.master_types[dict_key]

        try:
            fail_value = qc_definitions.fits_keyword_fail_value[dict_key]
        except:
            continue

        if fail_value is None:
            continue

        try:
            kw_value = kpf_object.header['PRIMARY'][kw]
            if kw_value == fail_value:
                logger.debug('--------->quality_control: check_all_qc_keywords: fname,kw,kw_value,fail_value = {},{},{},{}'.format(fname,kw,kw_value,fail_value))
                for master_type in master_types:
                    if input_master_type.lower() == master_type.lower() or master_type.lower() == 'all' or input_master_type.lower() == 'all':
                        qc_fail = True
                        break

                return qc_fail

        except KeyError as err:
            continue

    return qc_fail


def execute_all_QCs(kpf_object, data_level, logger=None):
    """
    Method to loop over all QC tests for the data level of the input KPF object
    (an L0, 2D, L1, or L2 object).  This method is useful for testing (e.g.,
    in a Jupyter Notebook).  To run the QCs in a recipe, use methods in
    quality_control_framework.py

    Args:
        kpf_object - a KPF object (L0, 2D, L1, or L2)
        data_type -

    Attributes:
        None

    Returns:
        kpf_object - the input kpf_object with QC keywords added
    """

    logger = logger if logger is not None else DummyLogger()

    #data_level = get_kpf_level(kpf_object)

    # Define QC object
    if data_level == 'L0':
        qc_obj = QCL0(kpf_object)
    elif data_level == '2D':
        qc_obj = QC2D(kpf_object)
    elif data_level == 'L1':
        qc_obj = QCL1(kpf_object)
    elif data_level == 'L2':
        qc_obj = QCL2(kpf_object)
    else:
        print('data_level is not L0, 2D, L1, or L2.  Exiting.')

    if data_level != None:

        # Get a list of QC method names appropriate for the data level
        qc_names = []
        for qc_name in qc_obj.qcdefinitions.names:
            if data_level in qc_obj.qcdefinitions.kpf_data_levels[qc_name]:
                qc_names.append(qc_name)

        # Run the QC tests and add result keyword to header
        primary_header = HeaderParse(kpf_object, 'PRIMARY')
        is_good = 1
        this_spectrum_type = primary_header.get_name(use_star_names=False)
        logger.info(f'Spectrum type: {this_spectrum_type}')
        for qc_name in qc_names:
            try:
                spectrum_types = qc_obj.qcdefinitions.spectrum_types[qc_name]
                if (this_spectrum_type in spectrum_types) or ('all' in spectrum_types):
                    if len(qc_obj.qcdefinitions.required_data_products[qc_name]) == 0:
                        all_required_data_products_present = True
                    else:
                        data_products_expected = get_data_products_expected(kpf_object, data_level)
                        data_products_required = qc_obj.qcdefinitions.required_data_products[qc_name]
                        all_required_data_products_present = all(element in data_products_expected for element in data_products_required)
                    if all_required_data_products_present:
                        text_running_qc = styled_text('Running QC', style="Bold", color="Magenta")
                        text_qc_name = styled_text(qc_name, style="Bold", color="Blue")
                        text_qc_keyword = styled_text(qc_obj.qcdefinitions.fits_keywords[qc_name], style="Bold", color="Blue")
                        logger.info(f'{text_running_qc}: {text_qc_name} ({text_qc_keyword}; {qc_obj.qcdefinitions.descriptions[qc_name]})')
                        method = getattr(qc_obj, qc_name) # get method with the name 'qc_name'
                        qc_value = method() # evaluate method
                        if qc_value == True:
                            text_qc_value = styled_text(qc_value, style="Bold", color="Green")
                        elif qc_value == False:
                            text_qc_value = styled_text(qc_value, style="Bold", color="Red")
                            is_good = 0
                        if qc_obj.qcdefinitions.fits_keywords[qc_name] == 'KPFERA':
                            logger.info(f'Result: {styled_text("KPFERA", style="Bold", color="Blue")}={styled_text(qc_value, style="Bold")}')
                        else:
                            logger.info(f'QC result: {text_qc_value} (True = pass)')
                        qc_obj.add_qc_keyword_to_header(qc_name, qc_value)
                    else:
                        logger.info(f'Not running QC: {qc_name} ({qc_obj.qcdefinitions.descriptions[qc_name]}) because {data_products_required} not in list of expected data products({data_products_expected})')
                else:
                    logger.info(f'Not running QC: {qc_name} ({qc_obj.qcdefinitions.descriptions[qc_name]}) because {this_spectrum_type} not in list of spectrum types: {spectrum_types}')

            except KeyError as e:
                logger.info(f"KeyError: {e}")
                pass

            except AttributeError as e:
                logger.info(f'Method {qc_name} does not exist in qc_obj or another AttributeError occurred: {e}')
                pass

            except Exception as e:
                logger.info(f'An error occurred when executing {qc_name}:', str(e))
                pass

        kpf_object.header['PRIMARY']['ISGOOD'] = (is_good, "QC: all other QC tests passed")

    return kpf_object


def check_all_QC_keywords_present(kpf_object, logger=None):
    """
    Method to determine if all QC tests have been run on the input kpf_object
    by examining it's keywords.  The method determines the data_level for
    kpf_object and checks for keywords of that level and lower, e.g., for
    data_level = 'L1', the method checks for keywords in levels 'L0', '2D',
    and 'L1'.

    Args:
        kpf_object - a KPF object (L0, 2D, L1, or L2)
        logger - Python logger object; if None, the DummyLogger is used

    Returns:
        kpf_object - the input kpf_object with QC keywords added
    """

    logger = logger if logger is not None else DummyLogger()
    data_level = get_kpf_level(kpf_object)
    primary_header = HeaderParse(kpf_object, 'PRIMARY')
    this_spectrum_type = primary_header.get_name(use_star_names=False)

    if data_level == 'L0':
        data_levels = data_levels = ['L0']
    if data_level == '2D':
        data_levels = data_levels = ['L0', '2D']
    if data_level == 'L1':
        data_levels = data_levels = ['L0', '2D', 'L1']
    if data_level == 'L2':
        data_levels = data_levels = ['L0', '2D', 'L1', 'L2']

#####################################################################

class QCDefinitions:

    """
    Description:
        This class defines QC metrics in a standard format.
        Dictionaries are used to associate unique metric names with various metric metadata.
        Modify this class to add new metrics.  Do not remove any metrics (we deprecate metrics
        simply by not using them any more).  When adding metrics to this class, ensure the length
        of the names list is equal to the number of dictionary entries.

    Class Attributes:
        names (list of strings): Each element is a unique and descriptive name for the metric.  No spaces allowed.
        descriptions (dictionary of strings): Each dictionary entry specifies a short description of the metric
            Try to keep it under 50 characters for brevity (this is not enforced but recommended).
        kpf_data_levels (dictionary of lists of strings): Each entry specifies the set of KPF data levels for the test.
            Possible values in the list: 'L0', '2D', 'L1', 'L2'
        data_types (dictionary of strings): Each entry specifies the Python data type of the metric.
            Only string, int, float are allowed.  Use 0/1 for boolean.
        spectrum_types (dictionary of arrays of strings): Each entry specifies the types of spectra that the metric will be applied to.
            Possible strings in array: 'all', 'Bias', 'Dark', 'Flat', 'Wide Flat', 'LFC', 'Etalon', 'ThAr', 'UNe', 'Sun', 'Star', <starname>
        master_types (dictionary of arrays of strings): Each entry specifies the types of masters where the QC check is relevant.  If the QC fails for an exposure, it is not added to the master stack.
            Possible strings in array: 'all', 'Bias', 'Dark', 'Flat', 'Wide Flat', 'LFC', 'Etalon', 'ThAr', 'UNe'
        required_data_products (dictionary of arrays of strings): specifies if data products are needed to perform check
            if = [], then no required data products; other possible values are from get_data_products_L0, etc.
        fits_keywords (dictionary of strings): Each entry specifies the FITS-header keyword for the metric.
            Must be 8 characters or less, following the FITS standard.
        fits_comments (dictionary of strings): Each entry specifies the FITS-header comment for the metric.
            Must be a short string for brevity (say, under 35 characters), following the FITS standard.
        db_columns (dictionary of strings): Each entry specifies either database_table.column if applicable,
            or None if not.
    """

    def __init__(self, logger=None):

        self.logger = logger if logger is not None else DummyLogger()

        self.names = []
        self.descriptions = {}
        self.kpf_data_levels = {}
        self.data_types = {}
        self.spectrum_types = {}
        self.master_types = {} # if = [], then the QC test is not relevant for the construction of any masters
        self.required_data_products = {} # if = [], then no required data products; other possible values: Green, Red, CaHK, ExpMeter, Guider, Telemetry, Config, Receipt, Pyrheliometer
        self.fits_keywords = {}
        self.fits_comments = {}
        self.db_columns = {}
        self.fits_keyword_fail_value = {}

        # Define QC metrics
        name1 = 'not_junk'
        self.names.append(name1)
        self.descriptions[name1] = 'File is not in list of junk files.'
        self.kpf_data_levels[name1] = ['L0', '2D', 'L1', 'L2']
        self.data_types[name1] = 'int'
        self.spectrum_types[name1] = ['all', ] # Need trailing comma to make list hashable
        self.master_types[name1] = ['all', ]
        self.required_data_products[name1] = [] # no required data products
        self.fits_keywords[name1] = 'NOTJUNK'
        self.fits_comments[name1] = 'QC: Not in list of junk files'
        self.db_columns[name1] = None
        self.fits_keyword_fail_value[name1] = 0

        name36 = 'L2_barycentric_rv_percent_change'
        self.names.append(name36)
        self.kpf_data_levels[name36] = ['L2']
        self.descriptions[name36] = 'Check non-zero-weight spectral orders BCV percent changes from weighted average are within an acceptable range.'
        self.data_types[name36] = 'int'
        self.spectrum_types[name36] = ['all', ]
        self.master_types[name36] = []
        self.required_data_products[name36] = [] # no required data products
        self.fits_keywords[name36] = 'PCBCV'
        self.fits_comments[name36] = 'QC: PCBCV values within acceptable range'
        self.db_columns[name36] = None
        self.fits_keyword_fail_value[name36] = 0

        # Integrity checks
        if len(self.names) != len(self.kpf_data_levels):
            raise ValueError("Length of kpf_data_levels list does not equal number of entries in descriptions dictionary.")

        if len(self.names) != len(self.descriptions):
            raise ValueError("Length of names list does not equal number of entries in descriptions dictionary.")

        if len(self.names) != len(self.data_types):
            raise ValueError("Length of data_types list does not equal number of entries in data_types dictionary.")

        if len(self.names) != len(self.spectrum_types):
            raise ValueError("Length of spectrum_types list does not equal number of entries in data_types dictionary.")

        if len(self.names) != len(self.fits_keywords):
            raise ValueError("Length of fits_keywords list does not equal number of entries in fits_keywords dictionary.")

        if len(self.names) != len(self.fits_comments):
            raise ValueError("Length of fits_comments list does not equal number of entries in fits_comments dictionary.")

        if len(self.names) != len(self.db_columns):
            raise ValueError("Length of db_columns list does not equal number of entries in db_columns dictionary.")

        keys_list = self.data_types.keys()
        for key in keys_list:
            dt = self.data_types[key]
            if dt not in ['string','int','float']:
                err_str = "Error in data type: " + dt
                raise ValueError(err_str)


    def list_qc_metrics(self):
        """
        Method to print a formatted block of the available QC checks and their
        characteristics, sorted by the data level that the QC check accepts.
        """
        qc_names = self.names

        for data_level in ['L0', '2D', 'L1', 'L2']:
            print(styled_text(f"Quality Control tests for {data_level}:", style="Bold"))
            for qc_name in qc_names:

                kpf_data_levels = self.kpf_data_levels[qc_name]
                data_type = self.data_types[qc_name]
                spectrum_types = self.spectrum_types[qc_name]
                master_types = self.master_types[qc_name]
                required_data_products = self.required_data_products[qc_name]
                keyword = self.fits_keywords[qc_name]
                keyword_fail_value = self.fits_keyword_fail_value[qc_name]
                comment = self.fits_comments[qc_name]
                db_column = self.db_columns[qc_name]
                description = self.descriptions[qc_name]

                if data_level in self.kpf_data_levels[qc_name]:
                    print('   ' + styled_text("Name: ", style="Bold") + styled_text(qc_name, style="Bold", color="Blue"))
                    print('      ' + styled_text("Description: ", style="Bold") + description)
                    print('      ' + styled_text("Date levels: ", style="Bold") + str(kpf_data_levels))
                    print('      ' + styled_text("Date type: ", style="Bold") + data_type)
                    print('      ' + styled_text("Required data products: ", style="Bold") + str(required_data_products))
                    print('      ' + styled_text("Spectrum types (applied to): ", style="Bold") + str(spectrum_types))
                    print('      ' + styled_text("Master types (applied to): ", style="Bold") + str(master_types))
                    print('      ' + styled_text("Keyword: ", style="Bold") + styled_text(keyword, style="Bold", color='Blue'))
                    print('      ' + styled_text("Keyword fail value: ", style="Bold") + str(keyword_fail_value))
                    print('      ' + styled_text("Comment: ", style="Bold") + comment)
                    print('      ' + styled_text("Database column: ", style="Bold") + str(db_column))
                    print()

    def search_for_QC_keywords_in_files(self):
        """
        This method checks if each QC keyword is listed in two places and
        prints the results with green and red highlighting.  The two places
        are: 1) .yaml plot configuration files for the time series database,
        2) .csv files that define the time series database structure, and xxx.
        It is best used in an interactive environment, e.g., in a Jupyter
        notebook.
        """

        cases = ['plots', 'database']

        for case in cases:

            if case == 'plots':
                search_directory = '/code/KPF-Pipeline/static/tsdb_plot_configs/'
                file_ext = '.yaml'
            if case == 'database':
                search_directory = '/code/KPF-Pipeline/static/tsdb_keywords/'
                file_ext = '.csv'

            print(styled_text(f"Searching for *{file_ext} files in {search_directory} for QC keywords.", style="Bold"))
            for name in self.names:
                fits_kwd = self.fits_keywords.get(name, "")
                if not fits_kwd:
                    print(f"Warning: No search string found for '{name}'")
                    continue
                found_occurrence = False
                for root, dirs, files in os.walk(search_directory):
                    for file_name in files:
                        if file_name.endswith(file_ext):
                            full_path = os.path.join(root, file_name)
                            # Read the file contents and check for the string
                            with open(full_path, 'r', encoding='utf-8') as f:
                                content = f.read()
                                if fits_kwd in content:
                                    found_occurrence = True
                                    print(styled_text(f"Found ", color="Green") + styled_text(f"'{fits_kwd}' from '{name}'", style="Bold", color="Green") + styled_text(f" in: {full_path}", color="Green"))
                if not found_occurrence:
                    print(styled_text(f"No occurrence of ", color="Red") + styled_text(f"'{name}' => '{fits_kwd}'", style="Bold", color="Red") + styled_text(f" found in any {file_ext} file.", color="Red"))
            print()


#####################################################################
#
# Superclass QC is normally not to be called directly (although it is not an abstract class, per se).
#

class QC:

    """
    Description:
        This superclass defines QC functions in general and has common methods across
        subclasses QCL0, QC2D, QCL1, and QCL2.  It also includes QC checks that apply
        to all data levels.

    Class Attributes:
        kpf_object: Returned from function KPF0.from_fits(fits_filename,data_type),
            which is wrapped by function read_fits in this module.
        qcdefinitions (QCDefinitions object): Returned from constructor of QCDefinitions class.

    """

    def __init__(self, kpf_object, logger=None):
        self.kpf_object = kpf_object
        self.qcdefinitions = QCDefinitions()
        self.logger = logger if logger is not None else DummyLogger()


    def add_qc_keyword_to_header(self, qc_name, value, debug=False):

        if (str(type(value)) == "<class 'bool'>") or (str(type(value)) == "<class 'numpy.bool'>"):
            if value == True:
                value = 1
            else:
                value = 0

        keyword = self.qcdefinitions.fits_keywords[qc_name]
        comment = self.qcdefinitions.fits_comments[qc_name]

        self.kpf_object.header['PRIMARY'][keyword] = (value,comment)
        if debug:
            print('---->add_qc_keyword_to_header: qc_name, keyword, value, comment = {}, {}, {}, {}'.format(qc_name,keyword,value,comment))


    def not_junk(self, junk_ObsIDs_csv='/data/reference/Junk_Observations_for_KPF.csv', debug=False):
        """
        This Quality Control method can be used in any of the data levels (L0/2D/L1/L2)
        so it is included in the superclass.
        It checks if the obsID of the input is in the list of junked files.

        Args:
             kpfobs - a KPF L0/2D/L1/L2 object
             junk_ObsIDs_csv - a CSV with ObsIDs in the first column
                               and a column header of 'observation_id'.
                               That is, the first few lines of the file will look like this:
                                   observation_id
                                   KP.20230621.27498.77
                                   KP.20230621.27611.73
                                   KP.20220516.57354.11

             debug - an optional flag.  If True, verbose output will be printed.

         Returns:
             QC_pass - a boolean signifying that the input(s) are not junk (i.e., = False if junk)
        """

        QC_pass = True  # Assume not junk unless explicitly listed in junk_ObsIDs_csv

        try:
            filename = self.kpf_object.header['PRIMARY']['OFNAME'] # 'KP.20231129.11266.37.fits' / Filename of output file
        except:
            filename = 'this file'
        obsID = filename[:20]

        # read list of junk files
        if os.path.exists(junk_ObsIDs_csv):
            df_junk = pd.read_csv(junk_ObsIDs_csv)
            if debug:
                self.logger.info(f'Read the junk file {junk_ObsIDs_csv}.')
        else:
            self.logger.info(f"The file {junk_ObsIDs_csv} does not exist.")
            return QC_pass

        QC_pass = not (df_junk['observation_id'].isin([obsID])).any()
        if debug:
            self.logger.info(f'{filename} is a Junk file: ' + str(not QC_pass[i]))

        return QC_pass

In [4]:
class QCL2(QC):

    """
    Description:
        This class inherits QC superclass and defines QC functions for L2 files.

    Class Attributes:
        kpf_object (astropy.io object): Returned from function KPF0.from_fits(fits_filename,data_type),
            which is wrapped by function read_fits in this module.
        qcdefinitions (QCDefinitions object): Returned from constructor of QCDefinitions class.

    """

    # Call superclass.
    def __init__(self,kpf_object):
        super().__init__(kpf_object)
    
    def L2_barycentric_rv_percent_change(self, pos_threshold=1.0, neg_threshold=-1.0, debug=False):
        """
        This QC module checks the MAXPCBCV and MINPCBCV headers within the L2
        primary headers in an L2 object. These headers represent the maximum and
        minimum percent changes from the weighted average CCFBCV (for non-zero-
        weight spectral orders). If an observation's maximum or minimum percent 
        change is greater or less than 1/-1% (respectively), then the method
        returns False. 

        Args:
             pos_threshold - The high percent change threshold (e.g., no orders
                             with a percent change greater than 1%)
             neg_threshold - The low percent change threshold (e.g., no orders
                             with a percent change less than -1%)
             debug - an optional flag.  If True, prints MAXPCBCV/MINPCBCV.
        """

        try:
            L2 = self.kpf_object
            myL2 = AnalyzeL2(L2, logger=self.logger)
            QC_pass = True

            max = myL2.Max_Perc_Delta_Bary_RV
            min = myL2.Min_Perc_Delta_Bary_RV
            if debug:
                print(f"max: {max}, min: {min}")
            if max > pos_threshold :
                QC_pass = False
                return QC_pass
            if min < neg_threshold :
                QC_pass = False
                return QC_pass

        except Exception as e:
            self.logger.info(f"Exception: {e}")
            QC_pass = False

        return QC_pass


In [5]:
L2 = KPF2.from_fits("KP.20241022.39422.56_L2_test.fits")

In [6]:
myL2 = AnalyzeL2(L2)
print(f"max: {myL2.Max_Perc_Delta_Bary_RV}, min: {myL2.Min_Perc_Delta_Bary_RV}")

max: 0.6894375932041458, min: -0.47031634755679774


In [18]:
# Test of 'L2_barycentric_rv_percent_change'
qcl2 = QCL2(L2)
qc_name = 'L2_barycentric_rv_percent_change'
qc_value = qcl2.L2_barycentric_rv_percent_change(debug=True)
qcl2.add_qc_keyword_to_header(qc_name,qc_value)
L2_new = qcl2.kpf_object
print('QC result: ' + str(L2_new.header['PRIMARY']['PCBCV']))

DEBUG: Initializing AnalyzeL2 object
max: 0.6894375932041458, min: -0.47031634755679774
QC result: 1


In [19]:
# Test of 'L2_barycentric_rv_percent_change'
qcl2 = QCL2(L2)
qc_name = 'L2_barycentric_rv_percent_change'
qc_value = qcl2.L2_barycentric_rv_percent_change(pos_threshold=0.5, debug=True)
qcl2.add_qc_keyword_to_header(qc_name,qc_value)
L2_new = qcl2.kpf_object
print('QC result: ' + str(L2_new.header['PRIMARY']['PCBCV']))

DEBUG: Initializing AnalyzeL2 object
max: 0.6894375932041458, min: -0.47031634755679774
QC result: 0


In [21]:
# Test of 'L2_barycentric_rv_percent_change'
qcl2 = QCL2(L2)
qc_name = 'L2_barycentric_rv_percent_change'
qc_value = qcl2.L2_barycentric_rv_percent_change(neg_threshold=-0.4)
qcl2.add_qc_keyword_to_header(qc_name,qc_value)
L2_new = qcl2.kpf_object
print('QC result: ' + str(L2_new.header['PRIMARY']['PCBCV']))

DEBUG: Initializing AnalyzeL2 object
QC result: 0


In [12]:
def L2_barycentric_rv_percent_change(L2, pos_threshold=1.0, neg_threshold=-1.0, debug=False):
        """
        This QC module checks the MAXPCBCV and MINPCBCV headers within the L2
        primary headers in an L2 object. These headers represent the maximum and
        minimum percent changes from the weighted average CCFBCV (for non-zero-
        weight spectral orders). If an observation's maximum or minimum percent 
        change is greater or less than 1/-1% (respectively), then the method
        returns False. 

        Args:
             pos_threshold - The high percent change threshold (e.g., no orders
                             with a percent change greater than 1%)
             neg_threshold - The low percent change threshold (e.g., no orders
                             with a percent change less than -1%)
             debug - an optional flag.  If True, prints MAXPCBCV/MINPCBCV.
        """

        try:
            myL2 = AnalyzeL2(L2)
            QC_pass = True

            max = myL2.Max_Perc_Delta_Bary_RV
            min = myL2.Min_Perc_Delta_Bary_RV
            if debug:
                print(f"max: {max}, min: {min}")
            if max > pos_threshold :
                QC_pass = False
                return QC_pass
            if min < neg_threshold :
                QC_pass = False
                return QC_pass

        except Exception as e:
            QC_pass = False

        return QC_pass

In [13]:
L2_barycentric_rv_percent_change(L2, pos_threshold=1.0, neg_threshold=-1.0, debug=False)

True

In [14]:
L2_barycentric_rv_percent_change(L2, pos_threshold=0.5, neg_threshold=-1.0, debug=False)

False

In [15]:
L2_barycentric_rv_percent_change(L2, pos_threshold=1.0, neg_threshold=-0.4, debug=False)

False