-
Notifications
You must be signed in to change notification settings - Fork 3.1k
/
cron_workflow_server.go
161 lines (143 loc) · 6.98 KB
/
cron_workflow_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package cronworkflow
import (
"context"
"encoding/json"
"fmt"
"google.golang.org/grpc/codes"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
cronworkflowpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/cronworkflow"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/server/auth"
"github.com/argoproj/argo-workflows/v3/util/instanceid"
"github.com/argoproj/argo-workflows/v3/workflow/creator"
"github.com/argoproj/argo-workflows/v3/workflow/templateresolution"
"github.com/argoproj/argo-workflows/v3/workflow/validate"
sutils "github.com/argoproj/argo-workflows/v3/server/utils"
)
type cronWorkflowServiceServer struct {
instanceIDService instanceid.Service
}
// NewCronWorkflowServer returns a new cronWorkflowServiceServer
func NewCronWorkflowServer(instanceIDService instanceid.Service) cronworkflowpkg.CronWorkflowServiceServer {
return &cronWorkflowServiceServer{instanceIDService}
}
func (c *cronWorkflowServiceServer) LintCronWorkflow(ctx context.Context, req *cronworkflowpkg.LintCronWorkflowRequest) (*v1alpha1.CronWorkflow, error) {
wfClient := auth.GetWfClient(ctx)
wftmplGetter := templateresolution.WrapWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().WorkflowTemplates(req.Namespace))
cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates())
c.instanceIDService.Label(req.CronWorkflow)
creator.Label(ctx, req.CronWorkflow)
err := validate.ValidateCronWorkflow(wftmplGetter, cwftmplGetter, req.CronWorkflow)
if err != nil {
return nil, sutils.ToStatusError(err, codes.InvalidArgument)
}
return req.CronWorkflow, nil
}
func (c *cronWorkflowServiceServer) ListCronWorkflows(ctx context.Context, req *cronworkflowpkg.ListCronWorkflowsRequest) (*v1alpha1.CronWorkflowList, error) {
options := &metav1.ListOptions{}
if req.ListOptions != nil {
options = req.ListOptions
}
c.instanceIDService.With(options)
cronWfList, err := auth.GetWfClient(ctx).ArgoprojV1alpha1().CronWorkflows(req.Namespace).List(ctx, *options)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
return cronWfList, nil
}
func (c *cronWorkflowServiceServer) CreateCronWorkflow(ctx context.Context, req *cronworkflowpkg.CreateCronWorkflowRequest) (*v1alpha1.CronWorkflow, error) {
wfClient := auth.GetWfClient(ctx)
if req.CronWorkflow == nil {
return nil, sutils.ToStatusError(fmt.Errorf("cron workflow was not found in the request body"), codes.NotFound)
}
c.instanceIDService.Label(req.CronWorkflow)
creator.Label(ctx, req.CronWorkflow)
wftmplGetter := templateresolution.WrapWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().WorkflowTemplates(req.Namespace))
cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates())
err := validate.ValidateCronWorkflow(wftmplGetter, cwftmplGetter, req.CronWorkflow)
if err != nil {
return nil, sutils.ToStatusError(err, codes.InvalidArgument)
}
crWf, err := wfClient.ArgoprojV1alpha1().CronWorkflows(req.Namespace).Create(ctx, req.CronWorkflow, metav1.CreateOptions{})
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
return crWf, nil
}
func (c *cronWorkflowServiceServer) GetCronWorkflow(ctx context.Context, req *cronworkflowpkg.GetCronWorkflowRequest) (*v1alpha1.CronWorkflow, error) {
options := metav1.GetOptions{}
if req.GetOptions != nil {
options = *req.GetOptions
}
return c.getCronWorkflowAndValidate(ctx, req.Namespace, req.Name, options)
}
func (c *cronWorkflowServiceServer) UpdateCronWorkflow(ctx context.Context, req *cronworkflowpkg.UpdateCronWorkflowRequest) (*v1alpha1.CronWorkflow, error) {
wfClient := auth.GetWfClient(ctx)
_, err := c.getCronWorkflowAndValidate(ctx, req.Namespace, req.CronWorkflow.Name, metav1.GetOptions{})
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
wftmplGetter := templateresolution.WrapWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().WorkflowTemplates(req.Namespace))
cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates())
if err := validate.ValidateCronWorkflow(wftmplGetter, cwftmplGetter, req.CronWorkflow); err != nil {
return nil, sutils.ToStatusError(err, codes.InvalidArgument)
}
crWf, err := auth.GetWfClient(ctx).ArgoprojV1alpha1().CronWorkflows(req.Namespace).Update(ctx, req.CronWorkflow, metav1.UpdateOptions{})
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
return crWf, nil
}
func (c *cronWorkflowServiceServer) DeleteCronWorkflow(ctx context.Context, req *cronworkflowpkg.DeleteCronWorkflowRequest) (*cronworkflowpkg.CronWorkflowDeletedResponse, error) {
_, err := c.getCronWorkflowAndValidate(ctx, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
opts := metav1.DeleteOptions{}
if req.DeleteOptions != nil {
opts = *req.DeleteOptions
}
err = auth.GetWfClient(ctx).ArgoprojV1alpha1().CronWorkflows(req.Namespace).Delete(ctx, req.Name, opts)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
return &cronworkflowpkg.CronWorkflowDeletedResponse{}, nil
}
func (c *cronWorkflowServiceServer) ResumeCronWorkflow(ctx context.Context, req *cronworkflowpkg.CronWorkflowResumeRequest) (*v1alpha1.CronWorkflow, error) {
crWf, err := setCronWorkflowSuspend(ctx, false, req.Namespace, req.Name)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
return crWf, nil
}
func (c *cronWorkflowServiceServer) SuspendCronWorkflow(ctx context.Context, req *cronworkflowpkg.CronWorkflowSuspendRequest) (*v1alpha1.CronWorkflow, error) {
crWf, err := setCronWorkflowSuspend(ctx, true, req.Namespace, req.Name)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
return crWf, nil
}
func setCronWorkflowSuspend(ctx context.Context, setTo bool, namespace, name string) (*v1alpha1.CronWorkflow, error) {
data, err := json.Marshal(map[string]interface{}{"spec": map[string]interface{}{"suspend": setTo}})
if err != nil {
return nil, sutils.ToStatusError(fmt.Errorf("failed to marshall cron workflow patch data: %w", err), codes.Internal)
}
cronWfs, err := auth.GetWfClient(ctx).ArgoprojV1alpha1().CronWorkflows(namespace).Patch(ctx, name, types.MergePatchType, data, metav1.PatchOptions{})
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
return cronWfs, nil
}
func (c *cronWorkflowServiceServer) getCronWorkflowAndValidate(ctx context.Context, namespace string, name string, options metav1.GetOptions) (*v1alpha1.CronWorkflow, error) {
wfClient := auth.GetWfClient(ctx)
cronWf, err := wfClient.ArgoprojV1alpha1().CronWorkflows(namespace).Get(ctx, name, options)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
err = c.instanceIDService.Validate(cronWf)
if err != nil {
return nil, sutils.ToStatusError(err, codes.InvalidArgument)
}
return cronWf, nil
}