Skip to content

Commit

Permalink
Add some document
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Aug 19, 2019
1 parent 096843b commit 6eb1cb1
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 17 deletions.
7 changes: 4 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ package client

import (
"fmt"
"github.com/pkg/errors"
"time"

"github.com/pkg/errors"

"github.com/buraksezer/olric"
"github.com/buraksezer/olric/internal/protocol"
"github.com/buraksezer/olric/internal/transport"
Expand All @@ -34,7 +35,7 @@ type Client struct {
// Config includes configuration parameters for the Client.
type Config struct {
Addrs []string
Serializer olric.Serializer
Serializer olric.Serializer
DialTimeout time.Duration
KeepAlive time.Duration
MaxConn int
Expand Down Expand Up @@ -90,7 +91,7 @@ func checkStatusCode(resp *protocol.Message) error {
case resp.Status == protocol.StatusOK:
return nil
case resp.Status == protocol.StatusInternalServerError:
return errors.Wrap(ErrInternalServerError, string(resp.Value))
return errors.Wrap(olric.ErrInternalServerError, string(resp.Value))
case resp.Status == protocol.StatusNoSuchLock:
return olric.ErrNoSuchLock
case resp.Status == protocol.StatusKeyNotFound:
Expand Down
33 changes: 21 additions & 12 deletions client/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,39 @@
// See the License for the specific language governing permissions and
// limitations under the License.

/*Package pipeline implements pipelining for Olric Binary Protocol. It enables to send multiple
commands to the server without waiting for the replies at all, and finally read the replies
in a single step.*/
package client

import (
"bytes"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"io"
"sync"
"time"

"github.com/buraksezer/olric"
"github.com/hashicorp/go-multierror"

"github.com/buraksezer/olric/internal/protocol"
)

var ErrInternalServerError = errors.New("internal server error")

// Pipeline implements pipelining feature for Olric Binary Protocol.
// It enables to send multiple commands to the server without
// waiting for the replies at all, and finally read the replies
// in a single step. All methods are thread-safe. So you can call them in
// different goroutines safely.
type Pipeline struct {
c *Client
m sync.Mutex
buf *bytes.Buffer
serializer olric.Serializer
c *Client
m sync.Mutex
buf *bytes.Buffer
}

// NewPipeline returns a new Pipeline.
func (c *Client) NewPipeline() *Pipeline {
return &Pipeline{
c: c,
buf: new(bytes.Buffer),
}
}

// Put appends a Put command to the underlying buffer with the given parameters.
func (p *Pipeline) Put(dmap, key string, value interface{}) error {
p.m.Lock()
defer p.m.Unlock()
Expand All @@ -66,6 +65,7 @@ func (p *Pipeline) Put(dmap, key string, value interface{}) error {
return m.Write(p.buf)
}

// PutEx appends a PutEx command to the underlying buffer with the given parameters.
func (p *Pipeline) PutEx(dmap, key string, value interface{}, timeout time.Duration) error {
p.m.Lock()
defer p.m.Unlock()
Expand All @@ -87,6 +87,7 @@ func (p *Pipeline) PutEx(dmap, key string, value interface{}, timeout time.Durat
return m.Write(p.buf)
}

// Get appends a Get command to the underlying buffer with the given parameters.
func (p *Pipeline) Get(dmap, key string) error {
p.m.Lock()
defer p.m.Unlock()
Expand All @@ -102,6 +103,7 @@ func (p *Pipeline) Get(dmap, key string) error {
return m.Write(p.buf)
}

// Delete appends a Delete command to the underlying buffer with the given parameters.
func (p *Pipeline) Delete(dmap, key string) error {
p.m.Lock()
defer p.m.Unlock()
Expand Down Expand Up @@ -137,14 +139,17 @@ func (p *Pipeline) incrOrDecr(opcode protocol.OpCode, dmap, key string, delta in
return m.Write(p.buf)
}

// Incr appends an Incr command to the underlying buffer with the given parameters.
func (p *Pipeline) Incr(dmap, key string, delta int) error {
return p.incrOrDecr(protocol.OpIncr, dmap, key, delta)
}

// Decr appends a Decr command to the underlying buffer with the given parameters.
func (p *Pipeline) Decr(dmap, key string, delta int) error {
return p.incrOrDecr(protocol.OpDecr, dmap, key, delta)
}

// GetPut appends a GetPut command to the underlying buffer with the given parameters.
func (p *Pipeline) GetPut(dmap, key string, value interface{}) error {
p.m.Lock()
defer p.m.Unlock()
Expand All @@ -165,6 +170,7 @@ func (p *Pipeline) GetPut(dmap, key string, value interface{}) error {
return m.Write(p.buf)
}

// Destroy appends a Destroy command to the underlying buffer with the given parameters.
func (p *Pipeline) Destroy(dmap string) error {
p.m.Lock()
defer p.m.Unlock()
Expand All @@ -179,6 +185,7 @@ func (p *Pipeline) Destroy(dmap string) error {
return m.Write(p.buf)
}

// LockWithTimeout appends a LockWithTimeout command to the underlying buffer with the given parameters.
func (p *Pipeline) LockWithTimeout(dmap, key string, timeout time.Duration) error {
p.m.Lock()
defer p.m.Unlock()
Expand All @@ -195,6 +202,7 @@ func (p *Pipeline) LockWithTimeout(dmap, key string, timeout time.Duration) erro
return m.Write(p.buf)
}

// Unlock appends an Unlock command to the underlying buffer with the given parameters.
func (p *Pipeline) Unlock(dmap, key string) error {
p.m.Lock()
defer p.m.Unlock()
Expand All @@ -210,6 +218,7 @@ func (p *Pipeline) Unlock(dmap, key string) error {
return m.Write(p.buf)
}

// Flush flushes all the commands to the server using a single write call.
func (p *Pipeline) Flush() ([]PipelineResponse, error) {
p.m.Lock()
defer p.m.Unlock()
Expand Down
5 changes: 3 additions & 2 deletions client/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ package client

import (
"context"
"github.com/buraksezer/olric"
"github.com/buraksezer/olric/internal/protocol"
"strconv"
"testing"
"time"

"github.com/buraksezer/olric"
"github.com/buraksezer/olric/internal/protocol"
)

func TestPipeline_Put(t *testing.T) {
Expand Down

0 comments on commit 6eb1cb1

Please sign in to comment.