New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Anomaly notifications #141
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Knispel2 for the code addition! It is a good first trial.
So if I understand correctly, this functions takes all the data from a given night, and send to telegram or Slack the threshold
most promising alerts according to their anomaly_score
.
As it is not a standard Fink filters to be used in real-time, your current implementation suffers several flaws (see review comments). So I propose to redesign the function to make clearer and more efficient. Here is a possible skeleton:
def anomaly_notification_(
df, threshold=10,
send_to_tg=False, channel_id=None,
send_to_slack=False, channel_name=None) -> DataFrame:
""" Create event notifications with a high anomaly_score value
Notes
----------
Notifications can be sent to a Slack or Telegram channels.
Parameters
----------
df : Spark DataFrame
Mandatory columns are:
objectId : unique identifier for this object
lc_features: Dict of dicts of floats.
Keys of first dict - filters (fid),
keys of inner dicts - names of features
rb: RealBogus quality score
anomaly_score: Anomaly score
timestamp : UTC time
threshold: optional, int
Number of notifications. Default is 10
send_to_tg: optional, boolean
If true, send message to Telegram. Default is False
channel_id: str
If `send_to_tg` is True, `channel_id` is the name of the
Telegram channel.
send_to_slack: optional, boolean
If true, send message to Slack. Default is False
channel_id: str
If `send_to_slack` is True, `channel_name` is the name of the
Slack channel.
Returns
----------
out: DataFrame
Return the input Spark DataFrame with a new column `flag`
for locating anomalous alerts.
Examples
----------
>>> from fink_utils.spark.utils import concat_col
>>> from fink_science.ad_features.processor import extract_features_ad
>>> from fink_science.anomaly_detection.processor import anomalys_score
>>> df = spark.read.format('parquet').load('datatest')
>>> what = ['magpsf', 'jd', 'sigmapsf', 'fid']
>>> prefix = 'c'
>>> what_prefix = [prefix + i for i in what]
>>> for colname in what:
... df = concat_col(df, colname, prefix=prefix)
>>> df = df.withColumn('lc_features', extract_features_ad(*what_prefix, 'objectId'))
>>> df = df.withColumn("anomaly_score", anomaly_score("lc_features"))
>>> df_proc = df.select(
... 'objectId', 'candidate.ra',
... 'candidate.dec', 'candidate.rb',
... 'anomaly_score', 'timestamp')
>>> df_out = anomaly_notification_(df_proc)
>>> print(df_out.filter(df_out['flag']).count())
"""
# Compute the median for the night
med = df.select('anomaly_score').approxQuantile('anomaly_score', [0.5], 0.25)
med = round(med[0], 2)
# Extract anomalous objects
pdf_anomalies = df.sort(['anomaly_score'], ascending=True).limit(threshold).toPandas()
upper_bound = np.max(pdf_anomalies['anomaly_score'].values)
tg_data, slack_data = [], []
for _, row in pdf_anomalies.iterrows():
gal = SkyCoord(ra=row.ra*u.degree, dec=row.dec*u.degree, frame='icrs').galactic
t1a = f'ID: [{row.objectId}](https://fink-portal.org/{row.objectId})'
t1b = f'ID: <https://fink-portal.org/{row.objectId}|{row.objectId}>'
t2 = f'GAL coordinates: {round(gal.l.deg, 6)}, {round(gal.b.deg, 6)}'
t3 = f'UTC: {str(row.timestamp)[:-3]}'
t4 = f'Real bogus: {round(row.rb, 2)}'
t5 = f'Anomaly score: {round(row.anomaly_score, 2)}'
tg_data.append(f'''{t1a}
{t2}
{t3}
{t4}
{t5}''')
slack_data.append(f'''{t1b}
{t2}
{t3}
{t4}
{t5}''')
if send_to_slack:
filter_utils.msg_handler_slack(slack_data, channel_name, med)
if send_to_tg:
filter_utils.msg_handler_tg(tg_data, channel_id, med)
df = df.withColumn('flag', df['anomaly_score'] <= upper_bound)
return df
Feel free to discuss this implementation. Note that I haven't check it properly as I haven't access to the telegram token.
For the Slack part, I'm not sure it works correctly as it? If not, I would propose that you focus on the Telegram implementation, and I will add the Slack one (with correct token) after merging.
Note that the CI is failing because |
med = df_proc.select('anomaly_score').approxQuantile('anomaly_score', [0.5], 0.05) | ||
med = round(med[0], 2) | ||
# Extract anomalous objects | ||
trick_par = 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about defining trick_par
as an argument of anomaly_notification_
? This way, it can be easily tuned if need be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, did it in the last commit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Knispel2 for the change -- they look all good to me! I left a final comment about the tokens (and minor comments about the doc).
By the way, the Slack client syntax you are using is no more valid. It used to be called https://pypi.org/project/slackclient/ Could you update the syntax for using the new version? |
Sorry, I seem to have used outdated documentation. I've fixed it now! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Knispel2 -- all good to me, I am merging! Thank you for the work.
Issue: #140
What changes were proposed in this pull request?
Added code that creates notifications about objects with a high Anomaly score in Telegram and Slack