/
worker_api.go
107 lines (74 loc) · 2.71 KB
/
worker_api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package rest
import (
"net/http"
"io/ioutil"
"encoding/json"
flow_pkg "hyperflow.in/server/pkg/flow"
"hyperflow.in/server/pkg/base"
"hyperflow.in/server/pkg/base/structs"
)
//These are worker specific API functions
func (h *Handler) handleUpdateTaskStatus() error {
if (h.rq.Method != "PATCH") {
return base.HTTPErrorf(http.StatusMethodNotAllowed, "Invalid method %s", h.rq.Method)
}
// declare in/out variables
var response map[string]interface{}
var raw_input []byte
var err error
worker_id, _ := h.getMandatoryUrlParam("workerId")
if worker_id == "" {
return base.HTTPErrorf(http.StatusBadRequest, "No worker Id")
}
if h.rq.Body == nil {
return base.HTTPErrorf(http.StatusBadRequest, "Empty status change request")
}
// read parameters
raw_input, err = ioutil.ReadAll(h.rq.Body)
// create a status change request from raw json body
change_req := flow_pkg.TaskStatusChangeRequest{}
if err := json.Unmarshal(raw_input, &change_req); err != nil {
base.Log("[rest.flow.UpdateTaskStatus] Invalid JSON for TaskStatusChangeRequest: ", err)
return err
}
//base.Log("[Handler.handleUpdateTaskStatus] Worker Id, New Status: ", worker_id, change_req.TaskStatus)
// call internal API
worker := flow_pkg.Worker {Id: worker_id}
change_resp, err := h.server.flowAPI.UpdateWorkerTaskStatus(worker, &change_req)
if err != nil {
base.Log("[Handler.handleUpdateTaskStatus] Failed task update: ", change_req)
base.Log("[Handler.handleUpdateTaskStatus] ", err)
return err
}
response = structs.Map(change_resp)
h.writeJSON(response)
return nil
}
func (h *Handler) handleRegisterWorker() error {
if (h.rq.Method != "POST") {
return base.HTTPErrorf(http.StatusMethodNotAllowed, "Invalid method %s", h.rq.Method)
}
flow_id := h.getQuery("flowId")
task_id := h.getQuery("taskId")
ip := h.getQuery("ip")
var response map[string]interface{}
worker_attr, err := h.server.flowAPI.RegisterWorker(flow_id, task_id, ip)
if err != nil {
return base.HTTPErrorf(http.StatusInternalServerError, err.Error() )
}
response = structs.Map(worker_attr)
h.writeJSON(response)
return nil
}
func (h *Handler) handleDetachTaskWorker() error {
if (h.rq.Method != "POST") {
return base.HTTPErrorf(http.StatusMethodNotAllowed, "Invalid method %s", h.rq.Method)
}
flow_id := h.getQuery("flowId")
task_id := h.getQuery("taskId")
worker_id := h.getQuery("workerId")
if flow_id == "" || task_id == "" || worker_id == "" {
return base.HTTPErrorf(http.StatusInternalServerError, "One of the params is missing: flowId, taskId, workerId")
}
return h.server.flowAPI.DetachTaskWorker(worker_id, flow_id, task_id)
}