/
topic_types.go
120 lines (98 loc) · 3.19 KB
/
topic_types.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package v1alpha1
import (
"encoding/json"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// TopicSpec defines the desired state of Topic
type TopicSpec struct {
// Name of the topic config to be created. If empty, CR name will be used.
// +optional
Name string `json:"name,omitempty"`
// globalOrderingEnabled allows all nodes listening to the same topic get their messages in the same order
// the same order
// +kubebuilder:default:=false
// +optional
GlobalOrderingEnabled bool `json:"globalOrderingEnabled"`
// multiThreadingEnabled enables multi-threaded processing of incoming messages
// a single thread will handle all topic messages
// +kubebuilder:default:=false
// +optional
MultiThreadingEnabled bool `json:"multiThreadingEnabled"`
// HazelcastResourceName defines the name of the Hazelcast resource for which
// topic config will be created
// +kubebuilder:validation:MinLength:=1
// +required
HazelcastResourceName string `json:"hazelcastResourceName"`
}
// TopicStatus defines the observed state of Topic
type TopicStatus struct {
DataStructureStatus `json:",inline"`
}
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.state",description="Current state of the Topic Config"
// +kubebuilder:printcolumn:name="Hazelcast-Resource",type="string",priority=1,JSONPath=".spec.hazelcastResourceName",description="Name of the Hazelcast resource that this resource is created for"
// +kubebuilder:printcolumn:name="Message",type="string",priority=1,JSONPath=".status.message",description="Message for the current Topic Config"
// Topic is the Schema for the topics API
type Topic struct {
metav1.TypeMeta `json:",inline"`
// +optional
metav1.ObjectMeta `json:"metadata,omitempty"`
// +required
Spec TopicSpec `json:"spec"`
// +optional
Status TopicStatus `json:"status,omitempty"`
}
func (t *Topic) GetDSName() string {
if t.Spec.Name != "" {
return t.Spec.Name
}
return t.Name
}
func (t *Topic) GetKind() string {
return t.Kind
}
func (t *Topic) GetHZResourceName() string {
return t.Spec.HazelcastResourceName
}
func (t *Topic) GetStatus() *DataStructureStatus {
return &t.Status.DataStructureStatus
}
func (t *Topic) GetSpec() (string, error) {
ts, err := json.Marshal(t.Spec)
if err != nil {
return "", fmt.Errorf("error marshaling %v as JSON: %w", t.Kind, err)
}
return string(ts), nil
}
func (t *Topic) SetSpec(spec string) error {
if err := json.Unmarshal([]byte(spec), &t.Spec); err != nil {
return err
}
return nil
}
func (t *Topic) ValidateSpecCurrent(_ *Hazelcast) error {
return validateTopicSpecCurrent(t)
}
func (t *Topic) ValidateSpecUpdate() error {
return validateTopicSpecUpdate(t)
}
//+kubebuilder:object:root=true
// TopicList contains a list of Topic
type TopicList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []Topic `json:"items"`
}
func (tl *TopicList) GetItems() []client.Object {
l := make([]client.Object, 0, len(tl.Items))
for i := range tl.Items {
l = append(l, &tl.Items[i])
}
return l
}
func init() {
SchemeBuilder.Register(&Topic{}, &TopicList{})
}