# Find mismatches

Source files are created from the APPLAUSE database by this query:

```
SELECT * 
FROM applause_dr4.source 
WHERE plate_id = 19019
ORDER BY source_id
```
with another separate call for the source_calib table.

We look for sources that show up in two separate plates. The ones that don't are the candidates we should look in more detail.

The script exploits parallelism to expedite the search.

In [1]:
import os, io
import csv
from math import sqrt

import multiprocessing as mp
from multiprocessing import Pool

import numpy as np

from astropy import units as u
from astropy.io import fits
from astropy.io.fits import Header, Card
from astropy.table import Table, join
from astropy.wcs import WCS
from astropy.coordinates import SkyCoord, Longitude, Latitude

from regions import PolygonSkyRegion
from mocpy import MOC

from settings import get_parameters, fname, current_dataset
from library import Worker, Worker2, is_in_jupyter

In [3]:
in_jupyter = is_in_jupyter()

print(in_jupyter)

True


In [5]:
par = get_parameters(current_dataset)
par

{'nproc': 8,
 'sextractor_flags': 4,
 'model_prediction': 0.8,
 'max_flux_threshold': 0.3,
 'elongation': 1.5,
 'annular_bin': 7,
 'flag_rim': 0,
 'min_acceptable_flux': 25000,
 'min_fwhm': 5.0,
 'max_fwhm': 7.5,
 'qfit_max': 5.0,
 'cfit_max': 5.0,
 'invert_east': [False, False],
 'invert_north': [False, False],
 'table1': 'sources_9319.csv',
 'table2': 'sources_9320.csv',
 'table1_calib': 'sources_calib_9319.csv',
 'table2_calib': 'sources_calib_9320.csv',
 'table_matched': 'table_match_9319_9320.fits',
 'table_non_matched': 'table_nomatch_9319_9320.fits',
 'table_psf_nonmatched': 'table_psf_nomatch_9319_9320.fits',
 'image1': 'GS00768_x.fits',
 'image2': 'GS00769_x.fits'}

## Read source tables generated by APPLAUSE database

From both **applause_dr4.source** and **applause_dr4.source_calib**. We need both, but we cannot get a joined version directly from APPLAUSE. Maybe they are limiting the output size. In any way, we get both and join them locally here.

In [None]:
table_src_1 = Table.read(fname(par['table1']), format='ascii.csv')
table_src_2 = Table.read(fname(par['table2']), format='ascii.csv')

table_calib_1 = Table.read(fname(par['table1_calib']), format='ascii.csv')
table_calib_2 = Table.read(fname(par['table2_calib']), format='ascii.csv')

In [None]:
table_1 = join(table_calib_1, table_src_1, keys='source_id')
table_2 = join(table_calib_2, table_src_2, keys='source_id')

In [None]:
print(len(table_1), len(table_2))

In [None]:
table_1

## Remove undesirable sources

Use a variety of criteria from SEXTRACTOR to get rid of possible contaminants.

We want to clean the first table only, since only well-defined star images from the first image should be present in our sample. The second table is taken as is; we want to see everything that is near the sources selected from the firsr table. 

In [None]:
table_1copy = table_1.copy()
table_2copy = table_2.copy()

# Extraction flags 
mask = table_1copy['sextractor_flags_1'] <= par['sextractor_flags']
table_1copy = table_1copy[mask]

# Likelihood of being valid star image 
mask = table_1copy['model_prediction_1'] > par['model_prediction']
table_1copy = table_1copy[mask]

# Round objects 
mask = table_1copy['elongation'] < par['elongation']
table_1copy = table_1copy[mask]

# Ring
mask = table_1copy['annular_bin_1'] <= par['annular_bin']
table_1copy = table_1copy[mask]

# Is source at plate rim ?
mask = table_1copy['flag_rim'] == par['flag_rim']
table_1copy = table_1copy[mask]

# flux threshold - set it to fraction of max peak in image
flux_threshold = max(table_1copy['flux_max']) * par['max_flux_threshold']
mask = table_1copy['flux_max'] > flux_threshold
table_1copy = table_1copy[mask]

print(len(table_1copy), len(table_2copy))

## Remove scanner artifacts

When plates were scanned twice or more, usually in two directions perpendicular to each other, all scans are included in a single table. Stars and artifacts *on* the plate will share the same celestial coordinates in both scans, but scanner-induced artifacts won't, because they move in x,y when the plate is rotated 90 deg. Thus we remove all rows that have no duplicates.  

To expedite the search, we split the job among all available performance CPUs.

In [None]:
# number of processors
nproc = par['nproc']

In [None]:
# The matching code lives in file library.py - it has to be kept in a separate 
# file because of restrictions in name space imposed by the parallelization library.

# Here, the parallelized code stores its product, the list of indices that have duplicates.
matched = []

# callback function to collect results from parallel workers
def collect_result(results):
    matched.extend(results)
    
results = []
pool = Pool(nproc)

row_range = int(len(table_1copy) / nproc)

for p in range(nproc):
    # workers are defined over ranges of rows in the table_1copy table
    worker = Worker2("w"+str(p), table_1copy, int(p*row_range), int((p+1)*row_range))

    r = pool.apply_async(worker, callback=collect_result)
    results.append(r)

for r in results:
    r.wait()

pool.close()

In [None]:
print(len(matched), " rows have duplicates")

In [None]:
# if a small number of duplicates, or none, was detected, ignore them.
if len(matched) > len(table_1copy) / 10:

    # lots of duplicates: keep only the duplicated rows
    table_1copy = table_1copy[matched]

print(len(table_1copy))

## Look for matches between 1st and 2nd tables

In [None]:
# Same code as above (can't be put in a function to avoid duplication).
#
# It generates the list of indices (in the first table) of all objects 
# for which it could find at least one matching entry in the second table.
matched = []

results = []
pool = Pool(nproc)

row_range = int(len(table_1copy) / nproc)

for p in range(nproc):
    # workers are defined over ranges of rows in the table_1copy table
    worker = Worker("w"+str(p), table_1copy, table_2copy, int(p*row_range), int((p+1)*row_range))

    r = pool.apply_async(worker, callback=collect_result)
    results.append(r)

for r in results:
    r.wait()

pool.close()

In [None]:
print(len(matched), " sources detected in 1st plate with a match in 2nd plate")

## Get the non-matched objects

Remove the matched rows from the first table. Whatever remains, is the table of objects for which a match couldn't be found.

Remove duplicated indices first, and save original table since removal is done in-place.

In [None]:
table_3 = table_1copy.copy()

m = list(dict.fromkeys(matched))
table_3.remove_rows(m)

In [None]:
table_3

## Write result to FITS table

In [None]:
table_3.write(fname(par['table_non_matched']), format='fits', overwrite=True)

In [None]:
# table with the matched objects only (selected by index array m)

table_1copy[m].write(fname(par['table_matched']), format='fits', overwrite=True)

In [None]:
table_1copy[m]