Skip to content

Commit

Permalink
Add pipeline documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Aug 22, 2019
1 parent 6eb1cb1 commit b74e793
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 1 deletion.
48 changes: 48 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ This project is a work in progress. The implementation is incomplete. The docume
* [Incr](#incr)
* [Decr](#decr)
* [GetPut](#getput)
* [Pipelining](#pipelining)
* [Serialization](#serialization)
* [Golang Client](#golang-client)
* [Standalone Server](#standalone-server)
Expand Down Expand Up @@ -234,6 +235,53 @@ methods concurrently on the cluster, Put/PutEx calls may set new values to the D
err := dm.Destroy()
```

### Pipelining
Olric Binary Protocol(OBP) supports pipelining. All protocol commands can be pushed to a remote Olric server through a pipeline in a single write call.
A sample use looks like the following:
```go
// Create an ordinary Olric client, not Olric node!
// ...
// Create a new pipe and call on it whatever you want.
pipe := client.NewPipeline()
for i := 0; i < 10; i++ {
key := "key-" + strconv.Itoa(i)
err := pipe.Put("mydmap", key, i)
if err != nil {
fmt.Println("returned an error: ", err)
}
}

for i := 0; i < 10; i++ {
key := "key-" + strconv.Itoa(i)
err := pipe.Get("mydmap", key)
if err != nil {
fmt.Println("returned an error: ", err)
}
}

// Flush messages to the server.
responses, err := pipe.Flush()
if err != nil {
fmt.Println("returned an error: ", err)
}

// Read responses from the pipeline.
for _, resp := range responses {
if resp.Operation() == "Get" {
val, err := resp.Get()
if err != nil {
fmt.Println("returned an error: ", err)
}
fmt.Println("Get response: ", val)
}
}
```

There is no hard-limit on message count in a pipeline. You should set a convenient `KeepAlive` for large pipelines.
Otherwise you can get a timeout error.

The `Flush` method returns errors along with success messages. Furhermore, you need to know the command order to match responses with requests.

## Configuration

[memberlist configuration](https://godoc.org/github.com/hashicorp/memberlist#Config) can be tricky and and the default configuration set should be tuned for your environment. A detailed deployment and configuration guide will be prepared before stable release.
Expand Down
3 changes: 2 additions & 1 deletion client/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func (p *Pipeline) Flush() ([]PipelineResponse, error) {
return nil, err
}

// Read the pipelined messages from pipeline response.
conn := bytes.NewBuffer(resp.Value)
var responses []PipelineResponse
var resErr error
Expand All @@ -251,5 +252,5 @@ func (p *Pipeline) Flush() ([]PipelineResponse, error) {
}
responses = append(responses, pr)
}
return responses, nil
return responses, resErr
}
26 changes: 26 additions & 0 deletions client/pipeline_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ package client

import "github.com/buraksezer/olric/internal/protocol"

// PipelineResponse implements response readers for pipelined requests.
type PipelineResponse struct {
*Client
response protocol.Message
}

// Operation returns the current operation name.
func (pr *PipelineResponse) Operation() string {
switch {
case pr.response.Op == protocol.OpPut:
Expand Down Expand Up @@ -48,42 +50,66 @@ func (pr *PipelineResponse) Operation() string {
}
}

// Get returns the value for the requested key. It returns ErrKeyNotFound if the DB does not contains the key.
// It's thread-safe. It is safe to modify the contents of the returned value.
// It is safe to modify the contents of the argument after Get returns.
func (pr *PipelineResponse) Get() (interface{}, error) {
return pr.processGetResponse(&pr.response)
}

// Put sets the value for the requested key. It overwrites any previous value for that key and
// it's thread-safe. It is safe to modify the contents of the arguments after Put returns but not before.
func (pr *PipelineResponse) Put() error {
return checkStatusCode(&pr.response)
}

// PutEx sets the value for the given key with TTL. It overwrites any previous value for that key.
// It's thread-safe. It is safe to modify the contents of the arguments after Put returns but not before.
func (pr *PipelineResponse) PutEx() error {
return checkStatusCode(&pr.response)
}

// Delete deletes the value for the given key. Delete will not return error if key doesn't exist.
// It's thread-safe. It is safe to modify the contents of the argument after Delete returns.
func (pr *PipelineResponse) Delete() error {
return checkStatusCode(&pr.response)
}

// Incr atomically increments key by delta. The return value is the new value after being incremented or an error.
func (pr *PipelineResponse) Incr() (int, error) {
return pr.processIncrDecrResponse(&pr.response)
}

// Decr atomically decrements key by delta. The return value is the new value after being decremented or an error.
func (pr *PipelineResponse) Decr() (int, error) {
return pr.processIncrDecrResponse(&pr.response)
}

// GetPut atomically sets key to value and returns the old value stored at key.
func (pr *PipelineResponse) GetPut() (interface{}, error) {
return pr.processGetPutResponse(&pr.response)
}

// Destroy flushes the given DMap on the cluster. You should know that there is no global lock on DMaps.
// So if you call Put/PutEx and Destroy methods concurrently on the cluster, Put/PutEx calls may set
// new values to the DMap.
func (pr *PipelineResponse) Destroy() error {
return checkStatusCode(&pr.response)
}

// LockWithTimeout sets a lock for the given key. If the lock is still unreleased the end of given period of time,
// it automatically releases the lock. Acquired lock is only for the key in this map. Please note that, before
// setting a lock for a key, you should set the key with Put method. Otherwise it returns olric.ErrKeyNotFound error.
//
// It returns immediately if it acquires the lock for the given key. Otherwise, it waits until timeout.
//
// You should know that the locks are approximate, and only to be used for non-critical purposes.
func (pr *PipelineResponse) LockWithTimeout() error {
return checkStatusCode(&pr.response)
}

// Unlock releases an acquired lock for the given key. It returns olric.ErrNoSuchLock if there is no lock
// for the given key.
func (pr *PipelineResponse) Unlock() error {
return checkStatusCode(&pr.response)
}
7 changes: 7 additions & 0 deletions pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ import (
func (db *Olric) pipelineOperation(req *protocol.Message) *protocol.Message {
conn := bytes.NewBuffer(req.Value)
response := &bytes.Buffer{}
// Read the pipelined messages into an in-memory buffer.
for {
var preq protocol.Message
err := preq.Read(conn)
if err == io.EOF {
// It's done. The last message has been read.
break
}

// Return an error message in pipelined response.
if err != nil {
err = preq.Error(protocol.StatusInternalServerError, err).Write(response)
if err != nil {
Expand All @@ -45,12 +49,15 @@ func (db *Olric) pipelineOperation(req *protocol.Message) *protocol.Message {
continue
}

// Call its function to prepare a response.
pres := f(&preq)
err = pres.Write(response)
if err != nil {
return req.Error(protocol.StatusInternalServerError, err)
}
}

// Create a success response and assign pipelined responses as Value.
resp := req.Success()
resp.Value = response.Bytes()
return resp
Expand Down

0 comments on commit b74e793

Please sign in to comment.