-
Notifications
You must be signed in to change notification settings - Fork 0
/
close_execution.go
100 lines (87 loc) · 2.8 KB
/
close_execution.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package sfn_activities
import (
"context"
"encoding/json"
"github.com/awlsring/texit/internal/pkg/domain/workflow"
"github.com/awlsring/texit/internal/pkg/logger"
)
type CloseExecutionInput struct {
WorkflowName string `json:"workflowName"`
ExecutionId string `json:"executionId"`
Status string `json:"status"`
Results interface{} `json:"results"`
Error string `json:"error"`
}
func serializeResults(results interface{}) (workflow.SerializedExecutionResult, error) {
if results == nil {
return workflow.SerializedExecutionResult(""), nil
}
resultsRaw, err := json.Marshal(results)
if err != nil {
return "", err
}
return workflow.SerializedExecutionResult(resultsRaw), nil
}
func (h *SfnActivityHandler) closeExecutionActivity(ctx context.Context, input *CloseExecutionInput) error {
log := logger.FromContext(ctx)
log.Debug().Interface("input", input).Msg("Closing execution request")
wf, err := workflow.WorkflowNameFromString(input.WorkflowName)
if err != nil {
log.Error().Err(err).Msg("Failed to parse workflow name")
return err
}
log.Debug().Msg("Marshalling results")
resultsRaw, err := serializeResults(input.Results)
if err != nil {
log.Error().Err(err).Msg("Failed to marshal results")
return err
}
log.Debug().Interface("results", resultsRaw).Msg("Results marshalled")
log.Debug().Msg("Parsing execution id")
executionId, err := workflow.ExecutionIdentifierFromString(input.ExecutionId)
if err != nil {
log.Error().Err(err).Msg("Failed to parse execution id")
return err
}
status, err := workflow.StatusFromString(input.Status)
if err != nil {
log.Error().Err(err).Msg("Failed to parse status")
return err
}
var res workflow.ExecutionResult
switch wf {
case workflow.WorkflowNameProvisionNode:
r, err := workflow.DeserializeExecutionResult[workflow.ProvisionNodeExecutionResult](resultsRaw)
if err != nil {
log.Error().Err(err).Msg("Failed to deserialize results")
return err
}
if input.Error != "" {
r.SetError(input.Error)
}
res = r
case workflow.WorkflowNameDeprovisionNode:
r, err := workflow.DeserializeExecutionResult[workflow.DeprovisionNodeExecutionResult](resultsRaw)
if err != nil {
log.Error().Err(err).Msg("Failed to deserialize results")
return err
}
if input.Error != "" {
r.SetError(input.Error)
}
res = r
}
log.Debug().Interface("results", res).Msg("Results deserialized")
log.Debug().Msg("Closing execution")
err = h.actSvc.CloseExecution(ctx, executionId, status, res)
if err != nil {
log.Error().Err(err).Msg("Failed to close execution")
return err
}
log.Debug().Msg("Signaling execution complete")
err = h.notSvc.NotifyExecutionCompletion(ctx, executionId, wf, status, res)
if err != nil {
log.Warn().Err(err).Msg("Failed to signal execution complete")
}
return nil
}