In [None]:
import os
import sys
module_path = os.path.abspath(os.path.join('../src'))
if module_path not in sys.path:
    sys.path.append(module_path)


# Build Classic Transform

Use standard field to calculate photometric transform by classic method ("slope of slopes")

In [None]:
import vso.util
import vso.data
import vso.phot
import numpy as np

OBJ_NAME='SA38'
OBJ_NAME_1='SA38'
OBJ_NAME_2='SA20'
OBJ_NAME_3='SA41'
SESSION_TAG='20241003'
IMAGE_ROOT = '/srv/public/img'
WORK_ROOT = '/srv/public'

session = vso.util.Session(tag=SESSION_TAG, name=OBJ_NAME)
layout = vso.util.WorkLayout(WORK_ROOT)
sd = vso.data.StarData(layout.charts_dir)


In [None]:
from astropy.table import QTable, unique, vstack
from astropy.time import Time
import astropy.units as u
import matplotlib.pyplot as plt
import itertools
import ipywidgets as widgets

def get_provider(session):
    session_layout = layout.get_session(session)
    settings = vso.util.Settings(session_layout.settings_file_path)
    phot = QTable.read(session_layout.photometry_file_path)
    chart = QTable.read(session_layout.chart_file_path)
    return vso.phot.DataProvider(phot, chart, bands=settings.bands,
                                 filters=[
                                     vso.phot.Filter.saturation(.9),
                                     vso.phot.Filter.snr(10.0 * u.db),
                                     #vso.phot.Filter.time(Time(0.0, format='jd'), Time(2460541.8, format='jd'))
                                 ],
                                 tag = session.name)

provider = vso.phot.DataProvider.combine([
     get_provider(vso.util.Session(tag=SESSION_TAG, name=OBJ_NAME_1)),
     get_provider(vso.util.Session(tag=SESSION_TAG, name=OBJ_NAME_2)),
#     get_provider(vso.util.Session(tag=SESSION_TAG, name=OBJ_NAME_3))
     ])

bands = vso.util.band_pairs(provider.bands)
band = bands[0]

batches = provider.get_batches(band)


In [None]:
from scipy import stats as sst

#

def C(stars, band):
    return stars[band[0]]['mag'] - stars[band[1]]['mag']

def dM(stars, band):
    return stars[band]['mag'] - stars[f"instr {band}"]['mag']

def plot_band(ax, stars, band, ci):
    dm = dM(stars, band)
    reg = sst.linregress(ci, dm)
    ax.scatter(ci, dm)
    ax.plot(ci, ci.value*reg.slope + reg.intercept)
    ax.set_title(rf"$\ T^{{'}}_{band[0]} = {reg.slope:.3g} \pm {reg.stderr:.2g} $" + "\n"
                 rf"$\ Z_{band[0]} = {reg.intercept:.3g} \pm {reg.intercept_stderr:.2g} $")

class PerBatchPreview:
    def __init__(self, provider, bands) -> None:
        self.bands_ = bands
        self.provider_ = provider

        self.wgt_band_ = widgets.Dropdown(
            options=[(f"{b[0]} {b[1]}", n) for b, n in zip(self.bands_, itertools.count())],
            description = 'Band'
            )
        self.wgt_band_.observe(self.band_updated, 'value')

        self.wgt_play_ = widgets.Play(value=0, min=0, max=1, interval=2000, description='Play')
        self.wgt_batch_ = widgets.IntSlider(value=0, min=0, max=1,
                                    description='Batch',
                                    )
        widgets.jslink((self.wgt_play_, 'value'), (self.wgt_batch_, 'value'))

    def band_updated(self, *args):
        self.band_ = self.bands_[self.wgt_band_.value]
        self.data_ = self.provider_.get_batches(self.band_)
        self.wgt_batch_.max = len(self.data_)-1
        self.wgt_batch_.value = 0
        self.wgt_play_.max = len(self.data_)-1
        self.wgt_play_.value = 0

    def play_updated(self, *args):
        self.wgt_file.value = self.files_[self.band_].summary['file'][self.wgt_play_.value]

    def plot(self, n_band, n_batch):
        band = self.band_
        batch = self.data_[n_batch]
        airmass = np.mean(batch['airmass'])
        fig = plt.figure(figsize=(10.24, 6.4))
        gs = fig.add_gridspec(1, 2)
        ci = C(batch, band)
        ax = plt.subplot(gs[0,0])
        plot_band(ax, batch, band[0], ci)
        ax = plt.subplot(gs[0,1])
        plot_band(ax, batch, band[1], ci)
        plt.suptitle(f"{band[0]}-{band[1]}, airmass {airmass:.5g}, {batch['tag'][0]}: {len(batch)} stars")
        plt.show()

    def run(self):
        self.band_updated()
        display(self.wgt_play_)
        widgets.interact(self.plot,
                         n_band=self.wgt_band_,
                         n_batch=self.wgt_batch_)

preview = PerBatchPreview(provider, bands)
preview.run()

In [None]:

class TransformPreview:
    def __init__(self, provider, bands) -> None:
        self.bands_ = bands
        self.provider_ = provider

        self.wgt_band_ = widgets.Dropdown(
            options=[(f"{b[0]} {b[1]}", n) for b, n in zip(self.bands_, itertools.count())],
            description = 'Band'
            )

    def plot_reg(self, ax, airmass, data, err):
        reg = sst.linregress(airmass, data)
        ax.errorbar(airmass, data, yerr=err, fmt='o')
        ax.plot(airmass, airmass*reg.slope + reg.intercept)
        return reg

    def plot_T(self, ax, airmass, data, err, band, pair):
        reg = self.plot_reg(ax, airmass, data, err)
        ax.set_title(rf"$\ T_{{{pair},{band}}} = {reg.intercept:.3g} \pm {reg.intercept_stderr:.2g} $" + "\n"
                 rf"$\ k^{{''}}_{{{pair},{band}}} = {reg.slope:.3g} \pm {reg.stderr:.2g} $")

    def plot_Z(self, ax, airmass, data, err, band, pair):
        reg = self.plot_reg(ax, airmass, data, err)
        ax.set_title(rf"$\ Z_{{{pair},{band}}} = {reg.intercept:.3g} \pm {reg.intercept_stderr:.2g} $" + "\n"
                 rf"$\ k^{{'}}_{{{pair},{band}}} = {reg.slope:.3g} \pm {reg.stderr:.2g} $")


    def plot(self, n_band):
        band = self.bands_[n_band]
        pair = band[0][0] + band[1][0]
        data = self.provider_.get_batches(band)
        airmass = np.array([np.mean(batch['airmass']) for batch in data])
        reg_dM_C_A = [sst.linregress(C(batch, band), dM(batch, band[0])) for batch in data]
        reg_dM_C_B = [sst.linregress(C(batch, band), dM(batch, band[1])) for batch in data]

        fig = plt.figure(figsize=(10.24, 9.6))
        gs = fig.add_gridspec(2, 2)

        ax = plt.subplot(gs[0,0])
        self.plot_T(ax, airmass, [r.slope for r in reg_dM_C_A], [r.stderr for r in reg_dM_C_A], band[0][0], pair)

        ax = plt.subplot(gs[0,1])
        self.plot_Z(ax, airmass, [r.intercept for r in reg_dM_C_A], [r.intercept_stderr for r in reg_dM_C_A], band[0][0], pair)

        ax = plt.subplot(gs[1,0])
        self.plot_T(ax, airmass, [r.slope for r in reg_dM_C_B], [r.stderr for r in reg_dM_C_B], band[1][0], pair)

        ax = plt.subplot(gs[1,1])
        self.plot_Z(ax, airmass, [r.intercept for r in reg_dM_C_B], [r.intercept_stderr for r in reg_dM_C_B], band[1][0], pair)

        plt.suptitle(f"{band[0]}-{band[1]}")
        plt.tight_layout()
        plt.show()

    def run(self):
        widgets.interact(self.plot,
                         n_band=self.wgt_band_)

preview = TransformPreview(provider, bands)
preview.run()

In [None]:
import json

def calculate_transform(provider, band):
    data = provider.get_batches(band)
    airmass = np.array([np.mean(batch['airmass']) for batch in data])
    reg_dM_C_A = [sst.linregress(C(batch, band), dM(batch, band[0])) for batch in data]
    reg_dM_C_B = [sst.linregress(C(batch, band), dM(batch, band[1])) for batch in data]
    pair = band[0][0] + band[1][0]

    def calc_T(airmass, data, band, pair):
        reg = sst.linregress(airmass, data)
        return {} if np.isnan(reg.intercept) else  {
            f'T_{pair}_{band}': float(reg.intercept),
            f'err_T_{pair}_{band}': float(reg.intercept_stderr),
            f'k2_{pair}_{band}': float(reg.slope),
            f'err_k2_{pair}_{band}': float(reg.stderr)}

    def calc_Z(airmass, data, band, pair):
        reg = sst.linregress(airmass, data)
        return {} if np.isnan(reg.intercept) else  {
            f'Z_{pair}_{band}': float(reg.intercept),
            f'err_Z_{pair}_{band}': float(reg.intercept_stderr),
            f'k1_{pair}_{band}': float(reg.slope),
            f'err_k1_{pair}_{band}': float(reg.stderr)}
    result = {}
    result.update(calc_T(airmass, [r.slope for r in reg_dM_C_A], band[0][0], pair))
    result.update(calc_T(airmass, [r.slope for r in reg_dM_C_B], band[1][0], pair))
    result.update(calc_Z(airmass, [r.intercept for r in reg_dM_C_A], band[0][0], pair))
    result.update(calc_Z(airmass, [r.intercept for r in reg_dM_C_B], band[1][0], pair))
    return result


bands = vso.util.band_pairs(provider.bands)

transform = {k: v for d in [calculate_transform(provider, b) for b in bands] for k, v in d.items()}
t_path = layout.get_session(vso.util.Session(tag=SESSION_TAG, name=OBJ_NAME_1)).root_dir.parent / 'transform.json'

with open(t_path, mode='w') as file:
    json.dump(transform, file)
transform