In [0]:
PROJECT_PATH = '/home/dobos/project/ga_isochrones/python:' + \
    '/home/dobos/project/ga_pfsspec_all/python:' + \
    '/home/dobos/project/pysynphot'

GRID_PATH = '/datascope/subaru/data/pfsspec/models/stellar/grid/phoenix/phoenix_HiRes'
FILTER_PATH = '/datascope/subaru/data/pfsspec/subaru/hsc/filters/fHSC-g.txt'

ARM = 'mr'

DETECTOR_PATH = '/datascope/subaru/data/pfsspec/subaru/pfs/arms/{}.json'.format(ARM)
PSF_PATH = '/datascope/subaru/data/pfsspec/subaru/pfs/psf/import/{}.2/pca.h5'.format(ARM)
SKY_PATH = '/datascope/subaru/data/pfsspec/subaru/pfs/noise/import/sky.see/{}/sky.h5'.format(ARM)
MOON_PATH = '/datascope/subaru/data/pfsspec/subaru/pfs/noise/import/moon/{}/moon.h5'.format(ARM)

# Radial Velocity fit

Demo code to perform maximum likelihood analysis of
radial velociy measurements between a spectrum and a
given template, drawn from the BOSZ models.

In [0]:
import os
import sys
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
from matplotlib.ticker import AutoMinorLocator, MultipleLocator
from scipy.interpolate import interp1d
import h5py as h5
from tqdm.notebook import trange, tqdm

In [0]:
plt.rc('font', size=7)

In [0]:
# Allow load project as module
for p in reversed(PROJECT_PATH.split(':')):
    sys.path.insert(0, p)

## Load spectrum grid

In [0]:
from pfs.ga.pfsspec.core.grid import ArrayGrid
from pfs.ga.pfsspec.stellar.grid import ModelGrid
from pfs.ga.pfsspec.stellar.grid.bosz import Bosz
from pfs.ga.pfsspec.stellar.grid.phoenix import Phoenix

In [0]:
fn = os.path.join(GRID_PATH, 'spectra.h5')
grid = ModelGrid(Phoenix(), ArrayGrid)
grid.preload_arrays = False
grid.load(fn, format='h5')

## Simulated observation

In [0]:
from pfs.ga.pfsspec.core import Filter
from pfs.ga.pfsspec.sim.obsmod.pipelines import StellarModelPipeline
from pfs.ga.pfsspec.core import Physics
from pfs.ga.pfsspec.core.obsmod.psf import PcaPsf
from pfs.ga.pfsspec.sim.obsmod import Detector
from pfs.ga.pfsspec.sim.obsmod.background import Sky
from pfs.ga.pfsspec.sim.obsmod.background import Moon
from pfs.ga.pfsspec.core.obsmod.snr import QuantileSnr

In [0]:
filt_hsc_g = Filter()
filt_hsc_g.read(FILTER_PATH)

psf = PcaPsf()
psf.load(PSF_PATH)

detector = Detector()
detector.load_json(DETECTOR_PATH)
detector.psf = psf

sky = Sky()
sky.load(SKY_PATH, format='h5')

moon = Moon()
moon.load(MOON_PATH, format='h5')

In [0]:
from pfs.ga.pfsspec.sim.obsmod.observations import PfsObservation
from pfs.ga.pfsspec.sim.obsmod.noise import NormalNoise
from pfs.ga.pfsspec.sim.obsmod.calibration import FluxCalibrationBias
from pfs.ga.pfsspec.core.obsmod.resampling import Interp1dResampler

In [0]:
obs = PfsObservation()
obs.detector = detector
obs.sky = sky
obs.moon = moon
obs.noise_model = NormalNoise()

In [0]:
def get_observation(rv=0.0, noise_level=1.0, noise_freeze=True, calib_bias=True, mag=19, **kwargs):
    """
    Generate a spectrum and calculate the variance (sigma) of realistic observational error.
    """

    args = {
        'mag': mag,
        'seeing': 0.5,
        'exp_time': 15 * 60,
        'exp_count': 4 * 3,
        'target_zenith_angle': 0,
        'target_field_angle': 0.0,
        'moon_zenith_angle': 45,
        'moon_target_angle': 60,
        'moon_phase': 0.,
        'redshift': Physics.vel_to_z(rv) 
    }

    idx = grid.get_nearest_index(**kwargs)
    spec = grid.get_model_at(idx)

    pp = StellarModelPipeline()
    pp.model_res = grid.resolution or 150000
    pp.mag_filter = filt_hsc_g
    pp.observation = obs
    pp.snr = QuantileSnr(binning=4.0)
    pp.resampler = Interp1dResampler()
    pp.noise_level = noise_level
    pp.noise_freeze = noise_freeze
    if calib_bias:
        bias = FluxCalibrationBias(reuse_bias=False)
        bias.amplitude = 0.25
        pp.calibration = bias
    pp.run(spec, **args)

    return idx, spec, pp

def get_template(convolve=True, **kwargs):
    """
    Generate a noiseless template spectrum with same line spread function as the
    observation but keep the original, high-resolution binning.
    """

    # TODO: add template caching

    idx = grid.get_nearest_index(**kwargs)
    temp = grid.get_model_at(idx)
    temp.cont = None        # Make sure it's not passed around for better performance
    temp.mask = None

    if convolve:
        temp.convolve_psf(psf)

    return idx, temp


In [0]:
rv = 180.0
M_H = -0.5
T_eff = 5500
log_g = 1.0
a_M = 0.0

idx, spec, pp = get_observation(rv=rv, M_H=M_H, T_eff=T_eff, log_g=log_g, a_M=a_M)
idx, spec1, pp1 = get_observation(rv=rv, M_H=M_H, T_eff=T_eff, log_g=log_g, a_M=a_M, noise_level=0, calib_bias=False)
_, temp = get_template(M_H=M_H, T_eff=T_eff, log_g=log_g, a_M=a_M)

f, ax = plt.subplots(1, 1, figsize=(3.4, 2.5), dpi=240)

ax.plot(spec.wave, spec.flux_model, '-', lw=0.3)
ax.plot(spec.wave, spec.flux_err, '-', lw=0.3)
ax.set_ylim(0, None)

### ###

f, ax = plt.subplots(1, 1, figsize=(3.4, 2.5), dpi=240)

ax.plot(spec.wave, spec.flux, '-', lw=0.3)
ax.plot(spec.wave, spec.flux_model, '-', lw=0.3)
ax.plot(spec.wave, spec1.flux, '-', lw=0.3)

ax.set_title(f'SNR = {spec.snr:.2f}')

ax.set_xlabel(r'$\lambda$')
ax.set_ylabel(r'$F\_\lambda$')

ax.set_xlim(0.99 * spec.wave.min(), 1.01 * spec.wave.max())
ax.set_ylim(0, np.quantile(spec.flux, 0.99) * 1.2)

f.tight_layout()

#ax.set_xlim(8400, 8700)

### ###

f, ax = plt.subplots(1, 1, figsize=(3.4, 2.5), dpi=240)

ax.plot(spec.wave, spec.flux / spec1.flux, '-', lw=0.3)
ax.plot(spec.wave, spec.flux_model / spec1.flux, '-', lw=0.5)

ax.set_xlabel(r'$\lambda$')
ax.set_ylabel(r'calibration bias')

ax.set_xlim(0.99 * spec.wave.min(), 1.01 * spec.wave.max())
ax.set_ylim(0, np.quantile(spec.flux / spec1.flux, 0.99) * 1.2)

#ax.set_xlim(8400, 8700)

## Radial velocity

In [0]:
from pfs.ga.pfsspec.stellar.rvfit import RVFit, RVFitTrace

In [0]:
idx, spec, pp = get_observation(calib_bias=True, rv=rv, M_H=M_H, T_eff=T_eff, log_g=log_g, a_M=a_M)
_, temp = get_template(M_H=M_H, T_eff=T_eff, log_g=log_g, a_M=a_M)

In [0]:
spec.snr

In [0]:
rvfit = RVFit(trace=RVFitTrace())
rvfit.grid = grid
rvfit.psf = psf
rvfit.resampler = Interp1dResampler()

In [0]:
# Run full fitting
best_rv = rvfit.fit_rv(spec, temp, rv_bounds=(100, 300))

In [0]:
f, ax = plt.subplots(1, 1, figsize=(3.4, 2.5), dpi=240)

ax.plot(rvfit.trace.guess_rv, rvfit.trace.guess_log_L)
ax.plot(rvfit.trace.guess_rv, rvfit.trace.guess_fit)
ax.axvline(rv, c='k')
ax.axvline(best_rv, c='r')

ax.set_title(f'SNR = {spec.snr / pp.noise_level}')

# Run many realizations

In [0]:
from pfs.ga.pfsspec.core.util import SmartParallel
from collections.abc import Iterable

In [0]:
idx, spec, pp = None, None, None

def rvfit_mc_plot(spec, rvfit, rv_gt, rv_fit):
    f, axs = plt.subplots(2, 2, figsize=(3.4, 2.5), dpi=240)

    # Noiseless spectrum

    axs[0, 0].plot(spec.wave, spec.flux_model, '-', lw=0.3)
    axs[0, 0].plot(spec.wave, spec.flux_err, '-', lw=0.3)
    axs[0, 0].set_ylim(0, None)

    # Noisy spectrum

    axs[0, 1].plot(spec.wave, spec.flux, '-', lw=0.3)
    axs[0, 1].plot(spec.wave, spec.flux_model, '-', lw=0.3)

    axs[0, 1].set_xlabel(r'$\lambda$')
    axs[0, 1].set_ylabel(r'$F\_\lambda$')

    axs[0, 1].set_xlim(0.99 * spec.wave.min(), 1.01 * spec.wave.max())
    axs[0, 1].set_ylim(0, np.quantile(spec.flux, 0.99) * 1.2)

    # Calibration bias

    if spec.flux_calibration is not None:
        axs[1, 0].plot(spec.wave, spec.flux_calibration, '-', lw=0.3)

    # RV fit

    axs[1, 1].plot(rvfit.trace.guess_rv, rvfit.trace.guess_log_L)
    axs[1, 1].plot(rvfit.trace.guess_rv, rvfit.trace.guess_fit)
    axs[1, 1].axvline(rv, c='k')
    axs[1, 1].axvline(best_rv, c='r')

    f.suptitle(f'mag = {spec.mag:.2f}, SNR = {spec.snr:.2f}')

    f.tight_layout()

def rvfit_mc_worker(i, rv=180, rv_bounds=(100, 300), calib_bias=False, mag=22, noise_level=1.0,
        M_H=-0.5, T_eff=5500, log_g=1.0, a_M=0.0):

    global idx, spec, pp
    
    # Sample RV randomly or use a constant value
    if isinstance(rv, Iterable):
        rv_gt = np.random.uniform(*rv)
        print(rv_gt)
        spec = None
    else:
        rv_gt = rv
    
    if spec is None:
        idx, spec, pp = get_observation(rv=rv_gt, mag=mag, M_H=M_H, T_eff=T_eff, log_g=log_g, a_M=a_M,
            calib_bias=calib_bias, noise_level=noise_level, noise_freeze=False)

        # Reset initial value for optimization so that it will try to guess it from scratch
        rvfit.rv0 = None

    # Copy spectrum before generating the noise to keep original to be reused
    # when rv_gt is constant
    nspec = type(spec)(orig=spec)
    nspec.flux_model = nspec.flux
    nspec.apply_noise(pp.observation.noise_model, noise_level=noise_level)
    rv_fit = rvfit.fit_rv(nspec, temp, rv_bounds=rv_bounds)

    if False:
        rvfit_mc_plot(nspec, rvfit, rv_gt, rv_fit)

    return i, rv_gt, rv_fit

def rvfit_mc(mc_count=100, mag=22, **kwargs):
    
    """
    Fit RV to many realizations of the same observation. Sample in a range of
    magnitudes.
    """

    global idx, spec, pp

    rv_gt = {}
    rv_fit = {}
    rv_err = {}

    if not isinstance(mag, Iterable):
        mag = [ mag ]

    t = tqdm(total=mc_count * len(mag))

    for m in mag:
        idx, spec, pp = None, None, None

        rvfit = RVFit(trace=RVFitTrace())        
        rvfit.grid = grid
        rvfit.psf = psf
        rvfit.resampler = Interp1dResampler()

        rv_gt[m] = np.empty((mc_count,))
        rv_fit[m] = np.empty((mc_count,))
        rv_err[m] = np.empty((mc_count,))
        with SmartParallel(verbose=False, parallel=False, threads=12) as p:
            for i, res_rv_gt, res_rv_fit in p.map(rvfit_mc_worker, list(range(mc_count)), mag=m, **kwargs):
                rv_gt[m][i] = res_rv_gt
                rv_fit[m][i] = res_rv_fit
                t.update(1)

    return rv_gt, rv_fit, rv_err

In [0]:
rv_gt = {}
rv_fit = {}
rv_err = {}

N = 1

for cb in [True, False]:
    rv_gt[cb], rv_fit[cb], rv_err[cb] = rvfit_mc(mc_count=N, rv=180, mag=[22, 23], calib_bias=cb)

In [0]:
m = 23

f, ax = plt.subplots(1, 1, figsize=(3.4, 2.5), dpi=240)

hist, bins = np.histogram(rv_fit[False][m] - rv_gt[False][m], bins=30, density=True)
ax.step(0.5 * (bins[1:] + bins[:-1]), hist, where='mid', label='no flux bias')

hist, bins = np.histogram(rv_fit[True][m] - rv_gt[True][m], bins=30, density=True)
ax.step(0.5 * (bins[1:] + bins[:-1]), hist, where='mid', label='with flux bias')

# ax.axvline(rv, c='r', label="ground truth")

ax.set_xlabel(r'$\Delta\,$RV [km s$^{-1}]$')
ax.set_ylabel('')

ax.set_title(f"mag = {m:.2f}")

ax.legend()

f.tight_layout()

In [0]:
import pickle

# with open("rv_fit.dat", "wb") as f:
#     pickle.dump((rv_fit, rv_gt, rv_err), f)

with open("rv_fit.dat", "rb") as f:
    (rv_fit, rv_gt, rv_err) = pickle.load(f)