-
Notifications
You must be signed in to change notification settings - Fork 0
/
manager.go
202 lines (174 loc) · 5.79 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
package a4c
import (
"context"
"errors"
"fmt"
"log"
"strings"
"sync"
"github.com/alien4cloud/alien4cloud-go-client/v3/alien4cloud"
"github.com/eflows4hpc/hpcwaas-api/api"
"github.com/eflows4hpc/hpcwaas-api/pkg/ctxauth"
)
type Manager interface {
GetWorkflows(ctx context.Context) ([]api.Workflow, error)
TriggerWorkflow(ctx context.Context, id string, inputs map[string]interface{}) (string, error)
GetExecution(ctx context.Context, id string) (alien4cloud.Execution, error)
GetExecutionLogs(ctx context.Context, id string, fromIndex, size int, levels ...string) ([]alien4cloud.Log, int, error)
CancelExecution(ctx context.Context, id string) error
}
var unauthorizedError = errors.New("not authorized")
func IsUnauthorizedError(e error) bool {
return errors.Is(e, unauthorizedError)
}
const DefaultAddress = "http://127.0.0.1:8088"
const DefaultUser = "admin"
const DefaultPassword = "admin"
type Config struct {
Address string `mapstructure:"address"`
User string `mapstructure:"user"`
Password string `mapstructure:"password"`
CaFile string `mapstructure:"ca_file"`
SkipSecure bool `mapstructure:"skip_secure"`
}
type manager struct {
client alien4cloud.Client
baseURL string
}
var l sync.Mutex
var managerCache map[Config]Manager = make(map[Config]Manager)
func GetManager(c Config) (
Manager, error) {
l.Lock()
defer l.Unlock()
if m, ok := managerCache[c]; ok {
return m, nil
}
m := &manager{}
m.baseURL = c.Address
var err error
m.client, err = alien4cloud.NewClient(c.Address, c.User, c.Password, c.CaFile, c.SkipSecure)
if err != nil {
return nil, err
}
managerCache[c] = m
return m, nil
}
func checkUserInAuthorizedTag(currentUsername string, tagValue string) bool {
log.Printf("Checking if %q is in authorized users %q", currentUsername, tagValue)
for _, user := range strings.Split(tagValue, ",") {
if user == currentUsername {
return true
}
}
return false
}
func (m *manager) isUserInAuthorizedTag(ctx context.Context, currentUsername, applicationID string) (bool, error) {
application, err := m.client.ApplicationService().GetApplicationByID(ctx, applicationID)
if err != nil {
return false, err
}
for _, tag := range application.Tags {
if tag.Key == "hpcwaas-authorized-users" {
return checkUserInAuthorizedTag(currentUsername, tag.Value), nil
}
}
// If hpcwaas-authorized-users not in tags every user is authorized
return true, nil
}
func (m *manager) GetWorkflows(ctx context.Context) ([]api.Workflow, error) {
appsSearchReq := alien4cloud.SearchRequest{
Size: 1000000,
Filters: map[string][]string{
"tags.name": {"hpcwaas-workflows"},
}}
// TODO(loicalbertin): add pagination
apps, _, err := m.client.ApplicationService().SearchApplications(ctx, appsSearchReq)
if err != nil {
return nil, err
}
// If no auth and no tags on app this is ok
// if no auth and tag then request is not authorized
currentUsername, _ := ctxauth.GetCurrentUser(ctx)
var ids []api.Workflow
for _, app := range apps {
var declaredWF []string
// by default if not specified all users are authorized
userAuthorized := true
for _, tag := range app.Tags {
if tag.Key == "hpcwaas-workflows" {
declaredWF = strings.Split(tag.Value, ",")
}
if tag.Key == "hpcwaas-authorized-users" {
userAuthorized = checkUserInAuthorizedTag(currentUsername, tag.Value)
}
}
if !userAuthorized {
continue
}
envs, _, err := m.client.ApplicationService().SearchEnvironments(ctx, app.ID, alien4cloud.SearchRequest{
Size: 100000,
// Filters seems to not apply here
// Filters: map[string][]string{"status": {"DEPLOYED"}},
})
if err != nil {
return nil, err
}
for _, env := range envs {
if env.Status != "DEPLOYED" {
continue
}
for _, wf := range declaredWF {
ids = append(ids, api.Workflow{
ID: strings.Join([]string{app.ID, env.ID, wf}, "@"),
ApplicationID: app.ID,
EnvironmentID: env.ID,
EnvironmentName: env.Name,
Name: wf,
})
}
}
}
return ids, nil
}
func (m *manager) TriggerWorkflow(ctx context.Context, id string, inputs map[string]interface{}) (string, error) {
parts := strings.Split(id, "@")
if len(parts) < 3 {
return "", fmt.Errorf("invalid workflow id %q", id)
}
appID := parts[0]
envID := parts[1]
wfID := parts[2]
// If no auth and no tags on app this is ok
// if no auth and tag then request is not authorized
currentUsername, _ := ctxauth.GetCurrentUser(ctx)
authorized, err := m.isUserInAuthorizedTag(ctx, currentUsername, appID)
if err != nil {
return "", err
}
if !authorized {
return "", fmt.Errorf("user %q is not authorized to trigger workflow %q: %w", currentUsername, id, unauthorizedError)
}
return m.client.DeploymentService().RunWorkflowAsyncWithParameters(ctx, appID, envID, wfID, inputs, func(exec *alien4cloud.Execution, err error) {})
}
func (m *manager) GetExecution(ctx context.Context, id string) (alien4cloud.Execution, error) {
return m.client.DeploymentService().GetExecutionByID(ctx, id)
}
func (m *manager) GetExecutionLogs(ctx context.Context, id string, fromIndex, size int, levels ...string) ([]alien4cloud.Log, int, error) {
filters := alien4cloud.LogFilter{
ExecutionID: []string{id},
Level: levels,
}
return m.getLogsOfExecution(ctx, id, filters, fromIndex, size)
}
func (m *manager) CancelExecution(ctx context.Context, id string) error {
execution, err := m.client.DeploymentService().GetExecutionByID(ctx, id)
if err != nil {
return fmt.Errorf("failed to get execution %q: %w", id, err)
}
deployment, err := m.client.DeploymentService().GetDeployment(ctx, execution.DeploymentID)
if err != nil {
return fmt.Errorf("failed to get deployment %q linked to execution %q: %w", execution.DeploymentID, id, err)
}
return m.client.DeploymentService().CancelExecution(ctx, deployment.EnvironmentID, id)
}