Skip to content

Commit

Permalink
fix(chore): surrogate use first storage (#463)
Browse files Browse the repository at this point in the history
* fix(chore): surrogate use first storage

* fix(redis): close connection when reconnecting

* fix(redis): Reconnection is now handled by rueidis itself

* fix(chore): use storer name to lowercase

* feat(plugins): vendorize Traefik & Tyk

* Update pkg/storage/redisProvider.go

Co-authored-by: Hussam Almarzooq <me@hussam.io>

---------

Co-authored-by: Hussam Almarzooq <me@hussam.io>
  • Loading branch information
darkweak and hussam-almarzoq committed Mar 27, 2024
1 parent ddc269d commit 37e0eac
Show file tree
Hide file tree
Showing 21 changed files with 45 additions and 129 deletions.
2 changes: 1 addition & 1 deletion pkg/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewHTTPCacheHandler(c configurationtypes.AbstractConfigurationInterface) *S
}
c.GetLogger().Debug("Storer initialized.")
regexpUrls := helpers.InitializeRegexp(c)
surrogateStorage := surrogate.InitializeSurrogate(c)
surrogateStorage := surrogate.InitializeSurrogate(c, storers[0].Name())
c.GetLogger().Debug("Surrogate storage initialized.")
var excludedRegexp *regexp.Regexp = nil
if c.GetDefaultCache().GetRegex().Exclude != "" {
Expand Down
87 changes: 2 additions & 85 deletions pkg/storage/redisProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"strings"
Expand All @@ -25,7 +24,6 @@ type Redis struct {
stale time.Duration
ctx context.Context
logger *zap.Logger
reconnecting bool
configuration redis.ClientOption
close func()
}
Expand Down Expand Up @@ -75,23 +73,13 @@ func (provider *Redis) Name() string {

// ListKeys method returns the list of existing keys
func (provider *Redis) ListKeys() []string {
if provider.reconnecting {
provider.logger.Sugar().Error("Impossible to list the redis keys while reconnecting.")
return []string{}
}

keys, _ := provider.inClient.Do(provider.ctx, provider.inClient.B().Keys().Pattern("*").Build()).AsStrSlice()

return keys
}

// MapKeys method returns the list of existing keys
func (provider *Redis) MapKeys(prefix string) map[string]string {
if provider.reconnecting {
provider.logger.Sugar().Error("Impossible to list the redis keys while reconnecting.")
return map[string]string{}
}

m := map[string]string{}
keys, _ := provider.inClient.Do(provider.ctx, provider.inClient.B().Keys().Pattern("*").Build()).AsStrSlice()
for _, key := range keys {
Expand All @@ -106,16 +94,9 @@ func (provider *Redis) MapKeys(prefix string) map[string]string {

// GetMultiLevel tries to load the key and check if one of linked keys is a fresh/stale candidate.
func (provider *Redis) GetMultiLevel(key string, req *http.Request, validator *rfc.Revalidator) (fresh *http.Response, stale *http.Response) {
if provider.reconnecting {
provider.logger.Sugar().Error("Impossible to get the redis key while reconnecting.")
return
}

b, e := provider.inClient.Do(provider.ctx, provider.inClient.B().Get().Key(MappingKeyPrefix+key).Build()).AsBytes()
if e != nil {
if !errors.Is(e, redis.Nil) && !provider.reconnecting {
go provider.Reconnect()
}
return fresh, stale
}

Expand All @@ -126,26 +107,15 @@ func (provider *Redis) GetMultiLevel(key string, req *http.Request, validator *r

// SetMultiLevel tries to store the key with the given value and update the mapping key to store metadata.
func (provider *Redis) SetMultiLevel(baseKey, variedKey string, value []byte, variedHeaders http.Header, etag string, duration time.Duration) error {
if provider.reconnecting {
provider.logger.Sugar().Error("Impossible to set the redis value while reconnecting.")
return fmt.Errorf("reconnecting error")
}

now := time.Now()
if err := provider.inClient.Do(provider.ctx, provider.inClient.B().Set().Key(variedKey).Value(string(value)).Ex(duration+provider.stale).Build()).Error(); err != nil {
if !provider.reconnecting {
go provider.Reconnect()
}
provider.logger.Sugar().Errorf("Impossible to set value into Redis, %v", err)
return err
}

mappingKey := MappingKeyPrefix + baseKey
v, e := provider.inClient.Do(provider.ctx, provider.inClient.B().Get().Key(mappingKey).Build()).AsBytes()
if e != nil && !errors.Is(e, redis.Nil) {
if !provider.reconnecting {
go provider.Reconnect()
}
return e
}

Expand All @@ -155,9 +125,6 @@ func (provider *Redis) SetMultiLevel(baseKey, variedKey string, value []byte, va
}

if e = provider.inClient.Do(provider.ctx, provider.inClient.B().Set().Key(mappingKey).Value(string(val)).Build()).Error(); e != nil {
if !provider.reconnecting {
go provider.Reconnect()
}
provider.logger.Sugar().Errorf("Impossible to set value into Redis, %v", e)
}

Expand All @@ -166,16 +133,8 @@ func (provider *Redis) SetMultiLevel(baseKey, variedKey string, value []byte, va

// Get method returns the populated response if exists, empty response then
func (provider *Redis) Get(key string) []byte {
if provider.reconnecting {
provider.logger.Sugar().Error("Impossible to get the redis key while reconnecting.")
return nil
}

r, e := provider.inClient.Do(provider.ctx, provider.inClient.B().Get().Key(key).Build()).AsBytes()
if e != nil && e != redis.Nil {
if !provider.reconnecting {
go provider.Reconnect()
}
return nil
}

Expand All @@ -184,10 +143,6 @@ func (provider *Redis) Get(key string) []byte {

// Prefix method returns the populated response if exists, empty response then
func (provider *Redis) Prefix(key string, req *http.Request, validator *rfc.Revalidator) *http.Response {
if provider.reconnecting {
provider.logger.Sugar().Error("Impossible to get the redis keys by prefix while reconnecting.")
return nil
}
in := make(chan *http.Response)
out := make(chan bool)

Expand Down Expand Up @@ -225,16 +180,8 @@ func (provider *Redis) Prefix(key string, req *http.Request, validator *rfc.Reva

// Set method will store the response in Etcd provider
func (provider *Redis) Set(key string, value []byte, url t.URL, duration time.Duration) error {
if provider.reconnecting {
provider.logger.Sugar().Error("Impossible to set the redis value while reconnecting.")
return fmt.Errorf("reconnecting error")
}

err := provider.inClient.Do(provider.ctx, provider.inClient.B().Set().Key(key).Value(string(value)).Ex(duration+provider.stale).Build()).Error()
if err != nil {
if !provider.reconnecting {
go provider.Reconnect()
}
provider.logger.Sugar().Errorf("Impossible to set value into Redis, %v", err)
}

Expand All @@ -243,20 +190,11 @@ func (provider *Redis) Set(key string, value []byte, url t.URL, duration time.Du

// Delete method will delete the response in Etcd provider if exists corresponding to key param
func (provider *Redis) Delete(key string) {
if provider.reconnecting {
provider.logger.Sugar().Error("Impossible to delete the redis key while reconnecting.")
return
}
_ = provider.inClient.Do(provider.ctx, provider.inClient.B().Del().Key(key).Build())
}

// DeleteMany method will delete the responses in Redis provider if exists corresponding to the regex key param
func (provider *Redis) DeleteMany(key string) {
if provider.reconnecting {
provider.logger.Sugar().Error("Impossible to delete the redis keys while reconnecting.")
return
}

keys, _ := provider.inClient.Do(provider.ctx, provider.inClient.B().Keys().Pattern(key).Build()).AsStrSlice()
_ = provider.inClient.Do(provider.ctx, provider.inClient.B().Del().Key(keys...).Build())
}
Expand All @@ -268,32 +206,11 @@ func (provider *Redis) Init() error {

// Reset method will reset or close provider
func (provider *Redis) Reset() error {
if provider.reconnecting {
provider.logger.Sugar().Error("Impossible to reset the redis instance while reconnecting.")
return nil
}
provider.close()
_ = provider.inClient.Do(provider.ctx, provider.inClient.B().Flushdb().Build())

return nil
}

func (provider *Redis) Reconnect() {
provider.reconnecting = true

cli, err := redis.NewClient(provider.configuration)
if err != nil {
time.Sleep(10 * time.Second)
provider.Reconnect()

return
}

provider.inClient = cli
provider.close = cli.Close
if provider.inClient != nil {
provider.reconnecting = false
} else {
time.Sleep(10 * time.Second)
provider.Reconnect()
}
provider.logger.Debug("Doing nothing on reconnect because rueidis handles it!")
}
4 changes: 2 additions & 2 deletions pkg/surrogate/providers/akamai.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type AkamaiSurrogateStorage struct {
url string
}

func generateAkamaiInstance(config configurationtypes.AbstractConfigurationInterface) *AkamaiSurrogateStorage {
func generateAkamaiInstance(config configurationtypes.AbstractConfigurationInterface, defaultStorerName string) *AkamaiSurrogateStorage {
cdn := config.GetDefaultCache().GetCDN()
a := &AkamaiSurrogateStorage{baseStorage: &baseStorage{}}

Expand All @@ -28,7 +28,7 @@ func generateAkamaiInstance(config configurationtypes.AbstractConfigurationInter
a.url += "/" + cdn.Network
}

a.init(config)
a.init(config, defaultStorerName)
a.parent = a

return a
Expand Down
4 changes: 2 additions & 2 deletions pkg/surrogate/providers/cloudflare.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type CloudflareSurrogateStorage struct {
zoneID string
}

func generateCloudflareInstance(config configurationtypes.AbstractConfigurationInterface) *CloudflareSurrogateStorage {
func generateCloudflareInstance(config configurationtypes.AbstractConfigurationInterface, defaultStorerName string) *CloudflareSurrogateStorage {
cdn := config.GetDefaultCache().GetCDN()
f := &CloudflareSurrogateStorage{
baseStorage: &baseStorage{},
Expand All @@ -27,7 +27,7 @@ func generateCloudflareInstance(config configurationtypes.AbstractConfigurationI
email: cdn.Email,
}

f.init(config)
f.init(config, defaultStorerName)
f.parent = f

return f
Expand Down
9 changes: 4 additions & 5 deletions pkg/surrogate/providers/common.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package providers

import (
"fmt"
"net/http"
"net/url"
"regexp"
Expand Down Expand Up @@ -99,7 +98,7 @@ type baseStorage struct {
duration time.Duration
}

func (s *baseStorage) init(config configurationtypes.AbstractConfigurationInterface) {
func (s *baseStorage) init(config configurationtypes.AbstractConfigurationInterface, defaultStorerName string) {
if configuration, ok := config.GetSurrogateKeys()["_configuration"]; ok {
instanciator, err := storage.NewStorageFromName(configuration.SurrogateConfiguration.Storer)
if err != nil {
Expand All @@ -108,15 +107,15 @@ func (s *baseStorage) init(config configurationtypes.AbstractConfigurationInterf

storer, err := instanciator(config)
if err != nil {
panic(fmt.Sprintf("Impossible to instanciate the storer for the surrogate-keys: %v", err))
s.logger.Sugar().Errorf("Impossible to instanciate the storer for the surrogate-keys: %v", err)
}

s.Storage = storer
} else {
instanciator, _ := storage.NewStorageFromName("nuts")
instanciator, _ := storage.NewStorageFromName(strings.ToLower(defaultStorerName))
storer, err := instanciator(config)
if err != nil {
panic(fmt.Sprintf("Impossible to instanciate the storer for the surrogate-keys: %v", err))
s.logger.Sugar().Errorf("Impossible to instanciate the storer %s for the surrogate-keys: %v", defaultStorerName, err)
}

s.Storage = storer
Expand Down
10 changes: 5 additions & 5 deletions pkg/surrogate/providers/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ import (
)

// SurrogateFactory generate a SurrogateInterface instance
func SurrogateFactory(config configurationtypes.AbstractConfigurationInterface) SurrogateInterface {
func SurrogateFactory(config configurationtypes.AbstractConfigurationInterface, defaultStorerName string) SurrogateInterface {
cdn := config.GetDefaultCache().GetCDN()

switch cdn.Provider {
case "akamai":
return generateAkamaiInstance(config)
return generateAkamaiInstance(config, defaultStorerName)
case "cloudflare":
return generateCloudflareInstance(config)
return generateCloudflareInstance(config, defaultStorerName)
case "fastly":
return generateFastlyInstance(config)
return generateFastlyInstance(config, defaultStorerName)
default:
return generateSouinInstance(config)
return generateSouinInstance(config, defaultStorerName)
}
}
6 changes: 3 additions & 3 deletions pkg/surrogate/providers/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ func TestSurrogateFactory(t *testing.T) {
fastlyConfiguration := mockConfiguration(cdnConfigurationFastly)
souinConfiguration := mockConfiguration(cdnConfigurationSouin)

akamaiProvider := SurrogateFactory(akamaiConfiguration)
fastlyProvider := SurrogateFactory(fastlyConfiguration)
souinProvider := SurrogateFactory(souinConfiguration)
akamaiProvider := SurrogateFactory(akamaiConfiguration, "nuts")
fastlyProvider := SurrogateFactory(fastlyConfiguration, "nuts")
souinProvider := SurrogateFactory(souinConfiguration, "nuts")

if akamaiProvider == nil {
errors.GenerateError(t, "Impossible to create the Akamai surrogate provider instance")
Expand Down
4 changes: 2 additions & 2 deletions pkg/surrogate/providers/fastly.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type FastlySurrogateStorage struct {
strategy string
}

func generateFastlyInstance(config configurationtypes.AbstractConfigurationInterface) *FastlySurrogateStorage {
func generateFastlyInstance(config configurationtypes.AbstractConfigurationInterface, defaultStorerName string) *FastlySurrogateStorage {
cdn := config.GetDefaultCache().GetCDN()
f := &FastlySurrogateStorage{
baseStorage: &baseStorage{},
Expand All @@ -28,7 +28,7 @@ func generateFastlyInstance(config configurationtypes.AbstractConfigurationInter
f.strategy = "1"
}

f.init(config)
f.init(config, defaultStorerName)
f.parent = f

return f
Expand Down
4 changes: 2 additions & 2 deletions pkg/surrogate/providers/souin.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ type SouinSurrogateStorage struct {
*baseStorage
}

func generateSouinInstance(config configurationtypes.AbstractConfigurationInterface) *SouinSurrogateStorage {
func generateSouinInstance(config configurationtypes.AbstractConfigurationInterface, defaultStorerName string) *SouinSurrogateStorage {
s := &SouinSurrogateStorage{baseStorage: &baseStorage{}}

s.init(config)
s.init(config, defaultStorerName)
s.parent = s

return s
Expand Down
4 changes: 2 additions & 2 deletions pkg/surrogate/surrogate.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,6 @@ import (
)

// InitializeSurrogate will initialize the Surrogate-Key storage system
func InitializeSurrogate(configurationInterface configurationtypes.AbstractConfigurationInterface) providers.SurrogateInterface {
return providers.SurrogateFactory(configurationInterface)
func InitializeSurrogate(configurationInterface configurationtypes.AbstractConfigurationInterface, storageName string) providers.SurrogateInterface {
return providers.SurrogateFactory(configurationInterface, storageName)
}
2 changes: 1 addition & 1 deletion plugins/traefik/override/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewHTTPCacheHandler(c configurationtypes.AbstractConfigurationInterface) *S
}
fmt.Println("Storers initialized.")
regexpUrls := helpers.InitializeRegexp(c)
surrogateStorage := surrogate.InitializeSurrogate(c)
surrogateStorage := surrogate.InitializeSurrogate(c, storers[0].Name())
fmt.Println("Surrogate storage initialized.")
var excludedRegexp *regexp.Regexp = nil
if c.GetDefaultCache().GetRegex().Exclude != "" {
Expand Down
2 changes: 1 addition & 1 deletion plugins/traefik/override/surrogate/providers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type baseStorage struct {
duration time.Duration
}

func (s *baseStorage) init(config configurationtypes.AbstractConfigurationInterface) {
func (s *baseStorage) init(config configurationtypes.AbstractConfigurationInterface, _ string) {
storers, err := storage.NewStorages(config)
if err != nil {
panic(fmt.Sprintf("Impossible to instanciate the storer for the surrogate-keys: %v", err))
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 37e0eac

Please sign in to comment.