Skip to content

Commit f5f73f8

Browse files
committed
Support string array in pubsub message payload
1 parent befee0e commit f5f73f8

File tree

2 files changed

+52
-7
lines changed

2 files changed

+52
-7
lines changed

pubsub.go

+22-7
Original file line numberDiff line numberDiff line change
@@ -284,9 +284,10 @@ func (m *Subscription) String() string {
284284

285285
// Message received as result of a PUBLISH command issued by another client.
286286
type Message struct {
287-
Channel string
288-
Pattern string
289-
Payload string
287+
Channel string
288+
Pattern string
289+
Payload string
290+
PayloadSlice []string
290291
}
291292

292293
func (m *Message) String() string {
@@ -322,10 +323,24 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
322323
Count: int(reply[2].(int64)),
323324
}, nil
324325
case "message":
325-
return &Message{
326-
Channel: reply[1].(string),
327-
Payload: reply[2].(string),
328-
}, nil
326+
switch payload := reply[2].(type) {
327+
case string:
328+
return &Message{
329+
Channel: reply[1].(string),
330+
Payload: payload,
331+
}, nil
332+
case []interface{}:
333+
ss := make([]string, len(payload))
334+
for i, s := range payload {
335+
ss[i] = s.(string)
336+
}
337+
return &Message{
338+
Channel: reply[1].(string),
339+
PayloadSlice: ss,
340+
}, nil
341+
default:
342+
return nil, fmt.Errorf("redis: unsupported pubsub message payload: %T", payload)
343+
}
329344
case "pmessage":
330345
return &Message{
331346
Pattern: reply[1].(string),

pubsub_test.go

+30
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package redis_test
22

33
import (
4+
"context"
45
"io"
56
"net"
67
"sync"
@@ -14,11 +15,16 @@ import (
1415

1516
var _ = Describe("PubSub", func() {
1617
var client *redis.Client
18+
var clientID int64
1719

1820
BeforeEach(func() {
1921
opt := redisOptions()
2022
opt.MinIdleConns = 0
2123
opt.MaxConnAge = 0
24+
opt.OnConnect = func(ctx context.Context, cn *redis.Conn) (err error) {
25+
clientID, err = cn.ClientID(ctx).Result()
26+
return err
27+
}
2228
client = redis.NewClient(opt)
2329
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
2430
})
@@ -415,6 +421,30 @@ var _ = Describe("PubSub", func() {
415421
Expect(msg.Payload).To(Equal(string(bigVal)))
416422
})
417423

424+
It("handles message payload slice with server-assisted client-size caching", func() {
425+
pubsub := client.Subscribe(ctx, "__redis__:invalidate")
426+
defer pubsub.Close()
427+
428+
client2 := redis.NewClient(redisOptions())
429+
defer client2.Close()
430+
431+
err := client2.Do(ctx, "CLIENT", "TRACKING", "on", "REDIRECT", clientID).Err()
432+
Expect(err).NotTo(HaveOccurred())
433+
434+
err = client2.Do(ctx, "GET", "mykey").Err()
435+
Expect(err).To(Equal(redis.Nil))
436+
437+
err = client2.Do(ctx, "SET", "mykey", "myvalue").Err()
438+
Expect(err).NotTo(HaveOccurred())
439+
440+
ch := pubsub.Channel()
441+
442+
var msg *redis.Message
443+
Eventually(ch).Should(Receive(&msg))
444+
Expect(msg.Channel).To(Equal("__redis__:invalidate"))
445+
Expect(msg.PayloadSlice).To(Equal([]string{"mykey"}))
446+
})
447+
418448
It("supports concurrent Ping and Receive", func() {
419449
const N = 100
420450

0 commit comments

Comments
 (0)