<img src="figures/dirac.png" width="700" align="left">

# Streaming Alerts (with Genesis)

By [@mjuric](http://github.com/mjuric), available at https://github.com/mjuric/petascale-streaming-demo

## What is Alert Streaming and When to Use It



Classic survey science is not performed in real time: the analysis usually usually lags the data collection and processing. Typically, a researcher waits (days to weeks) for a reasonably sized batch of new data to be accumulated, and then performs the analysis (and frequently manually).

This does not work well in cases where the object of interest may change on short timescales and needs to be followed up rapidly. An example may be a short-timescale transient, or a potentially hazardous asteroid undergoing in an Earth flyby. For this use case, the researcher would prefer to be analyzing the data as they come in, with minimal latency (on order of seconds to minutes) between data collection and discovery/characterization.

This is the problem alert streaming looks to solve: to **enable near real-time transmission of alerts to (and measurements of) objects whose properties have changed**. The key differences between the offline and streaming-driven research:
1. Response time on order of seconds
1. Fully automated, machine-driven, analysis

## Streaming Challenges (Desirables)

* Minimal latency (< seconds)
* Robustness to client, server, and transport failiures (no data loss)
* Ease of use (simple end-user interfaces)

## Technology: Apache Kafka

> Apache Kafka is a community distributed event streaming platform capable of handling trillions of events a day.
> -- https://www.confluent.io/what-is-apache-kafka/
<br>
<br>

> What is Kafka?
> Kafka is a **distributed messaging system** providing **fast**, **highly scalable** and **redundant** messaging through a **pub-sub model**. Kafka’s distributed design gives it several advantages. First, Kafka allows a large number of permanent or ad-hoc consumers. Second, Kafka is highly available and resilient to node failures and supports automatic recovery. In real world data systems, these characteristics make Kafka an ideal fit for communication and integration between components of large scale data systems.
> -- https://sookocheff.com/post/kafka/kafka-in-a-nutshell/
<br>
<br>

> ![Kafka Cluster](https://upload.wikimedia.org/wikipedia/commons/thumb/6/64/Overview_of_Apache_Kafka.svg/2560px-Overview_of_Apache_Kafka.svg.png)

## Demo setup: (Kafka Cluster + JupyterHub on Kubernetes) on Digital Ocean

The demo runs on VMs on [Digital Ocean](http://digitalocean.com):
* **Kafka broker cluster**: 8 x 6-core machine w. 16GB of RAM ("standard" [Droplet](https://www.digitalocean.com/pricing/) type)
* **JupyterHub**: A 4-core machine w. 8GB of RAM **per user** ("standard" [Droplet](https://www.digitalocean.com/pricing/) type)

Within the cluster, we've set up:
1. A small static topic with only 100 alerts (named `small`)
2. A medium-sized topic with 20,000 alerts (named `medium`)
3. A topic with continuously injected alerts at LSST scale (named `lsst`)

## Genesis Broker Access Library

Kafka comes with [performant Python libraries](https://docs.confluent.io/current/clients/confluent-kafka-python/) that largely follow the API and structure of their native (Java) client libraries. Unfortunately, that means they're not as "Pythonic" as they could be.

Included with this demo is an early version of `genesis.streaming`, a client library for robust and scalable access to alert streams. Genesis aims to make it easy for an astronomer with some knowledge of Python to consume and filter alert stream.

It also largely abstracts away both the underlying transport protocol and alert serialization: to the user, alerts are simple Python `dict`s, delivered through familiar `generator`s.

In [None]:
import genesis.streaming as gs

## Simple Streaming

If you execute the cell below, it will hang forever... (tip: click the stop ◾️button in your Jupyter to interrupt it).

In [None]:
with gs.open("kafka://genesis.alerts.wtf/small") as stream:
    for idx, alert in stream:
        print("Candidate ID:", alert['candid'])

What happened? Genesis (actually, Kafka) remembers what was the last alert you received from any topic (the ***offset*** of the last received alert), and will only send you new alerts. This is desired behavior -- if you weren't connected immediately when the night started (or got temporarily disconnected), you may want to catch up.

But what if this is your first time you connected to the stream? If so, Genesis will default to waiting for *new* packets, and not sending you anything it may already have. This is a safe default: e.g., if the first time you connect to the LSST stream is one year into operations, you don't want to be sent a year's worth of alerts! And in our case, since I'm not injecting any new alerts, it will wait indefinitely...

Below is a visualization of [Kafka topics and offsets](https://kafka.apache.org/documentation/#intro_topics):
![Kafka Offsets](https://kafka.apache.org/23/images/log_consumer.png)

Let's change this default, and have it send us everything it has. We'll also turn on a nice progress bar, and we will tell it to stop if it doesn't receive an alert in a 10 second interval.

In [None]:
with gs.open("kafka://genesis.alerts.wtf/small", start_at="earliest") as stream:
    for idx, alert in stream(timeout=10, progress=True):
        print("[%d] Candidate ID: %s" % (idx, alert['candid']))

You may notice a slight pause before the streaming starts: this is Kafka establishing connections to the broker cluster. Once the connections are established, the alerts start streaming quickly.

Rather than printing the alerts, let's just store their IDs.

In [None]:
with gs.open("kafka://genesis.alerts.wtf/small", start_at="earliest") as stream:
    alerts = [ alert['candid'] for _, alert in stream(timeout=10, progress=True) ]

alerts.sort()

print(f"Read {len(alerts)} alerts.")
print("First few candidate IDs:", alerts[:3])

### Remembering "offsets"

Kafka remembers the `offset` of your last transmitted alert; next time you connect, it will continue streaming from that offset.

For this to work, we need to connect to a stream with a "consumer ID" -- a name uniquely identifying you. Kafka will associate the offset to the consumer ID; the next time the same consumer ID connects, it will continue streaming the offset associated to it.

Let's generate a random consumer ID for your demo session, and add it to the stream URL:

In [None]:
import random, string

id = ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))
broker_url = "kafka://{}@genesis.alerts.wtf/small".format(id)

print("Consumer ID:", id)
print("Broker URL: ", broker_url)

Let's read in a few alerts:

In [None]:
with gs.open(broker_url, start_at="earliest") as stream:
    alerts1 = [ alert['candid'] for _, alert in stream(limit=25, progress=True) ]
    
    # ... now do some work with the alert ...
    
    # stream.commit()

alerts1.sort()

print(f"Read {len(alerts1)} alerts.")
print("First few candidate IDs:", alerts1[:3])

Let's read in the rest:

In [None]:
with gs.open(broker_url, start_at="earliest") as stream:
    alerts2 = [ alert['candid'] for _, alert in stream(timeout=10, progress=True) ]

    # ... now do some work with the alert ...
    
    # stream.commit()

alerts2.sort()

print(f"Read {len(alerts2)} alerts.")
print("First few candidate IDs:", alerts2[:3])

Why didn't it continue from where we left off? Because the broker needs to be _explicitly_ told to commit the offset. It doesn't do so automatically to prevent data loss. 

To illustrate: if we committed the offset as soon as the alert is returned to you, and the code in _"... now do some work with the alert ..."_ section above crashes before acting on the alerts, the next time you connect to the broker these alerts would be skipped.

This is why you must explicitly call `stream.commit()` once you're certain the received alerts were successfully processed.

Now go back up, uncomment the `stream.commit()` lines, and re-execute these two cells.

Finally, let's verify nothing was lost:

In [None]:
set(alerts) == set(alerts1 + alerts2)

### About Kafka delivery guarantees

A couple of warnings about Kafka's delivery guarantees:
* **Kafka does not guarantee the order in which you'll receive the alerts**: it guarantees delivery, but some may be out of order.
* **Kafka (typically) guarantees "at least once" delivery**: that is, you may receive some alerts more than once (if there's a crash, a network interruption, or any similar exceptional situation). Your code should guard agaist this. Exactly-once semantics difficult, but possible (and coming in the future).

![Distributed computing problems](https://cdn.confluent.io/wp-content/uploads/image2.png)

-- [Mathias Verraes](https://www.linkedin.com/in/mathiasverraes/)

## Filtering the stream

Now let's filter the stream for objects of interest. Say we're only interested in asteroids, and wish to ignore the rest.

We'll write a filter function which checks whether the alert candidate has the "Nearby Solar System object Name" field set to something other than "null":

In [None]:
def filter_asteroids(alert):
    if alert['candidate']['ssnamenr'] != 'null':
        return alert
    else:
        return None

In [None]:
with gs.open("kafka://genesis.alerts.wtf/medium", start_at="earliest") as stream:
    for idx, alert in stream(limit=10, timeout=10, progress=True, filter=filter_asteroids):
        print(f"[{idx}] Candidate ID: {alert['candid']} {alert['candidate']['ssnamenr']}")

By looking at the [fields available in the alert packet](https://zwickytransientfacility.github.io/ztf-avro-alert/schema.html), you can construct arbitrarily complex filters.

Here's one that checks whether an object may be a transient:

In [None]:
import pandas as pd
import numpy as np
import astropy.units as u

def is_transient(alert):
    # Filter by E. C. Bellm (@ebellm on GitHub)

    # if only a single discovery, bail out -- we wait for at least two
    # before triggering
    if alert['prv_candidates'] is None:
        return
    
    dflc = pd.DataFrame( [ alert['candidate'] ] + alert['prv_candidates'])
    candidate = dflc.loc[0]

    # positive subtraction?
    is_positive_sub = candidate['isdiffpos'] == 't'
    
    # no nearby source
    if (candidate['distpsnr1'] is None) or (candidate['distpsnr1'] > 1.5):
        no_pointsource_counterpart = True
    else:
        # nearby source, but it's a galaxy?
        if candidate['sgscore1'] < 0.5:
            no_pointsource_counterpart = True
        else:
            no_pointsource_counterpart = False
            
    where_detected = (dflc['isdiffpos'] == 't') # nondetections will be None
    if np.sum(where_detected) >= 2:
        detection_times = dflc.loc[where_detected,'jd'].values
        dt = np.diff(detection_times)
        not_moving = np.max(dt) >= (30*u.minute).to(u.day).value
    else:
        not_moving = False
    
    no_ssobject = (candidate['ssdistnr'] is None) or (candidate['ssdistnr'] < 0) or (candidate['ssdistnr'] > 5)
    
    if is_positive_sub and no_pointsource_counterpart and not_moving and no_ssobject:
        return alert

    return None

In [None]:
with gs.open("kafka://genesis.alerts.wtf/medium", start_at="earliest") as stream:
    for idx, alert in stream(limit=10, timeout=10, progress=True, filter=is_transient):
        print(f"[{idx}] Candidate ID: {alert['candid']}")

The filter in the above example was (intentionally) written to be slow. At this processing rate, it may not be able to keep up with the full LSST alert stream.

Fortunately, Genesis knows how to parallelize execution over multiple cores, using Python's `multiprocessing.Pool`:

In [None]:
from multiprocessing import Pool

with Pool(4) as workers:
    with gs.open("kafka://genesis.alerts.wtf/medium", start_at="earliest") as stream:
        for idx, alert in stream(pool=workers, limit=10, timeout=10, progress=True, filter=is_transient):
            print(f"[{idx}] Candidate ID: {alert['candid']}")

## Robustness

Kafka is a distributed system robust to component failures.

The cluster of kafka brokers in our demo setup has a "replication factor" of 2 -- that is, each alert is mirrored on at least two brokers. Therefore, one broker may fail without a data loss; the clients will transparently switch to receive data from the other replica. Let's demonstrate this!

When I give a signal, please start the cell below. It will start downloading alerts from a topic with 20,000 alerts. As it's running, I will shut down one of the brokers in the cluster; your client should still download the full 20,000 alerts.

In [None]:
with gs.open("kafka://genesis.alerts.wtf/medium", start_at="earliest") as stream:
    for idx, alert in stream(timeout=10, progress=True):
        # we'll do nothing -- just show the progress bar.
        pass;

## Scalability

And now for the main event: let's see if we can stream and filter alerts at the full LSST rate!

I have set up a script that injects 10,000 LSST-sized ZTF alerts every 40 seconds (average LSST rate).

When I give the signal, please execute the cell below to start consuming from this stream. We will observe how many simultaneous users we can have before the system fails to keep up!

In [None]:
with gs.Pool(4) as workers:
    with gs.open("kafka://genesis.alerts.wtf/lsst") as stream:
        for idx, alert in stream(pool=workers, progress=True):
            # we'll do nothing -- just show the progress bar.
            pass;

## Is this how you do it?

The idea at this stage was to demonstrate that sending the full stream of LSST alerts to end-users via a cloud resource is possible and cost effective.

The cost of machinery for this demo:
* Operations cost: **\$640/month** for the broker cluster
* End-user cost: **\$40/month** (for the analysis machine)

The final infrastructure costs may be ~2-3x larger, but still small.

This is a starting point to build on. For example:
* Integration with services like [AWS Lambda](https://aws.amazon.com/lambda/)
* Stream transformation with [KSQL](https://www.confluent.io/product/ksql/)
* Integration with [Apache Spark](http://spark.apache.org/) for scalable unified analytics.
* ...

## Acknowledgements

This work has been made possible and supported by

![Supported By](figures/foundation-logos.png)
![Also By](https://www.lsst.org/sites/default/files/Funding-logos-bk.png)