Skip to content

Commit

Permalink
Move pipeline to client package
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Aug 16, 2019
1 parent 754728d commit 3c3ce37
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 104 deletions.
9 changes: 5 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Client struct {
// Config includes configuration parameters for the Client.
type Config struct {
Addrs []string
Serializer olric.Serializer
DialTimeout time.Duration
KeepAlive time.Duration
MaxConn int
Expand All @@ -45,15 +46,15 @@ type DMap struct {
}

// New returns a new Client object. The second parameter is serializer, it can be nil.
func New(c *Config, s olric.Serializer) (*Client, error) {
func New(c *Config) (*Client, error) {
if c == nil {
return nil, fmt.Errorf("config cannot be nil")
}
if len(c.Addrs) == 0 {
return nil, fmt.Errorf("addrs list cannot be empty")
}
if s == nil {
s = olric.NewGobSerializer()
if c.Serializer == nil {
c.Serializer = olric.NewGobSerializer()
}
if c.MaxConn == 0 {
c.MaxConn = 1
Expand All @@ -66,7 +67,7 @@ func New(c *Config, s olric.Serializer) (*Client, error) {
}
return &Client{
client: transport.NewClient(cc),
serializer: s,
serializer: c.Serializer,
}, nil
}

Expand Down
20 changes: 10 additions & 10 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestClient_Get(t *testing.T) {
<-done
}()

c, err := New(testConfig, nil)
c, err := New(testConfig)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestClient_Put(t *testing.T) {
<-done
}()

c, err := New(testConfig, nil)
c, err := New(testConfig)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestClient_PutEx(t *testing.T) {
<-done
}()

c, err := New(testConfig, nil)
c, err := New(testConfig)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestClient_Delete(t *testing.T) {
<-done
}()

c, err := New(testConfig, nil)
c, err := New(testConfig)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -227,7 +227,7 @@ func TestClient_LockWithTimeout(t *testing.T) {
<-done
}()

c, err := New(testConfig, nil)
c, err := New(testConfig)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestClient_Unlock(t *testing.T) {
<-done
}()

c, err := New(testConfig, nil)
c, err := New(testConfig)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -300,7 +300,7 @@ func TestClient_Destroy(t *testing.T) {
<-done
}()

c, err := New(testConfig, nil)
c, err := New(testConfig)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -338,7 +338,7 @@ func TestClient_Incr(t *testing.T) {
<-done
}()

c, err := New(testConfig, nil)
c, err := New(testConfig)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -393,7 +393,7 @@ func TestClient_Decr(t *testing.T) {
<-done
}()

c, err := New(testConfig, nil)
c, err := New(testConfig)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -448,7 +448,7 @@ func TestClient_GetPut(t *testing.T) {
<-done
}()

c, err := New(testConfig, nil)
c, err := New(testConfig)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down
75 changes: 36 additions & 39 deletions pipeline/pipeline.go → client/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
/*Package pipeline implements pipelining for Olric Binary Protocol. It enables to send multiple
commands to the server without waiting for the replies at all, and finally read the replies
in a single step.*/
package pipeline
package client

import (
"bytes"
"fmt"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"io"
Expand All @@ -29,53 +28,29 @@ import (
"github.com/buraksezer/olric"

"github.com/buraksezer/olric/internal/protocol"
"github.com/buraksezer/olric/internal/transport"
)

var ErrInternalServerError = errors.New("internal server error")

type Pipeline struct {
c *Client
m sync.Mutex
buf *bytes.Buffer
client *transport.Client
serializer olric.Serializer
}

type Config struct {
Addr string
DialTimeout time.Duration
KeepAlive time.Duration
Serializer olric.Serializer
}

func New(c *Config) (*Pipeline, error) {
if c == nil {
return nil, fmt.Errorf("config cannot be nil")
}
if len(c.Addr) == 0 {
return nil, fmt.Errorf("Addr cannot be empty")
}
if c.Serializer == nil {
c.Serializer = olric.NewGobSerializer()
}

cc := &transport.ClientConfig{
Addrs: []string{c.Addr},
DialTimeout: c.DialTimeout,
KeepAlive: c.KeepAlive,
}
func (c *Client) NewPipeline() *Pipeline {
return &Pipeline{
buf: new(bytes.Buffer),
client: transport.NewClient(cc),
serializer: c.Serializer,
}, nil
c: c,
buf: new(bytes.Buffer),
}
}

func (p *Pipeline) Put(dmap, key string, value interface{}) error {
p.m.Lock()
defer p.m.Unlock()

data, err := p.serializer.Marshal(value)
data, err := p.c.serializer.Marshal(value)
if err != nil {
return err
}
Expand All @@ -95,7 +70,7 @@ func (p *Pipeline) PutEx(dmap, key string, value interface{}, timeout time.Durat
p.m.Lock()
defer p.m.Unlock()

data, err := p.serializer.Marshal(value)
data, err := p.c.serializer.Marshal(value)
if err != nil {
return err
}
Expand All @@ -121,15 +96,15 @@ func (p *Pipeline) Get(dmap, key string) error {
Magic: protocol.MagicReq,
Op: protocol.OpGet,
},
DMap: dmap,
Key: key,
DMap: dmap,
Key: key,
}
return m.Write(p.buf)
}

type PipelineResponse struct {
serializer olric.Serializer
response protocol.Message
response protocol.Message
}

func (pr *PipelineResponse) Operation() string {
Expand All @@ -144,9 +119,31 @@ func (pr *PipelineResponse) Operation() string {
return "Delete"
case pr.response.Op == protocol.OpIncr:
return "Incr"
case pr.response.Op == protocol.OpDecr:
return "Decr"
case pr.response.Op == protocol.OpGetPut:
return "GetPut"
case pr.response.Op == protocol.OpLockWithTimeout:
return "LockWithTimeout"
case pr.response.Op == protocol.OpUnlock:
return "Unlock"
case pr.response.Op == protocol.OpDestroy:
return "Destroy"
default:
return "unknown"

}
/*
OpPutEx
OpGet
OpDelete
OpDestroy
OpLockWithTimeout
OpUnlock
OpIncr
OpDecr
OpGetPut
*/
}

func (pr *PipelineResponse) Get() (interface{}, error) {
Expand Down Expand Up @@ -175,7 +172,7 @@ func (p *Pipeline) Flush() ([]PipelineResponse, error) {
req := &protocol.Message{
Value: p.buf.Bytes(),
}
resp, err := p.client.Request(protocol.OpPipeline, req)
resp, err := p.c.client.Request(protocol.OpPipeline, req)
if err != nil {
return nil, err
}
Expand All @@ -194,8 +191,8 @@ func (p *Pipeline) Flush() ([]PipelineResponse, error) {
continue
}
pr := PipelineResponse{
serializer: p.serializer,
response: pres,
serializer: p.c.serializer,
response: pres,
}
responses = append(responses, pr)
}
Expand Down
62 changes: 11 additions & 51 deletions pipeline/pipeline_test.go → client/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,64 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package pipeline
package client

import (
"context"
"github.com/buraksezer/olric/internal/protocol"
"log"
"net"
"strconv"
"testing"
"time"

"github.com/buraksezer/olric"
)

var testConfig = &Config{
DialTimeout: time.Second,
KeepAlive: time.Second,
}

func getFreePort() (int, error) {
addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
if err != nil {
return 0, err
}

l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, err
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port, nil
}

func newOlric() (*olric.Olric, chan struct{}, error) {
port, err := getFreePort()
if err != nil {
return nil, nil, err
}
addr := "127.0.0.1:" + strconv.Itoa(port)
cfg := &olric.Config{Name: addr}
db, err := olric.New(cfg)
if err != nil {
return nil, nil, err
}

done := make(chan struct{})
go func() {
rerr := db.Start()
if rerr != nil {
log.Printf("[ERROR] Expected nil. Got %v", rerr)
}
close(done)
}()
time.Sleep(100 * time.Millisecond)
testConfig.Addr = addr
return db, done, nil
}

func TestPipeline_Put(t *testing.T) {
db, done, err := newOlric()
if err != nil {
Expand All @@ -83,10 +35,13 @@ func TestPipeline_Put(t *testing.T) {
<-done
}()

p, err := New(testConfig)
c, err := New(testConfig)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
p := c.NewPipeline()

// Put some keys
dmap := "mydmap"
for i := 0; i < 10; i++ {
key := "key-" + strconv.Itoa(i)
Expand All @@ -95,10 +50,13 @@ func TestPipeline_Put(t *testing.T) {
t.Fatalf("Expected nil. Got: %v", err)
}
}

// Flush them
responses, err := p.Flush()
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
// Read responses
for _, res := range responses {
if res.response.Op != protocol.OpPut {
t.Fatalf("Expected Op: %v. Got: %v", protocol.OpPut, res.response.Op)
Expand All @@ -122,10 +80,12 @@ func TestPipeline_Get(t *testing.T) {
<-done
}()

p, err := New(testConfig)
c, err := New(testConfig)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
p := c.NewPipeline()

dmap := "mydmap"
key := "key-" + strconv.Itoa(1)

Expand Down

0 comments on commit 3c3ce37

Please sign in to comment.