/
policy.go
176 lines (155 loc) · 5.22 KB
/
policy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
// Copyright 2017 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafka
import (
"github.com/cilium/cilium/pkg/flowdebug"
api "github.com/cilium/cilium/pkg/policy/api/kafka"
log "github.com/sirupsen/logrus"
)
type Rule struct {
// ApiVersion is the allowed version, or < 0 if all versions
// are to be allowed
APIVersion int16
// ApiKeys is the set of all numerical apiKeys that are allowed.
// If empty, all API keys are allowed.
APIKeys map[int16]struct{}
// ClientID is the client identifier as provided in the request.
//
// From Kafka protocol documentation:
// This is a user supplied identifier for the client application. The
// user can use any identifier they like and it will be used when
// logging errors, monitoring aggregates, etc. For example, one might
// want to monitor not just the requests per second overall, but the
// number coming from each client application (each of which could
// reside on multiple servers). This id acts as a logical grouping
// across all requests from a particular client.
//
// If empty, all client identifiers are allowed.
ClientID string
// Topic is the topic name contained in the message. If a Kafka request
// contains multiple topics, then all topics must be allowed or the
// message will be rejected.
//
// This constraint is ignored if the matched request message type
// doesn't contain any topic. Maximum size of Topic can be 249
// characters as per recent Kafka spec and allowed characters are
// a-z, A-Z, 0-9, -, . and _
// Older Kafka versions had longer topic lengths of 255, but in Kafka 0.10
// version the length was changed from 255 to 249. For compatibility
// reasons we are allowing 255.
//
// If empty, all topics are allowed.
Topic string
}
// NewRule creates a new rule from already sanitized inputs
func NewRule(apiVersion int32, apiKeys []int32, clientID, topic string) Rule {
r := Rule{
APIVersion: int16(apiVersion),
ClientID: clientID,
Topic: topic,
APIKeys: make(map[int16]struct{}, len(apiKeys)),
}
for _, key := range apiKeys {
r.APIKeys[int16(key)] = struct{}{}
}
return r
}
// CheckAPIKeyRole checks the apiKey value in the request, and returns true if
// it is allowed else false
func (r *Rule) CheckAPIKeyRole(kind int16) bool {
// wildcard expression
if len(r.APIKeys) == 0 {
return true
}
// Check kind
_, ok := r.APIKeys[kind]
return ok
}
// CheckAPIVersion returns true if 'apiVersion' is allowed
func (r *Rule) CheckAPIVersion(apiVersion int16) bool {
return r.APIVersion < 0 || apiVersion == r.APIVersion
}
// CheckClientID returns true if 'clientID' is allowed
func (r *Rule) CheckClientID(clientID string) bool {
return r.ClientID == "" || clientID == r.ClientID
}
// isTopicAPIKey returns true if kind is apiKey message type which contains a
// topic in its request.
func isTopicAPIKey(kind int16) bool {
switch kind {
case api.ProduceKey,
api.FetchKey,
api.OffsetsKey,
api.MetadataKey,
api.LeaderAndIsr,
api.StopReplica,
api.UpdateMetadata,
api.OffsetCommitKey,
api.OffsetFetchKey,
api.CreateTopicsKey,
api.DeleteTopicsKey,
api.DeleteRecordsKey,
api.OffsetForLeaderEpochKey,
api.AddPartitionsToTxnKey,
api.WriteTxnMarkersKey,
api.TxnOffsetCommitKey,
api.AlterReplicaLogDirsKey,
api.DescribeLogDirsKey,
api.CreatePartitionsKey:
return true
}
return false
}
// Matches returns true if Rule matches the request and and all required topics have matched.
func (r Rule) Matches(data interface{}) bool {
req, ok := data.(*RequestMessage)
if !ok {
log.Warningf("Matches() called with type other than Kafka RequestMessage: %v", data)
return false
}
if flowdebug.Enabled() {
log.Debugf("Matching Kafka request %s against rule %v", req.String(), r)
}
if !r.CheckAPIKeyRole(req.kind) {
return false
}
if !r.CheckAPIVersion(req.version) {
return false
}
if !r.CheckClientID(req.clientID) {
return false
}
// Last step, check topic if applicable.
// Rule without a topic allows all topics and request types without topics
// are allowed regardless the rule's topic.
if r.Topic != "" && isTopicAPIKey(req.kind) {
// Rule has a topic constraint and the request type carries topics.
//
// Check it this rule's topic is in the request, but keep matching
// other rules (by returning false) even if this rule is satisfied
// if there are other topics in the request not matched yet.
//
// (req.topics is initialized with all the topics in the request
// before any rules are matched.)
if _, exists := req.topics[r.Topic]; exists {
delete(req.topics, r.Topic)
if len(req.topics) == 0 {
return true // all topics have matched
}
}
return false // more topic matches needed
}
// All rule's constraints are satisfied
return true
}