Skip to content

Commit

Permalink
Anomaly notifications (#141)
Browse files Browse the repository at this point in the history
* added filter

* first version (without udf)

* Common handler

* Comments added

* delete whitespace

* med correct

* med error

* First adjustments

* Some fixes for PEP8

* Small corrections flake8

* Fixing notifications with the same objectId

* flake8 fix

* Single-pass extraction

* Added dockstrings, put trick_par in arguments

* Slack client syntax updates

* Tokens from os

* flake8 check
  • Loading branch information
Knispel2 committed Apr 17, 2023
1 parent 955242c commit 99ee14a
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 0 deletions.
Empty file.
102 changes: 102 additions & 0 deletions fink_filters/filter_anomaly_notification/filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import numpy as np
from astropy import units as u
from astropy.coordinates import SkyCoord
import filter_utils


def anomaly_notification_(
df_proc, threshold=10,
send_to_tg=False, channel_id=None,
send_to_slack=False, channel_name=None,
trick_par=10):
""" Create event notifications with a high anomaly_score value
Notes
----------
Notifications can be sent to a Slack or Telegram channels.
Parameters
----------
df : Spark DataFrame
Mandatory columns are:
objectId : unique identifier for this object
ra: Right Ascension of candidate; J2000 [deg]
dec: Declination of candidate; J2000 [deg]
rb: RealBogus quality score
anomaly_score: Anomaly score
timestamp : UTC time
threshold: optional, int
Number of notifications. Default is 10
send_to_tg: optional, boolean
If true, send message to Telegram. Default is False
channel_id: str
If `send_to_tg` is True, `channel_id` is the name of the
Telegram channel.
send_to_slack: optional, boolean
If true, send message to Slack. Default is False
channel_id: str
If `send_to_slack` is True, `channel_name` is the name of the
Slack channel.
Returns
----------
out: DataFrame
Return the input Spark DataFrame with a new column `flag`
for locating anomalous alerts
Examples
----------
>>> from fink_utils.spark.utils import concat_col
>>> from fink_science.ad_features.processor import extract_features_ad
>>> from fink_science.anomaly_detection.processor import anomalys_score
>>> df = spark.read.format('parquet').load('datatest')
>>> what = ['magpsf', 'jd', 'sigmapsf', 'fid']
>>> prefix = 'c'
>>> what_prefix = [prefix + i for i in what]
>>> for colname in what:
... df = concat_col(df, colname, prefix=prefix)
>>> df = df.withColumn('lc_features', extract_features_ad(*what_prefix, 'objectId'))
>>> df = df.withColumn("anomaly_score", anomaly_score("lc_features"))
>>> df_proc = df.select(
... 'objectId', 'candidate.ra',
... 'candidate.dec', 'candidate.rb',
... 'anomaly_score', 'timestamp')
>>> df_out = anomaly_notification_(df_proc)
>>> #For sending to messengers:
>>> #df_out = anomaly_notification_(df_proc, threshold=10,
send_to_tg=True, channel_id="@ZTF_anomaly_bot",
send_to_slack=True, channel_name='fink_alert')
>>> print(df_out.filter(df_out['flag']).count())
"""
# Compute the median for the night
med = df_proc.select('anomaly_score').approxQuantile('anomaly_score', [0.5], 0.05)
med = round(med[0], 2)
# Extract anomalous objects
pdf_anomalies_ext = df_proc.sort(['anomaly_score'], ascending=True).limit(trick_par * threshold).toPandas()
pdf_anomalies_ext = pdf_anomalies_ext.drop_duplicates(['objectId'])
upper_bound = np.max(pdf_anomalies_ext['anomaly_score'].values[:threshold])
pdf_anomalies = pdf_anomalies_ext[pdf_anomalies_ext['anomaly_score'] <= upper_bound]
tg_data, slack_data = [], []
for _, row in pdf_anomalies.iterrows():
gal = SkyCoord(ra=row.ra * u.degree, dec=row.dec * u.degree, frame='icrs').galactic
t1a = f'ID: [{row.objectId}](https://fink-portal.org/{row.objectId})'
t1b = f'ID: <https://fink-portal.org/{row.objectId}|{row.objectId}>'
t2_ = f'GAL coordinates: {round(gal.l.deg, 6)}, {round(gal.b.deg, 6)}'
t3_ = f'UTC: {str(row.timestamp)[:-3]}'
t4_ = f'Real bogus: {round(row.rb, 2)}'
t5_ = f'Anomaly score: {round(row.anomaly_score, 2)}'
tg_data.append(f'''{t1a}
{t2_}
{t3_}
{t4_}
{t5_}''')
slack_data.append(f'''{t1b}
{t2_}
{t3_}
{t4_}
{t5_}''')
if send_to_slack:
filter_utils.msg_handler_slack(slack_data, channel_name, med)
if send_to_tg:
filter_utils.msg_handler_tg(tg_data, channel_id, med)
df_result = df_proc.withColumn('flag', df_proc['anomaly_score'] <= upper_bound)
return df_result
75 changes: 75 additions & 0 deletions fink_filters/filter_anomaly_notification/filter_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import os
import time
import requests
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError


def msg_handler_slack(slack_data, channel_name, med):
'''
Notes
----------
The function sends notifications to the "channel_name" channel of Slack.
Parameters
----------
slack_data: list
List of lines. Each item is a separate notification
channel_name: string
Channel name in Slack
med: float
Median anomaly score overnight
Returns
-------
None
'''
slack_client = WebClient(os.environ['ANOMALY_SLACK_TOKEN'])
slack_data = [f'Median anomaly score overnight: {med}'] + slack_data
try:
for slack_obj in slack_data:
slack_client.chat_postMessage(channel=channel_name, text=slack_obj)
time.sleep(3)
except SlackApiError as e:
if e.response["ok"] is False:
requests.post("https://api.telegram.org/bot" + os.environ['ANOMALY_TG_TOKEN'] + "/sendMessage", data={
"chat_id": "@fink_test",
"text": e.response["error"]
}, timeout=8)

def msg_handler_tg(tg_data, channel_id, med):
'''
Notes
----------
The function sends notifications to the "channel_id" channel of Telegram.
Parameters
----------
tg_data: list
List of lines. Each item is a separate notification
channel_id: string
Channel id in Telegram
med: float
Median anomaly score overnight
Returns
-------
None
'''
url = "https://api.telegram.org/bot"
url += os.environ['ANOMALY_TG_TOKEN']
method = url + "/sendMessage"
tg_data = [f'Median anomaly score overnight: {med}'] + tg_data
for tg_obj in tg_data:
res = requests.post(method, data={
"chat_id": channel_id,
"text": tg_obj,
"parse_mode": "markdown"
}, timeout=8
)
if res.status_code != 200:
res = requests.post(method, data={
"chat_id": "@fink_test",
"text": str(res.status_code)
}, timeout=8)
time.sleep(3)

0 comments on commit 99ee14a

Please sign in to comment.