From fd66daf424243b083fd68338abdca734bf3db033 Mon Sep 17 00:00:00 2001 From: Julien Date: Mon, 22 May 2023 09:20:19 +0200 Subject: [PATCH] AD filter returns a Pandas DataFrame (#145) * Modify output of AD filter, and expand test suite * Bump to 3.12 * PEP8 --- fink_filters/__init__.py | 2 +- .../filter_anomaly_notification/filter.py | 45 +++++++++++++------ 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/fink_filters/__init__.py b/fink_filters/__init__.py index ff71fe6..b31cab4 100644 --- a/fink_filters/__init__.py +++ b/fink_filters/__init__.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "3.11" +__version__ = "3.12" diff --git a/fink_filters/filter_anomaly_notification/filter.py b/fink_filters/filter_anomaly_notification/filter.py index 5135f00..bb02a85 100644 --- a/fink_filters/filter_anomaly_notification/filter.py +++ b/fink_filters/filter_anomaly_notification/filter.py @@ -19,13 +19,15 @@ from fink_filters.filter_anomaly_notification import filter_utils +from fink_filters.tester import spark_unit_tests + def anomaly_notification_( df_proc, threshold=10, send_to_tg=False, channel_id=None, send_to_slack=False, channel_name=None, trick_par=10): - """ Create event notifications with a high anomaly_score value + """ Create event notifications with a high `anomaly_score` value Notes ---------- @@ -50,38 +52,48 @@ def anomaly_notification_( Telegram channel. send_to_slack: optional, boolean If true, send message to Slack. Default is False - channel_id: str + channel_name: str If `send_to_slack` is True, `channel_name` is the name of the Slack channel. + trick_par: int, optional + Internal buffer to reduce the number of candidates while giving + space to reach the `threshold`. Defaut is 10. Returns ---------- - out: DataFrame - Return the input Spark DataFrame with a new column `flag` - for locating anomalous alerts + out: Pandas DataFrame + Pandas DataFrame with anomaly information for the selected candidates + Examples ---------- >>> from fink_utils.spark.utils import concat_col >>> from fink_science.ad_features.processor import extract_features_ad - >>> from fink_science.anomaly_detection.processor import anomalys_score + >>> from fink_science.anomaly_detection.processor import anomaly_score + >>> df = spark.read.format('parquet').load('datatest') + >>> what = ['magpsf', 'jd', 'sigmapsf', 'fid'] >>> prefix = 'c' >>> what_prefix = [prefix + i for i in what] >>> for colname in what: ... df = concat_col(df, colname, prefix=prefix) + >>> df = df.withColumn('lc_features', extract_features_ad(*what_prefix, 'objectId')) >>> df = df.withColumn("anomaly_score", anomaly_score("lc_features")) + >>> df_proc = df.select( ... 'objectId', 'candidate.ra', ... 'candidate.dec', 'candidate.rb', ... 'anomaly_score', 'timestamp') >>> df_out = anomaly_notification_(df_proc) - >>> #For sending to messengers: - >>> #df_out = anomaly_notification_(df_proc, threshold=10, - send_to_tg=True, channel_id="@ZTF_anomaly_bot", - send_to_slack=True, channel_name='fink_alert') - >>> print(df_out.filter(df_out['flag']).count()) + + # Disable communication + >>> pdf_anomalies = anomaly_notification_(df_proc, threshold=10, + ... send_to_tg=False, channel_id=None, + ... send_to_slack=False, channel_name=None) + >>> print(pdf_anomalies['objectId'].values) + ['ZTF21acoshvy' 'ZTF18aapgymv' 'ZTF19aboujyi' 'ZTF18abgjtxx' 'ZTF18aaypnnd' + 'ZTF18abbtxsx' 'ZTF18aaakhsv' 'ZTF18actxdmj' 'ZTF18aapoack' 'ZTF18abzvnya'] """ # Compute the median for the night med = df_proc.select('anomaly_score').approxQuantile('anomaly_score', [0.5], 0.05) @@ -117,5 +129,12 @@ def anomaly_notification_( if send_to_tg: filter_utils.msg_handler_tg(tg_data, channel_id, med) - df_result = df_proc.withColumn('flag', df_proc['anomaly_score'] <= upper_bound) - return df_result + return pdf_anomalies + + +if __name__ == "__main__": + """ Execute the test suite """ + + # Run the test suite + globs = globals() + spark_unit_tests(globs)