# DSA-110 Calibrator Visual Search
Use a UVH5 file and date to plot top calibrators near the pointing.


In [1]:
import os
import sys
from pathlib import Path
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from astropy.time import Time
import astropy.units as u
from astropy.coordinates import SkyCoord, AltAz

def ensure_repo_src():
    repo = Path.cwd()
    while repo != repo.parent and not (repo / 'src').exists():
        repo = repo.parent
    src = repo / 'src'
    if src.exists() and str(src) not in sys.path:
        sys.path.insert(0, str(src))
    return src

ensure_repo_src()

from dsa110_contimg.calibration.catalogs import read_vla_calibrator_catalog
from dsa110_contimg.calibration.schedule import OVRO
from dsa110_contimg.notebooks.calibrator_helper import (
    candidates_near_pointing,
    load_pointing,
    score_calibrators,
    plot_altitude_tracks,
    DEFAULT_CATALOG,
)
plt.rcParams['figure.figsize'] = (7, 4)
plt.rcParams['axes.grid'] = True


In [2]:
try:
    import ipywidgets as widgets
    from IPython.display import display, clear_output
except Exception as exc:
    print('ipywidgets unavailable; run helper functions manually. Error:', exc)
else:
    path_text = widgets.Text(description='Data path', layout=widgets.Layout(width='80%'))
    field_input = widgets.IntText(description='Field ID', value=-1, tooltip='For MS: set FIELD_ID (default auto)')
    date_text = widgets.Text(value='2025-10-08', description='Date (UTC)')
    radius_slider = widgets.FloatSlider(value=5.0, min=0.5, max=10.0, step=0.5, description='Radius [deg]')
    flux_slider = widgets.FloatSlider(value=100.0, min=0.0, max=5000.0, step=50.0, description='Min flux [mJy]')
    topn_slider = widgets.IntSlider(value=5, min=1, max=15, step=1, description='Top N')
    run_button = widgets.Button(description='Search', button_style='success')
    output = widgets.Output()

    def run_search(_):
        output.clear_output()
        data_path = path_text.value.strip()
        if not data_path:
            with output:
                print('Provide a UVH5 or MS path.')
            return
        
        # Progress indicator
        progress_widget = widgets.IntProgress(
            value=0, min=0, max=6, 
            description='Progress:', 
            bar_style='info',
            orientation='horizontal'
        )
        
        with output:
            display(progress_widget)
            print('Starting calibrator search...')
        
        try:
            # Step 1: Load pointing information
            progress_widget.value = 1
            progress_widget.description = 'Loading pointing...'
            fid = field_input.value if field_input.value >= 0 else None
            info = load_pointing(data_path, field_id=fid)
            
            # Step 2: Display basic info
            progress_widget.value = 2
            progress_widget.description = 'Processing data...'
            print('Source type:', info['source_type'])
            print('Selected field ID:', info['selected_field_id'])
            if info['fields'] is not None:
                display(pd.DataFrame(info['fields']))
            obs_time = info['mid_time']
            print('Observation mid-time UTC:', obs_time.utc.isot if obs_time else 'n/a')
            
            if info['ra_deg'] is None or info['dec_deg'] is None:
                print('Missing pointing RA/Dec; cannot evaluate calibrators.')
                progress_widget.bar_style = 'danger'
                return
            
            print('Pointing coordinates: RA {:.3f} deg, Dec {:.3f} deg'.format(info['ra_deg'], info['dec_deg']))
            
            # Step 3: Find calibrator candidates
            progress_widget.value = 3
            progress_widget.description = 'Searching catalog...'
            cal_df = candidates_near_pointing(np.radians(info['dec_deg']), radius_deg=radius_slider.value, min_flux_mJy=flux_slider.value, catalog_path=DEFAULT_CATALOG)
            
            if cal_df.empty:
                print('No calibrators within search radius/flux cuts.')
                progress_widget.bar_style = 'warning'
                return
            
            print(f'Found {len(cal_df)} calibrator candidates')
            
            # Step 4: Score calibrators
            progress_widget.value = 4
            progress_widget.description = 'Scoring calibrators...'
            scored = score_calibrators(cal_df, pointing_ra_deg=info['ra_deg'], pointing_dec_deg=info['dec_deg'], obs_time=obs_time)
            scored = scored.sort_values('weighted_flux_jy', ascending=False, na_position='last').reset_index(drop=True)
            scored['rank'] = np.arange(1, len(scored) + 1)
            
            # Step 5: Display results table
            progress_widget.value = 5
            progress_widget.description = 'Generating results...'
            display_cols = [c for c in ['rank', 'flux_20_cm', 'pb_response', 'weighted_flux_jy', 'alt_at_obs_deg'] if c in scored.columns]
            display(pd.concat([scored[['name', 'ra_deg', 'dec_deg']], scored[display_cols]], axis=1).head(topn_slider.value))
            
            # Step 6: Plot altitude tracks
            progress_widget.value = 6
            progress_widget.description = 'Plotting tracks...'
            track_df = plot_altitude_tracks(scored, date_text.value, top_n=topn_slider.value, obs_time=obs_time)
            plt.show()
            
            # Complete
            progress_widget.bar_style = 'success'
            progress_widget.description = 'Complete!'
            
            if not scored.empty:
                best = scored.iloc[0]
                print('\\n' + '='*50)
                print('RECOMMENDED CALIBRATOR')
                print('='*50)
                print('Name:', best['name'])
                if 'alt_at_obs_deg' in best and not np.isnan(best['alt_at_obs_deg']):
                    print('  Altitude at observation: {:.1f} deg'.format(best['alt_at_obs_deg']))
                if 'pb_response' in best and not np.isnan(best['pb_response']):
                    print('  PB response: {:.3f}'.format(best['pb_response']))
                if 'weighted_flux_jy' in best and not np.isnan(best['weighted_flux_jy']):
                    print('  Weighted flux: {:.3f} Jy'.format(best['weighted_flux_jy']))
                print('  Transit UTC:', best.get('transit_utc', 'n/a'))
                
                if info['ms_path']:
                    cli = 'scripts/calibrate_bandpass.sh --ms {} --radius {:.1f} --cal-catalog /data/dsa110-contimg/data-samples/catalogs/vlacalibrators.txt'.format(info['ms_path'], radius_slider.value)
                    print('\\nSuggested CLI command:')
                    print(cli)
                
                summary = {
                    'recommended_calibrator': best['name'],
                    'cal_ra_deg': float(best['ra_deg']),
                    'cal_dec_deg': float(best['dec_deg']),
                    'transit_utc': best.get('transit_utc'),
                    'obs_mid_utc': obs_time.utc.isot if obs_time else None,
                    'pointing_ra_deg': info['ra_deg'],
                    'pointing_dec_deg': info['dec_deg'],
                    'pb_response': float(best['pb_response']) if 'pb_response' in best and not np.isnan(best['pb_response']) else None,
                    'weighted_flux_jy': float(best['weighted_flux_jy']) if 'weighted_flux_jy' in best and not np.isnan(best['weighted_flux_jy']) else None,
                    'search_radius_deg': radius_slider.value,
                    'min_flux_mJy': flux_slider.value,
                }
                print('\\nSummary JSON:')
                print(json.dumps(summary, indent=2))
                if not track_df.empty:
                    print('\\nTransit info (top {}):'.format(topn_slider.value))
                    display(track_df.head(topn_slider.value))
                    
        except Exception as err:
            progress_widget.bar_style = 'danger'
            progress_widget.description = 'Error!'
            with output:
                print(f'\\nError during search: {err}')
                print('\\nPlease check your input parameters and try again.')

    run_button.on_click(run_search)
    display(widgets.VBox([
        path_text,
        field_input,
        widgets.HBox([date_text, radius_slider]),
        widgets.HBox([flux_slider, topn_slider]),
        run_button,
        output
    ]))


VBox(children=(Text(value='', description='Data path', layout=Layout(width='80%')), IntText(value=-1, descript…