/
listener.py
264 lines (219 loc) · 8.78 KB
/
listener.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
import copy
import dataclasses
import functools
import logging
import threading
import time
from typing import Any, Dict, List, Optional
import schedule
import rubrix
from rubrix.client import api
from rubrix.client.sdk.commons.errors import NotFoundApiError
from rubrix.listeners.models import (
ListenerAction,
ListenerCondition,
Metrics,
RBListenerContext,
Search,
)
@dataclasses.dataclass
class RBDatasetListener:
"""
The Rubrix dataset listener class
Args:
dataset: The dataset over which listener is created
action: The action to execute when condition is satisfied
metrics: A list of metrics ids that will be required in condition
query: The query string to apply
query_params: Defined parameters used dynamically in the provided query
condition: The condition to satisfy to execute the action
query_records: If ``False``, the records won't be passed as argument to the action.
Default: ``True``
interval_in_seconds: How often the listener is executed. Default to 30 seconds
"""
_LOGGER = logging.getLogger(__name__)
dataset: str
action: ListenerAction
metrics: Optional[List[str]] = None
query: Optional[str] = None
query_params: Optional[Dict[str, Any]] = None
condition: Optional[ListenerCondition] = None
query_records: bool = True
interval_in_seconds: int = 30
@property
def formatted_query(self) -> Optional[str]:
"""Formatted query using defined query params, if any"""
if self.query is None:
return None
return self.query.format(**(self.query_params or {}))
__listener_job__: Optional[schedule.Job] = dataclasses.field(
init=False, default=None
)
__stop_schedule_event__ = None
__current_thread__ = None
__scheduler__ = schedule.Scheduler()
def __post_init__(self):
self.metrics = self.metrics or []
self._validate()
def _validate(self):
try:
query = self.formatted_query
if query:
self._LOGGER.debug(f"Initial listener query {query}")
except KeyError as kex:
raise KeyError("Missing query parameter:", kex)
def is_running(self):
"""True if listener is running"""
return self.__listener_job__ is not None
def __catch_exceptions__(self, cancel_on_failure=False):
def catch_exceptions_decorator(job_func):
@functools.wraps(job_func)
def wrapper(*args, **kwargs):
try:
return job_func(*args, **kwargs)
except:
import traceback
print(traceback.format_exc())
if cancel_on_failure:
self.stop() # We stop the scheduler
return wrapper
return catch_exceptions_decorator
def start(self, *action_args, **action_kwargs):
"""
Start listen to changes in the dataset. Additionally, args and kwargs can be passed to action
by using the `action_*` arguments
If the listener is already started, a ``ValueError`` will be raised
"""
if self.is_running():
raise ValueError("Listener is already running")
job_step = self.__catch_exceptions__(cancel_on_failure=True)(
self.__listener_iteration_job__
)
self.__listener_job__ = self.__scheduler__.every(
self.interval_in_seconds
).seconds.do(job_step, *action_args, **action_kwargs)
class _ScheduleThread(threading.Thread):
_WAIT_EVENT = threading.Event()
_THREAD_LOGGER = logging.getLogger(__name__)
@classmethod
def run(cls):
cls._THREAD_LOGGER.debug("Running listener thread...")
while not cls._WAIT_EVENT.is_set():
self.__scheduler__.run_pending()
time.sleep(self.interval_in_seconds - 1)
cls._THREAD_LOGGER.debug("Stopping listener thread...")
@classmethod
def stop(cls):
cls._WAIT_EVENT.set()
self.__current_thread__ = _ScheduleThread()
self.__current_thread__.start()
def stop(self):
"""
Stops listener if it's still running.
If listener is already stopped, a ``ValueError`` will be raised
"""
if not self.is_running():
raise ValueError("Listener is not running")
self.__scheduler__.cancel_job(self.__listener_job__)
self.__listener_job__ = None
self.__current_thread__.stop()
self.__current_thread__.join() # TODO: improve it!
def __listener_iteration_job__(self, *args, **kwargs):
"""
Execute a complete listener iteration. The iteration consists on:
1. Query data and fetch configured metrics
2. Check search results and metrics with provided condition
3. Execute the action if condition is satisfied
"""
current_api = api.active_api()
try:
dataset = current_api.datasets.find_by_name(self.dataset)
self._LOGGER.debug(f"Found listener dataset {dataset.name}")
except NotFoundApiError:
self._LOGGER.warning(f"Not found dataset <{self.dataset}>")
return
ctx = RBListenerContext(
listener=self,
query_params=self.query_params,
metrics=self.__compute_metrics__(
current_api, dataset, query=self.formatted_query
),
)
if self.condition is None:
self._LOGGER.debug("No condition found! Running action...")
return self.__run_action__(ctx, *args, **kwargs)
search_results = current_api.searches.search_records(
name=self.dataset, task=dataset.task, query=self.formatted_query, size=0
)
ctx.search = Search(
total=search_results.total, query_params=copy.deepcopy(ctx.query_params)
)
condition_args = [ctx.search]
if self.metrics:
condition_args.append(ctx.metrics)
self._LOGGER.debug(f"Evaluate condition with arguments: {condition_args}")
if self.condition(*condition_args):
self._LOGGER.debug(f"Condition passed! Running action...")
return self.__run_action__(ctx, *args, **kwargs)
def __compute_metrics__(self, current_api, dataset, query: str) -> Metrics:
metrics = {}
for metric in self.metrics:
metrics.update(
{
metric: current_api.metrics.metric_summary(
name=self.dataset,
task=dataset.task,
metric=metric,
query=query,
)
}
)
return Metrics.from_dict(metrics)
def __run_action__(self, ctx: Optional[RBListenerContext] = None, *args, **kwargs):
try:
action_args = [ctx] if ctx else []
if self.query_records:
action_args.insert(
0, rubrix.load(name=self.dataset, query=self.formatted_query)
)
self._LOGGER.debug(f"Running action with arguments: {action_args}")
return self.action(*args, *action_args, **kwargs)
except:
import traceback
print(traceback.format_exc())
return schedule.CancelJob
def listener(
dataset: str,
query: Optional[str] = None,
metrics: Optional[List[str]] = None,
condition: Optional[ListenerCondition] = None,
with_records: bool = True,
execution_interval_in_seconds: int = 30,
**query_params,
):
"""
Configures the decorated function as a Rubrix listener.
Args:
dataset: The dataset name.
query: The query string.
metrics: Required metrics for listener condition.
condition: Defines condition over search and metrics that launch action when is satisfied.
with_records: Include records as part or action arguments. If ``False``,
only the listener context ``RBListenerContext`` will be passed. Default: ``True``.
execution_interval_in_seconds: Define the execution interval in seconds when listener
iteration will be executed.
**query_params: Dynamic parameters used in the query. These parameters will be available
via the listener context and can be updated for subsequent queries.
"""
def inner_decorator(func):
return RBDatasetListener(
dataset=dataset,
action=func,
condition=condition,
query=query,
query_params=query_params,
metrics=metrics,
query_records=with_records,
interval_in_seconds=execution_interval_in_seconds,
)
return inner_decorator