In [None]:
__author__ = 'Carl Stubens <cstubens@noao.edu>'
__version__ = '20190731' # yyyymmdd
__datasets__ = ['']
__keywords__ = ['ANTARES', 'transient']

# ANTARES Filter Development Kit

_Carl Stubens, ANTARES Team._

_Many thanks to Mike Fitzpatrick, Adam Scott, Knut Olsen, Jennifer Andrews, Robert Nikutta._

## Summary

This Notebook demonstrates how to write filters for [ANTARES](http://antares.noao.edu) and test them against a sample of real data from [ZTF](http://ztf.caltech.edu/).

This Notebook is intended to be used in NOAO DataLab's Jupyter environment. There, you will have access to ANTARES test data. If you're not running in DataLab, [sign up for DataLab](https://datalab.noao.edu), then [log in to the notebook server](https://datalab.noao.edu/devbooks).

For new Data Lab accounts, this notebook will be automatically included in your `notebooks/` directory. Otherwise, you can save this `.ipynb` notebook file locally, and then upload it to your Data Lab Jupyter notebook server (use the 'Upload' button in the upper right corner).

## Goals

To demonstrate:

1. How to write filters using the ANTARES filter API.
1. How to test filters against a small test dataset.

Note: As of this writing, the test dataset is limited. It is intended to represent the format of ZTF data in ANTARES' format and API. It is not intended to represent the variety of data that is available, or to be suitable for training machine learning systems.

## Table of Contents

* [0. Background information on ANTARES](#background)
* [1. Connect to test database](#connect)
* [2. Write a Filter](#write)
* [3. Test a Filter](#test)
 * [3.1 Test against an Alert from test dataset](#test-one)
 * [3.2 Test against multiple Alerts](#test-many)
* [4. Submit Filter to ANTARES](#submit)

<a class="anchor" id="background"></a>
## 0. Background information on ANTARES

ANTARES receives alerts from surveys in real-time and sends them through a processing pipeline. The pipeline contains the following stages:

1. Associate the Alert with the nearest point of known past measurements within a 1" radius. We call this a Locus.
2. Discard Alerts with a high probability of being false detections.
3. Discard Alerts with poor image quality.
4. Look up associated objects in our catalogs.
5. If the Alert's Locus has two or more measurements on it, execute the Filters.

The filters are python functions which take a LocusData object as a single parameter. Functions on the LocusData provide access to the Alert's properties, the data from past Alerts on the Locus, and the associated catalog objects. The LocusData also provides functions to set new properties on the Alert, and to send it to output streams.

<a class="anchor" id="connect"></a>
## 1. Connect to test database

First, we configure the `antares` package to connect to the test database, and we test the database connection.

In [None]:
# Imports
import antares
from antares.config import config
from antares.database import engine
from antares.dev_kit.run_stage import run_stage, get_locus_data
from antares.dev_kit.fetch_data import get_alert_ids
print('Using ANTARES version', antares.__version__)

In [None]:
# Configure connections to the test database.
config.ALERT_DATABASE_URL = "mysql://antares_datalab:pro_Moonrise_epi@antdb01.dm.noao.edu:3306/antares_devkit"
config.CATALOG_DATABASE_URL = "mysql://antares_datalab:pro_Moonrise_epi@antdb01.dm.noao.edu:3308/astro_catalog"
engine.init()

In [None]:
# Test database connections
print('Connecting to Alert DB...')
print(engine.get_alert_db_engine().execute('SELECT "OK"').scalar())
print('Connecting to Catalog DB...')
print(engine.get_catalog_db_engine().execute('SELECT "OK"').scalar())

<a class="anchor" id="write"></a>
## 2. Write a Filter

The filter `example_filter` below does nothing of scientific interest, but it demonstrates the use of the filter API.

Further down, the filter `high_snr`, `extragalactic`, etc. are examples of our current science filters.

The Filter API consists of the LocusData object, which is passed to the Filter as the single parameter. The `example_filter` shows examples of how to use the LocusData.

In [None]:
def example_filter(locus_data):
    """
    A test Filter for demonstration.
    """
    print('`example_filter` is running...')

    # Get a dict of all properties on the new alert.
    print('locus_data.get_properties()')
    print('-->')
    print(locus_data.get_properties())
    print()

    # Any properties from the ZTF Alert are prefixed with 'ztf_'.
    # See here for ZTF's documentation of their properties:
    # https://github.com/ZwickyTransientFacility/ztf-avro-alert/blob/master/schema/candidate.avsc

    # Get a numpy array of values for particular properties.
    # Rows for 'alert_id' and 'mjd' are always included at the top of the array.
    # For example, in the following examples, the rows of the array will be:
    # - alert_id
    # - mjd
    # - ra
    # - dec
    # - ztf_fid
    # - ztf_magpsf
    print("locus_data.get_time_series('ra', 'dec', 'ztf_fid', 'ztf_magpsf')")
    print('-->')
    print(locus_data.get_time_series('ra', 'dec', 'ztf_fid', 'ztf_magpsf'))
    print()
    # In the following example, we specify only to include columns where ztf_fid == 2.
    print("locus_data.get_time_series('ra', 'dec', 'ztf_fid', 'ztf_magpsf', filters={'ztf_fid': 2})")
    print('-->')
    print(locus_data.get_time_series('ra', 'dec', 'ztf_fid', 'ztf_magpsf', filters={'ztf_fid': 2}))

    # get_astro_object_matches() returns a datastructure like so:
    # {catalog_name1: [match1, match2, ...],
    #  catalog_name2: [...],
    #  ...}
    print()
    print('locus_data.get_astro_object_matches()')
    print('-->')
    astro_objects = locus_data.get_astro_object_matches()
    print(astro_objects)
    print()
    print('found catalog matches from catalogs: {}'.format(list(astro_objects.keys())))
    for catalog_name, objects in astro_objects.items():
        print()
        print(catalog_name)
        for obj in objects:
            print(obj)

    # Set some new properties on this Alert.
    # Any properties that you create in this way will be stored and visible on the ANTARES website.
    # The properties will also be included in the Kafka output messages.
    # Properties may be of type `int`, `float`, or `str`.
    locus_data.set_property('x', 500)
    locus_data.set_property('y', 3.14)
    locus_data.set_property('z', 'hello')

    # Send the Alert to an output stream.
    # The name of your stream must be unique. We will check this before accepting your filter.
    # All streams are directed to Kafka output topics with the same name as the stream.
    # We can also configure your stream to send notifications to a channel in Slack.
    locus_data.send_to_stream('my_stream')

    print('`example_filter` is finished.')

The following are examples of real ANTARES filters:

In [None]:
def high_snr(ld):
    """
    Send high-SNR alerts to stream 'high_snr'.

    Should flag ~2-3% of alerts.
    """
    snr_thresholds = {
        1: 50.0,  # For filter ID 1 (g), the threshold is 50
        2: 55.0,  # For filter ID 2 (R), the threshold is 55
    }

    p = ld.get_properties()  # get all Alert properties as a dict
    fid = p['ztf_fid']  # filter ID
    sigmapsf = p['ztf_sigmapsf']  # 1-sigma uncertainty in magnitude of PSF 
    snr = 1.0 / sigmapsf  # compute SNR
    snr_threshold = snr_thresholds.get(fid, None)  # SNR threshold for this field

    if snr_threshold is not None and snr > snr_threshold:
        ld.send_to_stream('high_snr')

In [None]:
def extragalactic(ld):
    """
    Send alert to stream 'extragalactic' if it matches any extended source catalogs.
    """
    matching_catalog_names = ld.get_astro_object_matches().keys()

    # These are the catalogs (Antares-based names) with extended sources
    xsc_cats = ['2mass_xsc', 'ned', 'nyu_valueadded_gals', 'sdss_gals', 'veron_agn_qso']

    if set(matching_catalog_names) & set(xsc_cats):
        ld.send_to_stream('extragalactic')

In [None]:
def nuclear_transient(ld):
    """
    Send alert to stream 'Nuclear Transient' if it is within 0.6 arcseconds of a
    source in the ZTF reference frame. It is also required that a match within
    1" of a known Pan-STARRS galaxy (ztf_distpsnr1 < 1. and ztf_sgscore1<0.3).
    To further remove small flux fluctuaion, we also require magpsf (alert PSF
    photometry) - magnr (PSF photometry of the nearby source in the reference
    image) > 1.5. The selection criteria are from Sjoert van Velzen et al.
    (2018, arXiv:1809.02608), section 2.1.
    """
    p = ld.get_properties()
    sgscore = p['ztf_sgscore1']
    distpsnr = p['ztf_distpsnr1']
    magpsf = p['ztf_magpsf']
    magnr = p['ztf_magnr']
    distnr = p['ztf_distnr']

    if None in (distnr, distpsnr, sgscore, magpsf, magnr):
        return
    
    if distnr < 0.6 and distpsnr < 1. and sgscore < 0.3 and magpsf - magnr < 1.5:
        ld.send_to_stream("nuclear_transient")

In [None]:
def in_m31(ld):
    """
    Send alerts to stream 'in_m31' if Alert is within a 2-square-degree box
    centered on M31.
    """
    ra_max = 11.434793
    ra_min = 9.934793
    dec_max = 42.269065
    dec_min = 40.269065

    p = ld.get_properties()
    ra = p['ra']
    dec = p['dec']

    if ra_max > ra > ra_min \
    and dec_max > dec > dec_min:
        ld.send_to_stream("in_m31")

<a class="anchor" id="test"></a>
## 3. Test a filter

<a class="anchor" id="test-one"></a>
### 3.1 Test against an Alert from test dataset

We have placed a sample of the ANTARES database (sourced from ZTF) in a read-only database for testing.

Here, we run the `example_filter` against a particular Alert and its measurement history. The `run_stage` function takes an Alert ID and a filter, and runs the filter by constructing a LocusData object identical to what would be generated in the ANTARES production system.

In [None]:
alert_id = 153505

# Run the `example_filter`.
# `verbose=True` prints detailed logs.
result = run_stage(alert_id, example_filter, verbose=True)

# `run_stage` returns a dict with a report of what happened:
print()
print(list(result.keys()))
print(result['new_properties'])
print(result['new_streams'])

# You can get the LocusData object too:
ld = result['locus_data']

<a class="anchor" id="test-many"></a>
### 3.2 Test against multiple Alerts

We can also run the filter against multiple Alerts from the database:

In [None]:
# Fetch `n` Alert IDs from the test database

def run_many(f, n=10):
    return [run_stage(alert_id, f)
            for alert_id in get_alert_ids(n)]

results = run_many(example_filter, n=10)

In [None]:
# You can also get the LocusData object directly for tinkering with.

alert_id = 153505
ld = get_locus_data(alert_id)

print(ld)
print(sorted(ld.get_properties().keys()))

<a class="anchor" id="submit"></a>
## 4. Submit Filter to ANTARES

When you're ready to submit your filter to ANTARES, copy your filter function definition into the form on the ANTARES website at:

* http://antares.noao.edu/filters

You will need to provide:

* Your filter function, helper functions, and `import` statements as a single block of code.

* A unique name for your filter.

* A brief text description of your filter.

* The "handler", which is the name of the filter function in your code. This determines which function ANTARES will call. The handler name does not need to be unique outside of your code. The handler function must accept a single parameter, which is the `LocusData` object. You may name the parameter anything you like. We reccomend `locus_data` or `ld`.