-
Notifications
You must be signed in to change notification settings - Fork 2
/
worker_info.go
160 lines (133 loc) · 3.9 KB
/
worker_info.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package libWorker
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/APITeamLimited/globe-test/lib"
"github.com/gorilla/websocket"
)
type (
Collection struct {
Variables map[string]string
Name string
}
Environment struct {
Variables map[string]string
Name string
}
KeyValueItem struct {
Key string `json:"key"`
Value string `json:"value"`
}
WorkerInfo struct {
Conn *websocket.Conn
JobId string
ChildJobId string
ScopeId string
OrchestratorId string
WorkerId string
Ctx context.Context
Environment *Environment
Collection *Collection
WorkerOptions Options
FinalRequest map[string]interface{}
UnderlyingRequest map[string]interface{}
Gs *BaseGlobalState
VerifiedDomains []string
SubFraction float64
CreditsManager *lib.CreditsManager
Standalone bool
}
MarkMessage struct {
Mark string `json:"mark"`
Message map[string]interface{} `json:"message"`
}
)
type Message struct {
JobId string `json:"jobId"`
ChildJobId string `json:"childJobId"`
Time time.Time `json:"time"`
WorkerId string `json:"workerId"`
Message string `json:"message"`
MessageType string `json:"messageType"`
}
type MessageQueue struct {
Mutex sync.Mutex
// The count of currently actively being sent messages
QueueCount int
NewQueueCount chan int
}
func DispatchMessage(gs BaseGlobalState, message string, messageType string) {
// go func() {
// messageQueue := gs.MessageQueue()
// isTerminal := messageType == "STATUS" && (message == "FAILURE" || message == "SUCCESS")
// if !isTerminal {
// messageQueue.Mutex.Lock()
// messageQueue.QueueCount++
// messageQueue.Mutex.Unlock()
// gs.ConnWriteMutex().Lock()
// gs.Conn().WriteJSON(formatMessage(gs, message, messageType))
// gs.ConnWriteMutex().Unlock()
// messageQueue.Mutex.Lock()
// messageQueue.QueueCount--
// // Must unlock the mutex before sending the new count to the channel
// messageQueue.Mutex.Unlock()
// messageQueue.NewQueueCount <- messageQueue.QueueCount
// return
// }
// messageQueue.Mutex.Lock()
// queueCount := messageQueue.QueueCount
// messageQueue.Mutex.Unlock()
// // If the message is terminal, we want to make sure that all messages are sent before we return
// if queueCount > 0 {
// for newCount := range messageQueue.NewQueueCount {
// if newCount == 0 {
// break
// }
// }
// }
// gs.ConnWriteMutex().Lock()
// gs.Conn().WriteJSON(formatMessage(gs, message, messageType))
// gs.ConnWriteMutex().Unlock()
// }()
// isTerminal := messageType == "STATUS" && (message == "FAILURE" || message == "SUCCESS")
// if isTerminal {
// time.Sleep(300 * time.Millisecond)
// }
serializedMessage, err := json.Marshal(formatMessage(gs, message, messageType))
if err != nil {
fmt.Println("Error serializing message: ", err.Error())
return
}
gs.ConnWriteMutex().Lock()
gs.Conn().WriteMessage(websocket.TextMessage, serializedMessage)
gs.ConnWriteMutex().Unlock()
}
func formatMessage(gs BaseGlobalState, message string, messageType string) Message {
return Message{
JobId: gs.JobId(),
ChildJobId: gs.ChildJobId(),
Time: time.Now(),
WorkerId: gs.WorkerId(),
Message: message,
MessageType: messageType,
}
}
func UpdateStatus(gs BaseGlobalState, status string) {
if gs.GetWorkerStatus() != status {
DispatchMessage(gs, status, "STATUS")
gs.SetWorkerStatus(status)
}
}
func HandleStringError(gs BaseGlobalState, errString string) {
fmt.Println("HandleStringError: ", errString)
DispatchMessage(gs, errString, "ERROR")
UpdateStatus(gs, "FAILURE")
}
func HandleError(gs BaseGlobalState, err error) {
fmt.Println("HandleError: ", err.Error())
DispatchMessage(gs, err.Error(), "ERROR")
UpdateStatus(gs, "FAILURE")
}