Permalink
Browse files

nsqd: topic message pump cleanup

* when all the channels have been deleted cleanly exit the topic
  message pump
* version bump
  • Loading branch information...
1 parent f9668d7 commit 02a510c97654f9be8c0b99d9546581447d4f7e88 @mreiferson mreiferson committed Oct 8, 2012
Showing with 56 additions and 9 deletions.
  1. +24 −8 nsqd/topic.go
  2. +31 −0 nsqd/topic_test.go
  3. +1 −1 nsqd/version.go
View
@@ -19,7 +19,7 @@ type Topic struct {
backend BackendQueue
incomingMsgChan chan *nsq.Message
memoryMsgChan chan *nsq.Message
- messagePumpStarter sync.Once
+ messagePumpStarter *sync.Once
exitChan chan int
waitGroup util.WaitGroupWrapper
exitFlag int32
@@ -30,13 +30,14 @@ type Topic struct {
// Topic constructor
func NewTopic(topicName string, options *nsqdOptions) *Topic {
topic := &Topic{
- name: topicName,
- channelMap: make(map[string]*Channel),
- backend: NewDiskQueue(topicName, options.dataPath, options.maxBytesPerFile, options.syncEvery),
- incomingMsgChan: make(chan *nsq.Message, 1),
- memoryMsgChan: make(chan *nsq.Message, options.memQueueSize),
- options: options,
- exitChan: make(chan int),
+ name: topicName,
+ channelMap: make(map[string]*Channel),
+ backend: NewDiskQueue(topicName, options.dataPath, options.maxBytesPerFile, options.syncEvery),
+ incomingMsgChan: make(chan *nsq.Message, 1),
+ memoryMsgChan: make(chan *nsq.Message, options.memQueueSize),
+ options: options,
+ exitChan: make(chan int),
+ messagePumpStarter: new(sync.Once),
}
topic.waitGroup.Wrap(func() { topic.router() })
@@ -156,6 +157,21 @@ func (t *Topic) messagePump() {
}
t.RLock()
+ // check if all the channels have been deleted
+ if len(t.channelMap) == 0 {
+ // put this message back on the queue
+ // we need to background because we currently hold the lock
+ go func() {
+ t.PutMessage(msg)
+ }()
+
+ // reset the sync.Once
+ t.messagePumpStarter = new(sync.Once)
+
+ t.RUnlock()
+ goto exit
+ }
+
for _, channel := range t.channelMap {
// copy the message because each channel
// needs a unique instance
View
@@ -9,13 +9,15 @@ import (
"runtime"
"strconv"
"testing"
+ "time"
)
func TestGetTopic(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)
nsqd := NewNSQd(1, NewNsqdOptions())
+ defer nsqd.Exit()
topic1 := nsqd.GetTopic("test")
assert.NotEqual(t, nil, topic1)
@@ -34,6 +36,8 @@ func TestGetChannel(t *testing.T) {
defer log.SetOutput(os.Stdout)
nsqd := NewNSQd(1, NewNsqdOptions())
+ defer nsqd.Exit()
+
topic := nsqd.GetTopic("test")
channel1 := topic.GetChannel("ch1")
@@ -51,6 +55,8 @@ func TestDeletes(t *testing.T) {
defer log.SetOutput(os.Stdout)
nsqd := NewNSQd(1, NewNsqdOptions())
+ defer nsqd.Exit()
+
topic := nsqd.GetTopic("test")
channel1 := topic.GetChannel("ch1")
@@ -69,6 +75,29 @@ func TestDeletes(t *testing.T) {
assert.Equal(t, 0, len(nsqd.topicMap))
}
+func TestDeleteLast(t *testing.T) {
+ log.SetOutput(ioutil.Discard)
+ defer log.SetOutput(os.Stdout)
+
+ nsqd := NewNSQd(1, NewNsqdOptions())
+ defer nsqd.Exit()
+
+ topic := nsqd.GetTopic("test")
+
+ channel1 := topic.GetChannel("ch1")
+ assert.NotEqual(t, nil, channel1)
+
+ err := topic.DeleteExistingChannel("ch1")
+ assert.Equal(t, nil, err)
+ assert.Equal(t, 0, len(topic.channelMap))
+
+ msg := nsq.NewMessage(<-nsqd.idChan, []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
+ err = topic.PutMessage(msg)
+ time.Sleep(100 * time.Millisecond)
+ assert.Equal(t, nil, err)
+ assert.Equal(t, topic.Depth(), int64(1))
+}
+
func BenchmarkTopicPut(b *testing.B) {
b.StopTimer()
log.SetOutput(ioutil.Discard)
@@ -77,6 +106,7 @@ func BenchmarkTopicPut(b *testing.B) {
options := NewNsqdOptions()
options.memQueueSize = int64(b.N)
nsqd := NewNSQd(1, options)
+ defer nsqd.Exit()
b.StartTimer()
for i := 0; i <= b.N; i++ {
@@ -95,6 +125,7 @@ func BenchmarkTopicToChannelPut(b *testing.B) {
options := NewNsqdOptions()
options.memQueueSize = int64(b.N)
nsqd := NewNSQd(1, options)
+ defer nsqd.Exit()
channel := nsqd.GetTopic(topicName).GetChannel(channelName)
b.StartTimer()
View
@@ -1,3 +1,3 @@
package main
-const VERSION = "0.2.10"
+const VERSION = "0.2.11"

0 comments on commit 02a510c

Please sign in to comment.