In [0]:
# S/N = 15
CONFIG_FILE = '/datascope/subaru/user/dobos/gapipe/work/run21_June2025/pfsGAObject/10092/0000000600000d9d-031-0x7270e4b76e8cc231/pfsGAObject-10092-0000000600000d9d-031-0x7270e4b76e8cc231.yaml'

# S/N = 6.5
# CONFIG_FILE = '/datascope/subaru/user/dobos/gapipe/work/run21_June2025/pfsGAObject/10092/0000000600000dad-035-0x26ed645d068efdb0/pfsGAObject-10092-0000000600000dad-035-0x26ed645d068efdb0.yaml'

# S/N = 8
# CONFIG_FILE = '/datascope/subaru/user/dobos/gapipe/work/run21_June2025/pfsGAObject/10092/0000000600000e13-035-0x26ed645d068efdb0/pfsGAObject-10092-0000000600000e13-035-0x26ed645d068efdb0.yaml'

# S/N = 22
# CONFIG_FILE = '/datascope/subaru/user/dobos/gapipe/work/run21_June2025/pfsGAObject/10092/0000000600000e57-035-0x26ed645d068efdb0/pfsGAObject-10092-0000000600000e57-035-0x26ed645d068efdb0.yaml'

# S/N = 3.8
# CONFIG_FILE = '/datascope/subaru/user/dobos/gapipe/work/run21_June2025/pfsGAObject/10092/0000000600001084-035-0x26ed645d068efdb0/pfsGAObject-10092-0000000600001084-035-0x26ed645d068efdb0.yaml'

# Spurious emission lines
# CONFIG_FILE = '/datascope/subaru/user/dobos/gapipe/work/run21_June2025/pfsGAObject/10092/000000060000445f-034-0x0087f12f4eac34f1/pfsGAObject-10092-000000060000445f-034-0x0087f12f4eac34f1.yaml'

# Spiky spectra in blue arm
# CONFIG_FILE = '/datascope/subaru/user/dobos/gapipe/work/run21_June2025/pfsGAObject/10092/0000000200003dd0-018-0x30183216f317a44f/pfsGAObject-10092-0000000200003dd0-018-0x30183216f317a44f.yaml'

DEBUG = False

In [0]:
import os
import numpy as np
import matplotlib.pyplot as plt

In [0]:
if DEBUG:
    if 'debug' not in globals():
        import debugpy
        debugpy.listen(('localhost', int(os.environ.get('GAPIPE_DEBUGPORT', 5678))))
        debug = True
        print('Started debugpy, waiting for client to attach...')
    else:
        print('Debugpy is already started, waiting for client to attach...')

In [0]:
%load_ext autoreload
%autoreload 2

# Manually run the GA pipeline step by step

Here we mimic the steps taken by the script `ga-pipeline` when called from the command-line but execute the pipeline itself step by step in a Jupyter notebook. This is useful for debugging and development.

In [0]:
from pfs.ga.pipeline.scripts.run.runscript import RunScript
from pfs.ga.pipeline.gapipe import GAPipeline, GAPipelineTrace
from pfs.ga.pipeline.gapipe.config import GAPipelineConfig

In [0]:
args = {
    'config': CONFIG_FILE,
    'trace_plot_level': 2,
}
script = RunScript()
# script._init_from_args_pre_logging(args)
script.prepare()
script.start_logging()
script._init_from_args(args)

In [0]:
# Load the configuration file
config = GAPipelineConfig()
config.load(CONFIG_FILE, ignore_collisions=True)
script._update_directories(config)

In [0]:
# Verify data locations
def print_repo_variables(repo):
    for key, value in repo.variables.items():
        print(f"  {key}: {repo.get_resolved_variable(key)}")
    print()

print("Input Repository Variables:")
print_repo_variables(script.input_repo)

print("Work Repository Variables:")
print_repo_variables(script.work_repo)

In [0]:
id = str(config.target.identity)
id

In [0]:
# Initialize the pipeline

trace = GAPipelineTrace()
pipeline = GAPipeline(
    script = script,
    input_repo = script.input_repo,
    work_repo = script.work_repo,
    trace = trace,
)

pipeline.reset()
pipeline.update(config=config, id=id)

logfile = pipeline.get_product_logfile()
logdir = os.path.dirname(logfile)

trace.reset()
trace.init_from_args(script, config.trace_args)
trace.update(figdir=logdir, logdir=logdir, id=id)

logdir, trace.plot_level

In [0]:
# Prepare the pipeline for execution
steps = pipeline.create_steps()
state = pipeline.create_state()
context = pipeline.create_context(state=state, trace=trace)

## Validate step

Validate the configuration and library versions

In [0]:
from pfs.ga.pipeline.gapipe.steps import ValidateStep

step = ValidateStep()
step.run(context)

## Initialize step

Create directories, save a copy of the configuration file to the output, initialize objects.

In [0]:
from pfs.ga.pipeline.gapipe.steps import InitStep

step = InitStep()
step.run(context)

## Load step

Load the input data and other resources such as stellar grid, PFS objects, etc.

In [0]:
# List of products to be loaded
context.pipeline.required_product_types

In [0]:
from pfs.ga.pipeline.gapipe.steps import LoadStep

step = LoadStep()
step.run(context)

In [0]:
step.validate(context)

In [0]:
# Products in the cache after the load step
pipeline.product_cache

## TempFit step

In [0]:
from pfs.ga.pipeline.gapipe.steps import TempFitStep

step = TempFitStep()

In [0]:
# Initialize 
step.init(context)

In [0]:
# Read the spectra from the PFS data products and load any additional data necessary for template fitting
step.load(context)

In [0]:
# This is a no-op, reserved for future use
step.preprocess(context)

In [0]:
# Plot the distribution of S/N
f, ax = plt.subplots(1, 1, figsize=(6, 4))

for arm, specs in context.state.tempfit_spectra.items():
    snr_values = [spec.snr for spec in specs if spec is not None and spec.snr is not None]
    hist, bins = np.histogram(snr_values, bins=10)
    ax.step(bins[:-1], hist, where='post', label=f'{arm}-band')


plt.title(f'S/N Distribution')
plt.xlabel('Signal-to-Noise Ratio')
plt.ylabel('Number of Spectra')
plt.show()

In [0]:
# Plot the input spectra
cmap = plt.get_cmap('tab10')
colors = {'b': cmap(0), 'm': cmap(1), 'r': cmap(3), 'n': cmap(5)}

all_flux = np.concatenate([s.flux 
                           for ss in context.state.tempfit_spectra.values()
                           for s in ss if s is not None])
q = np.quantile(all_flux, [0.95])

for spectra in zip(*context.state.tempfit_spectra.values()):
    f, ax = plt.subplots(figsize=(10, 4))
    
    snr_wl = []
    snr_value = []
    for i, arm in enumerate(context.state.tempfit_spectra.keys()):
        s = spectra[i]

        snr_wl.append(s.wave.mean())
        snr_value.append(f'SNR in {arm} : {s.snr:.1f}')

        ax.plot(s.wave, s.flux, lw=0.5, color=colors[arm])
        ax.plot(s.wave, s.flux_err, lw=0.5, color='k', alpha=0.3)

    ax.set_ylim(-0.1 * q, 1.1 * q)
    ax.set_ylabel('Flux [ergs/s/cm^2/Angstrom]')
    ax.set_xlabel('Wavelength [Angstrom]')

    # Y axis to display different tick labes but same wavelength values
    ax2 = ax.twiny()
    ax2.set_xlim(ax.get_xlim())
    ax2.set_xticks(snr_wl)
    ax2.set_xticklabels(snr_value)
    
    ax.grid()

    ax.set_title(f'catId: {s.catid:05d} objID: {s.id:016x} visit: {s.observations.visit[0]:06d}\n'
                 f'spectrograph: {s.observations.spectrograph[0]:d}, fiberId: {s.observations.fiberId[0]:d}')

    plt.show()
    plt.close(f)

    break

In [0]:
# Run template fitting
step.run(context)

In [0]:
step.calculate_error(context)

In [0]:
step.calculate_covariance(context)

In [0]:
step.finish(context)

In [0]:
f'RV: {context.state.tempfit_results.rv_fit:.3f} +/- {context.state.tempfit_results.rv_err:.3f}'

In [0]:
for p in context.state.tempfit_results.params_fit:
    print(f"{p}: {context.state.tempfit_results.params_fit[p]:.3f} +/- {context.state.tempfit_results.params_err[p]:.3f}")

In [0]:
def plot_input_spectra(match, normalized=False, xlim=(None, None)):
    # Plot the input spectra
    cmap = plt.get_cmap('tab10')
    colors = {'b': cmap(0), 'm': cmap(1), 'r': cmap(3), 'n': cmap(5)}

    spectra, _ = context.state.tempfit.append_corrections_and_templates(
        context.state.tempfit_spectra, None,
        context.state.tempfit_results.rv_fit,
        context.state.tempfit_results.params_fit,
        context.state.tempfit_results.a_fit,
        match=match)

    all_flux = []
    for ss in spectra.values():
        for s in ss:
            if s is not None and s.flux is not None:
                m = s.mask_as_bool()
                if xlim[0] is not None:
                    m &= s.wave >= xlim[0]
                if xlim[1] is not None:
                    m &= s.wave <= xlim[1]
                if normalized:
                    all_flux.append(s.flux[m] / s.cont[m])
                else:
                    all_flux.append(s.flux[m])

    all_flux = np.concatenate(all_flux)
    q = np.quantile(all_flux, [0.95])

    for j, specs in enumerate(zip(*spectra.values())):
        f, ax = plt.subplots(figsize=(10, 4))
        
        snr_wl = []
        snr_value = []
        for i, arm in enumerate(spectra.keys()):
            s = specs[i]
            if s is not None:
                m = s.mask_as_bool()

                snr_wl.append(s.wave.mean())
                snr_value.append(f'SNR in {arm} : {s.snr:.1f}')

                if normalized:
                    ax.plot(s.wave, s.flux / s.cont, lw=0.5, color='lightgray', alpha=0.5)
                    ax.plot(s.wave, np.where(m, s.flux / s.cont, np.nan), lw=0.5, color=colors[arm])
                    ax.plot(s.wave, s.flux_model / s.cont, '-r')
                else:
                    ax.plot(s.wave, s.flux, lw=0.5, color='lightgray', alpha=0.5)
                    ax.plot(s.wave, np.where(m, s.flux, np.nan), lw=0.5, color=colors[arm])
                    ax.plot(s.wave, s.flux_model, '-r')
                    # ax.plot(s.wave, s.flux_err, lw=0.5, color='k', alpha=0.3)

        if match == 'template':
            corrected = ' (corrected)'
        else:
            corrected = ' (observed)'

        ax.set_xlim(*xlim)
        ax.set_xlabel('Wavelength [Angstrom]')

        if normalized:
            ax.set_ylim(0.0, 1.1 * q)
            ax.set_ylabel(R'Normalized flux')
        else:
            ax.set_ylim(-0.1 * q, 1.1 * q)
            ax.set_ylabel(R'Flux [ergs s$^{-1}$ cm$^{-2}$ $\AA^{-1}$]' + corrected)
        

        # Y axis to display different tick labes but same wavelength values
        ax2 = ax.twiny()
        ax2.set_xlim(ax.get_xlim())
        ax2.set_xticks(snr_wl)
        ax2.set_xticklabels(snr_value)
        
        ax.grid()

        if s is not None:
            ax.set_title(f'catId: {s.catid:05d} objID: {s.id:016x} visit: {s.observations.visit[0]:06d}\n'
                        f'spectrograph: {s.observations.spectrograph[0]:d}, fiberId: {s.observations.fiberId[0]:d}')

        plt.show()
        plt.close(f)

        break



In [0]:
plot_input_spectra(match='spectrum')

In [0]:
plot_input_spectra(match='template')

In [0]:
plot_input_spectra(match='template', normalized=True)

In [0]:
plot_input_spectra(match='spectrum', normalized=True, xlim=(4950, 5150))

In [0]:
plot_input_spectra(match='template', normalized=True, xlim=(5200, 5600))

In [0]:
# Plot the flux correction

# Plot the input spectra
cmap = plt.get_cmap('tab10')
colors = {'b': cmap(0), 'm': cmap(1), 'r': cmap(3), 'n': cmap(5)}

spectra, _ = context.state.tempfit.append_corrections_and_templates(
        context.state.tempfit_spectra, None,
        context.state.tempfit_results.rv_fit,
        context.state.tempfit_results.params_fit,
        context.state.tempfit_results.a_fit,
        match='template')

f, ax = plt.subplots(figsize=(10, 4))

for arm in spectra:
    s = spectra[arm][0]
    ax.plot(s.wave, s.flux_corr, '-', c=colors[arm])

ax.axhline(1.0, ls='--', c='k', lw=0.5)

ax.set_ylim(0.3, 2.0)
ax.set_xlabel('wavelength [A]')
ax.set_ylabel('flux correction')

ax.set_title(f'catId: {s.catid:05d} objID: {s.id:016x} visit: {s.observations.visit[0]:06d}\n'
             f'spectrograph: {s.observations.spectrograph[0]:d}, fiberId: {s.observations.fiberId[0]:d}')

In [0]:
# Clean up any temporary stuff
step.cleanup(context)

## Coadd step

In [0]:
from pfs.ga.pipeline.gapipe.steps import CoaddStep

step = CoaddStep()

In [0]:
step.init(context)

In [0]:
step.run(context)

In [0]:
step.cleanup(context)

In [0]:
def plot_coadd(match='spectrum', normalized=False, xlim=(None, None)):
    # Plot the coadded spectra
    cmap = plt.get_cmap('tab10')
    colors = {'b': cmap(0), 'm': cmap(1), 'r': cmap(3), 'n': cmap(5)}

    spectra = context.state.coadd_results.coadd_spectra

    all_flux = []
    for ss in spectra.values():
        for s in ss:
            if s is not None:
                m = s.mask_as_bool()
                if xlim[0] is not None:
                    m &= s.wave >= xlim[0]
                if xlim[1] is not None:
                    m &= s.wave <= xlim[1]

                if normalized:
                    all_flux.append(s.flux[m] / s.cont[m])
                else:
                    all_flux.append(s.flux[m])

    all_flux = np.concatenate(all_flux)
    q = np.quantile(all_flux, [0.95])

    for j, specs in enumerate(zip(*spectra.values())):
        f, ax = plt.subplots(figsize=(10, 6))
        
        snr_wl = []
        snr_value = []
        for i, arm in enumerate(spectra.keys()):
            s = specs[i]

            snr_wl.append(s.wave.mean())
            snr_value.append(f'SNR in {arm} : {s.snr:.1f}')

            m = s.mask_as_bool()

            if normalized:
                ax.plot(s.wave, s.flux / s.cont, lw=0.5, color='lightgray')
                ax.plot(s.wave, np.where(m, s.flux / s.cont, np.nan), lw=0.5, color=colors[arm])
                ax.plot(s.wave, s.flux_model / s.cont, '-r')
                ax.plot(s.wave, s.flux_err / s.cont, lw=0.5, color='k', alpha=0.3)
            else:
                ax.plot(s.wave, s.flux, lw=0.5, color='lightgray')
                ax.plot(s.wave, np.where(m, s.flux, np.nan), lw=0.5, color=colors[arm])
                ax.plot(s.wave, s.flux_model, '-r')
                ax.plot(s.wave, s.flux_err, lw=0.5, color='k', alpha=0.3)

        if match == 'template':
            corrected = ' (corrected)'
        else:
            corrected = ' (observed)'

        ax.set_xlim(xlim)
        ax.set_xlabel('Wavelength [Angstrom]')

        if normalized:
            ax.set_ylim(0.0, 1.1 * q)
            ax.set_ylabel(R'Normalized flux')
        else:
            ax.set_ylim(-0.1 * q, 1.1 * q)
            ax.set_ylabel(R'Flux [ergs s$^{-1}$ cm$^{-2}$ $\AA^{-1}$]' + corrected)
        

        # Y axis to display different tick labes but same wavelength values
        ax2 = ax.twiny()
        ax2.set_xlim(ax.get_xlim())
        ax2.set_xticks(snr_wl)
        ax2.set_xticklabels(snr_value)
        
        ax.grid()

        ax.set_title(f'catId: {s.catid:05d} objID: {s.id:016x} nvisit: {len(s.observations.visit)}\n'
                    f'spectrograph: {s.observations.spectrograph[0]:d}, fiberId: {s.observations.fiberId[0]:d}\n'
                    f'RV = {context.state.tempfit_results.rv_fit:.1f} +/- {context.state.tempfit_results.rv_err:.1f} km/s '
                    f'[M/H] = {context.state.tempfit_results.params_fit["M_H"]:.1f} $\\pm$ {context.state.tempfit_results.params_err["M_H"]:.1f} dex     '
                    f'$T_\\mathrm{{eff}}$ = {context.state.tempfit_results.params_fit["T_eff"]:.0f} $\\pm$ {context.state.tempfit_results.params_err["T_eff"]:.0f} K     '
                    f'$\\log g$ = {context.state.tempfit_results.params_fit["log_g"]:.1f} $\\pm$ {context.state.tempfit_results.params_err["log_g"]:.1f}')

        f.tight_layout()

        plt.show()
        plt.close(f)

In [0]:
plot_coadd(match='spectrum')

In [0]:
plot_coadd(match='spectrum', normalized=True, xlim=(3850, 4000)) # Ca HK

In [0]:
plot_coadd(match='spectrum', normalized=True, xlim=(8450, 8700)) # Ca II triplet region

In [0]:
plot_coadd(match='template', normalized=True, xlim=(8450, 8700)) # Ca II triplet region

## ChemFit step

## Save step

This saves the results to a FITS file.

In [0]:
from pfs.ga.pipeline.gapipe.steps import SaveStep
step = SaveStep()

In [0]:
# step.run(context)

## Cleanup step

In [0]:
from pfs.ga.pipeline.gapipe.steps import CleanupStep
step = CleanupStep()

In [0]:
step.run(context)