Skip to content

Commit 179fc55

Browse files
authored
feat(pubsublite): Message type and message routers (#3077)
pubsublite.Message is similar to pubsub.Message, with the following differences: - Attributes can have multiple values for the same key. - Pub/Sub Lite uses []byte for data, attribute values and ordering keys. Message routers select a partition to route a published message to. SHA256 hash is used for routing messages with ordering keys. Round robin is used for routing messages without ordering keys.
1 parent 9eb9fcb commit 179fc55

File tree

2 files changed

+368
-0
lines changed

2 files changed

+368
-0
lines changed

pubsublite/message.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// Copyright 2020 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
14+
package pubsublite
15+
16+
import (
17+
"crypto/sha256"
18+
"fmt"
19+
"math/big"
20+
"math/rand"
21+
"time"
22+
23+
"github.com/golang/protobuf/ptypes"
24+
25+
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
26+
)
27+
28+
// AttributeValues is a slice of strings.
29+
type AttributeValues [][]byte
30+
31+
// Message represents a Pub/Sub message.
32+
type Message struct {
33+
// Data is the actual data in the message.
34+
Data []byte
35+
36+
// Attributes can be used to label the message. A key may have multiple
37+
// values.
38+
Attributes map[string]AttributeValues
39+
40+
// EventTime is an optional, user-specified event time for this message.
41+
EventTime time.Time
42+
43+
// OrderingKey identifies related messages for which publish order should
44+
// be respected. Messages with the same ordering key are published to the
45+
// same topic partition and subscribers will receive the messages in order.
46+
// If the ordering key is empty, the message will be sent to an arbitrary
47+
// partition.
48+
OrderingKey []byte
49+
}
50+
51+
func (m *Message) toProto() (*pb.PubSubMessage, error) {
52+
msgpb := &pb.PubSubMessage{
53+
Data: m.Data,
54+
Key: m.OrderingKey,
55+
}
56+
57+
if len(m.Attributes) > 0 {
58+
msgpb.Attributes = make(map[string]*pb.AttributeValues)
59+
for key, values := range m.Attributes {
60+
msgpb.Attributes[key] = &pb.AttributeValues{Values: values}
61+
}
62+
}
63+
64+
if !m.EventTime.IsZero() {
65+
ts, err := ptypes.TimestampProto(m.EventTime)
66+
if err != nil {
67+
return nil, fmt.Errorf("pubsublite: error converting message timestamp: %v", err)
68+
}
69+
msgpb.EventTime = ts
70+
}
71+
return msgpb, nil
72+
}
73+
74+
// messageRouter outputs a partition number, given an ordering key. Results are
75+
// undefined when:
76+
// - setPartitionCount() is called with count <= 0.
77+
// - route() is called before setPartitionCount() to initialize the router.
78+
//
79+
// Message routers need to accommodate topic partition resizing.
80+
type messageRouter interface {
81+
SetPartitionCount(count int)
82+
Route(orderingKey []byte) int
83+
}
84+
85+
// roundRobinMsgRouter sequentially cycles through partition numbers, starting
86+
// from a random partition.
87+
type roundRobinMsgRouter struct {
88+
rng *rand.Rand
89+
partitionCount int
90+
nextPartition int
91+
}
92+
93+
func (r *roundRobinMsgRouter) SetPartitionCount(count int) {
94+
r.partitionCount = count
95+
r.nextPartition = int(r.rng.Int63n(int64(count)))
96+
}
97+
98+
func (r *roundRobinMsgRouter) Route(orderingKey []byte) (partition int) {
99+
partition = r.nextPartition
100+
r.nextPartition = (partition + 1) % r.partitionCount
101+
return
102+
}
103+
104+
// hashingMsgRouter hashes an ordering key using SHA256 to obtain a partition
105+
// number. It should only be used for messages with an ordering key.
106+
//
107+
// Matches implementation at:
108+
// https://github.com/googleapis/java-pubsublite/blob/master/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/DefaultRoutingPolicy.java
109+
type hashingMsgRouter struct {
110+
partitionCount *big.Int
111+
}
112+
113+
func (r *hashingMsgRouter) SetPartitionCount(count int) {
114+
r.partitionCount = big.NewInt(int64(count))
115+
}
116+
117+
func (r *hashingMsgRouter) Route(orderingKey []byte) int {
118+
if len(orderingKey) == 0 {
119+
return -1
120+
}
121+
h := sha256.Sum256(orderingKey)
122+
num := new(big.Int).SetBytes(h[:])
123+
partition := new(big.Int).Mod(num, r.partitionCount)
124+
return int(partition.Int64())
125+
}
126+
127+
// compositeMsgRouter delegates to different message routers for messages
128+
// with/without ordering keys.
129+
type compositeMsgRouter struct {
130+
keyedRouter messageRouter
131+
keylessRouter messageRouter
132+
}
133+
134+
func (r *compositeMsgRouter) SetPartitionCount(count int) {
135+
r.keyedRouter.SetPartitionCount(count)
136+
r.keylessRouter.SetPartitionCount(count)
137+
}
138+
139+
func (r *compositeMsgRouter) Route(orderingKey []byte) int {
140+
if len(orderingKey) > 0 {
141+
return r.keyedRouter.Route(orderingKey)
142+
}
143+
return r.keylessRouter.Route(orderingKey)
144+
}
145+
146+
// defaultMessageRouter returns a compositeMsgRouter that uses hashingMsgRouter
147+
// for messages with ordering key and roundRobinMsgRouter for messages without.
148+
func newDefaultMessageRouter(rng *rand.Rand) messageRouter {
149+
return &compositeMsgRouter{
150+
keyedRouter: &hashingMsgRouter{},
151+
keylessRouter: &roundRobinMsgRouter{rng: rng},
152+
}
153+
}

pubsublite/message_test.go

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
// Copyright 2020 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
14+
package pubsublite
15+
16+
import (
17+
"fmt"
18+
"math/rand"
19+
"testing"
20+
"time"
21+
22+
"github.com/golang/protobuf/proto"
23+
24+
tspb "github.com/golang/protobuf/ptypes/timestamp"
25+
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
26+
)
27+
28+
type fakeSource struct {
29+
ret int64
30+
}
31+
32+
func (f *fakeSource) Int63() int64 { return f.ret }
33+
func (f *fakeSource) Seed(seed int64) {}
34+
35+
type fakeMsgRouter struct {
36+
multiplier int
37+
partitionCount int
38+
}
39+
40+
func (f *fakeMsgRouter) SetPartitionCount(count int) {
41+
f.partitionCount = count
42+
}
43+
44+
func (f *fakeMsgRouter) Route(orderingKey []byte) int {
45+
return f.partitionCount * f.multiplier
46+
}
47+
48+
func TestMessageToProto(t *testing.T) {
49+
for _, tc := range []struct {
50+
desc string
51+
msg *Message
52+
want *pb.PubSubMessage
53+
}{
54+
{
55+
desc: "valid: minimal",
56+
msg: &Message{
57+
Data: []byte("Hello world"),
58+
},
59+
want: &pb.PubSubMessage{
60+
Data: []byte("Hello world"),
61+
},
62+
},
63+
{
64+
desc: "valid: filled",
65+
msg: &Message{
66+
Data: []byte("foo"),
67+
Attributes: map[string]AttributeValues{
68+
"attr1": [][]byte{
69+
[]byte("val1"),
70+
[]byte("val2"),
71+
},
72+
},
73+
EventTime: time.Unix(1555593697, 154358*1000),
74+
OrderingKey: []byte("order"),
75+
},
76+
want: &pb.PubSubMessage{
77+
Data: []byte("foo"),
78+
Attributes: map[string]*pb.AttributeValues{
79+
"attr1": {
80+
Values: [][]byte{
81+
[]byte("val1"),
82+
[]byte("val2"),
83+
},
84+
},
85+
},
86+
EventTime: &tspb.Timestamp{
87+
Seconds: 1555593697,
88+
Nanos: 154358 * 1000,
89+
},
90+
Key: []byte("order"),
91+
},
92+
},
93+
} {
94+
t.Run(tc.desc, func(t *testing.T) {
95+
got, err := tc.msg.toProto()
96+
if err != nil {
97+
t.Errorf("toProto() err = %v", err)
98+
} else if !proto.Equal(got, tc.want) {
99+
t.Errorf("toProto() got = %v\nwant = %v", got, tc.want)
100+
}
101+
})
102+
}
103+
}
104+
105+
func TestRoundRobinMsgRouter(t *testing.T) {
106+
// Using the same msgRouter for each test run ensures that it reinitializes
107+
// when the partition count changes.
108+
source := &fakeSource{}
109+
msgRouter := &roundRobinMsgRouter{rng: rand.New(source)}
110+
111+
for _, tc := range []struct {
112+
partitionCount int
113+
source int64
114+
want []int
115+
}{
116+
{
117+
partitionCount: 8,
118+
source: 9,
119+
want: []int{1, 2, 3, 4, 5, 6, 7, 0, 1},
120+
},
121+
{
122+
partitionCount: 5,
123+
source: 2,
124+
want: []int{2, 3, 4, 0, 1, 2},
125+
},
126+
} {
127+
t.Run(fmt.Sprintf("partitionCount=%d", tc.partitionCount), func(t *testing.T) {
128+
source.ret = tc.source
129+
msgRouter.SetPartitionCount(tc.partitionCount)
130+
for i, want := range tc.want {
131+
got := msgRouter.Route([]byte("IGNORED"))
132+
if got != want {
133+
t.Errorf("i=%d: Route() = %d, want = %d", i, got, want)
134+
}
135+
}
136+
})
137+
}
138+
}
139+
140+
func TestHashingMsgRouter(t *testing.T) {
141+
// Using the same msgRouter for each test run ensures that it reinitializes
142+
// when the partition count changes.
143+
msgRouter := &hashingMsgRouter{}
144+
145+
keys := [][]byte{
146+
[]byte("foo1"),
147+
[]byte("foo2"),
148+
[]byte("foo3"),
149+
[]byte("foo4"),
150+
[]byte("foo5"),
151+
}
152+
153+
for _, tc := range []struct {
154+
partitionCount int
155+
}{
156+
{partitionCount: 10},
157+
{partitionCount: 5},
158+
} {
159+
t.Run(fmt.Sprintf("partitionCount=%d", tc.partitionCount), func(t *testing.T) {
160+
msgRouter.SetPartitionCount(tc.partitionCount)
161+
for _, key := range keys {
162+
p1 := msgRouter.Route(key)
163+
p2 := msgRouter.Route(key)
164+
if p1 != p2 {
165+
t.Errorf("Route() returned different partitions for same key %v", key)
166+
}
167+
if p1 < 0 || p1 >= tc.partitionCount {
168+
t.Errorf("Route() returned partition out of range: %v", p1)
169+
}
170+
}
171+
})
172+
}
173+
}
174+
175+
func TestCompositeMsgRouter(t *testing.T) {
176+
keyedRouter := &fakeMsgRouter{multiplier: 10}
177+
keylessRouter := &fakeMsgRouter{multiplier: 100}
178+
msgRouter := &compositeMsgRouter{
179+
keyedRouter: keyedRouter,
180+
keylessRouter: keylessRouter,
181+
}
182+
183+
for _, tc := range []struct {
184+
desc string
185+
partitionCount int
186+
key []byte
187+
want int
188+
}{
189+
{
190+
desc: "key",
191+
partitionCount: 2,
192+
key: []byte("foo"),
193+
want: 20,
194+
},
195+
{
196+
desc: "nil key",
197+
partitionCount: 8,
198+
key: nil,
199+
want: 800,
200+
},
201+
{
202+
desc: "empty key",
203+
partitionCount: 5,
204+
key: []byte{},
205+
want: 500,
206+
},
207+
} {
208+
t.Run(tc.desc, func(t *testing.T) {
209+
msgRouter.SetPartitionCount(tc.partitionCount)
210+
if got := msgRouter.Route(tc.key); got != tc.want {
211+
t.Errorf("Route() = %d, want = %d", got, tc.want)
212+
}
213+
})
214+
}
215+
}

0 commit comments

Comments
 (0)