Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fanout queues suppport #25

Merged
merged 1 commit into from
Nov 8, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.5.1

- Fanout queues support.<br>
`set <queue>+<another_queue>+<third_queue> ...` adds an item to multiple queues.

## 0.5

- Add durable cursors. An ability to consume queue multiple times
Expand Down
33 changes: 17 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,24 @@ We used to use Darner before, but got 2 large production queues corrupted at som

## Features

Multiple consumer groups per queue using `get <queue>:<cursor>` syntax.
1. Multiple consumer groups per queue using `get <queue>:<cursor>` syntax.

- When you read an item in a usual way: `get <queue>`, item gets expired and deleted.
- When you read an item using cursor syntax `get <queue>:<cursor>`, a durable
cursor gets initialized. It shifts forward with every read without deleting
any messages in the source queue. Number of cursors per queue is not limited.
- If you continue reads from the source queue directly, siberite will continue
deleting messages from the head of that queue. Any existing cursor that is
internally points to an already deleted message will catch up during next read
and will start serving messages from the current source queue head.
- Durable cursors are also support two-phase reliable reads. All failed reliable
reads for each cursor are stored in cursor's own small persistent queue.

2. Fanout queues

- Siberite allows you to insert new message into multiple queues at once
by using the following syntax `set <queue>+<another_queue>+<third_queue> ...`

1. When you read an item in a usual way: `get <queue>`, an item gets expired and deleted.
2. When you read an item using cursor `get <queue>:<cursor>`, a durable cursor gets initialized.
With every read it shifts forward without deleting any messages in the source queue.
So the source queue remains untouched. Number of cursors per queue is not limited.
3. If you continue reads from the source queue directly, siberite will continue
deleting messages from the head of that queue. Any existing cursor that is
internally points to already deleted message will catch up during next read
and will start serving messages from the current source queue head.
4. Durable cursors also support two-phase reliable reads. All failed reliable
reads for each cursor are stored in their own small persistent queues and get
served first.


##Benchmarks
Expand Down Expand Up @@ -124,10 +129,6 @@ END
# flush_all
```

## TODO

- Add fanout queues support


## Not supported

Expand Down
1 change: 1 addition & 0 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Command struct {
QueueName string
SubCommand string
ConsumerGroup string
FanoutQueues []string
DataSize int
}

Expand Down
60 changes: 44 additions & 16 deletions controller/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"log"
"strconv"
"strings"
"sync/atomic"
)

Expand All @@ -14,32 +15,32 @@ import (
// <data block>
// Response: STORED
func (c *Controller) Set(input []string) error {
if len(input) < 5 || len(input) > 6 {
return &Error{"CLIENT_ERROR", "Invalid input"}
}

totalBytes, err := strconv.Atoi(input[4])
cmd, err := parseSetCommand(input)
if err != nil {
return &Error{"CLIENT_ERROR", "Invalid <bytes> number"}
return NewError("CLIENT_ERROR", err)
}

cmd := &Command{Name: input[0], QueueName: input[1], DataSize: totalBytes}

dataBlock, err := c.readDataBlock(cmd.DataSize)
if err != nil {
return NewError("CLIENT_ERROR", err)
}

q, err := c.repo.GetQueue(cmd.QueueName)
if err != nil {
log.Println(cmd, err)
return NewError("ERROR", err)
if cmd.FanoutQueues == nil {
err = c.storeDataBlock(cmd.QueueName, dataBlock)
if err != nil {
log.Println(cmd, err)
return NewError("ERROR", err)
}
} else {
for _, queueName := range cmd.FanoutQueues {
err = c.storeDataBlock(queueName, dataBlock)
if err != nil {
log.Println(cmd, err)
return NewError("ERROR", err)
}
}
}

err = q.Enqueue([]byte(dataBlock))
if err != nil {
return NewError("ERROR", err)
}
fmt.Fprint(c.rw.Writer, "STORED\r\n")
c.rw.Writer.Flush()
atomic.AddUint64(&c.repo.Stats.CmdSet, 1)
Expand All @@ -60,3 +61,30 @@ func (c *Controller) readDataBlock(totalBytes int) ([]byte, error) {

return dataBlock[:totalBytes], nil
}

func (c *Controller) storeDataBlock(queueName string, dataBlock []byte) error {
q, err := c.repo.GetQueue(queueName)
if err != nil {
return err
}
return q.Enqueue([]byte(dataBlock))
}

func parseSetCommand(input []string) (*Command, error) {
if len(input) < 5 || len(input) > 6 {
return nil, errors.New("Invalid input")
}

totalBytes, err := strconv.Atoi(input[4])
if err != nil {
return nil, errors.New("Invalid <bytes> number")
}

cmd := &Command{Name: input[0], QueueName: input[1], DataSize: totalBytes}

if strings.Contains(cmd.QueueName, "+") {
cmd.FanoutQueues = strings.Split(cmd.QueueName, "+")
cmd.QueueName = cmd.FanoutQueues[0]
}
return cmd, nil
}
29 changes: 28 additions & 1 deletion controller/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"fmt"
"strings"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -15,7 +16,7 @@ func Test_Controller_Set(t *testing.T) {
fmt.Fprintf(&mockTCPConn.ReadBuffer, "0123567890\r\n")

err = controller.Set(command)
assert.Nil(t, err)
assert.NoError(t, err)
assert.Equal(t, "STORED\r\n", mockTCPConn.WriteBuffer.String())

mockTCPConn.WriteBuffer.Reset()
Expand All @@ -42,3 +43,29 @@ func Test_Controller_Set(t *testing.T) {
err = controller.Set(command)
assert.Equal(t, "CLIENT_ERROR bad data chunk", err.Error())
}

func Test_Controller_SetFanout(t *testing.T) {
repo, controller, mockTCPConn := setupControllerTest(t, 0)
defer cleanupControllerTest(repo)

queueNames := []string{"test", "fanout_test", "1", "2"}

command := []string{"set", strings.Join(queueNames, "+"), "0", "0", "10"}
fmt.Fprintf(&mockTCPConn.ReadBuffer, "0123456789\r\n")

err = controller.Set(command)
assert.NoError(t, err)
assert.Equal(t, "STORED\r\n", mockTCPConn.WriteBuffer.String())

for _, queueName := range queueNames {
q, err := repo.GetQueue(queueName)
assert.NoError(t, err)

assert.EqualValues(t, 1, q.Length())

value, err := q.GetNext()
assert.NoError(t, err)
assert.Equal(t, "0123456789", string(value))
assert.True(t, q.IsEmpty())
}
}
2 changes: 1 addition & 1 deletion repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

// Version represents siberite version
const Version = "siberite-0.5"
const Version = "siberite-0.5.1"

// QueueRepository represents a repository of queues
type QueueRepository struct {
Expand Down