Skip to content

Commit

Permalink
feat(plugins): sync middleware and storage code to upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
darkweak committed Apr 19, 2024
1 parent 25215e8 commit 7c3b8ba
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 64 deletions.
8 changes: 4 additions & 4 deletions pkg/storage/redisProvider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,13 @@ func TestRedis_MapKeys(t *testing.T) {
func TestRedis_DeleteMany(t *testing.T) {
client, _ := RedisConnectionFactory(tests.MockConfiguration(tests.RedisConfiguration))

if len(client.MapKeys("")) != 11 {
t.Error("The map should contain 11 elements")
if len(client.MapKeys("")) != 12 {
t.Error("The map should contain 12 elements")
}

client.DeleteMany("MAP_KEYS_PREFIX_*")
if len(client.MapKeys("")) != 1 {
t.Error("The map should contain 1 element")
if len(client.MapKeys("")) != 2 {
t.Error("The map should contain 2 element")
}

client.DeleteMany("*")
Expand Down
8 changes: 4 additions & 4 deletions plugins/kratos/souin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func Test_HttpcacheKratosPlugin_NewHTTPCacheFilter_API(t *testing.T) {
}
b, _ := io.ReadAll(rs.Body)
res.Result().Body.Close()
if string(b) != "[\"IDX_GET-http-example.com-/handled\"]" {
if string(b) != "[\"GET-http-example.com-/handled\"]" {
t.Error("The response body must be an empty array because no request has been stored")
}
req2 := httptest.NewRequest(http.MethodGet, "/handled", nil)
Expand All @@ -158,10 +158,10 @@ func Test_HttpcacheKratosPlugin_NewHTTPCacheFilter_API(t *testing.T) {
rs.Body.Close()
var payload []string
_ = json.Unmarshal(b, &payload)
if len(payload) != 2 {
t.Error("The system must store 2 items, the fresh and the stale one")
if len(payload) != 1 {
t.Error("The system must store 1 item, except the mapping")
}
if payload[0] != "GET-http-example.com-/handled" || payload[1] != "IDX_GET-http-example.com-/handled" {
if payload[0] != "GET-http-example.com-/handled" {
t.Error("The payload items mismatch from the expectations.")
}
}
4 changes: 2 additions & 2 deletions plugins/tyk/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ toolchain go1.21.0

require (
github.com/TykTechnologies/tyk v1.9.2-0.20230330071232-370295d796b5
github.com/cespare/xxhash v1.1.0
github.com/darkweak/souin v1.6.47
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pierrec/lz4/v4 v4.1.21
github.com/pquerna/cachecontrol v0.2.0
go.uber.org/zap v1.26.0
)
Expand All @@ -30,7 +32,6 @@ require (
github.com/buraksezer/consistent v0.10.0 // indirect
github.com/buraksezer/olric v0.5.4 // indirect
github.com/bwmarrin/snowflake v0.3.0 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/clbanning/mxj v1.8.4 // indirect
github.com/coreos/go-semver v0.3.1 // indirect
Expand Down Expand Up @@ -99,7 +100,6 @@ require (
github.com/nats-io/nuid v1.0.1 // indirect
github.com/nutsdb/nutsdb v0.14.3 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmylund/go-cache v2.1.0+incompatible // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
Expand Down
15 changes: 11 additions & 4 deletions plugins/tyk/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/TykTechnologies/tyk/ctx"
"github.com/cespare/xxhash"
"github.com/darkweak/souin/context"
"github.com/darkweak/souin/pkg/middleware"
"github.com/darkweak/souin/pkg/rfc"
Expand Down Expand Up @@ -189,10 +190,16 @@ func SouinRequestHandler(rw http.ResponseWriter, baseRq *http.Request) {
defer s.bufPool.Put(bufPool)
if !requestCc.NoCache {
validator := rfc.ParseRequest(rq)
var response *http.Response
for _, currentStorer := range s.SouinBaseHandler.Storers {
response = currentStorer.Prefix(cachedKey, rq, validator)
if response != nil {
var fresh, stale *http.Response
finalKey := cachedKey
if rq.Context().Value(context.Hashed).(bool) {
finalKey = fmt.Sprint(xxhash.Sum64String(finalKey))
}
for _, currentStorer := range s.Storers {
fresh, stale = currentStorer.GetMultiLevel(finalKey, rq, validator)

if fresh != nil || stale != nil {
s.Configuration.GetLogger().Sugar().Debugf("Found at least one valid response in the %s storage", currentStorer.Name())
break
}
}
Expand Down
114 changes: 114 additions & 0 deletions plugins/tyk/override/storage/abstractProvider.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
package storage

import (
"bufio"
"bytes"
"encoding/gob"
"net/http"
"net/url"
"strings"
"time"

"github.com/darkweak/souin/configurationtypes"
"github.com/darkweak/souin/pkg/rfc"
"github.com/darkweak/souin/pkg/storage/types"
"github.com/pierrec/lz4/v4"
"go.uber.org/zap"
)

const (
encodedHeaderSemiColonSeparator = "%3B"
encodedHeaderColonSeparator = "%3A"
StalePrefix = "STALE_"
surrogatePrefix = "SURROGATE_"
MappingKeyPrefix = "IDX_"
)

type StorerInstanciator func(configurationtypes.AbstractConfigurationInterface) (types.Storer, error)
Expand Down Expand Up @@ -58,3 +66,109 @@ func varyVoter(baseKey string, req *http.Request, currentKey string) bool {

return false
}
func decodeMapping(item []byte) (mapping types.StorageMapper, e error) {
e = gob.NewDecoder(bytes.NewBuffer(item)).Decode(&mapping)

return
}

func mappingElection(provider types.Storer, item []byte, req *http.Request, validator *rfc.Revalidator, logger *zap.Logger) (resultFresh *http.Response, resultStale *http.Response, e error) {
var mapping types.StorageMapper
if len(item) != 0 {
mapping, e = decodeMapping(item)
if e != nil {
return resultFresh, resultStale, e
}
}

for keyName, keyItem := range mapping.Mapping {
valid := true
for hname, hval := range keyItem.VariedHeaders {
if req.Header.Get(hname) != strings.Join(hval, ", ") {
valid = false
break
}
}

if !valid {
continue
}

rfc.ValidateETagFromHeader(keyItem.Etag, validator)
if validator.Matched {
// If the key is fresh enough.
if time.Since(keyItem.FreshTime) < 0 {
response := provider.Get(keyName)
if response != nil {
bufW := new(bytes.Buffer)
reader := lz4.NewReader(bytes.NewBuffer(response))
_, _ = reader.WriteTo(bufW)
if resultFresh, e = http.ReadResponse(bufio.NewReader(bufW), req); e != nil {
logger.Sugar().Errorf("An error occured while reading response for the key %s: %v", string(keyName), e)
return
}

logger.Sugar().Debugf("The stored key %s matched the current iteration key ETag %+v", string(keyName), validator)
return
}
}

// If the key is still stale.
if time.Since(keyItem.StaleTime) < 0 {
response := provider.Get(keyName)
if response != nil {
bufW := new(bytes.Buffer)
reader := lz4.NewReader(bytes.NewBuffer(response))
_, _ = reader.WriteTo(bufW)
if resultStale, e = http.ReadResponse(bufio.NewReader(bufW), req); e != nil {
logger.Sugar().Errorf("An error occured while reading response for the key %s: %v", string(keyName), e)
return
}

logger.Sugar().Debugf("The stored key %s matched the current iteration key ETag %+v as stale", string(keyName), validator)
}
}
} else {
logger.Sugar().Debugf("The stored key %s didn't match the current iteration key ETag %+v", string(keyName), validator)
}
}

return
}

func mappingUpdater(key string, item []byte, logger *zap.Logger, now, freshTime, staleTime time.Time, variedHeaders http.Header, etag, realKey string) (val []byte, e error) {
var mapping types.StorageMapper
if len(item) == 0 {
mapping = types.StorageMapper{}
} else {
e = gob.NewDecoder(bytes.NewBuffer(item)).Decode(&mapping)
if e != nil {
logger.Sugar().Errorf("Impossible to decode the key %s, %v", key, e)
return nil, e
}
}

if mapping.Mapping == nil {
mapping.Mapping = make(map[string]types.KeyIndex)
}

mapping.Mapping[key] = types.KeyIndex{
StoredAt: now,
FreshTime: freshTime,
StaleTime: staleTime,
VariedHeaders: variedHeaders,
Etag: etag,
RealKey: realKey,
}

buf := new(bytes.Buffer)
e = gob.NewEncoder(buf).Encode(mapping)
if e != nil {
logger.Sugar().Errorf("Impossible to encode the mapping value for the key %s, %v", key, e)
return nil, e
}

val = buf.Bytes()

return val, e
}
17 changes: 0 additions & 17 deletions plugins/tyk/override/storage/abstractProvider_test.go

This file was deleted.

97 changes: 65 additions & 32 deletions plugins/tyk/override/storage/cacheProvider.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package storage

import (
"bufio"
"bytes"
"errors"
"net/http"
"regexp"
"strings"
Expand All @@ -12,18 +12,25 @@ import (
"github.com/darkweak/souin/pkg/rfc"
"github.com/darkweak/souin/pkg/storage/types"
"github.com/patrickmn/go-cache"
"github.com/pierrec/lz4/v4"
"go.uber.org/zap"
)

// Cache provider type
type Cache struct {
*cache.Cache
stale time.Duration
logger *zap.Logger
stale time.Duration
}

// CacheConnectionFactory function create new Cache instance
func CacheConnectionFactory(c t.AbstractConfigurationInterface) (types.Storer, error) {
provider := cache.New(1*time.Second, 1*time.Second)
return &Cache{Cache: provider, stale: c.GetDefaultCache().GetStale()}, nil
return &Cache{
Cache: provider,
logger: c.GetLogger(),
stale: c.GetDefaultCache().GetStale(),
}, nil
}

// Name returns the storer name
Expand Down Expand Up @@ -65,34 +72,11 @@ func (provider *Cache) Get(key string) []byte {
}

// Prefix method returns the populated response if exists, empty response then
func (provider *Cache) Prefix(key string, req *http.Request, validator *rfc.Revalidator) *http.Response {
var result *http.Response

for k, v := range provider.Items() {
if k == key {
if res, err := http.ReadResponse(bufio.NewReader(bytes.NewBuffer(v.Object.([]byte))), req); err == nil {
rfc.ValidateETag(res, validator)
if validator.Matched {
result = res
}
}

return result
}

if !strings.HasPrefix(k, key) {
continue
}

if varyVoter(key, req, k) {
if res, err := http.ReadResponse(bufio.NewReader(bytes.NewBuffer(v.Object.([]byte))), req); err == nil {
rfc.ValidateETag(res, validator)
if validator.Matched {
result = res
}
}

return result
func (provider *Cache) Prefix(key string) []string {
result := []string{}
for k := range provider.Items() {
if strings.HasPrefix(k, key) {
result = append(result, k)
}
}

Expand All @@ -106,7 +90,6 @@ func (provider *Cache) Set(key string, value []byte, url t.URL, duration time.Du
}

provider.Cache.Set(key, value, duration)
provider.Cache.Set(StalePrefix+key, value, provider.stale+duration)

return nil
}
Expand Down Expand Up @@ -142,3 +125,53 @@ func (provider *Cache) Reset() error {

return nil
}

func (provider *Cache) GetMultiLevel(key string, req *http.Request, validator *rfc.Revalidator) (fresh *http.Response, stale *http.Response) {
r, found := provider.Cache.Get(MappingKeyPrefix + key)
if !found {
return
}

v, ok := r.([]byte)
if !ok {
return
}

if len(v) > 0 {
fresh, stale, _ = mappingElection(provider, v, req, validator, provider.logger)
}

return fresh, stale
}
func (provider *Cache) SetMultiLevel(baseKey, variedKey string, value []byte, variedHeaders http.Header, etag string, duration time.Duration, realKey string) error {
now := time.Now()

var e error
compressed := new(bytes.Buffer)
if _, e := lz4.NewWriter(compressed).ReadFrom(bytes.NewReader(value)); e != nil {
provider.logger.Sugar().Errorf("Impossible to compress the key %s into storage, %v", variedKey, e)
return e
}

provider.Cache.Set(variedKey, compressed.Bytes(), duration+provider.stale)
mappingKey := MappingKeyPrefix + baseKey
r, found := provider.Cache.Get(mappingKey)
if !found {
return errors.New("key not found")
}

val, ok := r.([]byte)
if !ok {
return errors.New("value is not a byte slice")
}

val, e = mappingUpdater(variedKey, val, provider.logger, now, now.Add(duration), now.Add(duration+provider.stale), variedHeaders, etag, realKey)
if e != nil {
return e
}

provider.logger.Sugar().Debugf("Store the new mapping for the key %s in storage", variedKey)
provider.Cache.Set(mappingKey, val, -1)

return nil
}
Loading

0 comments on commit 7c3b8ba

Please sign in to comment.