A fast Golang Redis client that does auto pipelining and supports client side caching.
- Auto pipelining for non-blocking redis commands
- Client side caching in RESP3
- Pub/Sub, Sharded Pub/Sub, Streams
- Redis Cluster, Sentinel, RedisJSON, RedisBloom, RediSearch, RedisTimeseries, etc.
- Generic Object Mapping with client side caching and optimistic locking
- Distributed Locks with client side caching
- Helpers for writing tests with rueidis mock
- OpenTelemetry integration
- Hooks and other integrations
- Go-redis like API adapter by @418Coffee
package main
import (
"context"
"github.com/rueian/rueidis"
)
func main() {
client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{"127.0.0.1:6379"},
})
if err != nil {
panic(err)
}
defer client.Close()
ctx := context.Background()
// SET key val NX
err = client.Do(ctx, client.B().Set().Key("key").Value("val").Nx().Build()).Error()
}
client.B()
is the builder entrypoint to construct a redis command:
Recorded by @FZambia Improving Centrifugo Redis Engine throughput and allocation efficiency with Rueidis Go library
Once the command is completed, use either client.Do()
or client.DoMulti()
to send it to redis.
The constructed command will be recycled to underlying sync.Pool
and you ❗️SHOULD NOT❗️ reuse it across multiple client.Do()
or client.DoMulti()
calls
All concurrent non-blocking redis commands (such as GET
, SET
) are automatically pipelined through connections,
which reduces the overall round trips and system calls, and gets higher throughput. You can easily get the benefit
of pipelining technique by just calling client.Do()
from multiple goroutines.
For example:
func BenchmarkPipelining(b *testing.B, client rueidis.Client) {
// the below client.Do() operations will be issued from
// multiple goroutines and thus will be pipelined automatically.
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
client.Do(context.Background(), client.B().Get().Key("k").Build()).ToString()
}
})
}
Comparing to go-redis, Rueidis has higher throughput across 1, 8, and 64 parallelism settings.
It is even able to achieve ~14x throughput over go-redis in a local benchmark of Macbook Pro 16" M1 Pro 2021. (see parallelism(64)-key(16)-value(64)-10
)
Benchmark source code: https://github.com/rueian/rueidis-benchmark
A benchmark result performed on two GCP n2-highcpu-2 machines also shows that rueidis can achieve higher throughput with lower latencies: redis#93
The opt-in mode of server-assisted client side caching is enabled by default, and can be used by calling DoCache()
or DoMultiCache()
with
pairs of a readonly command and a client side TTL.
client.DoCache(ctx, client.B().Hmget().Key("myhash").Field("1", "2").Cache(), time.Minute).ToArray()
client.DoMultiCache(ctx,
rueidis.CT(client.B().Get().Key("k1").Cache(), 1*time.Minute),
rueidis.CT(client.B().Get().Key("k2").Cache(), 2*time.Minute))
Cached responses will be invalidated when being notified by redis or their client side ttl is reached.
Client Side Caching can boost read throughput just like having a redis replica right inside your application:
Benchmark source code: https://github.com/rueian/rueidis-benchmark
Use CacheTTL()
to check the remaining client side TTL in seconds:
client.DoCache(ctx, client.B().Get().Key("k1").Cache(), time.Minute).CacheTTL() == 60
Use IsCacheHit()
to verify that if the response came from the client side memory:
client.DoCache(ctx, client.B().Get().Key("k1").Cache(), time.Minute).IsCacheHit() == true
If the OpenTelemetry is enabled by the rueidisotel.WithClient(client)
, then there are also two metrics instrumented:
- rueidis_do_cache_miss
- rueidis_do_cache_hits
rueidis.MGetCache
and rueidis.JsonMGetCache
are handy helpers fetching multiple keys across different slots through the client side caching.
They will first group keys by slot to build MGET
or JSON.MGET
commands respectively and then send requests with only cache missed keys to redis nodes.
Although the default is opt-in mode, you can use broadcast mode by specifying your prefixes in ClientOption.ClientTrackingOptions
:
client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{"127.0.0.1:6379"},
ClientTrackingOptions: []string{"PREFIX", "prefix1:", "PREFIX", "prefix2:", "BCAST"},
})
if err != nil {
panic(err)
}
client.DoCache(ctx, client.B().Get().Key("prefix1:1").Cache(), time.Minute).IsCacheHit() == false
client.DoCache(ctx, client.B().Get().Key("prefix1:1").Cache(), time.Minute).IsCacheHit() == true
Please make sure that commands passed to DoCache()
and DoMultiCache()
are covered by your prefixes.
Otherwise, their client-side cache will not be invalidated by redis.
Some Redis provider doesn't support client-side caching, ex. Google Cloud Memorystore.
You can disable client-side caching by setting ClientOption.DisableCache
to true
.
This will also fall back client.DoCache()
and client.DoMultiCache()
to client.Do()
and client.DoMulti()
.
client.Do()
, client.DoMulti()
, client.DoCache()
and client.DoMultiCache()
can return early if the context is canceled or the deadline is reached.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
client.Do(ctx, client.B().Set().Key("key").Value("val").Nx().Build()).Error() == context.DeadlineExceeded
Please note that though operations can return early, the command is likely sent already.
To receive messages from channels, client.Receive()
should be used. It supports SUBSCRIBE
, PSUBSCRIBE
and Redis 7.0's SSUBSCRIBE
:
err = client.Receive(context.Background(), client.B().Subscribe().Channel("ch1", "ch2").Build(), func(msg rueidis.PubSubMessage) {
// handle the msg
})
The provided handler will be called with received message.
It is important to note that client.Receive()
will keep blocking and return only when the following cases:
- return
nil
when received any unsubscribe/punsubscribe message related to the providedsubscribe
command. - return
rueidis.ErrClosing
when the client is closed manually. - return
ctx.Err()
when thectx
is done. - return non-nil
err
when the providedsubscribe
command failed.
While the client.Receive()
call is blocking, the Client
is still able to accept other concurrent requests,
and they are sharing the same tcp connection. If your message handler may take some time to complete, it is recommended
to use the client.Receive()
inside a client.Dedicated()
for not blocking other concurrent requests.
The client.Receive()
requires users to provide a subscription command in advance.
There is an alternative Dedicatedclient.SetPubSubHooks()
allows users to subscribe/unsubscribe channels later.
c, cancel := client.Dedicate()
defer cancel()
wait := c.SetPubSubHooks(rueidis.PubSubHooks{
OnMessage: func(m rueidis.PubSubMessage) {
// Handle message. This callback will be called sequentially, but in another goroutine.
}
})
c.Do(ctx, c.B().Subscribe().Channel("ch").Build())
err := <-wait // disconnected with err
If the hooks are not nil, the above wait
channel is guaranteed to be close when the hooks will not be called anymore,
and produce at most one error describing the reason. Users can use this channel to detect disconnection.
To do a CAS operation (WATCH
+ MULTI
+ EXEC
), a dedicated connection should be used, because there should be no
unintentional write commands between WATCH
and EXEC
. Otherwise, the EXEC
may not fail as expected.
client.Dedicated(func(c rueidis.DedicatedClient) error {
// watch keys first
c.Do(ctx, c.B().Watch().Key("k1", "k2").Build())
// perform read here
c.Do(ctx, c.B().Mget().Key("k1", "k2").Build())
// perform write with MULTI EXEC
c.DoMulti(
ctx,
c.B().Multi().Build(),
c.B().Set().Key("k1").Value("1").Build(),
c.B().Set().Key("k2").Value("2").Build(),
c.B().Exec().Build(),
)
return nil
})
Or use Dedicate()
and invoke cancel()
when finished to put the connection back to the pool.
c, cancel := client.Dedicate()
defer cancel()
c.Do(ctx, c.B().Watch().Key("k1", "k2").Build())
// do the rest CAS operations with the `client` who occupying a connection
However, occupying a connection is not good in terms of throughput. It is better to use Lua script to perform optimistic locking instead.
Each underlying connection in rueidis allocates a ring buffer for pipelining.
Its size is controlled by the ClientOption.RingScaleEachConn
and the default value is 10 which results into each ring of size 2^10.
If you have many rueidis connections, you may find that they occupy quite amount of memory.
In that case, you may consider reducing ClientOption.RingScaleEachConn
to 8 or 9 at the cost of potential throughput degradation.
Though all concurrent non-blocking commands are automatically pipelined, you can still pipeline commands manually with DoMulti()
:
cmds := make(rueidis.Commands, 0, 10)
for i := 0; i < 10; i++ {
cmds = append(cmds, client.B().Set().Key("key").Value("value").Build())
}
for _, resp := range client.DoMulti(ctx, cmds...) {
if err := resp.Error(); err != nil {
panic(err)
}
}
The NewLuaScript
or NewLuaScriptReadOnly
will create a script which is safe for concurrent usage.
When calling the script.Exec
, it will try sending EVALSHA
first and fallback to EVAL
if the server returns NOSCRIPT
.
script := rueidis.NewLuaScript("return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}")
// the script.Exec is safe for concurrent call
list, err := script.Exec(ctx, client, []string{"k1", "k2"}, []string{"a1", "a2"}).ToArray()
To connect to a redis cluster, the NewClient
should be used:
client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
ShuffleInit: true,
})
To connect to a single redis node, still use the NewClient
with one InitAddress
client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{"127.0.0.1:6379"},
})
To connect to sentinels, specify the required master set name:
client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{"127.0.0.1:26379", "127.0.0.1:26380", "127.0.0.1:26381"},
Sentinel: rueidis.SentinelOption{
MasterSet: "my_master",
},
})
If you want to construct commands that are not yet supported, you can use client.B().Arbitrary()
:
// This will result into [ANY CMD k1 k2 a1 a2]
client.B().Arbitrary("ANY", "CMD").Keys("k1", "k2").Args("a1", "a2").Build()
The command builder treats all the parameters as Redis strings, which are binary safe. This means that users can store []byte
directly into Redis without conversion. And the rueidis.BinaryString
helper can convert []byte
to string
without copy. For example:
client.B().Set().Key("b").Value(rueidis.BinaryString([]byte{...})).Build()
Treating all the parameters as Redis strings also means that the command builder doesn't do any quoting, conversion automatically for users.
When working with RedisJSON, users frequently need to prepare JSON string in Redis string. And rueidis.JSON
can help:
client.B().JsonSet().Key("j").Path("$.myStrField").Value(rueidis.JSON("str")).Build()
// equivalent to
client.B().JsonSet().Key("j").Path("$.myStrField").Value(`"str"`).Build()
It is hard to remember what message type is returned from redis and which parsing method should be used with. So, here is some common examples:
// GET
client.Do(ctx, client.B().Get().Key("k").Build()).ToString()
client.Do(ctx, client.B().Get().Key("k").Build()).AsInt64()
// MGET
client.Do(ctx, client.B().Mget().Key("k1", "k2").Build()).ToArray()
// SET
client.Do(ctx, client.B().Set().Key("k").Value("v").Build()).Error()
// INCR
client.Do(ctx, client.B().Incr().Key("k").Build()).AsInt64()
// HGET
client.Do(ctx, client.B().Hget().Key("k").Field("f").Build()).ToString()
// HMGET
client.Do(ctx, client.B().Hmget().Key("h").Field("a", "b").Build()).ToArray()
// HGETALL
client.Do(ctx, client.B().Hgetall().Key("h").Build()).AsStrMap()
// ZRANGE
client.Do(ctx, client.B().Zrange().Key("k").Min("1").Max("2").Build()).AsStrSlice()
// ZRANK
client.Do(ctx, client.B().Zrank().Key("k").Member("m").Build()).AsInt64()
// ZSCORE
client.Do(ctx, client.B().Zscore().Key("k").Member("m").Build()).AsFloat64()
// ZRANGE
client.Do(ctx, client.B().Zrange().Key("k").Min("0").Max("-1").Build()).AsStrSlice()
client.Do(ctx, client.B().Zrange().Key("k").Min("0").Max("-1").Withscores().Build()).AsZScores()
// ZPOPMIN
client.Do(ctx, client.B().Zpopmin().Key("k").Build()).AsZScore()
client.Do(ctx, client.B().Zpopmin().Key("myzset").Count(2).Build()).AsZScores()
// SCARD
client.Do(ctx, client.B().Scard().Key("k").Build()).AsInt64()
// SMEMBERS
client.Do(ctx, client.B().Smembers().Key("k").Build()).AsStrSlice()
// LINDEX
client.Do(ctx, client.B().Lindex().Key("k").Index(0).Build()).ToString()
// LPOP
client.Do(ctx, client.B().Lpop().Key("k").Build()).ToString()
client.Do(ctx, client.B().Lpop().Key("k").Count(2).Build()).AsStrSlice()