# CandidateFinder to locate a moving object in a small region

**Author:** Aram Lee
**Date:** 2025-03-14
**File Name:** CandidateFinder.ipynb

### [Description]
Based on the classification results from the predictor, the likelihood of a moving object being present in each time series of 64×64 pixel sub-images is measured. Time series with high likelihoods are referred to as "candidates" and represent the final product of this workflow. Once a moving object is located within the small 64×64 pixel region, a traditional method can be used to determine its precise sky coordinates in addition to this workflow.

### [Package requirements]
- numpy: 1.26.4
- astropy: 6.1.0
- pandas: 1.4.3
- matplotlib: 3.8.3

### [Workflow]  

Steps 1-3 are for training the model, and steps 4-6 are for using the model to detect TNOs.

|Step|File|Input|Output|Purpose|
|-|-|-|-|-|
|1|ImageCutter.ipynb|.fits (with artificial moving objects), .plantlist (artificial objects info)| .npy|Extract sub-images for training|
|2|Concatenator.ipynb|.npy (sub-images from ImageCutter)|.npy|Prepare dataset for training|
|3|Trainer.ipynb|.npy (dataset from Concatenator), .npy (target information)|.h5 (trained CNN models)|Train the model|
|-|-|-|-|-|
|4|ImageCutter.ipynb|.fits (without artificial moving objects)|.npy|Extract sub-images for detection|
|5|Predictor.ipynb|.npy (sub-images from ImageCutter), .npy (target info), .h5 (model)|.npy|Apply trained model to detect objects|
|6a|Link_sources_to_objects.py|.npy (classification and regression output from Predictor)|.npy|Detect moving objects (linear fitting method)|
|6b|CandidateFinder.ipynb **(Here)**|.npy (classification output from Predictor), .npy (sub-images, target info)|.csv|Detect moving objects (scoring method)|

In [7]:
import numpy as np
import pandas as pd
import scipy
import copy

import os
from matplotlib import pyplot as plt
import matplotlib.image as mpimg

# Astropy!
from astropy.visualization import simple_norm
from astropy.visualization import MinMaxInterval
from astropy.visualization import ZScaleInterval,ImageNormalize,SqrtStretch
from astropy.coordinates import SkyCoord
from astropy.table import Table
import logging 
# Pandas!
import pandas as pd
from pandas import DataFrame

logging.basicConfig(level=logging.WARNING)
logging.debug(f'Packages are loaded.')

from glob import glob
from astropy.io import fits
from astropy.wcs import WCS

def string_to_expnum(string, start=-5, end=-3, offset=13):
    return int(string[start:end])-offset

In [3]:
# We have a time series of 44 images, and 946 unique index combinations (44 choose 2) were generated for image pairs.

pairs = []
for k1 in range(43):
    for k2 in range(k1+1,44):
        pairs.append([k1,k2])
        
pairs = np.array(pairs)

In [3]:
# Define functions to load images, labels on the images, and CNN-predicted classification output. Change filepaths before use.

def load_img(number_ccd):
    img = np.load(f'cut_images/cut{number_ccd:02d}.npy', allow_pickle=True)
    return img

def load_tar(number_ccd):
    tar = np.load(f'measured/measured_allchip/tar{number_ccd:02d}.npy', allow_pickle=True)
    return tar

def load_prob(number_ccd):
    prob = np.load(f'measured/measured_allchip/pred_bin{number_ccd:02d}.npy', allow_pickle=True)
    return prob

In [3]:
def extract(lst, start=0, stop=1):
    return np.array([item[start:stop] for item in lst])

In [4]:
ccd_n = 13
img = load_img(ccd_n)
tar = load_tar(ccd_n)
prob = load_prob(ccd_n).astype('d')

k = 100
def my_function(x):
    return k*(x-(1-1/k))
vec_func = np.vectorize(my_function)

prob_m = copy.deepcopy(prob)
prob_m[prob_m<(1-1/k)] = (1-1/k)
result = vec_func(prob_m)

tar_bin = extract(tar, start=0, stop=2)
score = []   
artificialness = []
prob_each = []
p_avgs = []
for i in range(int(len(prob)/946)):
    artificial = np.sum(tar_bin[i*946:(i+1)*946])
    artificialness.append(artificial)
    if artificial == 0:
        score_i = np.sum(result[i*946:(i+1)*946])
        prob_i = prob[i*946:(i+1)*946]
        prob_i = prob_i.flatten()
    else:
        score_i = np.NINF
        prob_i = np.full(946*2, np.NaN)
    p_avg = []
    for j in range(44):
        exp = pairs.flatten()==j
        p_avg_each = np.sum(prob_i*exp)/43
        p_avg.append(p_avg_each)
    p_avgs.append(p_avg)
    score.append(score_i)
    
score = np.array(score)
score_argsort = np.argsort(score)[::-1]
score_sorted = score[score_argsort]

In [6]:
# The tno_movie function displays candidates in a time series for candidate vetting.

%matplotlib inline
from matplotlib import animation
from IPython.display import HTML

def tno_movie(search_n=None, save=None, where=None):
    if where != None:
        where = where[0]*72+where[1]
    else:
        where = score_argsort[search_n]
    imgs, imgps = [], []
    for exp in range(44):
        imgs.append(img[exp, where][1])
        imgps.append(img[exp, where][3][-2:])

    fig = plt.figure(figsize=(9,6.25))
    plt.rcParams['axes.facecolor']='white'
    plt.rcParams['savefig.facecolor']='white'
    im = plt.imshow(imgs[0], norm = ImageNormalize(imgs[0], interval=ZScaleInterval(), stretch=SqrtStretch()))
    info_text = plt.text(
        0.5, -1.5,
        f"Frame 0, CCD {ccd_n:02d}, Cand {search_n:02d}, ({where//72:02d}, {where%72:02d}), ({imgps[0][0]:06.1f}, {imgps[0][1]:06.1f}, Prob {p_avgs[where][0]:.2f})",
        fontstyle = 'italic')
    plt.close() # this is required to not display the generated image
    
    def init():
        im.set_data(imgs[0])

    def animate(i):
        im.set_data(imgs[i])
        info_text.set_text(
            f"Frame {i:02d}, CCD {ccd_n:02d}, Cand {search_n:02d}, ({where//72:02d}, {where%72:02d}), ({imgps[i][0]:06.1f}, {imgps[i][1]:06.1f}), Prob {p_avgs[where][i]:.2f}")
        return im
    
    anim = animation.FuncAnimation(fig, animate, init_func=init, frames=44, interval=125)
    
    if save == None or save == 0:
        return HTML(anim.to_html5_video())
    
    elif save == 1:
        anim.save(f'Figures/animations/ch_{ccd_n:02d}_ca_{search_n:02d}_po_{(where//72):02d}_{(where%72):02d}.gif', dpi=150)

In [7]:
tno_movie(0,)

In [None]:
# a function to make a figure of the sub-image time-series.

def show_all_images(search_n, save=0):
    # print(f"This is the {search_n}th candidate from {ccd_n}th chip.")

    where = score_argsort[search_n]
    imgs, imgps = [], []
    for exp in range(44):
        imgs.append(img[exp, where][1])
        imgps.append(img[exp, where][3][-2:])

    fig, axs = plt.subplots(nrows=7,ncols=7, figsize=(22,20))
    axs = axs.flatten()
    for i, imgi, imgpi, ax in zip(range(44), imgs, imgps, axs):
        norm = ImageNormalize(imgi, interval=ZScaleInterval(), stretch=SqrtStretch())
        ax.imshow(imgi, norm=norm)
        ax.text(0.5, -1.5, f"{i}, {ccd_n}, {where}, ({imgpi[0]:.1f}, {imgpi[1]:.1f})", fontstyle = 'italic')
        # ax.text(0.5, -1.5, f"{i}, {ccd_n}, {where}, {score_sorted[search_n]}, ({imgpi[0]:.1f}, {imgpi[1]:.1f})", fontstyle = 'italic')
    plt.tight_layout()
    
    if save == 1:
        plt.savefig(f'Figures/Linked/another_sky/ch_{ccd_n}_se_{search_n}_wh_{where}.png', facecolor='w', bbox_inches='tight', dpi=150)

    plt.show()