-
Notifications
You must be signed in to change notification settings - Fork 242
/
plugin.go
180 lines (149 loc) · 3.85 KB
/
plugin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package plugin
import (
"bufio"
"errors"
"io"
"os"
"os/exec"
"github.com/gaia-pipeline/gaia"
"github.com/gaia-pipeline/gaia/scheduler"
"github.com/gaia-pipeline/protobuf"
plugin "github.com/hashicorp/go-plugin"
)
const (
pluginMapKey = "Plugin"
)
var handshake = plugin.HandshakeConfig{
ProtocolVersion: 1,
MagicCookieKey: "GAIA_PLUGIN",
// This cookie should never be changed again
MagicCookieValue: "FdXjW27mN6XuG2zDBP4LixXUwDAGCEkidxwqBGYpUhxiWHzctATYZvpz4ZJdALmh",
}
var pluginMap = map[string]plugin.Plugin{
pluginMapKey: &PluginGRPCImpl{},
}
// Plugin represents a single plugin instance which uses gRPC
// to connect to exactly one plugin.
type Plugin struct {
// Client instance used to open gRPC connections.
client *plugin.Client
// Interface to the connected plugin.
pluginConn PluginGRPC
// Log file where all output is stored.
logFile *os.File
// Writer used to write logs from execution to file
writer *bufio.Writer
}
// NewPlugin creates a new instance of Plugin.
// One Plugin instance represents one connection to a plugin.
func (p *Plugin) NewPlugin() scheduler.Plugin {
return &Plugin{}
}
// Connect prepares the log path, starts the plugin, initiates the
// gRPC connection and looks up the plugin.
// It's up to the caller to call plugin.Close to shutdown the plugin
// and close the gRPC connection.
//
// It expects the start command for the plugin and the path where
// the log file should be stored.
func (p *Plugin) Connect(command *exec.Cmd, logPath *string) error {
// Create log file and open it.
// We will close this file in the close method.
if logPath != nil {
var err error
p.logFile, err = os.OpenFile(
*logPath,
os.O_CREATE|os.O_WRONLY,
0666,
)
if err != nil {
return err
}
}
// Create new writer
p.writer = bufio.NewWriter(p.logFile)
// Get new client
p.client = plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: handshake,
Plugins: pluginMap,
Cmd: command,
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
Stderr: p.writer,
})
// Connect via gRPC
gRPCClient, err := p.client.Client()
if err != nil {
return err
}
// Request the plugin
raw, err := gRPCClient.Dispense(pluginMapKey)
if err != nil {
return err
}
// Convert plugin to interface
if pC, ok := raw.(PluginGRPC); ok {
p.pluginConn = pC
return nil
}
return errors.New("plugin is not compatible with plugin interface")
}
// Execute triggers the execution of one single job
// for the given plugin.
func (p *Plugin) Execute(j *gaia.Job) error {
// Create new proto job object and just set the id.
// The rest is currently not important.
job := &proto.Job{
UniqueId: j.ID,
}
// Execute the job
_, err := p.pluginConn.ExecuteJob(job)
// Flush logs
p.writer.Flush()
return err
}
// GetJobs receives all implemented jobs from the given plugin.
func (p *Plugin) GetJobs() ([]gaia.Job, error) {
l := []gaia.Job{}
// Get the stream
stream, err := p.pluginConn.GetJobs()
if err != nil {
return nil, err
}
// receive all jobs
for {
job, err := stream.Recv()
// Got all jobs
if err == io.EOF {
break
}
// Error during stream
if err != nil {
return nil, err
}
// Convert proto object to gaia.Job struct
j := gaia.Job{
ID: job.UniqueId,
Title: job.Title,
Description: job.Description,
Priority: job.Priority,
Status: gaia.JobWaitingExec,
}
l = append(l, j)
}
// return list
return l, nil
}
// Close shutdown the plugin and kills the gRPC connection.
// Remember to call this when you call plugin.Connect.
func (p *Plugin) Close() {
// We start the kill command in a goroutine because kill
// is blocking until the subprocess successfully exits.
// The user should not wait for this.
go func() {
p.client.Kill()
// Flush the writer
p.writer.Flush()
// Close log file
p.logFile.Close()
}()
}