-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #28 from crowdresearch/api
Refactoring + Graceful Closure + Rerun Performance updates
- Loading branch information
Showing
14 changed files
with
1,041 additions
and
710 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
import json | ||
import logging | ||
|
||
from daemo.rest import RestClient | ||
from daemo.router import Route | ||
from daemo.utils import raise_if_error | ||
|
||
log = logging.getLogger("daemo.client") | ||
|
||
|
||
class ApiClient: | ||
def __init__(self, credentials_path, host, http_proto): | ||
self.route = Route() | ||
self.client = RestClient(credentials_path, host, http_proto) | ||
|
||
def get_auth_token(self): | ||
return self.client.auth.get_auth_token() | ||
|
||
def fetch_config(self, rerun_key): | ||
response = self.client.get(self.route.rerun_config % rerun_key, data=json.dumps({})) | ||
raise_if_error("publish project", response) | ||
|
||
return response.json() | ||
|
||
def publish_project(self, project_id): | ||
response = self.client.post(self.route.publish_project % project_id, data=json.dumps({})) | ||
raise_if_error("publish project", response) | ||
|
||
return response.json() | ||
|
||
def add_data(self, project_key, tasks, rerun_key): | ||
response = self.client.post(self.route.add_tasks % project_key, data=json.dumps({ | ||
"tasks": tasks, | ||
"rerun_key": rerun_key | ||
})) | ||
|
||
raise_if_error("publish project", response) | ||
|
||
return response.json() | ||
|
||
def get_task_results_by_taskworker_id(self, taskworker_id): | ||
try: | ||
response = self.client.get(self.route.task_worker_results % taskworker_id, data={}) | ||
raise_if_error("process result", response) | ||
results = response.json() | ||
return results | ||
except Exception as e: | ||
return None | ||
|
||
def update_approval_status(self, task): | ||
log.debug(msg="updating status for task %d" % task["id"]) | ||
|
||
STATUS_ACCEPTED = 3 | ||
STATUS_REJECTED = 4 | ||
|
||
data = { | ||
"status": STATUS_ACCEPTED if task["accept"] else STATUS_REJECTED, | ||
"workers": [task["id"]] | ||
} | ||
|
||
response = self.client.post(self.route.update_task_status, data=json.dumps(data)) | ||
raise_if_error("task approval", response) | ||
|
||
return response.json() | ||
|
||
def fetch_task(self, task_id): | ||
response = self.client.get(self.route.task % task_id, data=json.dumps({})) | ||
raise_if_error("task", response) | ||
|
||
task_data = response.json() | ||
return task_data | ||
|
||
def fetch_task_status(self, task_id): | ||
response = self.client.get(self.route.task_status % task_id, data={}) | ||
raise_if_error("task status", response) | ||
|
||
task_data = response.json() | ||
return task_data | ||
|
||
def submit_results(self, task_id, results): | ||
data = { | ||
"task_id": task_id, | ||
"results": results | ||
} | ||
|
||
response = self.client.post(self.route.mock_results, data=json.dumps(data)) | ||
raise_if_error("result submission", response) | ||
return response.json() | ||
|
||
def launch_peer_review(self, task_workers, inter_task_review, rerun_key): | ||
data = { | ||
"task_workers": task_workers, | ||
"inter_task_review": inter_task_review, | ||
"rerun_key": rerun_key | ||
} | ||
|
||
response = self.client.post(self.route.peer_review, data=json.dumps(data)) | ||
raise_if_error("peer review", response) | ||
return response.json() | ||
|
||
def get_trueskill_scores(self, match_group_id): | ||
response = self.client.get(self.route.true_skill_score.format(match_group_id)) | ||
raise_if_error("rating", response) | ||
|
||
return response.json() | ||
|
||
def boomerang_feedback(self, data): | ||
response = self.client.post(self.route.boomerang_rating, data=json.dumps(data)) | ||
raise_if_error("rating", response) | ||
|
||
return response.json() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
import fcntl | ||
import json | ||
import logging.config | ||
import os | ||
|
||
from daemo.errors import Error | ||
from daemo.exceptions import ServerException | ||
from daemo.utils import check_dependency | ||
|
||
log = logging.getLogger("daemo.client") | ||
|
||
AUTHORIZATION = "Authorization" | ||
TOKEN = "Bearer %s" | ||
GRANT_TYPE = "grant_type" | ||
REFRESH_TOKEN = "refresh_token" | ||
ACCESS_TOKEN = "access_token" | ||
CLIENT_ID = "client_id" | ||
CREDENTIALS_NOT_PROVIDED = "Authentication credentials were not provided." | ||
REFRESH_TOKEN_FAILED = "Error refreshing access token. Please retry again." | ||
AUTH_TOKEN_URL = "/api/oauth2-ng/token/" | ||
|
||
|
||
class Auth: | ||
client = None | ||
client_id = None | ||
access_token = None | ||
refresh_token = None | ||
credentials_path = None | ||
host = None | ||
http_proto = None | ||
|
||
def __init__(self, credentials_path, host, http_proto, client): | ||
self.client = client | ||
self.credentials_path = credentials_path | ||
self.host = host | ||
self.http_proto = http_proto | ||
|
||
def initialize(self): | ||
if self.credentials_exist(): | ||
self.load_tokens() | ||
else: | ||
self.persist_tokens() | ||
|
||
self.refresh_tokens() | ||
|
||
def credentials_exist(self): | ||
return os.path.isfile(self.credentials_path) | ||
|
||
def load_tokens(self): | ||
with open(self.credentials_path, "r") as infile: | ||
data = json.load(infile) | ||
|
||
check_dependency(data[CLIENT_ID] is not None and len(data[CLIENT_ID]) > 0, Error.required(CLIENT_ID)) | ||
check_dependency(data[ACCESS_TOKEN] is not None and len(data[ACCESS_TOKEN]) > 0, | ||
Error.required(ACCESS_TOKEN)) | ||
check_dependency(data[REFRESH_TOKEN] is not None and len(data[REFRESH_TOKEN]) > 0, | ||
Error.required(REFRESH_TOKEN)) | ||
|
||
self.client_id = data[CLIENT_ID] | ||
self.access_token = data[ACCESS_TOKEN] | ||
self.refresh_token = data[REFRESH_TOKEN] | ||
|
||
def persist_tokens(self): | ||
with open(self.credentials_path, "w") as outfile: | ||
fcntl.flock(outfile.fileno(), fcntl.LOCK_EX) | ||
|
||
data = { | ||
CLIENT_ID: self.client_id, | ||
ACCESS_TOKEN: self.access_token, | ||
REFRESH_TOKEN: self.refresh_token | ||
} | ||
|
||
json.dump(data, outfile) | ||
|
||
def refresh_tokens(self): | ||
self.load_tokens() | ||
|
||
data = { | ||
CLIENT_ID: self.client_id, | ||
GRANT_TYPE: REFRESH_TOKEN, | ||
REFRESH_TOKEN: self.refresh_token | ||
} | ||
|
||
auth_response = self.client.post(AUTH_TOKEN_URL, data=data, is_json=False, authorization=False) | ||
response = auth_response.json() | ||
|
||
if "error" in response: | ||
raise ServerException("auth", REFRESH_TOKEN_FAILED, 400) | ||
|
||
check_dependency(response[ACCESS_TOKEN] is not None and len(response[ACCESS_TOKEN]) > 0, | ||
Error.required(ACCESS_TOKEN)) | ||
check_dependency(response[REFRESH_TOKEN] is not None and len(response[REFRESH_TOKEN]) > 0, | ||
Error.required( | ||
REFRESH_TOKEN)) | ||
|
||
self.access_token = response.get(ACCESS_TOKEN) | ||
self.refresh_token = response.get(REFRESH_TOKEN) | ||
|
||
self.persist_tokens() | ||
|
||
def get_auth_token(self): | ||
return self.access_token | ||
|
||
def is_auth_error(self, response): | ||
try: | ||
response = response.json() | ||
except Exception as e: | ||
pass | ||
|
||
return response is not None \ | ||
and isinstance(response, dict) \ | ||
and response.get("detail", "") == CREDENTIALS_NOT_PROVIDED |
Oops, something went wrong.