Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Anomaly notifications #141

Merged
merged 18 commits into from
Apr 17, 2023
Empty file.
72 changes: 72 additions & 0 deletions fink_filters/filter_anomaly_notification/filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from pyspark.sql.functions import when, lit
import numpy as np
from astropy import units as u
from astropy.coordinates import SkyCoord
import pandas as pd
import filter_utils



def anomaly_notification_(df, threshold=10) -> pd.Series:
JulienPeloton marked this conversation as resolved.
Show resolved Hide resolved
"""
Create event notifications with a high anomaly_score value
Parameters
----------
df : Spark DataFrame with column :
objectId : unique identifier for this object
lc_features: Dict of dicts of floats.
Keys of first dict - filters (fid),
keys of inner dicts - names of features
rb: RealBogus quality score
anomaly_score: Anomaly score
timestamp : UTC time
threshold : Number of notifications (10 by default)

Returns
----------
out: pandas.Series of bool
Return a Pandas DataFrame with the appropriate flag:
false for bad alert, and true for good alert.
Examples
----------
>>> from fink_utils.spark.utils import concat_col
>>> from fink_science.ad_features.processor import extract_features_ad
>>> from fink_science.anomaly_detection.processor import anomalys_score
>>> df = spark.read.format('parquet').load('datatest')
>>> what = ['magpsf', 'jd', 'sigmapsf', 'fid']
>>> prefix = 'c'
>>> what_prefix = [prefix + i for i in what]
>>> for colname in what:
... df = concat_col(df, colname, prefix=prefix)
>>> df = df.withColumn('lc_features', extract_features_ad(*what_prefix, 'objectId'))
>>> df = df.withColumn("anomaly_score", anomaly_score("lc_features"))
>>> df_proc = df.select('objectId', 'candidate.ra',
>>> 'candidate.dec', 'candidate.rb',
>>> 'anomaly_score', 'timestamp')
>>> mask = anomaly_notification_(df_proc)
>>> print(sum(mask))
"""
med = df.select('anomaly_score').approxQuantile('anomaly_score', [0.5], 0.25)
df_filtred = df.sort(['anomaly_score'], ascending=True).limit(min(df.count(), threshold))
JulienPeloton marked this conversation as resolved.
Show resolved Hide resolved
df_min = df_filtred.toPandas() #Only 10-20 objects fall into the Pandas dataframe
filtred_id = set(df_min['objectId'].values)
df = df.withColumn('flag',
when((df.objectId.isin(filtred_id)), lit(True))
.otherwise(lit(False)))
JulienPeloton marked this conversation as resolved.
Show resolved Hide resolved
result = np.array(df.select('flag').collect()).reshape(1,-1)[0]
send_data, slack_data = [], []
for _, row in df_min.iterrows():
gal = SkyCoord(ra=row.ra*u.degree, dec=row.dec*u.degree, frame='icrs').galactic
send_data.append(f'''ID: [{row.objectId}](https://fink-portal.org/{row.objectId})
JulienPeloton marked this conversation as resolved.
Show resolved Hide resolved
GAL coordinates: {round(gal.l.deg, 6)}, {round(gal.b.deg, 6)}
UTC: {str(row.timestamp)[:-3]}
Real bogus: {round(row.rb, 2)}
Anomaly score: {round(row.anomaly_score, 2)}''')
slack_data.append(f'''ID: <https://fink-portal.org/{row.objectId}|{row.objectId}>
GAL coordinates: {round(gal.l.deg, 6)}, {round(gal.b.deg, 6)}
UTC: {str(row.timestamp)[:-3]}
Real bogus: {round(row.rb, 2)}
Anomaly score: {round(row.anomaly_score, 2)}''')
med = round(med[0], 2)
filter_utils.msg_handler(send_data, slack_data, med)
JulienPeloton marked this conversation as resolved.
Show resolved Hide resolved
return pd.Series(result)
46 changes: 46 additions & 0 deletions fink_filters/filter_anomaly_notification/filter_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import requests
import time
from slackclient import SlackClient
import tokens
JulienPeloton marked this conversation as resolved.
Show resolved Hide resolved


def msg_handler(tg_data, slack_data, med):
JulienPeloton marked this conversation as resolved.
Show resolved Hide resolved
url = "https://api.telegram.org/bot"
channel_id = "@ZTF_anomaly_bot"
url += tokens.tg_token
JulienPeloton marked this conversation as resolved.
Show resolved Hide resolved
method = url + "/sendMessage"
tg_data = [f'Median anomaly score overnight: {med}'] + tg_data
slack_client = SlackClient(tokens.slack_token)
JulienPeloton marked this conversation as resolved.
Show resolved Hide resolved
try:
channels = slack_client.api_call("conversations.list")['channels']
for channel in channels:
if channel['name'] == 'fink_alert':
JulienPeloton marked this conversation as resolved.
Show resolved Hide resolved
channel_buf = channel['id']
break
except KeyError:
r = requests.post(method, data={
"chat_id": "@fink_test",
"text": 'Slack API error'
})
channel_buf = None
slack_data = [f'Median anomaly score overnight: {med}'] + slack_data
for tg_obj, slack_obj in zip(tg_data, slack_data):
r = requests.post(method, data={
"chat_id": channel_id,
"text": tg_obj,
"parse_mode": "markdown"
})
if r.status_code != 200:
r = requests.post(method, data={
"chat_id": "@fink_test",
"text": str(r.status_code)
})
if channel_buf:
slack_client.api_call(
"chat.postMessage",
channel=channel_buf,
text=slack_obj,
username='fink-bot'
)
time.sleep(3)