Petals of the mountain rose
Fall now and then,
To the sound of the waterfall?
Raiku is to Riak as a waterfall is to Akka; a simple Riak client which lets Riak flow to your Scala applications.
It's targeted as a non-blocking, performance focused, full-scale alternative to the java Riak driver.
Based on Akka IO and Sentinel, it uses the pipelines and actors to create the best throughput possible.
The client should be stable enough for day to day usage, but should still be treated as beta software.
The next milestone releases of 0.8.0 will focus on the addition of CRDT and Map Reduce functionality, while trying to minimise API changes.
Because the actual client hasn't been updated in a quite long time, not all new Riak functionality is covered. This will be re-added soon.
Currently available in the client:
- Writing low-level protobuf style read-write objects through a RaikuClient;
- Doing this non-blocking through multiple connections handled by multiple actors (through Sentinel).
- Writing, fetching and deleting single or multiple objects at once;
- Querying items on 2i, based on binary or integral indexes (ranges also supported);
- Pagination and streaming of 2i queries;
- Auto-Reconnecting client;
- Different actor pools for normal and 2i requests;
The following is currently missing in the client, but will be added in the upcoming 0.7.0 milestones:
- Full CRDT support with Scala-like DSL for counters and collections;
- Redesigned Map Reduce functionality;
- Full Yokozuna support;
- Additional documentation for specific use cases.
The 0.8.0 release will integrate Akka Streams as the architecture for both streaming 2i as MR results.
This version will be the first stable 1.0 release, and will contain a revised test suite and an (Sentinel driven) test kit, able to emulate Riak’s behaviour for integration within application tests.
From version Raiku version 0.6.1 and on, only Riak version 1.4.1+ is tested and supported.
It's possible that the client will work perfectly with older versions, but isn't tested and could result in unexpected behavior.
Please use Raiku 0.5.1 for usage with older versions of Riak.
Upcoming functionality such as CRDTs and search are only supported on the (current in Beta) 2.0 release of Riak.
The client uses Akka IO and pipelines to send and receive protocol buffer encoded data streams over TCP sockets.
Protocol Buffer messages are transformed into case classes using ScalaBuff, Riak PBC Content classes are serialised into
RaikuRawValue, a type containing all information and metadata needed to write objects back into Riak.
You can use the client to fetch, store and delete these "low level" objects (through respectively
deleteRaw), but for less custom use cases it's wiser to use the
RaikuBucket to store objects converted using a
You are free to use any value serialisation method available, but I recommended to use the Spray JSON package or the one available with the Play Framework
Next to the retrieval of
RaikuClient implements functionality to store values of
RaikuValue wraps the raw converted type into a container without losing any of the information present in the
RaikuRawValue. By using the
delete functions on the client, the best approach can be taken to store serialisable types into buckets while maintaining complete control on all parameters and metadata.
For easy usage, a
RaikuBucket[T] is available, exposing the above functionality through a more opinionated and easy to use construct. While some parameters can still be set per bucket or per request through a custom
RaikuBucketConfig. The API of the
RaikuBucket is mostly focused on quick and easy usage of the functionality available in
Raiku and should cover most use cases.
Since Raiku is completely asynchronous, all methods return values wrapped in Futures.
You can use Raiku by source (by publishing it into your local Ivy repository):
Or by adding the repo:
"gideondk-repo" at "https://raw.github.com/gideondk/gideondk-mvn-repo/master"
to your SBT configuration and adding Raiku to your library dependencies:
libraryDependencies ++= Seq( "nl.gideondk" %% "raiku" % "0.8-M1" )
Using the client / bucket is quite simple, check the code of the tests to see all functionality. But it basically comes down to this:
Create a client:
implicit val system = ActorSystem("system") val client = RaikuClient("localhost", 8087)
Create a converter:
implicit val yFormat = jsonFormat4(Y) implicit val yConverter = RaikuConverter.newConverter( reader = (v: RaikuRWValue) ⇒ yFormat.read(new String(v.data).asJson), writer = (o: Y) ⇒ RaikuRWValue(o.id, o.toJson.toString.getBytes, "application/json"), binIndexes = (o: Y) ⇒ Map("group_id" -> Set(o.groupId)), intIndexes = (o: Y) ⇒ Map("age" -> Set(o.age)))
Finally, create the bucket:
val bucket = RaikuBucket.default[Y](“raiku_test_y_bucket", client)
RaikuBucket, you can use the normal functions to store, fetch or delete objects:
Or to fetch keys on 2i:
If you like to take a walk on the wild side, you can try the (currently quite primitive) DSL to do these actions:
persons ? personId persons ?* List(personIdA, personIdB)
persons << Person("Basho", 42, "Japan") persons <<* List(Person("Basho", 42, "Japan"), Person("Shiki", 52, "Japan"))
persons - Person("Basho", 42, "Japan") persons -* List(Person("Basho", 42, "Japan"), Person("Shiki", 52, "Japan"))
Querying objects based on 2i
persons idx ("age", 42) persons idx ("country", "Japan") persons idx ("age", 39 to 50) persons idx ("group_id", "A" to "B")
Raiku supports the newer 2i functionality available in Riak 1.4.0 through pagination and result streaming.
When using secondary indexes, it's possible to set a maximum number of results for both integral as binary index queries (ranges also supported):
bucket idx ("age", 20 to 50, maxResults = 20) bucket idx ("group_id", "A", 40)
Queries with a maximum in results not only return the normal
List[String] containing the keys, but also returns a option on a continuation, combining the result to a
This continuation value can be used to get the succeeding values of a specific query, making it able to paginate through values:
for (continuation, items) <- bucket idx ("age", 20, 10) (newContinuation, newItems) <- (bucket idx ("age", 20, maxResults = 10, continuation = continuation) } yield newItems
None to the index query as the continuation value, treats the query as to paginate from start. When taking results through this pagination functionality, treat a
None as returning continuation key as a end-of-content.
For each non-paging 2i query, a
streamIdx equivalent is available. Instead of returning a
Future[List[String]] for index values, keys are streamed back using a (Akka Streams) Source, wrapping each result into a
This Play powered enumerator can directly be used to directly stream results to (web)clients, but can also be used to be composed into more complex pipelines.
Copyright © 2017 Gideon de Kok
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.