forked from raystack/optimus
-
Notifications
You must be signed in to change notification settings - Fork 1
/
resource.go
123 lines (104 loc) · 3.36 KB
/
resource.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package event
import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/goto/optimus/core/resource"
"github.com/goto/optimus/internal/errors"
pbCore "github.com/goto/optimus/protos/gotocompany/optimus/core/v1beta1"
pbInt "github.com/goto/optimus/protos/gotocompany/optimus/integration/v1beta1"
)
type ResourceCreated struct {
Event
Resource *resource.Resource
}
func NewResourceCreatedEvent(rsc *resource.Resource) (*ResourceCreated, error) {
baseEvent, err := NewBaseEvent()
if err != nil {
return nil, err
}
return &ResourceCreated{
Event: baseEvent,
Resource: rsc,
}, nil
}
func (r ResourceCreated) Bytes() ([]byte, error) {
return resourceEventToBytes(r.Event, r.Resource, pbInt.OptimusChangeEvent_EVENT_TYPE_RESOURCE_CREATE, pbInt.ChangeImpact_CHANGE_IMPACT_TYPE_UNSPECIFIED)
}
type ResourceUpdated struct {
Event
Resource *resource.Resource
UpdateImpact pbInt.ChangeImpact
}
func NewResourceUpdatedEvent(rsc *resource.Resource, impact resource.UpdateImpact) (*ResourceUpdated, error) {
baseEvent, err := NewBaseEvent()
if err != nil {
return nil, err
}
var impactProto pbInt.ChangeImpact
switch impact {
case resource.ResourceDataPipeLineImpact:
impactProto = pbInt.ChangeImpact_CHANGE_IMPACT_TYPE_BEHAVIOUR
default:
impactProto = pbInt.ChangeImpact_CHANGE_IMPACT_TYPE_UNSPECIFIED
}
return &ResourceUpdated{
Event: baseEvent,
Resource: rsc,
UpdateImpact: impactProto,
}, nil
}
func (r ResourceUpdated) Bytes() ([]byte, error) {
return resourceEventToBytes(r.Event, r.Resource, pbInt.OptimusChangeEvent_EVENT_TYPE_RESOURCE_UPDATE, r.UpdateImpact)
}
type ResourceDeleted struct {
Event
Resource *resource.Resource
}
func NewResourceDeleteEvent(rsc *resource.Resource) (*ResourceDeleted, error) {
baseEvent, err := NewBaseEvent()
if err != nil {
return nil, err
}
return &ResourceDeleted{
Event: baseEvent,
Resource: rsc,
}, nil
}
func (r ResourceDeleted) Bytes() ([]byte, error) {
return resourceEventToBytes(r.Event, r.Resource, pbInt.OptimusChangeEvent_EVENT_TYPE_RESOURCE_DELETE, pbInt.ChangeImpact_CHANGE_IMPACT_TYPE_UNSPECIFIED)
}
func resourceEventToBytes(event Event, rsc *resource.Resource, eventType pbInt.OptimusChangeEvent_EventType, updateImpact pbInt.ChangeImpact) ([]byte, error) {
meta := rsc.Metadata()
if meta == nil {
return nil, errors.InvalidArgument(resource.EntityResource, "missing resource metadata")
}
pbStruct, err := structpb.NewStruct(rsc.Spec())
if err != nil {
return nil, errors.InvalidArgument(resource.EntityResource, "unable to convert spec to proto struct")
}
resourcePb := &pbCore.ResourceSpecification{
Version: meta.Version,
Name: rsc.FullName(),
Type: rsc.Kind(),
Spec: pbStruct,
Assets: nil,
Labels: meta.Labels,
}
occurredAt := timestamppb.New(event.OccurredAt)
optEvent := &pbInt.OptimusChangeEvent{
EventId: event.ID.String(),
OccurredAt: occurredAt,
ProjectName: rsc.Tenant().ProjectName().String(),
NamespaceName: rsc.Tenant().NamespaceName().String(),
EventType: eventType,
ChangeImpact: updateImpact,
Payload: &pbInt.OptimusChangeEvent_ResourceChange{
ResourceChange: &pbInt.ResourceChangePayload{
DatastoreName: rsc.Store().String(),
Resource: resourcePb,
},
},
}
return proto.Marshal(optEvent)
}