Skip to content

Commit

Permalink
Merge pull request nsqio#31 from jehiah/lookup_channels_31
Browse files Browse the repository at this point in the history
nsqd: lookup channels upon topic creation
  • Loading branch information
mreiferson committed Oct 5, 2012
2 parents 25936c9 + 056da14 commit 76d48d4
Show file tree
Hide file tree
Showing 13 changed files with 233 additions and 99 deletions.
11 changes: 11 additions & 0 deletions nsq/lookup_peer.go
Expand Up @@ -12,6 +12,14 @@ type LookupPeer struct {
conn net.Conn
state int32
connectCallback func(*LookupPeer)
PeerInfo PeerInfo
}

type PeerInfo struct {
TcpPort int `json:"tcp_port"`
HttpPort int `json:"http_port"`
Version string `json:"version"`
Address string `json:"address"`
}

// NewLookupPeer creates a new LookupPeer instance
Expand Down Expand Up @@ -64,6 +72,9 @@ func (lp *LookupPeer) Command(cmd *Command) ([]byte, error) {
lp.connectCallback(lp)
}
}
if cmd == nil {
return nil, nil
}
err := SendCommand(lp, cmd)
if err != nil {
lp.Close()
Expand Down
39 changes: 3 additions & 36 deletions nsqadmin/lookupd_utils.go
Expand Up @@ -34,7 +34,7 @@ func getLookupdTopics(lookupdAddresses []string) ([]string, error) {
// {"data":{"topics":["test"]}}
// TODO: convert this to a StringArray() function in simplejson
topics, _ := data.Get("topics").Array()
allTopics = stringUnion(allTopics, topics)
allTopics = util.StringUnion(allTopics, topics)
}(endpoint)
}
wg.Wait()
Expand Down Expand Up @@ -141,7 +141,7 @@ func getLookupdTopicProducers(topic string, lookupdAddresses []string) ([]string
address := producer["address"].(string)
port := int(producer["http_port"].(float64))
key := fmt.Sprintf("%s:%d", address, port)
allSources = stringAdd(allSources, key)
allSources = util.StringAdd(allSources, key)
}
}(endpoint)
}
Expand Down Expand Up @@ -176,7 +176,7 @@ func getNSQDTopics(nsqdAddresses []string) ([]string, error) {
for _, topicInfo := range topicList {
topicInfo := topicInfo.(map[string]interface{})
topicName := topicInfo["topic_name"].(string)
topics = stringAdd(topics, topicName)
topics = util.StringAdd(topics, topicName)
}
}(endpoint)
}
Expand Down Expand Up @@ -333,36 +333,3 @@ func getNSQDStats(nsqdAddresses []string, selectedTopic string) ([]*TopicHostSta
return topicHostStats, channelStats, nil

}

func stringAdd(s []string, a string) []string {
o := s
found := false
for _, existing := range s {
if a == existing {
found = true
return s
}
}
if found == false {
o = append(o, a)
}
return o

}

func stringUnion(s []string, a []interface{}) []string {
o := s
for _, entry := range a {
found := false
for _, existing := range s {
if entry.(string) == existing {
found = true
break
}
}
if found == false {
o = append(o, entry.(string))
}
}
return o
}
3 changes: 2 additions & 1 deletion nsqd/channel.go
Expand Up @@ -134,6 +134,7 @@ func (c *Channel) Close() error {

close(c.exitChan)

// handle race condition w/ things writing into incomingMsgChan
c.Lock()
close(c.incomingMsgChan)
c.Unlock()
Expand All @@ -154,7 +155,7 @@ func (c *Channel) Close() error {
c.name, len(c.memoryMsgChan), len(c.inFlightMessages), len(c.deferredMessages))
}
FlushQueue(c)
err := c.backend.Close()
err = c.backend.Close()
if err != nil {
return err
}
Expand Down
57 changes: 43 additions & 14 deletions nsqd/lookup.go
Expand Up @@ -3,14 +3,16 @@ package main
import (
"../nsq"
"bitly/notify"
"bytes"
"encoding/json"
"log"
"net"
"os"
"strconv"
"time"
)

var lookupPeers = make([]*nsq.LookupPeer, 0)

func lookupRouter(lookupHosts []string, exitChan chan int) {
func (n *NSQd) lookupLoop() {
notifyChannelChan := make(chan interface{})
notifyTopicChan := make(chan interface{})
syncTopicChan := make(chan *nsq.LookupPeer)
Expand All @@ -20,22 +22,37 @@ func lookupRouter(lookupHosts []string, exitChan chan int) {
log.Fatalf("ERROR: failed to get hostname - %s", err.Error())
}

for _, host := range lookupHosts {
for _, host := range n.lookupAddrs {
log.Printf("LOOKUP: adding peer %s", host)
lookupPeer := nsq.NewLookupPeer(host, func(lp *nsq.LookupPeer) {
cmd := nsq.Identify(VERSION, nsqd.tcpAddr.Port, nsqd.httpAddr.Port, hostname)
_, err := lp.Command(cmd)
if err != nil {
log.Printf("Error writing to %s %s", host, err.Error())
resp, err := lp.Command(cmd)
if err != nil || bytes.Equal(resp, []byte("E_INVALID")) {
log.Printf("LOOKUPD: Error writing to %s %s", host, err.Error())
} else {
if bytes.Equal(resp, []byte("OK")) {
// this is an old host
log.Printf("LOOKUPD(%s) got old response from lokupd. %v", lp, resp)
} else {
// this is a new response; parse it
err = json.Unmarshal(resp, &lp.PeerInfo)
if err != nil {
log.Printf("LOOKUPD(%s) Error parsing response %v", lp, resp)
} else {
log.Printf("LOOKUPD(%s) peer info %+v", lp, lp.PeerInfo)
}
}
}

go func() {
syncTopicChan <- lp
}()
})
lookupPeers = append(lookupPeers, lookupPeer)
lookupPeer.Command(nil) // start the connection
n.lookupPeers = append(n.lookupPeers, lookupPeer)
}

if len(lookupPeers) > 0 {
if len(n.lookupPeers) > 0 {
notify.Start("channel_change", notifyChannelChan)
notify.Start("new_topic", notifyTopicChan)
}
Expand All @@ -46,7 +63,7 @@ func lookupRouter(lookupHosts []string, exitChan chan int) {
select {
case <-ticker:
// send a heartbeat and read a response (read detects closed conns)
for _, lookupPeer := range lookupPeers {
for _, lookupPeer := range n.lookupPeers {
log.Printf("LOOKUP: [%s] sending heartbeat", lookupPeer)
_, err := lookupPeer.Command(nsq.Ping())
if err != nil {
Expand All @@ -62,7 +79,7 @@ func lookupRouter(lookupHosts []string, exitChan chan int) {
} else {
cmd = nsq.Register(channel.topicName, channel.name)
}
for _, lookupPeer := range lookupPeers {
for _, lookupPeer := range n.lookupPeers {
log.Printf("LOOKUP: [%s] channel %s", lookupPeer, cmd)
_, err := lookupPeer.Command(cmd)
if err != nil {
Expand All @@ -73,7 +90,7 @@ func lookupRouter(lookupHosts []string, exitChan chan int) {
// notify all nsqds that a new topic exists
topic := newTopic.(*Topic)
cmd := nsq.Register(topic.name, "")
for _, lookupPeer := range lookupPeers {
for _, lookupPeer := range n.lookupPeers {
log.Printf("LOOKUP: [%s] new topic %s", lookupPeer, cmd)
_, err := lookupPeer.Command(cmd)
if err != nil {
Expand Down Expand Up @@ -105,15 +122,27 @@ func lookupRouter(lookupHosts []string, exitChan chan int) {
break
}
}
case <-exitChan:
case <-n.exitChan:
goto exit
}
}

exit:
log.Printf("LOOKUP: closing")
if len(lookupPeers) > 0 {
if len(n.lookupPeers) > 0 {
notify.Stop("channel_change", notifyChannelChan)
notify.Stop("new_topic", notifyTopicChan)
}
}

func (n *NSQd) lookupHttpAddresses() []string {
var lookupHttpAddresses []string
for _, lp := range n.lookupPeers {
if len(lp.PeerInfo.Address) <= 0 {
continue
}
addr := net.JoinHostPort(lp.PeerInfo.Address, strconv.Itoa(lp.PeerInfo.HttpPort))
lookupHttpAddresses = append(lookupHttpAddresses, addr)
}
return lookupHttpAddresses
}
35 changes: 25 additions & 10 deletions nsqd/nsqd.go
Expand Up @@ -28,6 +28,7 @@ type NSQd struct {
idChan chan []byte
exitChan chan int
waitGroup util.WaitGroupWrapper
lookupPeers []*nsq.LookupPeer
}

type nsqdOptions struct {
Expand Down Expand Up @@ -65,7 +66,7 @@ func NewNSQd(workerId int64, options *nsqdOptions) *NSQd {
}

func (n *NSQd) Main() {
n.waitGroup.Wrap(func() { lookupRouter(n.lookupAddrs, n.exitChan) })
n.waitGroup.Wrap(func() { n.lookupLoop() })

tcpListener, err := net.Listen("tcp", n.tcpAddr.String())
if err != nil {
Expand Down Expand Up @@ -162,16 +163,30 @@ func (n *NSQd) Exit() {
// to return a pointer to a Topic object (potentially new)
func (n *NSQd) GetTopic(topicName string) *Topic {
n.Lock()
defer n.Unlock()

topic, ok := n.topicMap[topicName]
if !ok {
topic = NewTopic(topicName, n.options)
n.topicMap[topicName] = topic
log.Printf("TOPIC(%s): created", topic.name)
t, ok := n.topicMap[topicName]
if ok {
n.Unlock()
return t
} else {
t = NewTopic(topicName, n.options)
n.topicMap[topicName] = t
log.Printf("TOPIC(%s): created", t.name)

// release our global nsqd lock, and switch to a more granular topic lock while we init our
// channels from lookupd. This blocks concurrent PutMessages to this topic.
t.Lock()
defer t.Unlock()
n.Unlock()
// if using lookupd, make a blocking call to get the topics, and immediately create them.
// this makes sure that any message received is buffered to the right channels
if len(n.lookupPeers) > 0 {
channelNames, _ := util.GetChannelsForTopic(t.name, n.lookupHttpAddresses())
for _, channelName := range channelNames {
t.getOrCreateChannel(channelName)
}
}
}

return topic
return t
}

// HasTopic performs a thread safe operation to check for topic existance
Expand Down
30 changes: 8 additions & 22 deletions nsqd/nsqd_test.go
Expand Up @@ -5,7 +5,6 @@ import (
"github.com/bmizerany/assert"
"io/ioutil"
"log"
"net"
"os"
"strconv"
"testing"
Expand All @@ -19,22 +18,15 @@ func TestStartup(t *testing.T) {
iterations := 300
doneExitChan := make(chan int)

tcpAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:4150")
httpAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:4151")

options := NewNsqdOptions()
options.memQueueSize = 100
options.maxBytesPerFile = 10240

nsqd := NewNSQd(1, options)
nsqd.tcpAddr = tcpAddr
nsqd.httpAddr = httpAddr
mustStartNSQd(options)

topicName := "nsqd_test" + strconv.Itoa(int(time.Now().Unix()))

exitChan := make(chan int)
go func() {
nsqd.Main()
<-exitChan
nsqd.Exit()
doneExitChan <- 1
Expand Down Expand Up @@ -69,18 +61,18 @@ func TestStartup(t *testing.T) {

// start up a new nsqd w/ the same folder

nsqdd := NewNSQd(1, options)
nsqdd.tcpAddr = tcpAddr
nsqdd.httpAddr = httpAddr
options = NewNsqdOptions()
options.memQueueSize = 100
options.maxBytesPerFile = 10240
mustStartNSQd(options)

go func() {
nsqdd.Main()
<-exitChan
nsqdd.Exit()
nsqd.Exit()
doneExitChan <- 1
}()

topic = nsqdd.GetTopic(topicName)
topic = nsqd.GetTopic(topicName)
// should be empty; channel should have drained everything
count := topic.Depth()
assert.Equal(t, count, int64(0))
Expand Down Expand Up @@ -111,21 +103,15 @@ func TestEphemeralChannel(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)

tcpAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:4150")
httpAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:4151")

options := NewNsqdOptions()
options.memQueueSize = 100
nsqd := NewNSQd(1, options)
nsqd.tcpAddr = tcpAddr
nsqd.httpAddr = httpAddr
mustStartNSQd(options)

topicName := "ephemeral_test" + strconv.Itoa(int(time.Now().Unix()))
doneExitChan := make(chan int)

exitChan := make(chan int)
go func() {
nsqd.Main()
<-exitChan
nsqd.Exit()
doneExitChan <- 1
Expand Down

0 comments on commit 76d48d4

Please sign in to comment.