Skip to content

Commit

Permalink
AD filter returns a Pandas DataFrame (#145)
Browse files Browse the repository at this point in the history
* Modify output of AD filter, and expand test suite

* Bump to 3.12

* PEP8
  • Loading branch information
JulienPeloton committed May 22, 2023
1 parent d263622 commit fd66daf
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 14 deletions.
2 changes: 1 addition & 1 deletion fink_filters/__init__.py
Expand Up @@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__ = "3.11"
__version__ = "3.12"
45 changes: 32 additions & 13 deletions fink_filters/filter_anomaly_notification/filter.py
Expand Up @@ -19,13 +19,15 @@

from fink_filters.filter_anomaly_notification import filter_utils

from fink_filters.tester import spark_unit_tests


def anomaly_notification_(
df_proc, threshold=10,
send_to_tg=False, channel_id=None,
send_to_slack=False, channel_name=None,
trick_par=10):
""" Create event notifications with a high anomaly_score value
""" Create event notifications with a high `anomaly_score` value
Notes
----------
Expand All @@ -50,38 +52,48 @@ def anomaly_notification_(
Telegram channel.
send_to_slack: optional, boolean
If true, send message to Slack. Default is False
channel_id: str
channel_name: str
If `send_to_slack` is True, `channel_name` is the name of the
Slack channel.
trick_par: int, optional
Internal buffer to reduce the number of candidates while giving
space to reach the `threshold`. Defaut is 10.
Returns
----------
out: DataFrame
Return the input Spark DataFrame with a new column `flag`
for locating anomalous alerts
out: Pandas DataFrame
Pandas DataFrame with anomaly information for the selected candidates
Examples
----------
>>> from fink_utils.spark.utils import concat_col
>>> from fink_science.ad_features.processor import extract_features_ad
>>> from fink_science.anomaly_detection.processor import anomalys_score
>>> from fink_science.anomaly_detection.processor import anomaly_score
>>> df = spark.read.format('parquet').load('datatest')
>>> what = ['magpsf', 'jd', 'sigmapsf', 'fid']
>>> prefix = 'c'
>>> what_prefix = [prefix + i for i in what]
>>> for colname in what:
... df = concat_col(df, colname, prefix=prefix)
>>> df = df.withColumn('lc_features', extract_features_ad(*what_prefix, 'objectId'))
>>> df = df.withColumn("anomaly_score", anomaly_score("lc_features"))
>>> df_proc = df.select(
... 'objectId', 'candidate.ra',
... 'candidate.dec', 'candidate.rb',
... 'anomaly_score', 'timestamp')
>>> df_out = anomaly_notification_(df_proc)
>>> #For sending to messengers:
>>> #df_out = anomaly_notification_(df_proc, threshold=10,
send_to_tg=True, channel_id="@ZTF_anomaly_bot",
send_to_slack=True, channel_name='fink_alert')
>>> print(df_out.filter(df_out['flag']).count())
# Disable communication
>>> pdf_anomalies = anomaly_notification_(df_proc, threshold=10,
... send_to_tg=False, channel_id=None,
... send_to_slack=False, channel_name=None)
>>> print(pdf_anomalies['objectId'].values)
['ZTF21acoshvy' 'ZTF18aapgymv' 'ZTF19aboujyi' 'ZTF18abgjtxx' 'ZTF18aaypnnd'
'ZTF18abbtxsx' 'ZTF18aaakhsv' 'ZTF18actxdmj' 'ZTF18aapoack' 'ZTF18abzvnya']
"""
# Compute the median for the night
med = df_proc.select('anomaly_score').approxQuantile('anomaly_score', [0.5], 0.05)
Expand Down Expand Up @@ -117,5 +129,12 @@ def anomaly_notification_(
if send_to_tg:
filter_utils.msg_handler_tg(tg_data, channel_id, med)

df_result = df_proc.withColumn('flag', df_proc['anomaly_score'] <= upper_bound)
return df_result
return pdf_anomalies


if __name__ == "__main__":
""" Execute the test suite """

# Run the test suite
globs = globals()
spark_unit_tests(globs)

0 comments on commit fd66daf

Please sign in to comment.