/
compute.py
executable file
·135 lines (117 loc) · 4.63 KB
/
compute.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import os
import requests
import json
from requests.exceptions import RequestException, Timeout
import requests_mock
requests_mock.Mocker.TEST_PREFIX = "test"
WORKER_HN = os.environ.get("WORKERS")
TIMEOUT_IN_SECONDS = 1.7
MAX_ATTEMPTS_SUBMIT_JOB = 4
class JobFailError(Exception):
"""An Exception to raise when a remote jobs has failed"""
class WorkersUnreachableError(Exception):
"""
An Exception to raise when the backend workers are not reachable. This
should only be raised when the webapp is run without the workers.
"""
class Compute(object):
def remote_submit_job(self, url, data, timeout=TIMEOUT_IN_SECONDS, headers=None):
response = requests.post(url, data=data, timeout=timeout)
return response
def remote_query_job(self, theurl):
job_response = requests.get(theurl)
return job_response
def remote_get_job(self, theurl):
job_response = requests.get(theurl)
return job_response
def submit_job(self, tasks, endpoint):
print("submitting", tasks, endpoint)
url = f"http://{WORKER_HN}/{endpoint}"
return self.submit(tasks, url)
def submit(self, tasks, url, increment_counter=True, use_wnc_offset=True):
queue_length = 0
submitted = False
attempts = 0
while not submitted:
packed = json.dumps(tasks)
try:
response = self.remote_submit_job(
url, data=packed, timeout=TIMEOUT_IN_SECONDS
)
if response.status_code == 200:
print("submitted: ", url)
submitted = True
data = response.json()
job_id = data["job_id"]
queue_length = data["qlength"]
else:
print("FAILED: ", WORKER_HN)
attempts += 1
except Timeout:
print("Couldn't submit to: ", WORKER_HN)
attempts += 1
except RequestException as re:
print("Something unexpected happened: ", re)
attempts += 1
if attempts > MAX_ATTEMPTS_SUBMIT_JOB:
print("Exceeded max attempts. Bailing out.")
raise WorkersUnreachableError()
return job_id, queue_length
def results_ready(self, sim):
result_url = (
f"http://{WORKER_HN}/{sim.project.owner.user.username}/{sim.project.title}"
f"/query/{sim.job_id}/"
)
job_response = self.remote_query_job(result_url)
msg = "{0} failed on host: {1}".format(sim.job_id, WORKER_HN)
if job_response.status_code == 200: # Valid response
return job_response.text
else:
print("did not expect response with status_code", job_response.status_code)
raise JobFailError(msg)
def get_results(self, sim):
result_url = (
f"http://{WORKER_HN}/{sim.project.owner.user.username}/{sim.project.title}"
f"/get_job/{sim.job_id}/"
)
job_response = self.remote_get_job(result_url)
if job_response.status_code == 200: # Valid response
try:
return job_response.json()
except ValueError:
# Got back a bad response. Get the text and re-raise
msg = "PROBLEM WITH RESPONSE. TEXT RECEIVED: {}"
raise ValueError(msg)
else:
raise WorkersUnreachableError()
class SyncCompute(Compute):
def submit(self, tasks, url, increment_counter=True, use_wnc_offset=True):
submitted = False
attempts = 0
while not submitted:
packed = json.dumps(tasks)
try:
response = self.remote_submit_job(
url, data=packed, timeout=TIMEOUT_IN_SECONDS
)
if response.status_code == 200:
print("submitted: ", url)
submitted = True
data = response.json()
else:
print("FAILED: ", WORKER_HN)
attempts += 1
except Timeout:
print("Couldn't submit to: ", WORKER_HN)
attempts += 1
except RequestException as re:
print("Something unexpected happened: ", re)
attempts += 1
if attempts > MAX_ATTEMPTS_SUBMIT_JOB:
print("Exceeded max attempts. Bailing out.")
raise WorkersUnreachableError()
success = data["status"] == "SUCCESS"
if success:
return success, data
else:
return success, data