-
Notifications
You must be signed in to change notification settings - Fork 126
/
admin.go
132 lines (103 loc) · 3.07 KB
/
admin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package client
import (
"context"
"encoding/json"
"fmt"
admincontracts "github.com/hatchet-dev/hatchet/internal/services/admin/contracts"
"github.com/hatchet-dev/hatchet/internal/validator"
"github.com/hatchet-dev/hatchet/pkg/client/types"
"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type AdminClient interface {
PutWorkflow(workflow *types.Workflow) error
}
type adminClientImpl struct {
client admincontracts.WorkflowServiceClient
tenantId string
l *zerolog.Logger
v validator.Validator
}
func newAdmin(conn *grpc.ClientConn, opts *sharedClientOpts) AdminClient {
return &adminClientImpl{
client: admincontracts.NewWorkflowServiceClient(conn),
tenantId: opts.tenantId,
l: opts.l,
v: opts.v,
}
}
func (a *adminClientImpl) PutWorkflow(workflow *types.Workflow) error {
opts, err := a.getPutOpts(workflow)
if err != nil {
return fmt.Errorf("could not get put opts: %w", err)
}
apiWorkflow, err := a.client.GetWorkflowByName(context.Background(), &admincontracts.GetWorkflowByNameRequest{
TenantId: a.tenantId,
Name: opts.Opts.Name,
})
shouldPut := false
if err != nil {
// if not found, create
if statusErr, ok := status.FromError(err); ok && statusErr.Code() == codes.NotFound {
shouldPut = true
} else {
return fmt.Errorf("could not get workflow: %w", err)
}
} else {
// if there are no versions, exit
if len(apiWorkflow.Versions) == 0 {
return fmt.Errorf("found workflow, but it has no versions")
}
// get the workflow version to determine whether to update
if apiWorkflow.Versions[0].Version != workflow.Version {
shouldPut = true
}
}
if shouldPut {
_, err = a.client.PutWorkflow(context.Background(), opts)
if err != nil {
return fmt.Errorf("could not create workflow: %w", err)
}
}
return nil
}
func (a *adminClientImpl) getPutOpts(workflow *types.Workflow) (*admincontracts.PutWorkflowRequest, error) {
opts := &admincontracts.CreateWorkflowVersionOpts{
Name: workflow.Name,
Version: workflow.Version,
Description: workflow.Description,
EventTriggers: workflow.Triggers.Events,
CronTriggers: workflow.Triggers.Cron,
}
jobOpts := make([]*admincontracts.CreateWorkflowJobOpts, 0)
for jobName, job := range workflow.Jobs {
jobOpt := &admincontracts.CreateWorkflowJobOpts{
Name: jobName,
Description: job.Description,
Timeout: job.Timeout,
}
stepOpts := make([]*admincontracts.CreateWorkflowStepOpts, len(job.Steps))
for i, step := range job.Steps {
inputBytes, err := json.Marshal(step.With)
if err != nil {
return nil, fmt.Errorf("could not marshal step inputs: %w", err)
}
stepOpt := &admincontracts.CreateWorkflowStepOpts{
ReadableId: step.ID,
Action: step.ActionID,
Timeout: step.Timeout,
Inputs: string(inputBytes),
}
stepOpts[i] = stepOpt
}
jobOpt.Steps = stepOpts
jobOpts = append(jobOpts, jobOpt)
}
opts.Jobs = jobOpts
return &admincontracts.PutWorkflowRequest{
TenantId: a.tenantId,
Opts: opts,
}, nil
}