In [1]:
from bs4 import BeautifulSoup
from astropy.table import QTable, Table
import numpy as np
import astropy.units as u
import pandas as pd
from astropy.coordinates import SkyCoord
from pathlib import Path, PurePath
from glob import glob
import datetime

save = True

In [None]:
wanted_columns = [
    "SIMBAD",
    "HD",
    "HIP",
    "SpType",
    "RotVel", # km/s, v sin(i)
    "V",
    "R",
    "H",
    "K",
    "LDD",
    "e_LDD",
    "e_LDD_rel",
    "UD_V",
    "UD_R",
    "UD_H",
    "UD_K",
    "vis2",
    "vis2Err",
]

hard_cuts = {
    "R": [-2, 5.5],
    "H": [-2, 5.5],
    "LDD": [0, 0.7],
    "Sep": [0 * u.deg, 10 * u.deg],
#    "RotVel": [0, 100],
}

optimal_dra = 0.5 * u.hourangle

max_baseline = 330 * u.m
obs_wavelength = 1.65 * u.um
min_scale = ((obs_wavelength / max_baseline) * u.rad).to(u.mas)
print(min_scale)

searchcal_out_table_dir = Path("./searchcal_output/")
sifted_table_dir = Path("./spica_tables/")
sifted_table_pattern = sifted_table_dir / "stem.ecsv"

searchcal_tables = glob(str(searchcal_out_table_dir/"*.csv"))
#print(searchcal_tables)

wiki_wanted_columns = [ # Column name, unit, precision
    ["SIMBAD", None, None],
    ["HD", None, 0],
    ["HIP", None, 0],
    ["Sep", u.deg, 2],
    ["RASep", u.hourangle, 2],
    ["DECSep", u.deg, 2],
    ["OPTSep", u.deg, 2],
    ["SpType", None, None],
    ["RotVel", None, 1], # km/s, v sin(i)
    ["V", None, 2],
    ["R", None, 2],
    ["H", None, 2],
    ["K", None, 2],
    ["LDD", None, 3],
    ["e_LDD", None, 3],
    ["e_LDD_rel", None, 3],
    ["UD_V", None, 3],
    ["UD_R", None, 3],
    ["UD_H", None, 3],
    ["UD_K", None, 3],
    ["vis2", None, 3],
    ["vis2Err", None, 3],
    ["Scores", None, 1],    
]

1.0313240312354817 mas


In [None]:
for t in searchcal_tables:
    df = pd.read_csv(t, comment = "#")
    df.drop(df.columns[len(df.columns)-1], axis=1, inplace=True)

    searchcal_table = Table(df.values, names = df.columns)
    coords = []
    dists = []
    ra_dists = []
    dec_dists = []
    optimal_point_dists = []
    row1 = searchcal_table[0]
    row1_coord = SkyCoord(row1["RAJ2000"], row1["DEJ2000"], unit = (u.hourangle, u.deg))
    optimal_points = [
        SkyCoord(ra=row1_coord.ra - optimal_dra, dec=row1_coord.dec),
        SkyCoord(ra=row1_coord.ra + optimal_dra, dec=row1_coord.dec),        
    ]

    for row in searchcal_table:
        coord = SkyCoord(row["RAJ2000"], row["DEJ2000"], unit = (u.hourangle, u.deg))
        coords.append(coord)
        dist = row1_coord.separation(coord)
        dists.append(dist)
        ra_dist = (coord.ra - row1_coord.ra).to(u.hourangle)
        ra_dists.append(ra_dist)
        dec_dist = (coord.dec - row1_coord.dec).to(u.deg)
        dec_dists.append(dec_dist)

        if np.isclose(dist, 0):
            optimal_point_dists.append(np.nan * u.deg)
            continue
        
        optimal_seps = []
        for p in optimal_points:
            optimal_seps.append(p.separation(coord).to(u.deg).value)
        optimal_point_dists.append(np.amin(optimal_seps) * u.deg)

    eval_table = searchcal_table[wanted_columns]

    eval_table.add_columns((coords, dists, ra_dists, dec_dists, optimal_point_dists), indexes = (3, 3, 3, 3, 3), names = ("RADEC", "Sep", "RASep", "DECSep", "OPTSep"))

    def CutTable(table, cuts = hard_cuts):
        for col, lims in cuts.items():
            table = table[np.logical_and(table[col] >= lims[0], table[col] <= lims[1])]
        return table

    cut_table = CutTable(eval_table)

    new_HDs = []
    for hd in cut_table['HD']:
        new_HDs.append("{:0.0f}".format(hd).strip("."))
    cut_table['HD'] = new_HDs

    def CalibratorScore(row):
        if np.isclose(row["Sep"], 0):
            return np.inf

        # Separation from optimal calibrator placement
        ra_score = np.abs(optimal_dra.value - np.abs(row['RASep']))
        ra_scale = 0.1
        
        optsep_score = 10 - row['OPTSep']
        optsep_scale = 0.1

        # Rotational Velocity Score
        rotvel_score = float(row['RotVel'] < 200)
        rotvel_scale = 1.0

        # UDD Score
        udd_score = float(row['UD_H'] < min_scale.to(u.mas).value)
        udd_scale = 0.0

        # LDD Score
        ldd_score = 0.7 - row['LDD']
        ldd_scale = 10.0

        # Rmag Score
        rmag_score = 5.5 - row['R']
        rmag_scale = 5.0

        # Rotational velocity score
        rotvel_score = int(row['RotVel'] < 100)
        rotvel_scale = 100.0

        # Generate final score
        score = 0
        score += ra_score * ra_scale
        score += optsep_score * optsep_scale
        score += rotvel_score * rotvel_scale
        score += udd_score * udd_scale
        score += ldd_score * ldd_scale
        score += rmag_score * rmag_scale

        return score

    scores = []
    score_breakdowns = []
    for idx, row in enumerate(cut_table):        
        score = CalibratorScore(row)
        scores.append(score)

    cut_table.add_column(scores, index = -1, name = "Scores")

    cut_table.sort("Scores", reverse = True)

    outfname = Path(t).stem

    outpath = PurePath(sifted_table_pattern).with_stem(outfname)

    if save:
        cut_table.write(str(outpath), overwrite = True)

    mkdown_out = cut_table.to_pandas().to_markdown(index = False)

    mkdown_outpath = outpath.with_suffix(".md")

    if save:
        with open(mkdown_outpath, 'w') as f:
            f.write(mkdown_out)

    # The below was generated using GPT-4o and modified

    def is_float_like(value):
        # Check if the value is a float or a numpy float type
        if isinstance(value, float):
            return True
        elif isinstance(value, np.float32) or isinstance(value, np.float64):
            return True
        elif isinstance(value, np.generic):
            # Check if it's a numpy scalar and not an integer
            return not isinstance(value, (np.int32, np.int64, np.int8, np.int16, np.intc))
        
        return False
    
    # Function to convert DataFrame to MediaWiki table format
    def df_to_mediawiki(df):
        # Start with the table header
        mediawiki_table = "== Interferometric Calibrator Table - CHARA/SPICA ==\n"
        mediawiki_table += "{| class=\"wikitable\"\n"
        
        # Add the header row
        unit_string = "\nUnits -"
        mediawiki_table += "|+\n"
        for c in wiki_wanted_columns:
            mediawiki_table += "!{}\n".format(c[0])
            if c[1] is not None:
                unit_string += " {}: {}".format(c[0], str(c[1]).strip())


        # Add each row of the DataFrame
        for index, row in df.iterrows():
            mediawiki_table += "|-\n"
            wiki_wc_last = len(wiki_wanted_columns) - 1
            for idx, c in enumerate(wiki_wanted_columns):
                name = c[0]
                prec = c[2]

                value = row[name]
                
                if prec is None:
                    valstr = str(value)
                elif prec == 0:
                    valstr = str(round(float(value), prec)).split(".")[0]
                else:
                    valstr = str(round(float(value), prec))
                
                

                if idx != wiki_wc_last:
                    mediawiki_table += "| {} |".format(valstr)
                else:
                    mediawiki_table += "| {}\n".format(valstr)

#            rowlen = len(row)
#            for idx, value in enumerate(row):
#                if idx == rowlen-1:
#                    if is_float_like(value):
#                        mediawiki_table += "| " + str(np.round(value, 3)) + "\n"
#                    else:
#                        mediawiki_table += "| " + str(value) + "\n"
#                else:
#                    if is_float_like(value):
#                        mediawiki_table += "| " + str(np.round(value, 3)) + " |"
#                    else:
#                        mediawiki_table += "| " + str(value) + " |"
        
        # End the table
        mediawiki_table += "|}"

        generator_link = "https://drive.google.com/file/d/16YFkWxVqMv6mvf3FU8qOdNuLHu1IMyWc/view?usp=sharing"
        generator_time = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d")
        mediawiki_table += "\n{}".format(unit_string)
        mediawiki_table += "\n\nTable generated by Nick Schragal on {} using [https://github.com/NicholasSchragal/searchcal_tables methodology described here].".format(generator_time)
        
        return mediawiki_table

    # Convert the DataFrame to MediaWiki format
    mediawiki_output = df_to_mediawiki(cut_table.to_pandas())
    # Write the output to a text file
    if not save:print(mediawiki_output)
    if save:
        with open(outpath.with_suffix(".txt"), 'w') as f:
            f.write(mediawiki_output)