# Introduction
This is a test notebook based on online tutorials of ANTARES.

### Useful links
https://nsf-noirlab.gitlab.io/csdc/antares/antares/


https://nsf-noirlab.gitlab.io/csdc/antares/client/troubleshooting.html

https://antares.noirlab.edu/tags

https://nsf-noirlab.gitlab.io/csdc/antares/antares/catalogs.html#catalogs-with-broad-distribution-of-object-radii



https://antares.noirlab.edu/properties

https://antares.noirlab.edu/faq

https://nsf-noirlab.gitlab.io/csdc/antares/client/

https://nsf-noirlab.gitlab.io/csdc/antares/antares/devkit/getting_started.html

https://nsf-noirlab.gitlab.io/csdc/antares/antares/basic_concepts.html#writing-filters

https://alerce.online

https://irsa.ipac.caltech.edu/Missions/ztf.html

https://irsa.ipac.caltech.edu/data/ZTF/docs/releases/dr07/ztf_release_notes_dr07.pdf

# Tests, Functions, Tools

## The DevKit

Important! This is about the filters!

### Getting Started with Filter Development

https://nsf-noirlab.gitlab.io/csdc/antares/antares/devkit

In [None]:
import antares.devkit as dk
dk.init()

In [None]:
class HelloWorld(dk.Filter):
    OUTPUT_TAGS = [
        {
            'name': 'hello_world',
            'description': 'hello!',
        },
    ]

    def run(self, locus):
        print('Hello Locus ', locus.locus_id)
        locus.tag('hello_world')

In [None]:
# Fetch 1 random Locus ID from the test dataset
locus_id = dk.get_locus_ids(1)[0]

# Execute HelloWorld filter on the locus
report = dk.run_filter(HelloWorld, locus=locus_id)

# `run_filter()` returns a report of what the filter did. Take a look at it:
print(report)

### Example of a Real Filter

Take a look at this example filter that the ANTARES team has written to find objects with alerts with a high signal-noise ratio. 

See more in https://antares.noirlab.edu/pipeline

In [None]:
class HighSNR(dk.Filter):
    """
    This filter detects alerts with a high SNR (over 50% in the g-band and over 55% in
    the R-band).
    """
    NAME = "High SNR"
    ERROR_SLACK_CHANNEL = ""  # Put your Slack user ID here
    REQUIRED_LOCUS_PROPERTIES = ["ztf_object_id"]
    REQUIRED_ALERT_PROPERTIES = ["ant_passband", "ztf_sigmapsf"]
    OUTPUT_LOCUS_PROPERTIES = []
    OUTPUT_TAGS = [
        {
            "name": "high_snr",
            "description": "Locus has one or more Alerts with high SNR.",
        },
    ]

    def run(self, locus):
        """
        If the most recent alert on this locus has a high SNR, 
        add the "high_snr" tag.
        """
        # The threshold is dependent on the band that is being imaged.
        # These thresholds should flag ~2-3% of alerts.
        snr_threshold = {
            "g": 50.0,
            "R": 55.0,
        }

        # Determine the passband of the most recent alert at this locus.
        alert_passband = locus.alert.properties["ant_passband"]
        if alert_passband not in snr_threshold:
            print(f"alert_passband {alert_passband} is not supported by this filter.")
            return  # Do nothing.

        # Calculate the SNR of the latest alert
        alert_snr = 1.0 / locus.alert.properties["ztf_sigmapsf"]
        alert_id = locus.alert.alert_id  # Get the ANTARES alert_id
        ztf_object_id = locus.properties["ztf_object_id"]  # Get the ZTF Object ID

        # Tag this locus if the latest alert meets our SNR criteria
        if alert_snr > snr_threshold[alert_passband]:
            print("High SNR detected")
            locus.tag("high_snr")

### Structure of a Filter

In [None]:
import antares.devkit as dk

# Filters must inherit from `dk.Filter`.
# There are no requirements for the name of the filter class other than
# that it be valid Python code.
# The formal name of the filter is whatever you enter into the form
# on the ANTARES website when you submit the filter. The name of the
# Filter class here is therefore unimportant, but ought to be descriptive.

class MyFilter(dk.Filter):

    # Required.
    #
    # This allows you to receive error logs through Slack.
    # See footnotes below for more details.
    ERROR_SLACK_CHANNEL = '<my_slack_member_id>'

    # Optional.
    #
    # List of Locus properties which the filter depends on.
    # If an incoming Alert's Locus does not have all properties listed here,
    # then the filter will not run on it.
    REQUIRED_LOCUS_PROPERTIES = [
        # eg:
        'ztf_object_id',
        # etc.
    ]

    # Optional.
    #
    # List of Alert properties which the filter depends on.
    # If an incoming Alert does not have all properties listed here, then
    # the filter will not run on it.
    REQUIRED_ALERT_PROPERTIES = [
        # eg:
        'passband',
        'mag',
        'ztf_magpsf',
        # etc.
    ]

    # Optional.
    #
    # List of Tag names which the filter depends on.
    # If an incoming Alert's Locus does not have all Tags listed here, then
    # the filter will not run on it.
    REQUIRED_TAGS = [
        # eg:
        'high_snr',
        # etc.
    ]

    # Required.
    #
    # A list of properties which your filter may set.
    # If your filter doesn't set properties, then value should be an
    # empty list.
    # 'name' must be formatted like '<author>_<property_name>'.
    # 'type' must be one of the strings: 'int', 'float', or 'str'.
    # 'description' should briefly describe what the property means.
    OUTPUT_LOCUS_PROPERTIES = [
        # eg:
        {
            'name': 'stubens_interest_score',
            'type': 'float',
            'description': 'interestingness of the alert by algorithm XYZ',
        },
        {
            'name': 'stubens_object_class',
            'type': 'str',
            'description': 'probable class of object by algorithm ABC',
        },
        # etc.
    ]

    # Required.
    #
    # A list tags names which this filter may produce.
    # If your filter does't tag Loci, then this list should be empty.
    # 'name' must be formatted like '<author>_<property_name>'.
    # 'description' should briefly describe what the tag means.
    OUTPUT_TAGS = [
        # eg:
        {
            'name': 'stubens_transients',
            'description': 'Probable transient according to method PQE'
        },
        # etc.
    ]

    # Optional.
    #
    # If your filter requires access to data files, they must be declared here.
    # See footnotes below for more details on how to work with data files.
    REQUIRES_FILES = [
        # eg:
        'soraisam_myFile.txt',
        'soraisam_myOtherFile.bin',
        # etc.
    ]

    # Optional.
    #
    # This function is called once per night when ANTARES reboots and
    # filters are instantiated. If your filter needs to do any work to prepare
    # itself to run, that logic should go here.
    # Examples:
    #  - Loading data from files
    #  - Constructing datastructures
    #  - Instantiating machine-learning model objects
    def setup(self):
        ...

    # Required.
    #
    # This is the function which is called to process an Alert.
    # All setup work should have been done in `setup()` in order to make this
    # function run as efficiently as possible.
    # See footnotes below for description of the `locus` object.
    def run(self, locus):
        ...

### The Locus Object

Properties: http://antares.noirlab.edu/properties
The full history of alerts received on that same object

In [None]:
import antares.devkit as dk

LOCUS_ID = "ANT2020blbrs"
locus = dk.get_locus(LOCUS_ID)

# A lot of output
#print(locus)
print(type(locus))

### Testing Filters

In [None]:
class HelloWorld(dk.Filter):
    OUTPUT_TAGS = [
        {
            'name': 'hello_world',
            'description': 'hello!',
        },
    ]

    def run(self, locus):
        print('Hello Locus ', locus.locus_id)
        locus.tag('hello_world')


In [None]:
report = dk.run_filter(HelloWorld)  # Run against a random locus
print(report)

report = dk.run_filter(HelloWorld, locus="ANT2020blbrs")  # Run against ANT2020blbrs

# `run_filter()` returns a report of what the filter did. Take a look at it:
print(report)

In [None]:
report = dk.run_many(HelloWorld, n=10)  # Run against 10 loci
print(report)


In [None]:
import antares.devkit as dk
import antares.pipeline
from antares.pipeline.locus import Locus
from antares.pipeline.ingestion import is_ztf_candidate

# Fetch a real locus as a starting point
locus = dk.get_locus("ANT2020blbrs")

# Convert the locus to a mutable dictionary
locus_dict = locus.to_dict()

# Edit the properties that you want to customize. We are interested in setting the
# magnitude of the current candidate (i.e. the one that will trigger the filter) to
# the magnitude of the previous candidate minus 5.
candidates = [
    alert for alert in locus_dict["alerts"] if is_ztf_candidate(alert["alert_id"])
]
previous_alert_mag = candidates[-2]["properties"]["ant_mag"]  # Previous Candidate
locus_dict["alerts"][-1]["properties"]["ant_mag"] = previous_alert_mag - 5

# Rebuild a Locus instance
locus = Locus.from_dict(locus_dict)

# Run your filter
dk.run_filter(HelloWorld, locus=locus)

In [None]:
import antares.devkit as dk

dk.init()

ra, dec = 88.2744186, -5.0010774
locus_dict = {
    'locus_id': 'locus1',
    'ra': ra,
    'dec': dec,
    'properties': {
        'num_alerts': 2,
        'num_mag_values': 2,
    },
    'tags': [],
    'watch_list_ids': [],
    'watch_object_ids': [],
    'catalog_objects': dk.search_catalogs(ra, dec),
    'alerts': [
        {
            'alert_id': 'alert1',
            'locus_id': 'locus1',
            'mjd': 58794.272488399874,
            'properties': {
                'ant_mjd': 58794.272488399874,
                'ant_mag': 15.1,
                'ant_magerr': 0.1,
                'ant_maglim': 0.1,
                'ant_survey': 1,
                'ant_passband': 'g',
                'ant_ra': 88.2744186,
                'ant_dec': -5.0010774,
            },
        },
        {
            'alert_id': 'alert2',
            'locus_id': 'locus1',
            'mjd': 58799.50587960007,
            'properties': {
                'ant_mjd': 58799.50587960007,
                'ant_mag': 15.2,
                'ant_magerr': 0.1,
                'ant_maglim': 0.1,
                'ant_survey': 1,
                'ant_passband': 'g',
                'ant_ra': 88.2744186,
                'ant_dec': -5.0010774,
            }
        },
    ],
}

locus = dk.locus_from_dict(locus_dict)
dk.run_filter(HelloWorld, locus=locus)


## The ANTARES Client

https://nsf-noirlab.gitlab.io/csdc/antares/client/tutorial

### Streaming Alerts

Q: how to get "api_key": "YOUR_API_KEY", "api_secret": "YOUR_API_SECRET"??

In [None]:
from antares_client import StreamingClient
client = StreamingClient(
    topics=["extragalactic_staging", "nuclear_transient_staging"],
    api_key="********************",
    api_secret="********************",
)


In [None]:
topic, locus = client.poll(timeout=10)
if locus:
    print("received an alert")
else:
    print("waited 10 seconds but didn't get an alert")

In [None]:
# This...
for topic, locus in client.iter():
    print("received {} on {}".format(locus, topic))

# Is equivalent to...
while True:
    topic, locus = client.poll()
    print("received {} on {}".format(locus, topic))


In [None]:
# This...
with StreamingClient(topics, **config) as client:
    for topic, locus in client.iter():
        print("received {} on {}".format(locus, topic))

# Is equivalent to...
client = StreamingClient(topics, **config)
try:
    for topic, locus in client.iter():
        print("received {} on {}".format(locus, topic))
finally:
    client.close()


In [None]:
"""
Use this script as a starting point for streaming alerts from ANTARES.

Author: YOUR_NAME

"""

from antares_client import StreamingClient

TOPICS = ["extragalactic_staging", "nuclear_transient_staging"]
CONFIG = {
    "api_key": "YOUR_API_KEY",
    "api_secret": "YOUR_API_SECRET",
}


def process_alert(topic, locus):
    """Put your code here!"""
    pass


def main():
   with StreamingClient(TOPICS, **CONFIG) as client:
       for topic, locus in client.iter():
           process_alert(topic, locus)


if __name__ == "__main__":
    main()


### Searching for Data

In [None]:
from antares_client.search import get_by_id, get_by_ztf_object_id

# Lookup by ANTARES ID
locus = get_by_id("ANT2020j7wo4")
print(locus)

# Lookup by ZTF Object ID
locus = get_by_ztf_object_id("ZTF20aafqubg")
print(locus)

In [None]:
from antares_client.search import cone_search
from astropy.coordinates import Angle, SkyCoord

center = SkyCoord("20h48m25.1805s 29d45m4.8361s")
radius = Angle("1s")

for locus in cone_search(center, radius):
    print(locus)
    pass


In [None]:
query = {
    "query": {
        "bool": {
            "filter": [
                {
                    "range": {
                        "properties.num_mag_values": {
                            "gte": 50,
                            "lte": 100,
                        }
                    }
                },
                {
                     "term": {
                         "tags": "nuclear_transient"
                     }
                }
             ]
        }
    }
}



In [None]:
from antares_client.search import search
first_result = next(search(query))

The return value of the search function is an iterator over loci in the result set. This means that the result set is not immediately available in memory unless you did something like `result_set = list(search(query))`. Because result sets can be so large, we recommend against doing so. Prefer, instead, operations on the iterable like:

In [None]:
for idn, locus in enumerate(search(query)):
    print(locus)
    if idn==3: break

In [None]:
from antares_client.search import search
from elasticsearch_dsl import Search

query = (
    Search()
    .filter("range", **{"properties.num_mag_values": {"gte": 50, "lte": 100}})
    .filter("term", tags="nuclear_transient")
    .to_dict()
)
first_result = next(search(query))

print(first_result)

In [None]:
from antares_client.search import get_available_tags

tags = get_available_tags()

for i in tags:
    print(i)

In [None]:
from antares_client.search import get_latest_grav_wave_notices

# Lookup by GraceDB ID
gravitational_wave_notice = get_latest_grav_wave_notices("S231103aa")
print(gravitational_wave_notice)

In [None]:
from antares_client.search import get_grav_wave_notices
import datetime

# Lookup by GraceDB ID and datetime
gravitational_wave_notice = get_grav_wave_notices("S231103aa", datetime.datetime(2023, 11, 3, 18, 58, 2))
print(gravitational_wave_notice)

In [None]:
from antares_client.search import get_multiple_grav_wave_notices

gravitational_wave_notices = get_multiple_grav_wave_notices(["S231004f","S231004q"])
print(gravitational_wave_notice)

In [None]:
from antares_client.search import get_catalog_samples

# Retrieve 5 rows from each catalog available
catalog_data = get_catalog_samples(5)

# a lot of output, info from different surveys
#print(catalog_data)

In [None]:
from antares_client.search import catalog_search

# Retrieve all catalog crossmatches for a position
catalog_data = catalog_search(ra=316.7859, dec=13.1324)

#print(catalog_data)

In [None]:
from antares_client.search import get_thumbnails

thumbnails = get_thumbnails("ztf_candidate:2552120390115015005")

# a lot of output
#print(thumbnails)
#print(type(thumbnails))