Skip to content

Commit

Permalink
Tests for streams
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Jun 13, 2020
1 parent 1add026 commit 33976be
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 14 deletions.
2 changes: 1 addition & 1 deletion internal/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ const (
OpDTopicRemoveListener
OpCreateStream
OpStreamCreated
OpCloseStream
OpStreamClosed
)

type StatusCode uint8
Expand Down
1 change: 0 additions & 1 deletion internal/transport/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"time"

"github.com/buraksezer/olric/internal/flog"

"github.com/buraksezer/olric/internal/protocol"
multierror "github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
Expand Down
1 change: 1 addition & 0 deletions internal/transport/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func readFromStream(conn io.Reader, bufCh chan<- *protocol.Message, errCh chan<-
}
}

// CreateStream creates a new Stream connection which provides a bidirectional communication channel between Olric nodes and clients.
func (c *Client) CreateStream(ctx context.Context, addr string, read chan<- *protocol.Message, write <-chan *protocol.Message) error {
cpool, err := c.getPool(addr)
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions olric.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ func New(c *config.Config) (*Olric, error) {
server: transport.NewServer(c.BindAddr, c.BindPort, c.KeepAlivePeriod, flogger),
members: members{m: make(map[uint64]discovery.Member)},
dtopic: newDTopic(ctx),
streams: &streams{m: make(map[uint64]*stream)},
started: c.Started,
}

Expand Down Expand Up @@ -640,6 +641,16 @@ func (db *Olric) Shutdown(ctx context.Context) error {
db.cancel()

var result error

db.streams.mu.RLock()
db.log.V(2).Printf("[INFO] Closing active streams")
for streamID, str := range db.streams.m {
if err := str.close(); err != nil {
result = multierror.Append(result, fmt.Errorf("streamID: %d: %w", streamID, err))
}
}
db.streams.mu.RUnlock()

if err := db.server.Shutdown(ctx); err != nil {
result = multierror.Append(result, err)
}
Expand Down
39 changes: 34 additions & 5 deletions stream.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,56 @@
// Copyright 2018-2020 Burak Sezer
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package olric

import (
"context"
"github.com/buraksezer/olric/internal/protocol"
"fmt"
"io"
"math/rand"
"sync"

"github.com/buraksezer/olric/internal/protocol"
)

// streams maps StreamIDs to streams
type streams struct {
mu sync.RWMutex

m map[uint64]*stream
}

// streams provides a bidirectional communication channel between Olric nodes and clients. It can also be used
// for node-to-node communication.
type stream struct {
read chan *protocol.Message
write chan *protocol.Message
ctx context.Context
cancel context.CancelFunc
}

func (s *stream ) close() {
s.cancel()
// close sends OpStreamClosed command to other side of the channel and cancels underlying context.
func (s *stream) close() error {
defer s.cancel()

req := protocol.NewRequest(protocol.OpStreamClosed)
select {
case s.write <- req:
return nil
default:
}
return fmt.Errorf("impossible to send StreamClosed message: channel busy")
}

func (db *Olric) readFromStream(conn io.Reader, bufCh chan<- *protocol.Message, errCh chan<- error) {
Expand All @@ -45,10 +73,10 @@ func (db *Olric) createStreamOperation(req *protocol.Message) *protocol.Message
return req.Error(protocol.StatusInternalServerError, err)
}

// Now, we have a TCP socket here.
streamID := rand.Uint64()
ctx, cancel := context.WithCancel(context.Background())
db.streams.mu.Lock()
// TODO: Direction of channels
s := &stream{
read: make(chan *protocol.Message, 1),
write: make(chan *protocol.Message, 1),
Expand Down Expand Up @@ -78,8 +106,10 @@ loop:
for {
select {
case <-ctx.Done():
// close method is called
break loop
case <-db.ctx.Done():
// server is gone
break loop
case msg := <-s.write:
err = msg.Write(conn)
Expand All @@ -90,6 +120,5 @@ loop:
s.read <- buf
}
}

return req.Success()
}
93 changes: 86 additions & 7 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
package olric

import (
"bytes"
"context"
"fmt"
"testing"
"time"

"github.com/buraksezer/olric/internal/protocol"
)

func TestStream_OpenStream(t *testing.T) {
func TestStream_CreateStream(t *testing.T) {
db, err := newDB(testSingleReplicaConfig())
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
Expand All @@ -45,14 +46,92 @@ func TestStream_OpenStream(t *testing.T) {
}
}()

loop:
for {
select {
case msg := <-readCh:
fmt.Println(msg.DMap)
fmt.Println(msg.Key)
fmt.Println(string(msg.Value))
case msg := <-writeCh:
fmt.Println(msg)
if msg.Op != protocol.OpStreamCreated {
t.Fatalf("Expected OpCode %d: Got: %d", protocol.OpStreamCreated, msg.Op)
}

streamID := msg.Extra.(protocol.StreamCreatedExtra).StreamID
db.streams.mu.RLock()
_, ok := db.streams.m[streamID]
db.streams.mu.RUnlock()
if !ok {
t.Fatalf("StreamID is invalid: %d", streamID)
}
// Everything is OK
break loop
case <-time.After(5 * time.Second):
t.Fatalf("No message received in 5 seconds")
}
}
}

func TestStream_EchoMessage(t *testing.T) {
db, err := newDB(testSingleReplicaConfig())
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
defer func() {
err = db.Shutdown(context.Background())
if err != nil {
db.log.V(2).Printf("[ERROR] Failed to shutdown Olric: %v", err)
}
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

readCh := make(chan *protocol.Message, 1)
writeCh := make(chan *protocol.Message, 1)
go func() {
err = db.client.CreateStream(ctx, db.this.String(), readCh, writeCh)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
}()

f := func(m *protocol.Message) {
streamID := m.Extra.(protocol.StreamCreatedExtra).StreamID
db.streams.mu.RLock()
s, _ := db.streams.m[streamID]
db.streams.mu.RUnlock()

echoed := <-s.read
s.write <- echoed
}

loop:
for {
select {
case msg := <-readCh:
if msg.Op == protocol.OpStreamCreated {
go f(msg)
// Stream is created. Now, we are able to do write or read on this bidirectional channel.
//
// Send a test message
req := protocol.NewRequest(protocol.OpPut)
req.DMap = "echo-test-dmap"
req.Key = "echo-test-key"
req.Value = []byte("echo-test-value")
writeCh <- req
} else if msg.Op == protocol.OpPut {
if msg.DMap != "echo-test-dmap" {
t.Fatalf("Expected msg.DMap: echo-test-dmap. Got: %s", msg.DMap)
}
if msg.Key != "echo-test-key" {
t.Fatalf("Expected msg.Key: echo-test-key. Got: %s", msg.Key)
}
if bytes.Equal(msg.Value, []byte("echo-test-dmap")) {
t.Fatalf("Expected msg.Value: echo-test-value. Got: %s", string(msg.Value))
}
break loop
} else {
t.Fatalf("Invalid message received: %d", msg.Op)
}
case <-time.After(5 * time.Second):
t.Fatalf("No message received in 5 seconds")
}
}
}

0 comments on commit 33976be

Please sign in to comment.