Skip to content

Commit

Permalink
Feature: add unit test channel_test.go
Browse files Browse the repository at this point in the history
cmd `go test github.com/chainhelen/dtnsq/nsqd`
  • Loading branch information
chainhelen committed Jul 29, 2019
1 parent 89a9d44 commit 94081c8
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 83 deletions.
20 changes: 20 additions & 0 deletions nsqd/assist_test.go
@@ -1,6 +1,7 @@
package nsqd

import (
"errors"
"github.com/chainhelen/dtnsq/internal/test"
"github.com/chainhelen/go-dtnsq"
"io"
Expand Down Expand Up @@ -95,3 +96,22 @@ func readValidate(t *testing.T, conn io.Reader, f int32, d string) []byte {
test.Equal(t, d, string(data))
return data
}

type errorBackendQueue struct{}

func (d *errorBackendQueue) Put([]byte) (BackendQueueEnd, error) {
return nil, errors.New("never gonna happen")
}
func (d *errorBackendQueue) ReadChan() chan []byte { return nil }
func (d *errorBackendQueue) Close() error { return nil }
func (d *errorBackendQueue) Delete() error { return nil }
func (d *errorBackendQueue) Depth() int64 { return 0 }
func (d *errorBackendQueue) Empty() error { return nil }
func (d *errorBackendQueue) ReaderFlush() error { return nil }
func (d *errorBackendQueue) GetQueueReadEnd() BackendQueueEnd { return nil }
func (d *errorBackendQueue) GetQueueCurMemRead() BackendQueueEnd { return nil }
func (d *errorBackendQueue) UpdateBackendQueueEnd(BackendQueueEnd) {}
func (d *errorBackendQueue) TryReadOne() (*ReadResult, bool) { return nil, false }
func (d *errorBackendQueue) Confirm(start int64, end int64, endCnt int64) bool { return true }

type errorRecoveredBackendQueue struct{ errorBackendQueue }
71 changes: 67 additions & 4 deletions nsqd/channel.go
Expand Up @@ -80,6 +80,8 @@ type Channel struct {
dtPreMessages map[MessageID]*Message
dtPrePQ dtPrePqueue
dtPreMutex sync.Mutex
// There is no need to lock pgSize in the current scenario
pqSize int

waitGroup util.WaitGroupWrapper

Expand Down Expand Up @@ -218,7 +220,9 @@ func (c *Channel) handleMsg(data ReadResult) error {
}

func (c *Channel) initPQ() {
pqSize := int(math.Max(1, float64(c.ctx.nsqd.getOpts().MemQueueSize)/10))
pqSize := int(math.Max(1, float64(c.ctx.nsqd.getOpts().MemQueueSize)))

c.pqSize = pqSize

c.inFlightMutex.Lock()
c.inFlightMessages = make(map[MessageID]*Message)
Expand Down Expand Up @@ -378,6 +382,7 @@ func (c *Channel) IsPaused() bool {
}

// PutMessage writes a Message to the queue
/*
func (c *Channel) PutMessage(m *Message) error {
c.RLock()
defer c.RUnlock()
Expand All @@ -390,12 +395,39 @@ func (c *Channel) PutMessage(m *Message) error {
}
atomic.AddUint64(&c.messageCount, 1)
return nil
}*/

/*
type putActionError struct {
channel *Channel
}
func NewPutAction(channel *Channel) *putActionError {
return &putActionError{channel: channel}
}
func (p *putActionError) Error() string {
c := p.channel
c.RLock()
clientLen := len(c.clients)
c.RUnlock()
return fmt.Sprintf("(%s:%s) put client failed, len(clients) == %d", c.topicName, c.name, clientLen)
}*/

// TODO SetHealth(nil)
func (c *Channel) put(m *Message) error {
select {
case c.clientMsgChan <- m:
c.ctx.nsqd.SetHealth(nil)
atomic.AddUint64(&c.messageCount, 1)
case <-c.exitChan:
default:
c.RLock()
clientLen := len(c.clients)
c.RUnlock()
err := fmt.Errorf("(%s:%s) put client failed, len(clients) == %d", c.topicName, c.name, clientLen)
c.ctx.nsqd.SetHealth(err)
return err
}
return nil
}
Expand Down Expand Up @@ -745,14 +777,39 @@ func (c *Channel) processDeferredQueue(t int64) bool {
if err != nil {
goto exit
}
c.put(msg)
if err := c.put(msg); err != nil {
c.StartDeferredTimeout(msg, c.ctx.nsqd.getOpts().MsgTimeout)
}
}

exit:
return dirty
}

func (c *Channel) tryReadOne() (*ReadResult, bool) {
c.RLock()
clientLen := len(c.clients)
c.RUnlock()
if clientLen == 0 {
return nil, false
}

c.inFlightMutex.Lock()
inFlightLen := len(c.inFlightMessages)
c.inFlightMutex.Unlock()

c.deferredMutex.Lock()
deferredLen := len(c.deferredMessages)
c.deferredMutex.Unlock()

c.dtPreMutex.Lock()
dtPreLen := len(c.dtPreMessages)
c.dtPreMutex.Unlock()

if inFlightLen >= c.pqSize || deferredLen >= c.pqSize || dtPreLen >= c.pqSize {
return nil, false
}

return c.backend.TryReadOne()
}

Expand All @@ -766,9 +823,13 @@ func (c *Channel) processInFlightQueue(t int64) bool {

dirty := false
for {
if len(c.clients) == 0 {
c.RLock()
clientLen := len(c.clients)
c.RUnlock()
if clientLen == 0 {
goto exit
}

c.inFlightMutex.Lock()
msg, _ := c.inFlightPQ.PeekAndShift(t)
c.inFlightMutex.Unlock()
Expand All @@ -789,7 +850,9 @@ func (c *Channel) processInFlightQueue(t int64) bool {
if ok {
client.TimedOutMessage()
}
c.put(msg)
if err := c.put(msg); err != nil {
c.StartInFlightTimeout(msg, msg.clientID, c.ctx.nsqd.getOpts().MsgTimeout)
}
}

exit:
Expand Down

0 comments on commit 94081c8

Please sign in to comment.