/
XShooterFilter.py
executable file
·117 lines (100 loc) · 4.73 KB
/
XShooterFilter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: Ampel-contrib-HU/ampel/contrib/hu/t0/XShooterFilter.py
# License: BSD-3-Clause
# Author: m. giomi <matteo.giomi@desy.de>
# Date: 28.08.2018
# Last Modified Date: 24.11.2021
# Last Modified By: jnordin
from numpy import array
from ampel.protocol.AmpelAlertProtocol import AmpelAlertProtocol
from ampel.ztf.t0.DecentFilter import DecentFilter
class XShooterFilter(DecentFilter):
"""
Filter derived from the DecentFilter, in addition selecting very new
transients which are visible from the South. In particular, the transient
are accepted if they are detected during the last 6h, at least one non-detection
during the last 5 days (and no detection during this time).
"""
max_dec: float # maximum allowed value for the declination
det_within: float # the transient must have been detected within the last 'DET_WITHIN' days
ul_within: float # the transient must AT LEAST one ulim within the last 'UL_WITHIN' days
# Updated parameters based on infant detections spring 2021. Defaults conservative
max_chipsf: float = 4 # Best guess value 2
max_seeratio: float = 2 # Best guess value 1.3
min_sumrat: float = 0.6 # Best guess value 0.8
def post_init(self):
super().post_init()
# self.keys_to_check += ("jd",)
# Is none not working, now doing this manually
# self.select_upper_limits = [{'attribute': 'magpsf', 'operator': 'is', 'value': None}]
self.select_photopoints = [
{"attribute": "magpsf", "operator": "is not", "value": None}
]
# Override
def process(self, alert: AmpelAlertProtocol) -> None | bool | int:
"""
run the decent filter on the alert
"""
# cut on declination
latest = alert.datapoints[0]
if latest["dec"] > self.max_dec:
self.logger.debug(
f"Rejected: declination {latest['dec']:.2f} deg is "
f"above maximum allowed of {self.max_dec:.2f} deg"
)
return None
# CUT ON LATEST SUBTRACTION QUALITY
#################################
if latest["chipsf"] > self.max_chipsf:
self.logger.debug(
f"Rejected: chipsf {latest['chipsf']:.2f} "
f"above maximum allowed of {self.max_chipsf:.2f}"
)
return None
if latest["seeratio"] > self.max_seeratio:
self.logger.debug(
f"Rejected: seeratio {latest['seeratio']:.2f} "
f"above maximum allowed of {self.max_seeratio:.2f}"
)
return None
if latest["sumrat"] < self.min_sumrat:
self.logger.debug(
f"Rejected: sumrat {latest['sumrat']:.2f} "
f"below min allowed of {self.min_sumrat:.2f}"
)
return None
# CUT ON THE HISTORY OF THE ALERT
#################################
now_jd = latest["jd"]
self.logger.debug(f"Setting 'now' to JD {now_jd:.4f} to cut on alert history")
# check on history 1: detected in the last 6h
detection_jds = array(alert.get_values("jd", filters=self.select_photopoints))
recent_detections = detection_jds > (now_jd - self.det_within)
if not any(recent_detections):
self.logger.debug(
f"Rejected: no detection within the last {self.det_within:.3f} days "
f"(latest one {(now_jd - max(detection_jds)):.3f} days ago)"
)
return None
# check on the history 2: at least one upper limit in the last 5 days
# old version to look for upper limits not working...
ulim_jds = [el["jd"] for el in alert.datapoints if el.get("candid") is None]
if ulim_jds is None:
self.logger.debug("Rejected: this alert has no upper limits")
return None
if not any(array(ulim_jds) > (now_jd - self.ul_within)):
self.logger.debug(
f"Rejected: no upper limit in the last {self.ul_within:.3f} days"
)
return None
# check on history 3: no detection within the last 5 days
not_so_recent_detections = detection_jds[~recent_detections]
if any(not_so_recent_detections > (now_jd - self.ul_within)):
self.logger.debug(
f"Rejected: at least one detection within the last {self.ul_within:.3f} days "
f"(but not within {self.det_within:.3f} days)."
)
return None
# now apply the DecentFilter
return super().process(alert)