-
Notifications
You must be signed in to change notification settings - Fork 1
/
dummyhandler.py
111 lines (87 loc) · 3.34 KB
/
dummyhandler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from dane.handlers.base_handler import BaseHandler
from dane.state import ProcState
import uuid
class DummyHandler(BaseHandler):
def __init__(self):
super().__init__({}) # pass empty config
self.doc_register = {}
self.task_register = {}
def registerDocument(self, document):
idx = str(uuid.uuid4())
document._id = idx
self.doc_register[idx] = document
return idx
def deleteDocument(self, document):
del self.doc_register[document.document_id]
# Note: added to make test_dane.py work, but does nothing yet
def assignTaskToMany(self, task, document_ids):
return
# Note: added to make test_dane.py work, but does nothing yet
def registerDocuments(self, documents):
return
def assignTask(self, task, document_id):
if document_id in self.doc_register.keys():
idx = str(len(self.task_register))
task._id = idx
# store document state as HTTP response codes
self.task_register[idx] = (task, document_id)
task.state = ProcState.CREATED.value
task.msg = "Created"
else:
raise APIRegistrationError(
"Unregistered document or unknown document id!"
"Register document first"
)
return task.run()
def deleteTask(self, task):
del self.task_register[task._id]
def taskFromTaskId(self, task_id):
return self.task_register[task_id]
def getTaskState(self, task_id):
return self.taskFromTaskId(task_id)[1]
def getTaskKey(self, task_id):
return self.taskFromTaskId(task_id)[0]
def documentFromDocumentId(self, document_id):
return self.doc_register[document_id]
def documentFromTaskId(self, task_id):
return self.doc_register[self.task_register[task_id][2]]
def run(self, task_id):
task, document_id = self.task_register[task_id]
self.updateTaskState(task._id, ProcState.SUCCESS.value, "Success!")
print(
"DummyEndpoint: Executed task {} for "
"document: {}".format(task.key, document_id)
)
def retry(self, task_id, force=False):
task, document_id = self.task_register[task_id]
self.updateTaskState(task._id, ProcState.SUCCESS.value, "Retried successfully!")
print(
"DummyEndpoint: Retried task {} for "
"document: {}".format(task.key, document_id)
)
def callback(self, task_id, response):
print(
"DummyEndpoint: Callback response {} for "
"task_id: {}".format(response, task_id)
)
def updateTaskState(self, task_id, state, message):
task, _ = self.task_register[task_id]
task.state = state
task.msg = message
# self.task_register[task_id] = (state, message, document_id)
def search(self, target_id, creator_id):
return
def getUnfinished(self):
return [
t._id for t, _ in self.task_register if t.state != ProcState.SUCCESS.value
]
def registerResult(self, result, task_id):
return
def deleteResult(self, result):
return
def resultFromResultId(self, result_id):
return
def searchResult(document_id, task_key):
return
def getAssignedTasks(self, document_id, task_key=None):
return