/
storage_request.go
147 lines (121 loc) · 4.66 KB
/
storage_request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package kafka
// StorageRequestType is used to determine the message type / request when communicating with the
// storage module via channel. Depending on the request type you must provide additional information
// so that the request can be processed.
type StorageRequestType int
const (
// StorageAddPartitionHighWaterMark is the request type to add a partition's high water mark
StorageAddPartitionHighWaterMark StorageRequestType = iota + 1
// StorageAddPartitionLowWaterMark is the request type to add a partition's low water mark
StorageAddPartitionLowWaterMark
// StorageAddConsumerOffset is the request type to add a consumer's offset commit
StorageAddConsumerOffset
// StorageAddGroupMetadata is the request type to add a group member's partition assignment
StorageAddGroupMetadata
// StorageAddTopicConfiguration is the request type to add configuration entries for a topic
StorageAddTopicConfiguration
// StorageAddSizeByTopic is the request type to add aggregated partition sizes grouped by topic
StorageAddSizeByTopic
// StorageAddSizeByBroker is the request type to add aggregated partition sizes grouped by broker
StorageAddSizeByBroker
// StorageDeleteConsumerGroup is the request type to remove an offset commit for a topic:group:partition combination
StorageDeleteConsumerGroup
// StorageRegisterOffsetPartitions is the request type to make the storage module aware that the offset consumer
// first has to fully consume a specific number of partitions before it should expose any metrics
StorageRegisterOffsetPartitions
// StorageMarkOffsetPartitionReady is the request type to mark a partition consumer of the consumer offsets topic
// as ready (=caught up partition lag)
StorageMarkOffsetPartitionReady
// StorageDeleteGroupMetadata is the request type to delete a group member's partition assignment
StorageDeleteGroupMetadata
// StorageDeleteTopic is the request type to delete all topic information
StorageDeleteTopic
)
// StorageRequest is an entity to send messages / requests to the storage module.
type StorageRequest struct {
RequestType StorageRequestType
ConsumerOffset *ConsumerPartitionOffset
PartitionWaterMark *PartitionWaterMark
TopicConfig *TopicConfiguration
GroupMetadata *ConsumerGroupMetadata
ConsumerGroupName string
TopicName string
PartitionID int32
PartitionCount int
SizeByTopic map[string]int64
SizeByBroker map[int32]int64
}
func newAddPartitionLowWaterMarkRequest(lowWaterMark *PartitionWaterMark) *StorageRequest {
return &StorageRequest{
RequestType: StorageAddPartitionLowWaterMark,
PartitionWaterMark: lowWaterMark,
}
}
func newAddPartitionHighWaterMarkRequest(highWaterMark *PartitionWaterMark) *StorageRequest {
return &StorageRequest{
RequestType: StorageAddPartitionHighWaterMark,
PartitionWaterMark: highWaterMark,
}
}
func newAddConsumerOffsetRequest(offset *ConsumerPartitionOffset) *StorageRequest {
return &StorageRequest{
RequestType: StorageAddConsumerOffset,
ConsumerOffset: offset,
}
}
func newAddGroupMetadata(metadata *ConsumerGroupMetadata) *StorageRequest {
return &StorageRequest{
RequestType: StorageAddGroupMetadata,
GroupMetadata: metadata,
}
}
func newAddTopicConfig(config *TopicConfiguration) *StorageRequest {
return &StorageRequest{
RequestType: StorageAddTopicConfiguration,
TopicConfig: config,
}
}
func newDeleteConsumerGroupRequest(group string, topic string, partitionID int32) *StorageRequest {
return &StorageRequest{
RequestType: StorageDeleteConsumerGroup,
ConsumerGroupName: group,
TopicName: topic,
PartitionID: partitionID,
}
}
func newRegisterOffsetPartitionsRequest(partitionCount int) *StorageRequest {
return &StorageRequest{
RequestType: StorageRegisterOffsetPartitions,
PartitionCount: partitionCount,
}
}
func newMarkOffsetPartitionReadyRequest(partitionID int32) *StorageRequest {
return &StorageRequest{
RequestType: StorageMarkOffsetPartitionReady,
PartitionID: partitionID,
}
}
func newDeleteGroupMetadataRequest(group string) *StorageRequest {
return &StorageRequest{
RequestType: StorageDeleteGroupMetadata,
ConsumerGroupName: group,
}
}
func newDeleteTopicRequest(topic string) *StorageRequest {
return &StorageRequest{
RequestType: StorageDeleteTopic,
TopicName: topic,
}
}
func newAddSizeByTopicRequest(sizeByTopic map[string]int64) *StorageRequest {
return &StorageRequest{
RequestType: StorageAddSizeByTopic,
SizeByTopic: sizeByTopic,
}
}
func newAddSizeByBrokerRequest(sizeByBroker map[int32]int64) *StorageRequest {
return &StorageRequest{
RequestType: StorageAddSizeByBroker,
SizeByBroker: sizeByBroker,
}
}