In [None]:
import os
os.environ['OPENTSDB_PYTHON_METRICS_TEST_MODE'] = 'True'
import banzai_floyds.frames
from banzai_floyds import settings
import numpy as np
from astropy.io import fits

In [None]:
os.makedirs('test_data', exist_ok=True)
os.environ['DB_ADDRESS'] = 'sqlite:///test_data/test.db'
settings.processed_path= os.path.join(os.getcwd(), 'test_data')
settings.fpack=True
settings.db_address = os.environ['DB_ADDRESS']
settings.RAW_DATA_FRAME_URL = f'https://archive-api.lco.global/frames'

In [None]:
import banzai.main
context = banzai.main.parse_args(settings, parse_system_args=False)

In [None]:
factory = banzai_floyds.frames.FLOYDSFrameFactory()

In [None]:
image = factory.open({'path': os.path.join(os.getcwd(), 'test_data', 'ogg' ,'en06', '20200822', 'processed', 'ogg2m001-en06-20200822-0028-a91.fits.fz')}, context)

In [None]:
image

In [None]:
image.data

In [None]:
from banzai_floyds.arc_lines import arc_lines_table
class foo:
    EXTRACTION_HEIGHT = 5
    LINES = arc_lines_table()
    # All in angstroms, measured by Curtis McCully
    # FWHM is , 5 pixels
    INITIAL_LINE_WIDTHS = {1: 10, 2: 6}
    INITIAL_DISPERSIONS = {1: 3.51, 2: 1.72}
    # Tilts in degrees measured counterclockwise (right-handed coordinates)
    INITIAL_LINE_TILTS = {1: 8., 2: 8.}
    OFFSET_RANGES = {1: np.arange(7200.0, 7700.0, 0.5), 2: np.arange(4300, 4600, 0.5)}
    MATCH_THRESHOLDS = {1: 20.0, 2: 10.0}
    # In pixels
    MIN_LINE_SEPARATIONS = {1: 5.0, 2: 5.0}
    FIT_ORDERS = {1: 3, 2: 2}
    # Success Metrics
    MATCH_SUCCESS_THRESHOLD = 3  # matched lines required to consider solution success

self = foo()

In [None]:
orders = np.unique(image.orders.data)
orders = orders[orders != 0]
initial_wavelength_solutions = []

In [None]:
from copy import copy
extraction_orders = copy(image.orders)
extraction_orders.order_heights = self.EXTRACTION_HEIGHT * np.ones_like(orders)

In [None]:
i = 0
order = 1

In [None]:
from banzai_floyds.wavelengths import linear_wavelength_solution
from banzai_floyds.utils.order_utils import get_order_2d_region

order_region = get_order_2d_region(extraction_orders.data == order)
# Note that his flux has an x origin at the x = 0 instead of the domain of the order
# I don't think it matters though
flux_1d = np.median(image.data[order_region], axis=0)
# This 1.2533 is from Rider 1960 DOI: 10.1080/01621459.1960.10482056 and converts the standard error
# to error on the median
flux_1d_error = 1.2533 * np.median(image.uncertainty[order_region], axis=0)
flux_1d_error /= np.sqrt(self.EXTRACTION_HEIGHT)
linear_solution = linear_wavelength_solution(flux_1d, flux_1d_error, self.LINES[self.LINES['used']],
                                             self.INITIAL_DISPERSIONS[order],
                                             self.INITIAL_LINE_WIDTHS[order],
                                             self.OFFSET_RANGES[order],
                                             domain=image.orders.domains[i])


In [None]:
linear_solution

In [None]:
import plotly.express as px
import plotly.graph_objects as go

In [None]:
def fits_header_to_wavelength(header):
    crval = float(header['CRVAL1'])
    crpix = float(header['CRPIX1'])
    # Convert crpix to be zero indexed
    crpix -= 1
    if 'CDELT1' in header.keys():
        cdelt = float(header['CDELT1'])
    else:
        cdelt = float(header['CD1_1'])
    npix = float(header['NAXIS1'])
    lam = np.arange(crval - cdelt * crpix ,
                    crval + cdelt * (npix - crpix) - 1e-4,
                    cdelt)
    return lam

In [None]:
hdu = fits.open('test_data/ttarc_HD201767_ftn_20200823_red_2.0_59085_1.fits')
lam = fits_header_to_wavelength(hdu[0].header)
fig = px.line(x=linear_solution(np.arange(flux_1d.size)), y=flux_1d, color_discrete_sequence=['steelblue'])
fig.add_trace(go.Scatter(x=lam, y=hdu[0].data[0,:], line_color='coral'))
fig.show()

In [None]:
from banzai_floyds.wavelengths import identify_peaks
peaks = identify_peaks(flux_1d, flux_1d_error,
                       self.INITIAL_LINE_WIDTHS[order] / self.INITIAL_DISPERSIONS[order],
                       self.MIN_LINE_SEPARATIONS[order], domain=image.orders.domains[i])

In [None]:
hdu = fits.open('test_data/ttarc_HD201767_ftn_20200823_red_2.0_59085_1.fits')
lam = fits_header_to_wavelength(hdu[0].header)
fig = px.line(x=np.arange(flux_1d.size), y=flux_1d, color_discrete_sequence=['steelblue'])
for peak in peaks:
    fig.add_trace(go.Scatter(x=[peak], y=[flux_1d[peak] + 1000] , line_color='black'))
fig.show()


In [None]:
from banzai_floyds.wavelengths import refine_peak_centers
peaks = refine_peak_centers(flux_1d, flux_1d_error, peaks,
                            self.INITIAL_LINE_WIDTHS[order] / self.INITIAL_DISPERSIONS[order],
                            domain=image.orders.domains[i])

In [None]:
fig = px.line(x=np.arange(flux_1d.size), y=flux_1d, color_discrete_sequence=['steelblue'])
for peak in peaks:
    fig.add_trace(go.Scatter(x=[peak], y=[flux_1d[int(peak)] + 1000] , line_color='black'))
fig.show()

In [None]:
from banzai_floyds.wavelengths import correlate_peaks
corresponding_lines = np.array(correlate_peaks(peaks, linear_solution, self.LINES[self.LINES['used']],
                                               50.0)).astype(float)

In [None]:
corresponding_lines

In [None]:
from banzai_floyds.wavelengths import estimate_distortion
successful_matches = np.isfinite(corresponding_lines)
initial_wavelength_solutions.append(estimate_distortion(peaks[successful_matches],
                                                                    corresponding_lines[successful_matches],
                                                                    image.orders.domains[i],
                                                                    order=self.FIT_ORDERS[order]))

In [None]:
wave_1d = initial_wavelength_solutions[0](np.arange(flux_1d.size))
fig = px.line(x=wave_1d, y=flux_1d, color_discrete_sequence=['steelblue'])
for line in self.LINES[self.LINES['used']]:
    flux_ind = np.argmin(np.abs(wave_1d - line['wavelength']))
    fig.add_trace(go.Scatter(x=[line['wavelength']], y=[flux_1d[flux_ind] + 1000] , line_color='black'))
fig.add_trace(go.Scatter(x=lam, y=hdu[0].data[0,:], line_color='coral'))
fig.show()

In [None]:
i = 1
order = 2

In [None]:
order_region = get_order_2d_region(extraction_orders.data == order)
# Note that his flux has an x origin at the x = 0 instead of the domain of the order
# I don't think it matters though
flux_1d = np.median(image.data[order_region], axis=0)
# This 1.2533 is from Rider 1960 DOI: 10.1080/01621459.1960.10482056 and converts the standard error
# to error on the median
flux_1d_error = 1.2533 * np.median(image.uncertainty[order_region], axis=0)
flux_1d_error /= np.sqrt(self.EXTRACTION_HEIGHT)
linear_solution = linear_wavelength_solution(flux_1d, flux_1d_error, self.LINES[self.LINES['used']],
                                             self.INITIAL_DISPERSIONS[order],
                                             self.INITIAL_LINE_WIDTHS[order],
                                             self.OFFSET_RANGES[order],
                                             domain=image.orders.domains[i])


In [None]:
linear_solution.coef

In [None]:
hdu = fits.open('test_data/ttarc_HD201767_ftn_20200823_blue_2.0_59085_1.fits')
lam = fits_header_to_wavelength(hdu[0].header)
linear_solution.coef[0] = 4700
fig = px.line(x=linear_solution(np.arange(flux_1d.size) + np.min(linear_solution.domain)), y=flux_1d, color_discrete_sequence=['steelblue'])
fig.add_trace(go.Scatter(x=lam, y=hdu[0].data[0,:], line_color='coral'))
fig.show()

In [None]:
image.orders.domains