# Introduction to filtering an alert stream

For the hackathon closing out ZTF Summer School 2024, your challenge is to create your own alert broker which can consume a stream of alerts and filter for interesting objects.

This is a starter notebook for the hackathon. It will show you how to read a simulated ZTF alert stream and give some examples of how you can build your own filtering for those alerts. Previous notebooks from this week, for example "ztf_alert_filtering" from Day 1, provide additional examples that you may find useful to apply to this challenge.

In [8]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

# Part 1 - Reading the alert stream

Note: here we simulate our Kafka consumer by reading the data with an iterator function. Some people will find that their computers can run the true kafka consumer on these couple hundred thousand alerts on the order of minutes, but less powerful computers will require an unreasonable amount of time for this exercise.

We will walk you through reading the alerts in this notebook, but each of these steps is put together in consumer_EDIT.py, which you will edit to add the filters that you devise.

In [14]:
# what is the config.yaml file here

import sys
import os
from ztfalertsim.config import load_config

config = load_config(config_files=["config.yaml"])

  Config files: config.defaults.yaml
                config.yaml


In [None]:
# TO ADD: count how many files are in data/20240707, ie how many alerts there were on 2024/07/07 that you will be processing in this callenge



In [18]:
# just run this cell to try retrieving a single alert with an iterator function
import pathlib
import time

def get_iterator(date):
    path = pathlib.Path("data") / date

    # grab the full list of alert files and create an iterator
    alert_files = sorted(list(path.glob("*.avro")))
    print(len(alert_files))

    alert_files_iter = iter(alert_files)
    return alert_files_iter

iterator = get_iterator("20240707")

24948


TypeError: 'list_iterator' object is not callable

In [None]:
def fetch_alert(alert_files_iter):
    try:
        alert_file = next(alert_files_iter)
    except StopIteration:
        return None
    with open(alert_file, "rb") as f:
        alert = f.read()
    return alert

fetch_alerts(iterator)

In [None]:
# Print the alerts you have read, and check their type

...

In [None]:
def decode_message(cls, msg):
    """
    Decode Avro message according to a schema.

    :param msg: The Kafka message result from consumer.poll()
    :return:
    """
    message = msg
    decoded_msg = message

    try:
        bytes_io = io.BytesIO(message)
        bytes_io.seek(0)
        decoded_msg = reader(bytes_io)
    except Exception:
        decoded_msg = message
    finally:
        return decoded_msg

In [None]:
def fetch_alert(self):
    try:
        alert_file = next(self.alert_files_iter)
    except StopIteration:
        return None
    with open(alert_file, "rb") as f:
        alert = f.read()
    return alert

In [None]:
def consume(self, max_alerts=None):

    start = time.time()

    # consume messages
    count = 0
    try:
        while True:
            msg = self.fetch_alert()
            if msg is None:
                break
            else:
                msg = self.decode_message(msg)
                count += 1
                if not msg:
                    continue
                for record in msg:

In [2]:
# load config file that provide information for the kafka consumer. feel free to open this file and see what it looks like

import sys
import os
from ztfalertsim.config import load_config

config = load_config(config_files=["config.yaml"])

  Config files: config.defaults.yaml
                config.yaml


In [3]:
# format the data as YYYYMMDD
date = 20240715
maxalerts = 100

Run the below cell to create a consumer that reads all events (no filters applied)

What happens below when you change the "timeout" variable? 

In [4]:
import uuid
import time
from confluent_kafka import Consumer, KafkaError, KafkaException

In [17]:
# test consumer with just a single alerts

def consume(date, config, max_alerts=1000):
    topic = f"ztf_{date}_programid1"

    # create consumer
    consumer = Consumer({
        'bootstrap.servers': config["kafka"]["bootstrap.servers"],
        'group.id': str(uuid.uuid1()), # generate a unique group id
        'auto.offset.reset': 'earliest'
    })

    start = time.time()

    # subscribe to topic
    consumer.subscribe([topic])

    # consume messages
    count = 0
    try:
        while True:
            msg = consumer.poll(timeout=10.0)
            if msg is None:
                break
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    print('%% %s [%d] reached end at offset %d\n' %
                        (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                #msg = decode_message(msg)
                print(f"read alert number {count}")
                count += 1

            if max_alerts and count >= max_alerts:
                print(f"Reached maximum number of alerts: {max_alerts}")
                break

            
    finally:
        print (f'Consumed {count} alerts in {time.time() - start:.2f} seconds')
        consumer.close()

consume(date, config)

read alert number 0
read alert number 1
read alert number 2
read alert number 3
read alert number 4
read alert number 5
read alert number 6
read alert number 7
read alert number 8
read alert number 9
read alert number 10
read alert number 11
read alert number 12
read alert number 13
read alert number 14
read alert number 15
read alert number 16
read alert number 17
read alert number 18
read alert number 19
read alert number 20
read alert number 21
read alert number 22
read alert number 23
read alert number 24
read alert number 25
read alert number 26
read alert number 27
read alert number 28
read alert number 29
read alert number 30
read alert number 31
read alert number 32
read alert number 33
read alert number 34
read alert number 35
read alert number 36
read alert number 37
read alert number 38
read alert number 39
read alert number 40
read alert number 41
read alert number 42
read alert number 43
read alert number 44
read alert number 45
read alert number 46
read alert number 47
re

In [None]:
#just a stream of bytes, not a proper python object



In [None]:
#we need to decode (class kafka consumer)

In [None]:
#decode message and read schema message(can remove two exceptions in source code) fast avro needed bc not a python object before

In [None]:
#get 1000 alerts in a loop and print time at end total, and print oject ids or something (if it takes too long tell us)


In [None]:
#check ztf alert schema

In [None]:
#to pull 1000 objects, there is a counter, use this to one by one read events and update

# Part 2 - Filtering the alert stream

Many of the filters you may have already used will be helpful here, for example using candidate.drb to remove likely artifacts and other not real alerts. Reminder that drb goes from 0 to 1 where 1 are likely real alerts.

Make cuts on alert parameters (ie as you did in ztf_summer_school_2024/lectures/01/ztf_alert_filtering.ipynb)
Use prv_candidates to understand the alert history, and do things like compute the rate that transients are rising and falling (hint: np.polyfit)
Train a classifier (or reuse the one you trained in  ztf_summer_school_2024/lectures/01-ml/BTS_ml_summer_school.ipynb)


In [None]:
# Save alerts based on a drb cut of your choice, for example > 0.5 

alerts_real = alerts[alerts['candidate.drb'] > 0.5]

In [None]:
# Save alerts that don't (or do) originate in our solar system

alerts_not_ss = alerts[(alerts['candidate.ssdistnr'] > 0.0) 
                       & (alerts['candidate.ssdistnr'] < 10.0)
                       & (alerts['candidate.magnr'] > -20.0)
                       & (alerts['candidate.magnr'] < 20.0)]

The goal is to add these filters to work in real time on your Kafka consumer.

In [None]:
# adding these filters to the consumer 

Take the ZTF id for one of the objects returned by your consumer and make a plot yourself, and/or use a public alert broker to inspect it.

For example, just at the ZTF id to the end of this url: https://alerce.online/object/ZTF24aamvzfc

# Treasure hunt for objects:





1. Supernovae