forked from cilium/cilium
/
request.go
229 lines (199 loc) · 6.1 KB
/
request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
// Copyright 2017 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafka
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"github.com/cilium/cilium/pkg/flowdebug"
"github.com/optiopay/kafka/proto"
)
// RequestMessage represents a Kafka request message
type RequestMessage struct {
kind int16
version int16
rawMsg []byte
request interface{}
}
// CorrelationID represents the correlation id as defined in the Kafka protocol
// specification
type CorrelationID uint32
// GetAPIKey returns the kind of Kafka request
func (req *RequestMessage) GetAPIKey() int16 {
return req.kind
}
// GetRaw returns the raw Kafka request
func (req *RequestMessage) GetRaw() []byte {
return req.rawMsg
}
// GetVersion returns the version Kafka request
func (req *RequestMessage) GetVersion() int16 {
return req.version
}
// GetCorrelationID returns the Kafka request correlationID
func (req *RequestMessage) GetCorrelationID() CorrelationID {
if len(req.rawMsg) >= 12 {
return CorrelationID(binary.BigEndian.Uint32(req.rawMsg[8:12]))
}
return CorrelationID(0)
}
// SetCorrelationID modified the correlation ID of the Kafka request
func (req *RequestMessage) SetCorrelationID(id CorrelationID) {
if len(req.rawMsg) >= 12 {
binary.BigEndian.PutUint32(req.rawMsg[8:12], uint32(id))
}
}
func (req *RequestMessage) extractVersion() int16 {
return int16(binary.BigEndian.Uint16(req.rawMsg[6:8]))
}
// String returns a human readable representation of the request message
func (req *RequestMessage) String() string {
b, err := json.Marshal(req.request)
if err != nil {
return err.Error()
}
return fmt.Sprintf("apiKey=%d,apiVersion=%d,len=%d: %s",
req.kind, req.version, len(req.rawMsg), string(b))
}
// GetTopics returns the Kafka request list of topics
func (req *RequestMessage) GetTopics() []string {
if req.request == nil {
return nil
}
switch val := req.request.(type) {
case *proto.ProduceReq:
return produceTopics(val)
case *proto.FetchReq:
return fetchTopics(val)
case *proto.OffsetReq:
return offsetTopics(val)
case *proto.MetadataReq:
return metadataTopics(val)
case *proto.OffsetCommitReq:
return offsetCommitTopics(val)
case *proto.OffsetFetchReq:
return offsetFetchTopics(val)
}
return nil
}
func produceTopics(req *proto.ProduceReq) []string {
topics := make([]string, len(req.Topics))
for k, topic := range req.Topics {
topics[k] = topic.Name
}
return topics
}
func fetchTopics(req *proto.FetchReq) []string {
topics := make([]string, len(req.Topics))
for k, topic := range req.Topics {
topics[k] = topic.Name
}
return topics
}
func offsetTopics(req *proto.OffsetReq) []string {
topics := make([]string, len(req.Topics))
for k, topic := range req.Topics {
topics[k] = topic.Name
}
return topics
}
func metadataTopics(req *proto.MetadataReq) []string {
topics := req.Topics
return topics
}
func offsetCommitTopics(req *proto.OffsetCommitReq) []string {
topics := make([]string, len(req.Topics))
for k, topic := range req.Topics {
topics[k] = topic.Name
}
return topics
}
func offsetFetchTopics(req *proto.OffsetFetchReq) []string {
topics := make([]string, len(req.Topics))
for k, topic := range req.Topics {
topics[k] = topic.Name
}
return topics
}
// CreateResponse creates a response message based on the provided request
// message. The response will have the specified error code set in all topics
// and embedded partitions.
func (req *RequestMessage) CreateResponse(err error) (*ResponseMessage, error) {
switch val := req.request.(type) {
case *proto.ProduceReq:
return createProduceResponse(val, err)
case *proto.FetchReq:
return createFetchResponse(val, err)
case *proto.OffsetReq:
return createOffsetResponse(val, err)
case *proto.MetadataReq:
return createMetadataResponse(val, err)
case *proto.ConsumerMetadataReq:
return createConsumerMetadataResponse(val, err)
case *proto.OffsetCommitReq:
return createOffsetCommitResponse(val, err)
case *proto.OffsetFetchReq:
return createOffsetFetchResponse(val, err)
case nil:
return nil, fmt.Errorf("unsupported request API key %d", req.kind)
default:
// The switch cases above must correspond exactly to the switch cases
// in ReadRequest.
log.Panic(fmt.Sprintf("Kafka API key not handled: %d", req.kind))
}
return nil, nil
}
// ReadRequest will read a Kafka request from an io.Reader and return the
// message or an error.
func ReadRequest(reader io.Reader) (*RequestMessage, error) {
req := &RequestMessage{}
var err error
req.kind, req.rawMsg, err = proto.ReadReq(reader)
if err != nil {
return nil, err
}
if len(req.rawMsg) < 12 {
return nil,
fmt.Errorf("unexpected end of request (length < 12 bytes)")
}
req.version = req.extractVersion()
var nilSlice []byte
buf := bytes.NewBuffer(append(nilSlice, req.rawMsg...))
switch req.kind {
case proto.ProduceReqKind:
req.request, err = proto.ReadProduceReq(buf)
case proto.FetchReqKind:
req.request, err = proto.ReadFetchReq(buf)
case proto.OffsetReqKind:
req.request, err = proto.ReadOffsetReq(buf)
case proto.MetadataReqKind:
req.request, err = proto.ReadMetadataReq(buf)
case proto.ConsumerMetadataReqKind:
req.request, err = proto.ReadConsumerMetadataReq(buf)
case proto.OffsetCommitReqKind:
req.request, err = proto.ReadOffsetCommitReq(buf)
case proto.OffsetFetchReqKind:
req.request, err = proto.ReadOffsetFetchReq(buf)
default:
log.WithField(fieldRequest, req.String()).Debugf("Unknown Kafka request API key: %d", req.kind)
}
if err != nil {
flowdebug.Log(log.WithField(fieldRequest, req.String()).WithError(err),
"Ignoring Kafka message due to parse error")
return nil, err
}
return req, nil
}