In [None]:
from __future__ import print_function
from astropy.io import fits
import numpy as np
import math
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
from numpy.polynomial.polynomial import polyval, polyder
import time
import csv
from dotenv import load_dotenv, find_dotenv
import os
import configparser
import logging

%matplotlib inline

class DotDict(dict):
    pass

In [None]:
from modules.optimal_extraction.src import OptimalExtractionAlg
load_dotenv()
TEST_DIR = os.getenv('KPFPIPE_TEST_DATA')
print(TEST_DIR)

In [None]:
# copy of start_logger from logger.py
def get_level(lvl:str) -> int:
    if lvl == 'debug': return logging.DEBUG
    elif lvl == 'info': return logging.INFO
    elif lvl == 'warning': return logging.WARNING
    elif lvl == 'error': return logging.ERROR
    elif lvl == 'critical': return logging.CRITICAL
    else: return logging.NOTSET
    
def start_logger(logger_name: str, config: str):
    if config is None: 
        # a config file is not provided, so don't start logger
        print('[{}] missing log configuration...not starting a new logger'.format(
            logger_name))
        return None
    config_obj = configparser.ConfigParser()
    res = config_obj.read(config)
    if res == []:
        return None

    log_cfg = config_obj['LOGGER']

    log_start = log_cfg.get('start_log', False)
    log_path = log_cfg.get('log_path', 'log')
    log_lvl = log_cfg.get('log_level', logging.WARNING)
    log_verbose = log_cfg.getboolean('log_verbose', True)
    # logger.setLevel(get_level(log_lvl))
        
    # if log_start:
    #     # setup a log format
    #     formatter = logging.Formatter('[%(name)s][%(levelname)s]:%(message)s')
    #     # setup a log file
    #     f_handle = logging.FileHandler(log_path, mode='w') # logging to file
    #     f_handle.setLevel(get_level(log_lvl))
    #     f_handle.setFormatter(formatter)
    #     logger.addHandler(f_handle)

    #     if log_verbose: 
    #         # also print to terminal 
    #         s_handle = logging.StreamHandler()
    #         s_handle.setLevel(get_level(log_lvl))
    #         s_handle.setFormatter(formatter)
    #         logger.addHandler(s_handle)
    # return logger


    logger = logging.getLogger(logger_name)
    logger.setLevel(get_level(log_lvl))
    logger.propagate = False

    formatter = logging.Formatter('[%(name)s][%(levelname)s]:%(message)s')
    s_handle = logging.StreamHandler()
    s_handle.setLevel(get_level(log_lvl))
    s_handle.setFormatter(formatter)
    logger.addHandler(s_handle)
    return logger

In [None]:
def plot_two_fits_trace(spectral1, spectral2, total_rows, coeffs_rows, range_rows = None):
    plt.figure(figsize=(20,20))
    plt.subplot(1, 2, 1) 
    im1 = plt.imshow(spectral1['data'], cmap='gray', norm=LogNorm())

    total_col = np.shape(coeffs_rows)[1]

    for y in range(0, total_rows):
        if range_rows is not None:
            x_val = np.arange(range_rows[y, 0], range_rows[y, 1])
        else:
            x_val = np.arange(0, spectral['xdim'])
        y_val = np.polyval(coeffs_rows[y], x_val)
        plt.plot(x_val, y_val, 'r--')
    
    plt.ylim(0, spectral1['ydim'])
    plt.colorbar(im1, fraction=0.046, pad=0.04)   
    
    plt.subplot(1, 2, 2)
    im2 = plt.imshow(spectral2['data'], cmap='gray', norm=LogNorm())
    
    plt.ylim(0, spectral2['ydim'])
    plt.colorbar(im2, fraction=0.046, pad=0.04)  
    plt.show()

In [None]:
def plot_output(out_data, total_rows):
    # show output
    plt.figure(figsize=(12,12))
    plt.subplot(1, 1, 1)
    plt.imshow(out_data, cmap='gray')
    plt.ylim(0, total_rows)
    plt.show()
    #plt.colorbar(im, fraction=0.046, pad=0.04)

In [None]:
def load_spectral_sample(fits_file, order_trace_csv, flatlamp_file, config, logger, power):
    spectrum_flux, spectrum_header = fits.getdata(fits_file, header=True)
    flat_flux = fits.open(flatlamp_file)
    order_trace_result = np.genfromtxt(order_trace_csv, delimiter=',')  
    f_header = {'ORDER TRACE RESULT': {'POLY DEGREE': power}, 'PRIMARY': flat_flux[0].header}
  
    flat_data = DotDict() 
    flat_data.data = flat_flux[0].data
    flat_data.order_trace_result = order_trace_result
    flat_data.header = f_header
    
    spectrum_data = DotDict()
    spectrum_data.data = spectrum_flux
    spectrum_data.header = {'PRIMARY': spectrum_header}
    

    opt_extract = OptimalExtractionAlg(flat_data, spectrum_data, config, logger)
    coeffs_rows = opt_extract.order_coeffs
    widths = opt_extract.order_edges
    xrange = opt_extract.order_xrange

    spectral = {'data': spectrum_data.data, 'xdim': int(spectrum_data.header['PRIMARY']['NAXIS1']), 
                                            'ydim': int(spectrum_data.header['PRIMARY']['NAXIS2'])}
    flatlamp_spectral = {'data': flat_data.data, 'xdim': int(flat_data.header['PRIMARY']['NAXIS1']), 
                                                 'ydim': int(flat_data.header['PRIMARY']['NAXIS2'])}
   
    return {'spectral': spectral, 'flatlamp_spectral': flatlamp_spectral, 'coeffs': coeffs_rows,
            'op_handle': opt_extract, 'widths': widths, 'xrange': xrange, 'power':power}

In [None]:
def make_fits(data, output_fits, metadata):
    hdu = fits.PrimaryHDU(data)
    for key in metadata.keys():
        hdu.header[key] = metadata[key]
        
    hdu.writeto(output_fits, overwrite=True)

In [None]:
def extract_optimal_trace(in_data, selected_order=None):
    if selected_order is None:
        height, width = np.shape(in_data)
        selected_order = np.arange(0, height, dtype=int)
 
    return in_data[selected_order, :]

## 1.1 PARAS: define and load files: spectrum file, flat file, cure file, coeffs/width file

In [None]:
# input for PARAS data, from KPF-Pipeline-TestData/polygon_clipping_test/
mission = 'PARAS'    # NEID or PARAS
power = 4            # power = 4 if using csv from PARAS

fits_base = TEST_DIR + '/polygon_clipping_test/paras_data/14feb2015/a00'
fiber_list = ['A']
f_idx = 0
flatlamp_file = TEST_DIR + '/polygon_clipping_test/paras_data/paras.flat'+fiber_list[f_idx]+'.fits'
fits_list=['18', '19']

# csv from paras
# csv_file =  TEST_DIR+'/polygon_clipping_test/paras_data/order_trace_'+fiber_list[f_idx]+'.csv'

# csv from order trace module, paired with 'for_width_3' or 'for_fixed_width''
csv_file = TEST_DIR + '/order_trace_test/for_optimal_extraction/paras_poly_3sigma_gaussian_pixel_3_width_3.csv'
# csv_file = TEST_DIR + '/order_trace_test/for_optimal_extraction/paras_poly_3sigma_gaussian_pixel_3.csv'
power = 3
width_type = 'for_width_3'                     # for using .csv from order trace, 'for_fixed_width', 'for_width_3'
# width_type = fiber_list[f_idx]               # for using .csv from PARAS

# optimal extraction method
method = OptimalExtractionAlg.NoRECT       # optimal extraction method: no rectified, 
# method = OptimalExtractionAlg.VERTICAL     # rectified using fractional summation in vertical direction
# method = OptimalExtractionAlg.NORMAL         # rectified using fractional summation in normal direction

rectification_method = [ 'optimal_norm_fraction','optimal_vertical_fraction', 'optimal_not_rectified']

output_base =  '../results/PARAS_3sigma/' + width_type + '/PARAS_'    # temporarily output to a local directory
# output for neid
print('fits_base:', fits_base, '\ncsv_file:', csv_file, '\nflat file:', flatlamp_file, '\noutput base:', output_base)

config = configparser.ConfigParser()
config_file = '../configs/PARAS.cfg'
config.read(config_file)
logger = start_logger("OptimalExtractionAlg", config_file)


## 1.2 NEID: define and load files: spectrum file, flat file, coeffs/width file

In [None]:
# input for NEID data
mission = 'NEID'
power = 3
fits_base = TEST_DIR+'/NEIDdata/TAUCETI_20191217/L0/neidTemp_2D20191217T'
flatlamp_file = TEST_DIR+'/NEIDdata/FLAT/stacked_2fiber_flat.fits'
fits_list = ['023129', '023815','024240','024704', '025129', '025613', '030057','030724','031210','031636']

csv_base = TEST_DIR+'/order_trace_test/for_optimal_extraction/'   
# csv_file = csv_base + 'neid_poly_3sigma_gaussian_pixel_3.csv'        # paired with 'for_fixed_width'
csv_file = csv_base + 'neid_poly_3sigma_gaussian_pixel_3_width_3.csv'  # paired with 'for_width_3'

width_type = 'for_width_3'     #'for_fixed_width', 'for_width_3'

# optimal extraction method
method = OptimalExtractionAlg.NoRECT      # optimal extraction method: no rectified, 
#method = OptimalExtractionAlg.VERTICAL    # rectified using fractional summation in vertical direction
#method = OptimalExtractionAlg.NORMAL      # rectified using fractional summation in normal direction

rectification_method = [ 'optimal_norm_fraction','optimal_vertical_fraction', 'optimal_not_rectified']

output_base =  '../results/NEID_3sigma/' + width_type + '/NEID_'      # temporarily output to a local directory 

# output for neid
print('fits_base:', fits_base, '\ncsv_file:', csv_file, '\nflat file:', flatlamp_file, '\noutput base:', output_base)

config = configparser.ConfigParser()
config_file = '../configs/NEID.cfg'
config.read(config_file)
logger = start_logger("OptimalExtractionAlg", config_file)

## Optimal extraction (or sum fraction)  on a list of NEID/PARAS fits, create L1 output on original spectrum

In [None]:
for f in range(0, len(fits_list)):
#for f in range(1, 10):
    fits_file = fits_base + fits_list[f]+'.fits'
    sample_info = load_spectral_sample(fits_file, csv_file, flatlamp_file, config, logger, power)
    poly_c = sample_info.get('op_handle')

    total_order = poly_c.get_spectrum_order()
    if mission == 'NEID':
        c_set = np.arange(0, total_order, 2, dtype=int)
    else:
        c_set = None
    # coeffs_rows = sample_info.get('coeffs')
    # range_rows = sample_info.get('xrange')
    # plot_two_fits_trace(spectral, flatlamp_spectral, np.shape(coeffs_rows)[0], coeffs_rows, range_rows)
    
    print(fits_file)
    optimal_output = poly_c.extract_spectrum(rectification_method=method, extraction_method='optimal',
                                             order_set=c_set, show_time=True, print_debug='result.txt')

    optimal_result = optimal_output['optimal_extraction_result']    # result in Pandas Dataframe format
    out_order_data = optimal_result.values
    plot_output(out_order_data, optimal_result.attrs['TOTALORD'])   # optimal extraction 
    
    output_order_file = '../results/'+mission+'_3sigma/'+width_type+'/'+mission+ '_' \
                        + fits_list[f] + '_extraction_' + rectification_method[method] + '.fits'
    make_fits(out_order_data, output_order_file, optimal_result.attrs )
    
    nan_data = np.argwhere(np.isnan(out_order_data))
    if np.size(nan_data) > 0:
        print('there is pixel with nan data')
    else:
        print('no pixel with nan data')
    # compare the result to that before the porting
    target_base = '/Users/cwang/documents/KPF/KPF-Pipeline/AlgorithmDev/test_data_04032020/'
    target_file = target_base + 'order_trace_test/for_optimal_extraction/output/rv_'+mission+'_3sigma/' + \
                  width_type +'/'+mission+'_' + fits_list[f] + '_extraction_'+ rectification_method[method] + '.fits'
    print('target_file: ', target_file)
    compare_result = poly_c.result_test(target_file, out_order_data)
    print(compare_result)

## comparison between NEID L1 and the result from module of optimal extraction

In [None]:
mission = 'NEID'
method = OptimalExtractionAlg.VERTICAL
width_type = 'for_width_3'
rectification_method = [ 'optimal_norm_fraction','optimal_vertical_fraction', 'optimal_not_rectified']

neid_L1_file = TEST_DIR + '/NEIDdata/TAUCETI_20191217/L1/neidL1_20191217T023129.fits'

output_base =  '../results/NEID_3sigma/' + width_type + '/'+mission+'_'  
my_L1_file = output_base+'023129_extraction_'+ rectification_method[method] +'.fits'

neid_L1_fits, neid_header = fits.getdata(neid_L1_file, header=True)
my_L1_fits, my_header = fits.getdata(my_L1_file, header=True)
                                   
d = 7 
neid_size = np.shape(neid_L1_fits)
my_size = np.shape(my_L1_fits)
total_avail = min(neid_size[0]-d, my_size[0])
print('neid: ',np.shape(neid_L1_fits))
print('my: ', np.shape(my_L1_fits))
print('size_y: ', total_avail)

x0 = 450

center_x = input('center_x: ')
c_x = center_x.strip()
width = input('extension to the center: ')
w_x = width.strip()

c_x = int(c_x)
w_x = int(w_x)
s_x = max(x0, c_x - w_x)
e_x = min(c_x+w_x, my_size[1])

print('show x from ', s_x, ' to ', e_x)
for i in np.arange(0, total_avail, dtype=int):
    neid_order = neid_L1_fits[i+d, s_x:e_x]
    my_order = my_L1_fits[i, s_x:e_x]

    plt.figure(figsize=(18,8))
    plt.subplot(1, 1, 1)
    plt.plot(neid_order, 'b--', label='neid order: '+str(i+d))
    plt.plot(my_order, 'r--', alpha=0.5, label = 'my order: ' + str(i))
   
    plt.title( '['+str(s_x)+','+str(e_x)+']')
    plt.legend(loc="upper right", prop={'size': 12})   
    plt.show()
    
    plt.figure(figsize=(18,8))
    plt.subplot(1, 1, 1)
    ratio = (my_order-neid_order)/neid_order

    abs_my = [ abs(i)  for i in my_order]
    abs_neid = [abs(i) for i in neid_order]
    ratio = np.absolute((my_order - neid_order)/np.maximum(abs_my, abs_neid))
    plt.plot(ratio, 'g--', label='difference: ')
    plt.show()

        