-
Notifications
You must be signed in to change notification settings - Fork 373
/
grpc_agent.go
134 lines (111 loc) · 3.56 KB
/
grpc_agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package dkron
import (
"errors"
"time"
"github.com/armon/circbuf"
metrics "github.com/armon/go-metrics"
"github.com/distribworks/dkron/v3/plugin/types"
"github.com/golang/protobuf/ptypes"
"github.com/sirupsen/logrus"
)
const (
// maxBufSize limits how much data we collect from a handler.
maxBufSize = 256000
)
type statusAgentHelper struct {
execution *types.Execution
stream types.Agent_AgentRunServer
}
func (s *statusAgentHelper) Update(b []byte, c bool) (int64, error) {
s.execution.Output = b
// Send partial execution
if err := s.stream.Send(&types.AgentRunStream{
Execution: s.execution,
}); err != nil {
return 0, err
}
return 0, nil
}
// GRPCAgentServer is the local implementation of the gRPC server interface.
type AgentServer struct {
types.AgentServer
agent *Agent
logger *logrus.Entry
}
// NewServer creates and returns an instance of a DkronGRPCServer implementation
func NewAgentServer(agent *Agent, logger *logrus.Entry) types.AgentServer {
return &AgentServer{
agent: agent,
logger: logger,
}
}
// AgentRun is called when an agent starts running a job and lasts all execution,
// the agent will stream execution progress to the server.
func (as *AgentServer) AgentRun(req *types.AgentRunRequest, stream types.Agent_AgentRunServer) error {
defer metrics.MeasureSince([]string{"grpc_agent", "agent_run"}, time.Now())
job := req.Job
execution := req.Execution
as.logger.WithFields(logrus.Fields{
"job": job.Name,
}).Info("grpc_agent: Starting job")
output, _ := circbuf.NewBuffer(maxBufSize)
var success bool
jex := job.Executor
exc := job.ExecutorConfig
// Send the first update with the initial execution state to be stored in the server
execution.StartedAt = ptypes.TimestampNow()
execution.NodeName = as.agent.config.NodeName
if err := stream.Send(&types.AgentRunStream{
Execution: execution,
}); err != nil {
return err
}
if jex == "" {
return errors.New("grpc_agent: No executor defined, nothing to do")
}
// Check if executor exists
if executor, ok := as.agent.ExecutorPlugins[jex]; ok {
as.logger.WithField("plugin", jex).Debug("grpc_agent: calling executor plugin")
runningExecutions.Store(execution.GetGroup(), execution)
out, err := executor.Execute(&types.ExecuteRequest{
JobName: job.Name,
Config: exc,
}, &statusAgentHelper{
stream: stream,
execution: execution,
})
if err == nil && out.Error != "" {
err = errors.New(out.Error)
}
if err != nil {
as.logger.WithError(err).WithField("job", job.Name).WithField("plugin", executor).Error("grpc_agent: command error output")
success = false
output.Write([]byte(err.Error() + "\n"))
} else {
success = true
}
if out != nil {
output.Write(out.Output)
}
} else {
as.logger.WithField("executor", jex).Error("grpc_agent: Specified executor is not present")
output.Write([]byte("grpc_agent: Specified executor is not present"))
}
execution.FinishedAt = ptypes.TimestampNow()
execution.Success = success
execution.Output = output.Bytes()
runningExecutions.Delete(execution.GetGroup())
// Send the final execution
if err := stream.Send(&types.AgentRunStream{
Execution: execution,
}); err != nil {
// In case of error means that maybe the server is gone so fallback to ExecutionDone
as.logger.WithError(err).WithField("job", job.Name).Error("grpc_agent: error sending the final execution, falling back to ExecutionDone")
rpcServer, err := as.agent.checkAndSelectServer()
if err != nil {
return err
}
return as.agent.GRPCClient.ExecutionDone(rpcServer, NewExecutionFromProto(execution))
}
return nil
}