-
Notifications
You must be signed in to change notification settings - Fork 16
/
event_create_handler.go
110 lines (99 loc) · 3.64 KB
/
event_create_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package submission
import (
"context"
"reflect"
"github.com/Bio-OS/bioos/internal/context/workspace/infrastructure/eventbus"
workspaceproto "github.com/Bio-OS/bioos/internal/context/workspace/interface/grpc/proto"
apperrors "github.com/Bio-OS/bioos/pkg/errors"
"github.com/Bio-OS/bioos/pkg/utils"
"github.com/Bio-OS/bioos/pkg/utils/grpc"
)
const WESTag = "wes"
type CreateHandler struct {
workflowClient grpc.WorkflowClient
repository Repository
eventbus eventbus.EventBus
}
func NewCreateHandler(repository Repository, eventbus eventbus.EventBus, workflowClient grpc.WorkflowClient) *CreateHandler {
return &CreateHandler{
repository: repository,
workflowClient: workflowClient,
eventbus: eventbus,
}
}
func (h *CreateHandler) Handle(ctx context.Context, event *CreateEvent) (err error) {
if event == nil {
return nil
}
sub, err := h.repository.Get(ctx, event.SubmissionID)
if err != nil {
return err
}
// todo we should store the data model & workflow used in submission
createRunEvent, err := h.genCreateRunEvent(ctx, event, sub)
if err != nil {
return err
}
return h.eventbus.Publish(ctx, createRunEvent)
}
func (h *CreateHandler) genCreateRunEvent(ctx context.Context, event *CreateEvent, sub *Submission) (*EventCreateRuns, error) {
getWorkflowResp, err := h.workflowClient.GetWorkflow(ctx, &workspaceproto.GetWorkflowRequest{
Id: event.SourceWorkflowID,
WorkspaceID: event.WorkspaceID,
})
if err != nil {
return nil, apperrors.NewInternalError(err)
}
getWorkflowVersionResp, err := h.workflowClient.GetWorkflowVersion(ctx, &workspaceproto.GetWorkflowVersionRequest{
Id: getWorkflowResp.Workflow.LatestVersion.Id,
WorkflowID: event.SourceWorkflowID,
WorkspaceID: event.WorkspaceID,
})
if err != nil {
return nil, apperrors.NewInternalError(err)
}
workflowEngineParameters := exposedOptions2Map(&sub.ExposedOptions)
files, err := h.genWorkflowFiles(ctx, getWorkflowVersionResp.Version, event)
if err != nil {
return nil, err
}
return NewEventCreateRuns(sub.WorkspaceID, sub.ID, sub.Type, sub.Inputs, sub.Outputs, sub.DataModelID, sub.DataModelRowIDs, &RunConfig{
Language: getWorkflowVersionResp.Version.Language,
MainWorkflowFilePath: getWorkflowVersionResp.Version.MainWorkflowPath,
WorkflowContents: files,
WorkflowEngineParameters: workflowEngineParameters,
Version: getWorkflowVersionResp.Version.LanguageVersion,
}), nil
}
func (h *CreateHandler) genWorkflowFiles(ctx context.Context, workflowVersion *workspaceproto.WorkflowVersion, event *CreateEvent) (workflowFiles map[string]string, err error) {
ids := make([]string, 0)
for _, fileInfo := range workflowVersion.Files {
ids = append(ids, fileInfo.Id)
}
ListWorkflowFilesResponse, err := h.workflowClient.ListWorkflowFiles(ctx, &workspaceproto.ListWorkflowFilesRequest{
Page: 1,
Size: int32(len(ids)),
Ids: ids,
WorkspaceID: event.WorkspaceID,
WorkflowID: event.SourceWorkflowID,
WorkflowVersionID: utils.PointString(workflowVersion.Id),
})
if err != nil {
return nil, apperrors.NewInternalError(err)
}
workflowFiles = make(map[string]string)
for _, file := range ListWorkflowFilesResponse.Files {
workflowFiles[file.Path] = file.Content
}
return workflowFiles, nil
}
// exposedOptions2Map ...
func exposedOptions2Map(exposedOptions *ExposedOptions) map[string]interface{} {
res := make(map[string]interface{})
t := reflect.TypeOf(exposedOptions).Elem()
v := reflect.ValueOf(exposedOptions).Elem()
for i := 0; i < v.NumField(); i++ {
res[t.Field(i).Tag.Get(WESTag)] = v.Field(i).Interface()
}
return res
}