An alert filter (Ampel Tier 0) works on a single stream of incoming data, deciding based on its content whether a candidate should be rejected or added to the database for further analysis.

A filter is functionally a python class which implements an abstract filter class through implementing the `process` method. This recieves an alert as input and makes the bool choice `True|False` about whether to save.

Notes:
- The `DecentFilter` is created as a general use ZTF alert filter for extragalactic transients and also includes an online star-veto using the Gaia catalog:
https://github.com/AmpelProject/Ampel-ZTF/blob/master/ampel/ztf/t0/DecentFilter.py. This is a flexible starting point for various transient studies.
- What happens after a transient is accepted depends on the T2 and T3 channel definition. Advanced users can make use of filter exit codes to fine-tune this process for individual events.
- An ampel filter is exposed to _all_ incoming alerts and need to be efficiently setup. Quick  checks on basic alert properites should be done first, and more complex comparisons last. 
- Once an object has been accepted into the AMPEL live DB, new observations of the same object can be set to be saved irrespectively of subsequent filter evaluations through the `autocomplete` feature.


In [None]:
import requests, os

In [None]:
# This is the archive token which can be obtained from https://ampel.zeuthen.desy.de/live/dashboard/tokens
# In order to retrieve ZTF partnership alerts your token needs to have the appropriate access
token = os.environ["ARCHIVE_TOKEN"]   # I have mine stored
header = {"Authorization": "bearer "+token}

##### 1. Obtaining a stream of alerts 

This demonstration will be carried out using a pre-selected set of alerts taken from one ZTF field, and with a set of minimal properties.

In [None]:
endpoint = 'https://ampel.zeuthen.desy.de/api/ztf/archive/v3/streams/from_query?programid=1'

In [None]:
# Base alert query: 
# - For ZTF operations 2020 (JD selection).
# - At least 5 detection, the last has to be positive, and Real-Bogus>0.3
# - In a specific ZTF field [1]
query = {
  "jd": {
    "$gt": 2458849.5,
    "$lt": 2458859.5,
  },
  "candidate": {
    "rb": {
      "$gt": 0.3
    },
    "ndethist": {
      "$gt": 4,
    },
    "isdiffpos": {"$in": ["t", "1"]},
    "field": {"$in":[773]},
  }
}

In [None]:
response = requests.post(endpoint, headers=header, json=query )

In [None]:
if not response.ok:
    print( 'Query creation failed.')

In [None]:
# The full response contains the resume token as well as the chunk size, i.e.
# how many alerts will be return in each call to the alert iterator.
response.json()

In [None]:
resume_token = response.json()['resume_token']

At this point the alert archive will start the process of staging alerts for release. This process takes a few min (length depending on query size), during which time the resume_token will stay locked. 

##### 2. Create a transient filter

We will here implement a filter class for ZTF alerts.

In [None]:
from ampel.protocol.AmpelAlertProtocol import AmpelAlertProtocol
from ampel.abstract.AbsAlertFilter import AbsAlertFilter
from ampel.log.AmpelLogger import AmpelLogger


class DemoFilter(AbsAlertFilter):
    """
    Sample filter based on the following criteria for acceptance:
    - At least 5 detections in at least 2 bands.
    - RealBogus: either one detection >0.65 or 5 >0.3.
    - At most one detection >1yr older than the most recent.
    """

    # Parameters
    min_ndet: int
    min_bands: int = 2
    min_maxrb: float = 0.65  # One det with RB larger than this OR
    min_goodrb_det: int = 3  # At least this many "ok" detections (RB>0.3)
    max_old_det: int = 2     # Max number of "old" (>1yr) detections in the ALERT package.
    max_archive_tspan: float = 365.  # Max age since first detection
    

    # Override
    def process(self, alert: AmpelAlertProtocol) -> None | bool | int:
        """
        Mandatory implementation.
        To exclude the alert, return *None*
        To accept it, either return
        * self.on_match_t2_units
        * or a custom combination of T2 unit names
        """


        # CUT ON THE HISTORY OF THE ALERT
        #################################
        
        # Get real detections (with candid)
        pps = [el for el in alert.datapoints if el.get("candid") is not None]
        
        # Sufficient number of detections
        if len(pps) < self.min_ndet:
            self.logger.debug(None, extra={"nDet": len(pps)})
            return None
        # Sufficient number of filters
        if len( set([el['fid'] for el in pps]) ) < self.min_bands:
            self.logger.debug(None, extra={"nBands": len(set([el['fid'] for el in pps]))}) 
            return None
        
        # RealBogus
        if len( [1 for el in pps if el['rb']>=self.min_maxrb] )==0:
            self.logger.debug(None, extra={"maxRB": max([el['rb'] for el in pps])}) 
            return None
        if len( [1 for el in pps if el['rb']>=0.3] )<self.min_goodrb_det:
            self.logger.debug(None, extra={"nbrGoodRB": len( [1 for el in pps if el['rb']>=0.3] )}) 
            return None
        

        # Detection history - in the ALERTS!
        detection_jds = [el['jd'] for el in pps]
        latest_jd = max(detection_jds)
        old_js = [ el['jd'] for el in pps if (latest_jd-el['jd'])>365. ]
        if len(old_js)>self.max_old_det:
            self.logger.debug(None, extra={"nbrOldDet": len(old_js) }) 
            return None
        
        # Cut on archive length
        archive_start_jd = pps[0]['jdstarthist']
        archive_tspan = latest_jd - archive_start_jd
        if not (self.max_archive_tspan > archive_tspan):
            self.logger.debug(None, extra={'archive_tspan': archive_tspan})
            return None

        # An alert which made it this far is accepted!
        return True


In [None]:
# We can update filter parameters, or leave the defaults. 
# Parameters without defaults have to be set
filter_config = {
    'min_ndet': 5,       # Necessary parameter
    'max_old_det': 1,    # Override default
}

In [None]:
t0filter = DemoFilter( **filter_config, logger=AmpelLogger.get_logger() )

##### 3. Run filter on the alerts returned from the alert query

For this we need a `ZTFArchiveAlertLoader` (see query/alert notebook) as well as an `AlertSupplier` for ZTF alerts. The latter reshapes raw ZTF data to a general alert format.

In [None]:
from ampel.ztf.t0.load.ZTFArchiveAlertLoader import ZTFArchiveAlertLoader
from ampel.ztf.alert.ZiAlertSupplier import ZiAlertSupplier

In [None]:
# The loader config contains the resume_token as stream identifier
config = {'archive':"https://ampel.zeuthen.desy.de/api/ztf/archive/v3", 
          "stream":resume_token}

In [None]:
config

In [None]:
accepted_alerts = []
alertcount = 0

In [None]:
try:
    alertloader = ZTFArchiveAlertLoader(**config)
    for alert in alertloader.get_alerts():
        alertcount += 1
        print(alert['objectId'], alert['candidate']['jd'])
        filter_accept = t0filter.process( ZiAlertSupplier.shape_alert_dict( alert, [] ) )
        if filter_accept:
            accepted_alerts.append(alert)
            print('... accepted')
except requests.exceptions.HTTPError as e:
    status_code = e.response.status_code
    if status_code==423:
        print('HTTP error {}: likely caused by server staging process. Wait and try again.'.format(status_code) )
    else:
        raise e
    

In [None]:
unique_objects = len( set([alert.get("objectId") for alert in accepted_alerts]) )

In [None]:
print('Parsed {} alerts, out of which alerts for {} objects across {} alerts were accepted by the filter'.format(alertcount,unique_objects, len(accepted_alerts)))

Accepted alerts are in a live AMPEL instance saved into the DB ans tickets for feature calculations (T2s) are created. The processing layer of AMPEL will in parallel start to process these tickets.