Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions schemaregistry/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type Cache interface {
// Parameters:
// * `key` - the key to delete
Delete(key interface{})
// Clear clears the cache
Clear()
// ToMap returns the current cache entries copied into a map
ToMap() map[interface{}]interface{}
}
13 changes: 13 additions & 0 deletions schemaregistry/cache/lrucache.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,19 @@ func (c *LRUCache) Delete(key interface{}) {
}
}

// Clear clears the cache
func (c *LRUCache) Clear() {
c.cacheLock.Lock()
for key, value := range c.lruElements {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a more efficient way to clear than iterating through all the elements?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is idomatic go before go 1.21, where they added a clear() method. In any case, these maps are not expected to get that large as they store one entry per subject

delete(c.lruElements, key)
c.lruKeys.Remove(value)
}
for key := range c.entries {
delete(c.entries, key)
}
c.cacheLock.Unlock()
}

// ToMap returns the current cache entries copied into a map
func (c *LRUCache) ToMap() map[interface{}]interface{} {
ret := make(map[interface{}]interface{})
Expand Down
7 changes: 7 additions & 0 deletions schemaregistry/cache/mapcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ func (c *MapCache) Delete(key interface{}) {
delete(c.entries, key)
}

// Clear clears the cache
func (c *MapCache) Clear() {
for key := range c.entries {
delete(c.entries, key)
}
}

// ToMap returns the current cache entries copied into a map
func (c *MapCache) ToMap() map[interface{}]interface{} {
ret := make(map[interface{}]interface{})
Expand Down
2 changes: 2 additions & 0 deletions schemaregistry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type Config struct {
RequestTimeoutMs int
// CacheCapacity positive integer or zero for unbounded capacity
CacheCapacity int
// CacheLatestTTLSecs ttl in secs for caching the latest schema
CacheLatestTTLSecs int

// HTTP client
HTTPClient *http.Client
Expand Down
68 changes: 67 additions & 1 deletion schemaregistry/schemaregistry_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"encoding/json"
"fmt"
"net/url"
"runtime"
"strings"
"sync"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/cache"
)
Expand Down Expand Up @@ -189,6 +191,9 @@ type client struct {
schemaToVersionCacheLock sync.RWMutex
versionToSchemaCache cache.Cache
versionToSchemaCacheLock sync.RWMutex
latestToSchemaCache cache.Cache
latestToSchemaCacheLock sync.RWMutex
evictor *evictor
}

var _ Client = new(client)
Expand Down Expand Up @@ -243,6 +248,7 @@ func NewClient(conf *Config) (Client, error) {
var idToSchemaCache cache.Cache
var schemaToVersionCache cache.Cache
var versionToSchemaCache cache.Cache
var latestToSchemaCache cache.Cache
if conf.CacheCapacity != 0 {
schemaToIDCache, err = cache.NewLRUCache(conf.CacheCapacity)
if err != nil {
Expand All @@ -260,18 +266,28 @@ func NewClient(conf *Config) (Client, error) {
if err != nil {
return nil, err
}
latestToSchemaCache, err = cache.NewLRUCache(conf.CacheCapacity)
if err != nil {
return nil, err
}
} else {
schemaToIDCache = cache.NewMapCache()
idToSchemaCache = cache.NewMapCache()
schemaToVersionCache = cache.NewMapCache()
versionToSchemaCache = cache.NewMapCache()
latestToSchemaCache = cache.NewMapCache()
}
handle := &client{
restService: restService,
schemaToIDCache: schemaToIDCache,
idToSchemaCache: idToSchemaCache,
schemaToVersionCache: schemaToVersionCache,
versionToSchemaCache: versionToSchemaCache,
latestToSchemaCache: latestToSchemaCache,
}
if conf.CacheLatestTTLSecs > 0 {
runEvictor(handle, time.Duration(conf.CacheLatestTTLSecs)*time.Second)
runtime.SetFinalizer(handle, stopEvictor)
}
return handle, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coulse use runtime.SetFinalizer with handle that stops the ticker and a done channel to exit the for loop in the spawned goroutine

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let me make that change

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emasab , I added the finalizer

}
Expand Down Expand Up @@ -393,7 +409,26 @@ func (c *client) GetID(subject string, schema SchemaInfo, normalize bool) (id in
// GetLatestSchemaMetadata fetches latest version registered with the provided subject
// Returns SchemaMetadata object
func (c *client) GetLatestSchemaMetadata(subject string) (result SchemaMetadata, err error) {
return c.GetSchemaMetadata(subject, -1)
c.latestToSchemaCacheLock.RLock()
metadataValue, ok := c.latestToSchemaCache.Get(subject)
c.latestToSchemaCacheLock.RUnlock()
if ok {
return *metadataValue.(*SchemaMetadata), nil
}

c.latestToSchemaCacheLock.Lock()
// another goroutine could have already put it in cache
metadataValue, ok = c.latestToSchemaCache.Get(subject)
if !ok {
err = c.restService.handleRequest(newRequest("GET", versions, nil, url.PathEscape(subject), "latest"), &result)
if err == nil {
c.latestToSchemaCache.Put(subject, &result)
}
} else {
result = *metadataValue.(*SchemaMetadata)
}
c.latestToSchemaCacheLock.Unlock()
return result, err
}

// GetSchemaMetadata fetches the requested subject schema identified by version
Expand Down Expand Up @@ -687,3 +722,34 @@ func (c *client) UpdateDefaultCompatibility(update Compatibility) (compatibility

return result.CompatibilityUpdate, err
}

type evictor struct {
Interval time.Duration
stop chan bool
}

func (e *evictor) Run(c cache.Cache) {
ticker := time.NewTicker(e.Interval)
for {
select {
case <-ticker.C:
c.Clear()
case <-e.stop:
ticker.Stop()
return
}
}
}

func stopEvictor(c *client) {
c.evictor.stop <- true
}

func runEvictor(c *client, ci time.Duration) {
e := &evictor{
Interval: ci,
stop: make(chan bool),
}
c.evictor = e
go e.Run(c.latestToSchemaCache)
}