diff --git a/README.md b/README.md index 0d14e315..f77bc6ac 100644 --- a/README.md +++ b/README.md @@ -21,16 +21,13 @@ See [Docker](#docker) and [Sample Code](#sample-code) sections to get started! * Supports atomic operations, * Supports [distributed queries](#query) on keys, * Provides a plugin interface for service discovery daemons, -* Provides a locking primitive which inspired by [SETNX of Redis](https://redis.io/commands/setnx#design-pattern-locking-with-codesetnxcode). +* Provides a locking primitive which inspired by [SETNX of Redis](https://redis.io/commands/setnx#design-pattern-locking-with-codesetnxcode), +* Support [distributed topic](#distributed-topic) data structure, ## Possible Use Cases -With this feature set, Olric is suitable to use as a distributed cache. But it also provides data replication, failure detection -and simple anti-entropy services. So it can be used as an ordinary key/value data store to scale your cloud application. - -## Project Status - -Olric is in early stages of development. The package API and client protocol may change without notification. +With this feature set, Olric is suitable to use as a distributed cache. But it also provides distributed topics, data replication, +failure detection and simple anti-entropy services. So it can be used as an ordinary key/value data store to scale your cloud application. ## Table of Contents @@ -49,28 +46,34 @@ Olric is in early stages of development. The package API and client protocol may * [olric-stats](#olric-stats) * [olric-load](#olric-load) * [Usage](#usage) - * [Put](#put) - * [PutIf](#putif) - * [PutEx](#putex) - * [PutIfEx](#putifex) - * [Get](#get) - * [Expire](#expire) - * [Delete](#delete) - * [LockWithTimeout](#lockwithtimeout) - * [Lock](#lock) - * [Unlock](#unlock) - * [Destroy](#destroy) - * [Stats](#stats) - * [Ping](#ping) - * [Query](#query) - * [Cursor](#cursor) - * [Range](#range) - * [Close](#close) - * [Atomic Operations](#atomic-operations) - * [Incr](#incr) - * [Decr](#decr) - * [GetPut](#getput) - * [Pipelining](#pipelining) + * [Distributed Map](#distributed-map) + * [Put](#put) + * [PutIf](#putif) + * [PutEx](#putex) + * [PutIfEx](#putifex) + * [Get](#get) + * [Expire](#expire) + * [Delete](#delete) + * [LockWithTimeout](#lockwithtimeout) + * [Lock](#lock) + * [Unlock](#unlock) + * [Destroy](#destroy) + * [Stats](#stats) + * [Ping](#ping) + * [Query](#query) + * [Cursor](#cursor) + * [Range](#range) + * [Close](#close) + * [Atomic Operations](#atomic-operations) + * [Incr](#incr) + * [Decr](#decr) + * [GetPut](#getput) + * [Pipelining](#pipelining) + * [Distributed Topic](#distributed-topic) + * [Publish](#publish) + * [AddListener](#addlistener) + * [RemoveListener](#removelistener) + * [Destroy](#destroy) * [Serialization](#serialization) * [Golang Client](#golang-client) * [Configuration](#configuration) @@ -121,7 +124,8 @@ Olric is in early stages of development. The package API and client protocol may * Provides a plugin interface for service discovery daemons and cloud providers, * Provides a command-line-interface to access the cluster directly from the terminal, * Supports different serialization formats. Gob, JSON and MessagePack are supported out of the box, -* Provides a locking primitive which inspired by [SETNX of Redis](https://redis.io/commands/setnx#design-pattern-locking-with-codesetnxcode). +* Provides a locking primitive which inspired by [SETNX of Redis](https://redis.io/commands/setnx#design-pattern-locking-with-codesetnxcode), +* Support [distributed topic](#distributed-topic) data structure, See [Architecture](#architecture) section to see details. @@ -130,7 +134,6 @@ See [Architecture](#architecture) section to see details. * Distributed queries over keys and values, * Database backend for persistence, * Anti-entropy system to repair inconsistencies in DMaps, -* Publish/Subscribe for messaging, * Eviction listeners by using Publish/Subscribe, * Memcached interface, * Client implementations for different languages: Java, Python and JavaScript, @@ -428,15 +431,6 @@ When you call **Start** method, your process joins the cluster and will be respo indefinitely. So you may need to run it in a goroutine. Of course, this is just a single-node instance, because you didn't give any configuration. -Create a **DMap** object to access the cluster: - -```go -dm, err := db.NewDMap("my-dmap") -``` - -DMap object has *Put*, *PutEx*, *PutIf*, *PutIfEx*, *Get*, *Delete*, *Expire*, *LockWithTimeout* and *Destroy* methods to access -and modify data in Olric. We may add more methods for finer control but first, I'm willing to stabilize this set of features. - When you want to leave the cluster, just need to call **Shutdown** method: ```go @@ -447,6 +441,14 @@ This will stop background tasks and servers. Finally purges in-memory data and q ***Please note that this section aims to document DMap API in embedded member mode.*** If you prefer to use Olric in Client-Server mode, please jump to [Golang Client](#golang-client) section. + +### Distributed Map + +Create a **DMap** instance: + +```go +dm, err := db.NewDMap("my-dmap") +``` ### Put @@ -797,6 +799,25 @@ There is no hard-limit on message count in a pipeline. You should set a convenie The `Flush` method returns errors along with success messages. Furthermore, you need to know the command order for matching responses with requests. +### Distributed Topic + +Distributed topic implementation provides a very scalable way to distribute messages across clients and cluster members. Currently, it only supports +unordered delivery mode. + +Create a **DTopic** instance + +```go +dt, err := db.NewDTopic("my-topic", olric.UnorderedDelivery) +``` + +### Publish + +```go +err := dt.Publish("my-message") +``` + +Publish accepts any serializable type as message. + ## Golang Client This repo contains the official Golang client for Olric. It implements Olric Binary Protocol(OBP). With this client, diff --git a/internal/protocol/dmap.go b/internal/protocol/dmap.go index a1a68e83..872dff3a 100644 --- a/internal/protocol/dmap.go +++ b/internal/protocol/dmap.go @@ -20,10 +20,13 @@ import ( "fmt" ) +// DMapMessageHeaderSize defines total count of bytes in a DMapMessage const DMapMessageHeaderSize uint32 = 7 const ( + // MagicDMapReq is a magic number which denotes DMap message requests on the wire. MagicDMapReq MagicCode = 0xE2 + // MagicDMapRes is a magic number which denotes DMap message response on the wire. MagicDMapRes MagicCode = 0xE3 ) @@ -36,16 +39,18 @@ type DMapMessageHeader struct { StatusCode StatusCode // 1 } +// DMapMessage is a message type in OBP. It can be used to access and modify DMap data structure. type DMapMessage struct { - Header // [0-4] - DMapMessageHeader // [1..10] - extra interface{} // [11..(m-1)] Command specific extras (In) - dmap string // [m..(n-1)] dmap (as needed, length in Header) - key string // [n..(x-1)] key (as needed, length in Header) - value []byte // [x..y] value (as needed, length in Header) - buf *bytes.Buffer + Header + DMapMessageHeader + extra interface{} + dmap string + key string + value []byte + buf *bytes.Buffer } +// NewDMapMessage returns a new DMapMessage with the given operation code. func NewDMapMessage(opcode OpCode) *DMapMessage { return &DMapMessage{ Header: Header{ @@ -58,6 +63,8 @@ func NewDMapMessage(opcode OpCode) *DMapMessage { } } +// NewDMapMessageFromRequest returns a new DMapMessage for the given bytes.Buffer. The caller can use +// Decode method to read message from the raw data. func NewDMapMessageFromRequest(buf *bytes.Buffer) *DMapMessage { return &DMapMessage{ Header: Header{ @@ -70,6 +77,7 @@ func NewDMapMessageFromRequest(buf *bytes.Buffer) *DMapMessage { } } +// Response generates a response message for the request. This is a shortcut function to reduce boilerplate code. func (d *DMapMessage) Response(buf *bytes.Buffer) EncodeDecoder { msg := &DMapMessage{ Header: Header{ @@ -90,59 +98,72 @@ func (d *DMapMessage) Response(buf *bytes.Buffer) EncodeDecoder { return msg } +// SetStatus sets a status code for the message. func (d *DMapMessage) SetStatus(code StatusCode) { d.StatusCode = code } +// Status returns status code. func (d *DMapMessage) Status() StatusCode { return d.StatusCode } +// SetValue writes the given byte slice into the underlying bytes.Buffer func (d *DMapMessage) SetValue(value []byte) { d.value = value } +// Value returns the value func (d *DMapMessage) Value() []byte { return d.value } +// OpCode returns operation code of the message func (d *DMapMessage) OpCode() OpCode { return d.Op } +// SetBuffer sets the underlying bytes.Buffer. It should be recycled by the caller. func (d *DMapMessage) SetBuffer(buf *bytes.Buffer) { d.buf = buf } +// Buffer returns the underlying bytes.Buffer func (d *DMapMessage) Buffer() *bytes.Buffer { return d.buf } +// SetDMap sets the DMap name for this message. func (d *DMapMessage) SetDMap(dmap string) { d.dmap = dmap } +// Returns the DMap name. func (d *DMapMessage) DMap() string { return d.dmap } +// SetKey sets the key for this DMap message. func (d *DMapMessage) SetKey(key string) { d.key = key } +// Key returns the key for this DMap message. func (d *DMapMessage) Key() string { return d.key } +// SetExtra sets the extra section for the message, if there is any. func (d *DMapMessage) SetExtra(extra interface{}) { d.extra = extra } +// Extra returns the extra section of the message, if there is any. func (d *DMapMessage) Extra() interface{} { return d.extra } -// Encode writes a protocol message to given TCP connection by encoding it. +// Encode encodes the message into byte form. func (d *DMapMessage) Encode() error { // Calculate lengths here d.DMapLen = uint16(len(d.dmap)) @@ -183,6 +204,7 @@ func (d *DMapMessage) Encode() error { return err } +// Decode decodes message from byte form into DMapMessage. func (d *DMapMessage) Decode() error { err := binary.Read(d.buf, binary.BigEndian, &d.DMapMessageHeader) if err != nil { diff --git a/internal/protocol/dtopic.go b/internal/protocol/dtopic.go index f2fc0966..11a07361 100644 --- a/internal/protocol/dtopic.go +++ b/internal/protocol/dtopic.go @@ -20,10 +20,13 @@ import ( "fmt" ) +// DTopicMessageHeaderSize defines total count of bytes in a DTopicMessage const DTopicMessageHeaderSize uint32 = 5 const ( + // MagicDTopicReq is a magic number which denotes DTopic message requests on the wire. MagicDTopicReq MagicCode = 0xEA + // MagicDTopicRes is a magic number which denotes DTopic message response on the wire. MagicDTopicRes MagicCode = 0xEB ) @@ -35,6 +38,7 @@ type DTopicMessageHeader struct { StatusCode StatusCode // 1 } +// DTopicMessage is a message type in OBP. It can be used to access and modify DTopic data structure. type DTopicMessage struct { Header DTopicMessageHeader @@ -44,6 +48,7 @@ type DTopicMessage struct { buf *bytes.Buffer } +// NewDTopicMessage returns a new DTopicMessage with the given operation code. func NewDTopicMessage(opcode OpCode) *DTopicMessage { return &DTopicMessage{ Header: Header{ @@ -56,6 +61,8 @@ func NewDTopicMessage(opcode OpCode) *DTopicMessage { } } +// NewDTopicMessageFromRequest returns a new DTopicMessage for the given bytes.Buffer. The caller can use +// Decode method to read message from the raw data. func NewDTopicMessageFromRequest(buf *bytes.Buffer) *DTopicMessage { return &DTopicMessage{ Header: Header{ @@ -68,6 +75,7 @@ func NewDTopicMessageFromRequest(buf *bytes.Buffer) *DTopicMessage { } } +// Response generates a response message for the request. This is a shortcut function to reduce boilerplate code. func (d *DTopicMessage) Response(buf *bytes.Buffer) EncodeDecoder { msg := &DTopicMessage{ Header: Header{ @@ -89,51 +97,62 @@ func (d *DTopicMessage) Response(buf *bytes.Buffer) EncodeDecoder { return msg } +// SetStatus sets a status code for the message. func (d *DTopicMessage) SetStatus(code StatusCode) { d.StatusCode = code } +// Status returns status code. func (d *DTopicMessage) Status() StatusCode { return d.StatusCode } +// SetValue writes the given byte slice into the underlying bytes.Buffer func (d *DTopicMessage) SetValue(value []byte) { d.value = value } +// Value returns the value func (d *DTopicMessage) Value() []byte { return d.value } +// OpCode returns operation code of the message func (d *DTopicMessage) OpCode() OpCode { return d.Op } +// SetBuffer sets the underlying bytes.Buffer. It should be recycled by the caller. func (d *DTopicMessage) SetBuffer(buf *bytes.Buffer) { d.buf = buf } +// Buffer returns the underlying bytes.Buffer func (d *DTopicMessage) Buffer() *bytes.Buffer { return d.buf } +// SetExtra sets the extra section for the message, if there is any. func (d *DTopicMessage) SetExtra(extra interface{}) { d.extra = extra } +// Extra returns the extra section of the message, if there is any. func (d *DTopicMessage) Extra() interface{} { return d.extra } +// SetDTopic sets the DTopic name for this message. func (d *DTopicMessage) SetDTopic(dtopic string) { d.dtopic = dtopic } +// Returns the DTopic name. func (d *DTopicMessage) DTopic() string { return d.dtopic } -// Encode writes a protocol message to given TCP connection by encoding it. +// Encode encodes the message into byte form. func (d *DTopicMessage) Encode() error { // Calculate lengths here d.DTopicLen = uint16(len(d.dtopic)) @@ -168,6 +187,7 @@ func (d *DTopicMessage) Encode() error { return err } +// Decode decodes message from byte form into DTopicMessage. func (d *DTopicMessage) Decode() error { err := binary.Read(d.buf, binary.BigEndian, &d.DTopicMessageHeader) if err != nil { diff --git a/internal/protocol/pipeline.go b/internal/protocol/pipeline.go index a4c5e3ad..817e07d1 100644 --- a/internal/protocol/pipeline.go +++ b/internal/protocol/pipeline.go @@ -20,10 +20,13 @@ import ( "fmt" ) +// PipelineMessageHeaderSize defines total count of bytes in a PipelineMessage const PipelineMessageHeaderSize uint32 = 3 const ( + // MagicPipelineReq is a magic number which denotes Pipeline message requests on the wire. MagicPipelineReq MagicCode = 0xE6 + // MagicPipelineRes is a magic number which denotes Pipeline message response on the wire. MagicPipelineRes MagicCode = 0xE7 ) @@ -34,6 +37,7 @@ type PipelineMessageHeader struct { StatusCode StatusCode // 1 } +// PipelineMessage is a message type in OBP. It can be used to access and modify Pipeline data structure. type PipelineMessage struct { Header PipelineMessageHeader @@ -42,6 +46,7 @@ type PipelineMessage struct { buf *bytes.Buffer } +// NewPipelineMessage returns a new PipelineMessage with the given operation code. func NewPipelineMessage(opcode OpCode) *PipelineMessage { return &PipelineMessage{ Header: Header{ @@ -54,6 +59,8 @@ func NewPipelineMessage(opcode OpCode) *PipelineMessage { } } +// NewPipelineMessageFromRequest returns a new PipelineMessage for the given bytes.Buffer. The caller can use +// Decode method to read message from the raw data. func NewPipelineMessageFromRequest(buf *bytes.Buffer) *PipelineMessage { return &PipelineMessage{ Header: Header{ @@ -66,6 +73,7 @@ func NewPipelineMessageFromRequest(buf *bytes.Buffer) *PipelineMessage { } } +// Response generates a response message for the request. This is a shortcut function to reduce boilerplate code. func (d *PipelineMessage) Response(buf *bytes.Buffer) EncodeDecoder { msg := &PipelineMessage{ Header: Header{ @@ -87,43 +95,52 @@ func (d *PipelineMessage) Response(buf *bytes.Buffer) EncodeDecoder { return msg } +// SetStatus sets a status code for the message. func (d *PipelineMessage) SetStatus(code StatusCode) { d.StatusCode = code } +// Status returns status code. func (d *PipelineMessage) Status() StatusCode { return d.StatusCode } +// SetValue writes the given byte slice into the underlying bytes.Buffer func (d *PipelineMessage) SetValue(value []byte) { d.value = value } +// Value returns the value func (d *PipelineMessage) Value() []byte { return d.value } +// OpCode returns operation code of the message func (d *PipelineMessage) OpCode() OpCode { return d.Op } +// SetBuffer sets the underlying bytes.Buffer. It should be recycled by the caller. func (d *PipelineMessage) SetBuffer(buf *bytes.Buffer) { d.buf = buf } +// Buffer returns the underlying bytes.Buffer func (d *PipelineMessage) Buffer() *bytes.Buffer { return d.buf } +// SetExtra sets the extra section for the message, if there is any. func (d *PipelineMessage) SetExtra(extra interface{}) { d.extra = extra } +// Extra returns the extra section of the message, if there is any. func (d *PipelineMessage) Extra() interface{} { return d.extra } -// Encode writes a protocol message to given TCP connection by encoding it. +// Encode encodes the message into byte form. func (d *PipelineMessage) Encode() error { // Calculate lengths here if d.extra != nil { @@ -152,6 +169,7 @@ func (d *PipelineMessage) Encode() error { return err } +// Decode decodes message from byte form into PipelineMessage. func (d *PipelineMessage) Decode() error { err := binary.Read(d.buf, binary.BigEndian, &d.PipelineMessageHeader) if err != nil { diff --git a/internal/protocol/protocol.go b/internal/protocol/protocol.go index 20f28f2a..fd8a809b 100644 --- a/internal/protocol/protocol.go +++ b/internal/protocol/protocol.go @@ -35,34 +35,48 @@ var pool = bufpool.New() // MagicCode defines an unique code to distinguish a request message from a response message in Olric Binary Protocol. type MagicCode uint8 +// EncodeDecoder is an interface that defines methods for encoding/decoding a messages in OBP. type EncodeDecoder interface { + // Encode encodes the message and writes into a bytes.Buffer. Encode() error + // Decode decodes the message from the given bytes.Buffer. Decode() error + // SetStatus sets a status code for the message. SetStatus(StatusCode) + // Status returns status code. Status() StatusCode + // SetValue writes the given byte slice into the underlying bytes.Buffer SetValue([]byte) + // Value returns the value Value() []byte + // OpCode returns operation code of the message OpCode() OpCode + // SetBuffer sets the underlying bytes.Buffer. It should be recycled by the caller. SetBuffer(*bytes.Buffer) + // Buffer returns the underlying bytes.Buffer Buffer() *bytes.Buffer + // SetExtra sets the extra section for the message, if there is any. SetExtra(interface{}) + // Extra returns the extra section of the message, if there is any. Extra() interface{} + // Response generates a response message for the message. Response(*bytes.Buffer) EncodeDecoder } const headerSize int64 = 6 +// Header is a shared message header for all the message types in Olric Binary Protocol. type Header struct { Magic MagicCode // 1 byte Version uint8 // 1 byte @@ -82,6 +96,7 @@ func readHeader(conn io.ReadWriteCloser) (Header, error) { buf := pool.Get() defer pool.Put(buf) + // Read the header section. The first 6 bytes. var header Header _, err := io.CopyN(buf, conn, headerSize) if err != nil { @@ -98,12 +113,17 @@ func readHeader(conn io.ReadWriteCloser) (Header, error) { return header, nil } +// ReadMessage reads the whole message from src into the given bytes.Buffer. +// Header can be used to determine message. Then you can pick an appropriate message +// type and decode it. func ReadMessage(src io.ReadWriteCloser, dst *bytes.Buffer) (Header, error) { header, err := readHeader(src) if err != nil { return Header{}, err } + // Read the whole message. Now, the caller knows the message type and she can + // Decode method. length := int64(header.MessageLength) nr, err := io.CopyN(dst, src, length) if err != nil { @@ -129,15 +149,19 @@ func filterNetworkErrors(err error) error { return err } +// BytesToConn translates bytes.Buffer into io.ReadWriteCloser interface. It's useful to implement +// pipeline in OBP. type BytesToConn struct { *bytes.Buffer } +// Close resets and recycles underlying bytes.Buffer. func (b *BytesToConn) Close() error { pool.Put(b.Buffer) return nil } +// NewBytesToConn returns a new BytesToConn. The underlying bytes.Buffer retrieves from the pool. func NewBytesToConn(data []byte) *BytesToConn { b := pool.Get() b.Write(data) diff --git a/internal/protocol/stream.go b/internal/protocol/stream.go index 3a94ff34..c03e0b52 100644 --- a/internal/protocol/stream.go +++ b/internal/protocol/stream.go @@ -22,10 +22,13 @@ import ( "io" ) +// StreamMessageHeaderSize defines total count of bytes in a StreamMessage const StreamMessageHeaderSize uint32 = 3 const ( + // MagicStreamReq is a magic number which denotes Stream message requests on the wire. MagicStreamReq MagicCode = 0xE4 + // MagicStreamRes is a magic number which denotes Stream message response on the wire. MagicStreamRes MagicCode = 0xE5 ) @@ -36,6 +39,7 @@ type StreamMessageHeader struct { StatusCode StatusCode // 1 } +// StreamMessage is a message type in OBP. It can be used to access and modify Stream data structure. type StreamMessage struct { Header StreamMessageHeader @@ -46,6 +50,7 @@ type StreamMessage struct { cancel context.CancelFunc } +// NewStreamMessage returns a new StreamMessage with the given operation code. func NewStreamMessage(opcode OpCode) *StreamMessage { return &StreamMessage{ Header: Header{ @@ -67,6 +72,8 @@ func ConvertToStreamMessage(msg EncodeDecoder, listenerID uint64) *StreamMessage return str } +// NewStreamMessageFromRequest returns a new StreamMessage for the given bytes.Buffer. The caller can use +// Decode method to read message from the raw data. func NewStreamMessageFromRequest(buf *bytes.Buffer) *StreamMessage { return &StreamMessage{ Header: Header{ @@ -79,6 +86,7 @@ func NewStreamMessageFromRequest(buf *bytes.Buffer) *StreamMessage { } } +// Response generates a response message for the request. This is a shortcut function to reduce boilerplate code. func (d *StreamMessage) Response(buf *bytes.Buffer) EncodeDecoder { msg := &StreamMessage{ Header: Header{ @@ -100,38 +108,47 @@ func (d *StreamMessage) Response(buf *bytes.Buffer) EncodeDecoder { return msg } +// SetStatus sets a status code for the message. func (d *StreamMessage) SetStatus(code StatusCode) { d.StatusCode = code } +// Status returns status code. func (d *StreamMessage) Status() StatusCode { return d.StatusCode } +// SetValue writes the given byte slice into the underlying bytes.Buffer func (d *StreamMessage) SetValue(value []byte) { d.value = value } +// Value returns the value func (d *StreamMessage) Value() []byte { return d.value } +// OpCode returns operation code of the message func (d *StreamMessage) OpCode() OpCode { return d.Op } +// SetBuffer sets the underlying bytes.Buffer. It should be recycled by the caller. func (d *StreamMessage) SetBuffer(buf *bytes.Buffer) { d.buf = buf } +// Buffer returns the underlying bytes.Buffer func (d *StreamMessage) Buffer() *bytes.Buffer { return d.buf } +// SetExtra sets the extra section for the message, if there is any. func (d *StreamMessage) SetExtra(extra interface{}) { d.extra = extra } +// Extra returns the extra section of the message, if there is any. func (d *StreamMessage) Extra() interface{} { return d.extra } @@ -155,7 +172,7 @@ func (d *StreamMessage) Close() { d.cancel() } -// Encode writes a protocol message to given TCP connection by encoding it. +// Encode encodes the message into byte form. func (d *StreamMessage) Encode() error { // Calculate lengths here if d.extra != nil { @@ -184,6 +201,7 @@ func (d *StreamMessage) Encode() error { return err } +// Decode decodes message from byte form into StreamMessage. func (d *StreamMessage) Decode() error { err := binary.Read(d.buf, binary.BigEndian, &d.StreamMessageHeader) if err != nil {