Skip to content

Commit

Permalink
Add a sized capped cache for prepared queries
Browse files Browse the repository at this point in the history
  • Loading branch information
mpenick committed Sep 2, 2021
1 parent 760d33a commit 147ef72
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 14 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ go 1.16
require (
github.com/alecthomas/kong v0.2.17
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210521184019-c5ad59b459ec
github.com/cespare/xxhash v1.1.0 // indirect
github.com/datastax/go-cassandra-native-protocol v0.0.0-20210604174339-4311e5d5654d
github.com/dgraph-io/ristretto v0.1.0 // indirect
github.com/stretchr/testify v1.7.0
go.uber.org/atomic v1.8.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
Expand Down
17 changes: 17 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alecthomas/kong v0.2.17 h1:URDISCI96MIgcIlQyoCAlhOmrSw6pZScBNkctg8r0W0=
github.com/alecthomas/kong v0.2.17/go.mod h1:ka3VZ8GZNPXv9Ov+j4YNLkI8mTuhXyr/0ktSlqIydQQ=
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210521184019-c5ad59b459ec h1:EEyRvzmpEUZ+I8WmD5cw/vY8EqhambkOqy5iFr0908A=
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210521184019-c5ad59b459ec/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/datastax/go-cassandra-native-protocol v0.0.0-20210604174339-4311e5d5654d h1:sGM7OKF00vOQwn3cKGVpZLixkeP6i6kQRcz9+aV1cqs=
github.com/datastax/go-cassandra-native-protocol v0.0.0-20210604174339-4311e5d5654d/go.mod h1:n6F7IFlMxffEUMfTBvNqwaxwpJMvMnRT5L93km1qeNc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/LuerPI=
github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/snappy v0.0.2 h1:aeE13tS0IiQgFjYdoL8qN3K1N2bXXtI6Vi51/y7BpMw=
github.com/golang/snappy v0.0.2/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/pierrec/lz4/v4 v4.0.3 h1:vNQKSVZNYUEAvRY9FaUXAF1XPbSOHJtDTiP41kzDz2E=
Expand All @@ -19,9 +31,11 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.20.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand All @@ -37,11 +51,14 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190828213141-aed303cbaa74/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
55 changes: 50 additions & 5 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net"
"sync"
"sync/atomic"
"unsafe"

"cql-proxy/parser"
"cql-proxy/proxycore"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/datastax/go-cassandra-native-protocol/frame"
"github.com/datastax/go-cassandra-native-protocol/message"
"github.com/datastax/go-cassandra-native-protocol/primitive"
"github.com/dgraph-io/ristretto"
"go.uber.org/zap"
)

Expand All @@ -47,6 +49,9 @@ type Config struct {
ReconnectPolicy proxycore.ReconnectPolicy
NumConns int
Logger *zap.Logger
// PreparedCache a cache that stores prepared queries. If not set it uses the default implementation with a max
// capacity of 100MB.
PreparedCache proxycore.PreparedCache
}

type Proxy struct {
Expand All @@ -56,9 +61,9 @@ type Proxy struct {
listener net.Listener
cluster *proxycore.Cluster
sessions sync.Map
sessMu *sync.Mutex
sessMu sync.Mutex
schemaEventClients sync.Map
preparedCache sync.Map
preparedCache proxycore.PreparedCache
clientIdGen uint64
lb proxycore.LoadBalancer
systemLocalValues map[string]message.Column
Expand Down Expand Up @@ -91,7 +96,6 @@ func NewProxy(ctx context.Context, config Config) *Proxy {
ctx: ctx,
config: config,
logger: proxycore.GetOrCreateNopLogger(config.Logger),
sessMu: &sync.Mutex{},
}
}

Expand All @@ -106,6 +110,11 @@ func (p *Proxy) ListenAndServe(address string) error {
func (p *Proxy) Listen(address string) error {
var err error

p.preparedCache, err = getOrCreateDefaultPreparedCache(p.config.PreparedCache)
if err != nil {
return fmt.Errorf("unable to create prepared cache %w", err)
}

p.cluster, err = proxycore.ConnectCluster(p.ctx, proxycore.ClusterConfig{
Version: p.config.Version,
Auth: p.config.Auth,
Expand Down Expand Up @@ -135,7 +144,7 @@ func (p *Proxy) Listen(address string) error {
NumConns: p.config.NumConns,
Version: p.cluster.NegotiatedVersion,
Auth: p.config.Auth,
PreparedCache: &p.preparedCache,
PreparedCache: p.preparedCache,
})

if err != nil {
Expand Down Expand Up @@ -189,7 +198,7 @@ func (p *Proxy) maybeCreateSession(keyspace string) error {
NumConns: p.config.NumConns,
Version: p.cluster.NegotiatedVersion,
Auth: p.config.Auth,
PreparedCache: &p.preparedCache,
PreparedCache: p.preparedCache,
Keyspace: keyspace,
})
if err != nil {
Expand Down Expand Up @@ -481,3 +490,39 @@ func (c *client) send(hdr *frame.Header, msg message.Message) {
func (c *client) Closing(_ error) {
c.proxy.schemaEventClients.Delete(c.id)
}

func getOrCreateDefaultPreparedCache(cache proxycore.PreparedCache) (proxycore.PreparedCache, error) {
if cache == nil {
return NewDefaultPreparedCache(1e8) // 100MB
}
return cache, nil
}

// NewDefaultPreparedCache creates a new default prepared cache capping the max capacity to `maxBytes`.
func NewDefaultPreparedCache(maxBytes int64) (proxycore.PreparedCache, error) {
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: maxBytes / 32, // Assuming an average size of ~32 bytes per entry.
MaxCost: maxBytes,
BufferItems: 64,
})
if err != nil {
return nil, err
}
return &defaultPreparedCache{cache}, nil
}

type defaultPreparedCache struct {
cache *ristretto.Cache
}

func (d defaultPreparedCache) Store(id string, entry *proxycore.PreparedEntry) {
cost := int64(len(id)) + int64(unsafe.Sizeof(frame.Header{})) + int64(unsafe.Sizeof(frame.Body{})) + int64(len(entry.PreparedFrame.Body))
d.cache.Set(id, entry, cost)
}

func (d defaultPreparedCache) Load(id string) (entry *proxycore.PreparedEntry, ok bool) {
if val, ok := d.cache.Get(id); ok {
return val.(*proxycore.PreparedEntry), true
}
return nil, false
}
11 changes: 7 additions & 4 deletions proxycore/clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (f EventHandlerFunc) OnEvent(frm *frame.Frame) {
}

type ClientConnConfig struct {
PreparedCache *sync.Map
PreparedCache PreparedCache
Handler EventHandler
Logger *zap.Logger
}
Expand All @@ -58,7 +58,7 @@ type ClientConn struct {
inflight int32
pending *pendingRequests
eventHandler EventHandler
preparedCache *sync.Map
preparedCache PreparedCache
logger *zap.Logger
closing bool
closingMu *sync.RWMutex
Expand Down Expand Up @@ -304,7 +304,7 @@ func (c *ClientConn) maybePrepareAndExecute(request Request, raw *frame.RawFrame
id := hex.EncodeToString(msg.Id)
if prepare, ok := c.preparedCache.Load(id); ok {
err = c.Send(&prepareRequest{
prepare: prepare.(*frame.RawFrame),
prepare: prepare.PreparedFrame,
origRequest: request,
})
if err != nil {
Expand Down Expand Up @@ -340,7 +340,10 @@ func (c *ClientConn) maybeCachePrepared(request Request, raw *frame.RawFrame) {
return
}
msg := frm.Body.Message.(*message.PreparedResult)
c.preparedCache.Store(hex.EncodeToString(msg.PreparedQueryId), request.Frame()) // Store frame so we can re-prepare
c.preparedCache.Store(hex.EncodeToString(msg.PreparedQueryId),
&PreparedEntry{
request.Frame().(*frame.RawFrame), // Store frame so we can re-prepare
})
}
}

Expand Down
22 changes: 19 additions & 3 deletions proxycore/clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,6 @@ func TestClientConn_Unprepared(t *testing.T) {
Executed
)

var preparedCache sync.Map
preparedId := []byte("abc")

state := atomic.NewInt32(Unprepared)
Expand Down Expand Up @@ -390,7 +389,8 @@ func TestClientConn_Unprepared(t *testing.T) {
prepareFrame, err := codec.ConvertToRawFrame(frame.NewFrame(supported, 0, &message.Prepare{Query: "SELECT * FROM test.test"}))
require.NoError(t, err)

preparedCache.Store(hex.EncodeToString(preparedId), prepareFrame)
var preparedCache testPrepareCache
preparedCache.Store(hex.EncodeToString(preparedId), &PreparedEntry{ prepareFrame})

cl, err := ConnectClient(ctx, &defaultEndpoint{"127.0.0.1:9042"}, ClientConnConfig{PreparedCache: &preparedCache})
defer func(cl *ClientConn) {
Expand All @@ -417,7 +417,6 @@ func TestClientConn_Unprepared(t *testing.T) {
}

func TestClientConn_UnpreparedNotCached(t *testing.T) {
var preparedCache sync.Map
preparedId := []byte("abc")

server := &MockServer{
Expand Down Expand Up @@ -450,6 +449,8 @@ func TestClientConn_UnpreparedNotCached(t *testing.T) {

logger, _ := zap.NewDevelopment()

var preparedCache testPrepareCache

cl, err := ConnectClient(ctx, &defaultEndpoint{"127.0.0.1:9042"},
ClientConnConfig{
PreparedCache: &preparedCache, // Empty cache
Expand All @@ -473,6 +474,21 @@ func TestClientConn_UnpreparedNotCached(t *testing.T) {
assert.True(t, ok, "expecting an unprepared response")
}

type testPrepareCache struct {
cache sync.Map
}

func (t *testPrepareCache) Store(id string, entry *PreparedEntry) {
t.cache.Store(id, entry)
}

func (t *testPrepareCache) Load(id string) (entry *PreparedEntry, ok bool) {
if val, ok := t.cache.Load(id); ok {
return val.(*PreparedEntry), true
}
return nil, false
}

type testPrepareRequest struct {
t *testing.T
wg *sync.WaitGroup
Expand Down
2 changes: 1 addition & 1 deletion proxycore/connpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type connPool struct {
ctx context.Context
config connPoolConfig
logger *zap.Logger
preparedCache *sync.Map
preparedCache PreparedCache
cancel context.CancelFunc
remaining int32
conns []*ClientConn
Expand Down
17 changes: 16 additions & 1 deletion proxycore/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"
"time"

"github.com/datastax/go-cassandra-native-protocol/frame"
"github.com/datastax/go-cassandra-native-protocol/primitive"
"go.uber.org/zap"
)
Expand All @@ -28,14 +29,28 @@ var (
NoConnForHost = errors.New("no connection available for host")
)

// PreparedEntry is an entry in the prepared cache.
type PreparedEntry struct {
PreparedFrame *frame.RawFrame
}

// PreparedCache a thread-safe cache for storing prepared queries.
type PreparedCache interface {
// Store add an entry to the cache.
Store(id string, entry *PreparedEntry)
// Load retrieves an entry from the cache. `ok` is true if the entry is present; otherwise it's false.
Load(id string) (entry *PreparedEntry, ok bool)
}

type SessionConfig struct {
ReconnectPolicy ReconnectPolicy
NumConns int
Keyspace string
Version primitive.ProtocolVersion
Auth Authenticator
Logger *zap.Logger
PreparedCache *sync.Map // A global cache share across sessions for storing previously prepared queries
// PreparedCache a global cache share across sessions for storing previously prepared queries
PreparedCache PreparedCache
ConnectTimeout time.Duration
}

Expand Down

0 comments on commit 147ef72

Please sign in to comment.