Skip to content

Commit 21913a8

Browse files
committed
pubsub: drop a message when the channel is full
1 parent 0d39ee8 commit 21913a8

File tree

2 files changed

+38
-7
lines changed

2 files changed

+38
-7
lines changed

pubsub.go

+31-7
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package redis
33
import (
44
"errors"
55
"fmt"
6+
"strings"
67
"sync"
78
"time"
89

@@ -29,8 +30,9 @@ type PubSub struct {
2930
cn *pool.Conn
3031
channels map[string]struct{}
3132
patterns map[string]struct{}
32-
closed bool
33-
exit chan struct{}
33+
34+
closed bool
35+
exit chan struct{}
3436

3537
cmd *Cmd
3638

@@ -39,6 +41,12 @@ type PubSub struct {
3941
ping chan struct{}
4042
}
4143

44+
func (c *PubSub) String() string {
45+
channels := mapKeys(c.channels)
46+
channels = append(channels, mapKeys(c.patterns)...)
47+
return fmt.Sprintf("PubSub(%s)", strings.Join(channels, ", "))
48+
}
49+
4250
func (c *PubSub) init() {
4351
c.exit = make(chan struct{})
4452
}
@@ -389,16 +397,23 @@ func (c *PubSub) ReceiveMessage() (*Message, error) {
389397
// It periodically sends Ping messages to test connection health.
390398
// The channel is closed with PubSub. Receive* APIs can not be used
391399
// after channel is created.
400+
//
401+
// If the Go channel is full for 30 seconds the message is dropped.
392402
func (c *PubSub) Channel() <-chan *Message {
393403
c.chOnce.Do(c.initChannel)
394404
return c.ch
395405
}
396406

397407
func (c *PubSub) initChannel() {
408+
const timeout = 30 * time.Second
409+
398410
c.ch = make(chan *Message, 100)
399-
c.ping = make(chan struct{}, 10)
411+
c.ping = make(chan struct{}, 1)
400412

401413
go func() {
414+
timer := time.NewTimer(timeout)
415+
timer.Stop()
416+
402417
var errCount int
403418
for {
404419
msg, err := c.Receive()
@@ -413,6 +428,7 @@ func (c *PubSub) initChannel() {
413428
errCount++
414429
continue
415430
}
431+
416432
errCount = 0
417433

418434
// Any message is as good as a ping.
@@ -427,16 +443,24 @@ func (c *PubSub) initChannel() {
427443
case *Pong:
428444
// Ignore.
429445
case *Message:
430-
c.ch <- msg
446+
timer.Reset(timeout)
447+
select {
448+
case c.ch <- msg:
449+
if !timer.Stop() {
450+
<-timer.C
451+
}
452+
case <-timer.C:
453+
internal.Logf(
454+
"redis: %s channel is full for %s (message is dropped)",
455+
c, timeout)
456+
}
431457
default:
432-
internal.Logf("redis: unknown message: %T", msg)
458+
internal.Logf("redis: unknown message type: %T", msg)
433459
}
434460
}
435461
}()
436462

437463
go func() {
438-
const timeout = 5 * time.Second
439-
440464
timer := time.NewTimer(timeout)
441465
timer.Stop()
442466

pubsub_test.go

+7
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@ var _ = Describe("PubSub", func() {
2727
Expect(client.Close()).NotTo(HaveOccurred())
2828
})
2929

30+
It("implements Stringer", func() {
31+
pubsub := client.PSubscribe("mychannel*")
32+
defer pubsub.Close()
33+
34+
Expect(pubsub.String()).To(Equal("PubSub(mychannel*)"))
35+
})
36+
3037
It("should support pattern matching", func() {
3138
pubsub := client.PSubscribe("mychannel*")
3239
defer pubsub.Close()

0 commit comments

Comments
 (0)