## Imported Libraries

The libraries we import here are those required for running the pipeline, these similar to the other notebooks, as well as some specific parallel processing libraries we will take advantage of.

In [1]:
# Dask puts out more advisory logging that we care for.
# It takes some doing to quiet all of it, but this recipe works.
import dask
import logging
from dask_jobqueue.slurm import SLURMCluster
from dask.dataframe.utils import make_meta

dask.config.set({"logging.distributed": "critical"})

# This also has to be done, for the above to be effective
logger = logging.getLogger("distributed")
logger.setLevel(logging.CRITICAL)

import warnings

# Finally, suppress the specific warning about Dask dashboard port usage
warnings.filterwarnings("ignore", message="Port 8787 is already in use.")

from pathlib import Path

import numpy as np
import pandas as pd
from astropy.io import ascii
import matplotlib.pyplot as plt

from hats import read_hats

import lsdb

from catalog_filtering import bandFilterLenient, contains_PM

print("Imported libraries.")

Imported libraries.


In [2]:
# Directory variables
CATALOG_DIR = Path("../../catalogs")
DES_NAME = "des_light"
DES_DIR = CATALOG_DIR / DES_NAME
DES_MARGIN_CACHE_NAME = "des_margin_cache_18_arcsec"
DES_MARGIN_CACHE_DIR = CATALOG_DIR / DES_MARGIN_CACHE_NAME
RESULTS_NAME = "des_hpms"
RESULTS_DIR = CATALOG_DIR / RESULTS_NAME

# Filter variables
bandList = ['G','R','I','Z','Y']
class_star = None
spread_model = 0.05
magnitude_error = 0.05
check_flags = True
check_invalid_mags = True
query_string = bandFilterLenient(
    bandList,
    classStar=class_star,
    spreadModel=spread_model,
    magError=magnitude_error,
    flag=check_flags,
    invalidMags=check_invalid_mags
)
des_cols = (
    [f'CLASS_STAR_{band}' for band in bandList] + 
    [f'FLAGS_{band}' for band in bandList] + 
    ['RA','DEC','COADD_OBJECT_ID'] + 
    [f'SPREAD_MODEL_{band}' for band in bandList] + 
    [f'WAVG_MAG_PSF_{band}' for band in bandList] + 
    [f'WAVG_MAGERR_PSF_{band}' for band in bandList]
)

#Algorithm variables
k = 1
max_obj_deviation = 0.2
pm_speed_min = 2000 #units are milliseconds per year
pm_speed_max = 10**5
cone_search_rad = 50
max_neighbor_dist = 18
xmatch_max_neighbors = 100
min_neighbors = 3

'''
TODO: Verify with Kostya that this is what is expected
'''
# Computing Variables:
queue = "RM-shared" #SBATCH -p RM-shared
account_name = "jpassos"
memory_size = "x GB"
num_cores = int
job_extra = [f'--ntasks-per-node={num_cores}']
walltime_per_worker_job = "DD:HH:MM"
pre_worker_launch_commands = [
    "source ~/.bashrc",
    "conda activate lsdb-main"
]

## Catalog Filtering 

Before running the algorithm, we first filter for stars and later group sizes. We filter for group sizes via the function defined below:

In [73]:
'''map_partitions() compatible function which filters catalog for objects groups of a minimum size. Objects in the same group
are considered "neighbors". It is assumed that this catalog has already been crossmatched.

Args:
    - df: passed by map_partitions(), dataframe which is filtered by function
    - min_neighbors: minimum groupsize to filter for, any groups smaller than this will be filtered out

Returns: Dataframe filtered for group sizes greater than or equal to min_neighbors.

Note: Because we are crossmatching without margin cache, we will be losing some potential matches at the border of healpix pixels.
'''

def n_neighbors_filter(df, min_neighbors):

    neighbors = df.groupby('_healpix_29')['_dist_arcsec'].count()
    neighbors -= 1 #Double counting adjustment
    neighbors.name = 'neighbors'
    df = df.join(neighbors, on='_healpix_29')
    xmatch_large_groups = df.query(f'neighbors >= {min_neighbors}')

    return xmatch_large_groups


## Define Client and Cluster

Here we define the client and cluster (both objects from dask) to structure the computation of our data pipeline for parallel computing.

In [None]:
cluster = SLURMCluster(
    queue=queue
    account=account_name
    memory=memory_size
    cores=numcores
    job_extra_directives=job_extra
    walltime=walltime_per_worker_job
    job_script_prologue=pre_worker_launch_commands
)

client = Client(cluster)

In [None]:
des2_catalog = lsdb.read_hats(DES_DIR, margin_cache=DES_MARGIN_CACHE_DIR)
des2_catalog

In [None]:
'''
This is be valuable when choosing the optimal number
of workers to process the partitions.

TODO: Implement this into dask cluster worker args
'''
num_pixels = len(des2_catalog.get_healpix_pixels())

## Running the Algorithm

We first define the algorithm through kth_star_min_distance and other helper functions. 

In [89]:
'''Evaluates the magnitude of the cross product of two vectors. When one vector is a vector from a line in 2D space and the other
is a vector pointing from a point on that line to some arbitrary point in 2D space, Q, this will return the distance of point Q 
from that line. This is relevant to finding the deviation of objects from projections.

Args:
    - line_vector: line vector of a projection
    - PQ: vector starting at point P and pointing to point Q

Returns: Magnitude of the cross product of two vectors, or distance of point Q from line with line_vector.
'''
def distance_to_line(line_vector, PQ):
    #Add zero for proper cross product
    PQ_3D = np.append(PQ, 0)
    line_vector_3D = np.append(line_vector, 0)
    return np.linalg.norm(np.abs(np.cross(PQ_3D, line_vector_3D)) / np.linalg.norm(line_vector_3D))

# Consider making the RA and DEC columns arguments into this func

'''The kth star algorithm. Given a group of objects, this algorithm will project lines between two objects and calculate the 
kth smallest deviation of other stars within the group from this line. For example, if k = 4 and the kth minimum distance is zero,
then we know that 4 stars are in perfect alignment, with may be one high PM star.

Args:
    - group: groupby object which holds all the data of one group of objects
    - k: number of objects we seek to filter for alignment
    - cols_to_keep: array of column names which are kept after running algorithm. Since we are working with a self-crossmatched
      catalog, this is useful because we can drop the repeated columns (in this case they all end with "_1")

Returns: group passed in as an argument with an additional column containing the kth smallest deviation from a line projection.
'''
def kth_star_min_distance(group, k):
    origin_ra, origin_dec = group['RA_1'], group['DEC_1']
    ra2, dec2 = group[["RA_2", "DEC_2"]].to_numpy().T
    
    x_vals = (ra2 - origin_ra) * np.cos(np.radians(origin_dec)) * 3600
    y_vals = (dec2 - origin_dec) * 3600
    delta_coords = np.vstack((x_vals, y_vals)).T 

    kth_distances = [None] * len(delta_coords)

    for i in range(len(delta_coords)):
        proj_vector = delta_coords[i]
        distances = []

        if np.all(proj_vector == 0): continue #ensures vector points to a point that is not the origin

        for j in range(len(delta_coords)):
            if np.all(delta_coords[j] == 0) or (j == i): continue
            
            distances.append(distance_to_line(proj_vector, delta_coords[j]))

        
        kth_distances[i] = sorted(distances)[k]

    group['kth_min_proj_error'] = kth_distances

    return group['kth_min_proj_error']

'''map_partitions() compatible function which runs the kth star algorithm on a catalog already crossmatched
and filtered for groups.

Args:
    - df: passed by map_partitions(), dataframe which is modified by function
    - k: number of stars which we seek to be in aligment

Returns: Catalog with "kth_min_proj_error" column, which is the kth smallest deviation of a star with alignment
with other stars.
'''

# Make .apply function return only the kth_min_proj_error column, then join that on original df, and drop the _1 cols in apply_kth_star()
def apply_kth_star(df, k):
    cols_to_keep = (
        [col for col in df.columns if col.endswith('_2')]
        + ['kth_min_proj_error']
    )

    kth_col = (
        df.groupby('_healpix_29')
        .apply(kth_star_min_distance, k=k-1)
    )

    df['kth_min_proj_error'] = kth_col

    return df[cols_to_keep]


'''Executes the high PM star data pipeline.

Args:
    - catalog: Catalog in HATS format we are seeking high PM stars in
    - query_string: string for initial filter of dataframe, in our case this is used for filtering for stars
    - xmatch_max_neighbors: when performing crossmatching, this is the maximum size of a group which the crossmatch will return
    - max_neighbor_dist: maximum distance between neighbors
    - k: The number of stars which we seek to be in alignment
    - max_obj_deviation: The maxmimum we want an object to deviate from alignment.

Returns: Catalog filtered for high PM stars using the kth star algorithm.

Note: Postfiltering is not yet implemented.
'''
def execute_pipeline(catalog, query_string,
                     xmatch_max_neighbors, max_neighbor_dist, min_neighbors,
                     k, max_obj_deviation):
    # Filter for stars and objects with reasonable measurement errors
    star_filtered = catalog.query(query_string)

    # Crossmatch catalog with itself and filter for group sizes >= min_ neighbors
    xmatch = star_filtered.crossmatch(
                star_filtered,
                n_neighbors=xmatch_max_neighbors,
                radius_arcsec=max_neighbor_dist,
                suffixes=['_1', '_2']
    )
    neighbors_filtered = xmatch.map_partitions(n_neighbors_filter, min_neighbors=min_neighbors)
    
    # Add column for kth minimum distance
    kth_star = neighbors_filtered.map_partitions(
        apply_kth_star, 
        k=k
    )
    
    return kth_star
    

## Check New Pipeline Functions

The previous version of this datapipeline worked directly on the underlying dataframes of these catalogs. To make the algorithm be more generalizable, we've developed these functions to work directly on catalogs. To verify that they produce the same results as the old version of this algorithm, we reproduce the results below. 

In [7]:
gaia_high_pm_stars = pd.read_csv('../gaia_high_pm_stars.csv')

gaia_high_pm_stars

Unnamed: 0,_healpix_29,CLASS_STAR_G_des,CLASS_STAR_R_des,CLASS_STAR_I_des,CLASS_STAR_Z_des,CLASS_STAR_Y_des,FLAGS_G_des,FLAGS_R_des,FLAGS_I_des,FLAGS_Z_des,...,WAVG_MAGERR_PSF_G_des,WAVG_MAGERR_PSF_R_des,WAVG_MAGERR_PSF_I_des,WAVG_MAGERR_PSF_Z_des,WAVG_MAGERR_PSF_Y_des,ra_gaia,dec_gaia,pmra_gaia,pmdec_gaia,_dist_arcsec
0,1153482605725265461,0.844888,0.845371,0.844807,0.845333,0.844827,3,3,3,3,...,-99.0,-99.0,-99.0,-99.0,-99.0,1.383284,-37.367744,5633.438088,-2334.721273,11.461677
1,1257518643390044839,0.844564,0.845075,0.845251,0.843732,0.801461,2,2,2,2,...,-99.0,-99.0,-99.0,-99.0,-99.0,33.079599,3.567385,-1762.405718,-1852.8711,1.62095
2,2368327529620177120,0.3251,0.177878,0.293946,0.140421,0.001102,0,0,0,0,...,0.236298,0.062562,0.070822,0.11552,-99.0,53.567196,-49.890084,2360.592206,483.127504,13.763072
3,2390050329526096144,0.125803,0.120065,0.028601,0.028627,0.028627,3,3,3,3,...,-99.0,-99.0,-99.0,-99.0,-99.0,62.611,-53.612997,-825.17937,-2415.577565,4.490123
4,2405297220004419732,0.194207,0.84529,0.84489,0.845284,0.029913,3,3,3,3,...,-99.0,-99.0,-99.0,-99.0,-99.0,77.959937,-45.043813,6491.223339,-5708.61415,14.437522
5,2423978658324676571,0.845425,0.845425,0.845406,0.845371,0.84513,22,22,18,18,...,-99.0,-99.0,-99.0,-99.0,-99.0,50.000344,-43.066553,3035.017316,726.964482,0.825611
6,2450054482295873745,0.844839,0.845014,0.845117,0.845267,0.844982,2,2,2,2,...,-99.0,-99.0,-99.0,-99.0,-99.0,5.03561,-64.869617,1706.746855,1164.959443,3.096527
7,2468500457058623057,0.84536,0.029197,0.845341,0.842975,0.047746,2,3,2,2,...,-99.0,-99.0,-99.0,-99.0,-99.0,32.622946,-50.820906,2125.416147,637.975043,0.425698
8,2468500457058623057,0.84536,0.029197,0.845341,0.842975,0.047746,2,3,2,2,...,-99.0,-99.0,-99.0,-99.0,-99.0,32.624069,-50.820823,2168.886011,710.847727,2.199757
9,2503116016090087218,0.141634,0.108526,0.007691,0.009382,0.014642,3,3,3,3,...,-99.0,0.096928,0.085113,0.068819,-99.0,11.341389,-33.497993,1826.373986,-1485.010343,13.386494


In [44]:
def algo_found_pm(row, des_cols, cone_search_rad, query_string, 
                  xmatch_max_neighbors, max_neighbor_dist, min_neighbors,
                  k, max_obj_deviation):    
    # Filter DES around PM of interest:
    HPMS_filtered_catalog = execute_pipeline(
        lsdb.read_hats(DES_DIR, columns=des_cols, margin_cache=DES_MARGIN_CACHE_DIR)
        .cone_search(ra=row['ra_gaia'], dec=row['dec_gaia'], radius_arcsec=cone_search_rad),
        query_string=query_string, xmatch_max_neighbors=xmatch_max_neighbors,
        max_neighbor_dist=max_neighbor_dist, min_neighbors=min_neighbors, k=k,
        max_obj_deviation=max_obj_deviation
    )    
      
    df = HPMS_filtered_catalog.compute().reset_index(drop=True, level = 0)

    return bool(not (df.query(f'kth_min_proj_error < {max_obj_deviation}').empty))

In [90]:
%%time
gaia_high_pm_stars['algo_found'] = gaia_high_pm_stars.apply(
    func=algo_found_pm, axis=1, des_cols=des_cols, 
    cone_search_rad=cone_search_rad, query_string=query_string, 
    max_neighbor_dist=max_neighbor_dist, k=k, 
    xmatch_max_neighbors=xmatch_max_neighbors,
    min_neighbors=min_neighbors,
    max_obj_deviation=max_obj_deviation
)

gaia_high_pm_stars

ValueError: Cannot set a DataFrame with multiple columns to the single column kth_min_proj_error

Here we begin running the algorithm using the map_partition() function in LSDB (see tutorial here https://docs.lsdb.io/en/stable/tutorials/pre_executed/map_partitions.html). As explained in the tutorial, this returns a "lazy" result, which we can later compute or save to storage running the .compute() or .to_hats() methods respectively. Here, we first save then compute to produce our results.

In [None]:
unrealized = execute_pipeline(catalog, query_string, k, xmatch_max_neighbors, max_neighbor_dist, min_neighbors)

In [None]:
with client:
    unrealized.to_hats(catalog_name=RESULTS_NAME,
                       base_catalog_path=RESULTS_DIR)

In [None]:
with client:
    df = lsdb.read_hats(RESULTS_DIR).compute()

df