This repository has been archived by the owner on May 12, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 235
/
job_monitor.py
63 lines (52 loc) · 2.07 KB
/
job_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import time
from twitter.common.quantity import Amount, Time
from gen.apache.aurora.constants import (
LIVE_STATES,
TERMINAL_STATES
)
from gen.apache.aurora.ttypes import (
Identity,
TaskQuery
)
from thrift.transport import TTransport
class JobMonitor(object):
MIN_POLL_INTERVAL = Amount(10, Time.SECONDS)
MAX_POLL_INTERVAL = Amount(2, Time.MINUTES)
@classmethod
def running_or_finished(cls, status):
return status in (LIVE_STATES | TERMINAL_STATES)
@classmethod
def terminal(cls, status):
return status in TERMINAL_STATES
# TODO(ksweeney): Make this use the AuroraJobKey
def __init__(self, client, role, env, jobname):
self._client = client
self._query = TaskQuery(owner=Identity(role=role), environment=env, jobName=jobname)
self._initial_tasks = set()
self._initial_tasks = set(task.assignedTask.taskId for task in self.iter_query())
def iter_query(self):
try:
res = self._client.scheduler.getTasksStatus(self._query)
except TTransport.TTransportException as e:
print('Failed to query slaves from scheduler: %s' % e)
return
if res is None or res.result is None:
return
for task in res.result.scheduleStatusResult.tasks:
if task.assignedTask.taskId not in self._initial_tasks:
yield task
def states(self):
states = {}
for task in self.iter_query():
status, instance_id = task.status, task.assignedTask.instanceId
first_timestamp = task.taskEvents[0].timestamp
if instance_id not in states or first_timestamp > states[instance_id][0]:
states[instance_id] = (first_timestamp, status)
return dict((instance_id, status[1]) for (instance_id, status) in states.items())
def wait_until(self, predicate):
"""Given a predicate (from ScheduleStatus => Boolean), return once all tasks
return true for that predicate."""
poll_interval = self.MIN_POLL_INTERVAL
while not all(predicate(state) for state in self.states().values()):
time.sleep(poll_interval.as_(Time.SECONDS))
poll_interval = min(self.MAX_POLL_INTERVAL, 2 * poll_interval)