/
actions.go
104 lines (96 loc) · 3.32 KB
/
actions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/*
* === This file is part of ALICE O² ===
*
* Copyright 2018-2022 CERN and copyright holders of ALICE O².
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In applying this license CERN does not waive the privileges and
* immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*/
package executor
import (
"context"
"encoding/json"
"github.com/AliceO2Group/Control/common/event"
"github.com/AliceO2Group/Control/common/utils"
"github.com/AliceO2Group/Control/common/utils/uid"
"github.com/AliceO2Group/Control/executor/executable"
mesos "github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/executor/calls"
"github.com/sirupsen/logrus"
)
func makeSendStatusUpdateFunc(state *internalState, task mesos.TaskInfo) executable.SendStatusFunc {
return func(envId uid.ID, mesosState mesos.TaskState, message string) {
status := newStatus(envId, state, task.TaskID)
status.State = &mesosState
status.Message = utils.ProtoString(message)
state.statusCh <- status
}
}
func makeSendDeviceEventFunc(state *internalState) executable.SendDeviceEventFunc {
return func(envId uid.ID, event event.DeviceEvent) {
jsonEvent, err := json.Marshal(event)
if err != nil {
log.WithError(err).
Warning("error marshaling event from task")
return
}
state.messageCh <- jsonEvent
}
}
func makeSendMessageFunc(state *internalState) executable.SendMessageFunc {
return func(message []byte) {
// to send task events using state.
state.messageCh <- message
}
}
func sendOutgoingMessage(state *internalState, message []byte) {
_, _ = state.cli.Send(context.TODO(), calls.NonStreaming(calls.Message(message)))
}
func performStatusUpdate(state *internalState, status mesos.TaskStatus) {
if status.State == nil {
log.Warn("status with nil state received")
} else if *status.State == mesos.TASK_FAILED { // failed task updates are sent separately with less priority
state.activeTasksMu.Lock()
state.failedTasks[status.TaskID] = status
delete(state.activeTasks, status.TaskID)
state.activeTasksMu.Unlock()
} else {
switch *status.State {
case mesos.TASK_DROPPED:
fallthrough
case mesos.TASK_FINISHED:
fallthrough
case mesos.TASK_GONE:
fallthrough
case mesos.TASK_KILLED:
fallthrough
case mesos.TASK_LOST:
state.activeTasksMu.Lock()
delete(state.activeTasks, status.TaskID)
state.activeTasksMu.Unlock()
}
err := update(state, status)
if err != nil { // in case of failed update, we just print an error message
log.WithFields(logrus.Fields{
"task": status.TaskID,
"state": status.State.String(),
}).
Warn("executor failed to send task status update")
}
}
}