Skip to content

Commit

Permalink
move RPC transport and analytics purger into package (#1923)
Browse files Browse the repository at this point in the history
Another potentially better approach for TykTechnologies/tyk-pump#80
This approach allows to move some re-usable part of RPC without changing RPC server side.

if it works then it cancels #1922 and TykTechnologies/tyk-sink#51

The idea is to do `rpc.Connect(...)` where we supply connection info, a couple of call backs for emergency mode and map of rpc funcs definitions to add to RPC client dispatcher.

Then we can create `rpc.Purger` with all needed fields and run `PurgeLoop` with some ticker.
  • Loading branch information
dencoded authored and buger committed Oct 4, 2018
1 parent c0b351b commit e640f4d
Show file tree
Hide file tree
Showing 11 changed files with 713 additions and 577 deletions.
8 changes: 5 additions & 3 deletions api_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"text/template"
"time"

"github.com/TykTechnologies/tyk/rpc"

"github.com/Sirupsen/logrus"
"github.com/rubyist/circuitbreaker"

Expand Down Expand Up @@ -341,11 +343,11 @@ func (a APIDefinitionLoader) FromDashboardService(endpoint, secret string) ([]*A

// FromCloud will connect and download ApiDefintions from a Mongo DB instance.
func (a APIDefinitionLoader) FromRPC(orgId string) []*APISpec {
if rpcEmergencyMode {
if rpc.IsEmergencyMode() {
return LoadDefinitionsFromRPCBackup()
}

store := RPCStorageHandler{UserKey: config.Global().SlaveOptions.APIKey, Address: config.Global().SlaveOptions.ConnectionString}
store := RPCStorageHandler{}
if !store.Connect() {
return nil
}
Expand All @@ -361,7 +363,7 @@ func (a APIDefinitionLoader) FromRPC(orgId string) []*APISpec {

//store.Disconnect()

if rpcLoadCount > 0 {
if rpc.LoadCount() > 0 {
saveRPCDefinitionsBackup(apiCollection)
}

Expand Down
4 changes: 2 additions & 2 deletions api_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func prepareStorage() (storage.RedisCluster, storage.RedisCluster, storage.Redis
redisStore := storage.RedisCluster{KeyPrefix: "apikey-", HashKeys: config.Global().HashKeys}
redisOrgStore := storage.RedisCluster{KeyPrefix: "orgkey."}
healthStore := storage.RedisCluster{KeyPrefix: "apihealth."}
rpcAuthStore := RPCStorageHandler{KeyPrefix: "apikey-", HashKeys: config.Global().HashKeys, UserKey: config.Global().SlaveOptions.APIKey, Address: config.Global().SlaveOptions.ConnectionString}
rpcOrgStore := RPCStorageHandler{KeyPrefix: "orgkey.", UserKey: config.Global().SlaveOptions.APIKey, Address: config.Global().SlaveOptions.ConnectionString}
rpcAuthStore := RPCStorageHandler{KeyPrefix: "apikey-", HashKeys: config.Global().HashKeys}
rpcOrgStore := RPCStorageHandler{KeyPrefix: "orgkey."}

FallbackKeySesionManager.Init(&redisStore)

Expand Down
3 changes: 3 additions & 0 deletions cli/lint/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,9 @@ const confSchema = `{
},
"regexp_cache_expire": {
"type": "integer"
},
"proxy_ssl_disable_renegotiation": {
"type": "boolean"
}
}
}`
22 changes: 14 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/TykTechnologies/tyk/config"
logger "github.com/TykTechnologies/tyk/log"
"github.com/TykTechnologies/tyk/regexp"
"github.com/TykTechnologies/tyk/rpc"
"github.com/TykTechnologies/tyk/storage"
"github.com/TykTechnologies/tyk/user"
)
Expand Down Expand Up @@ -158,7 +159,12 @@ func setupGlobals() {

rpcPurgeOnce.Do(func() {
store := storage.RedisCluster{KeyPrefix: "analytics-"}
purger := RPCPurger{Store: &store}
purger := rpc.Purger{
Store: &store,
AnalyticsRecordFunc: func() interface{} {
return AnalyticsRecord{}
},
}
purger.Connect()
go purger.PurgeLoop(rpcPurgeTicker)
})
Expand Down Expand Up @@ -810,6 +816,10 @@ func initialiseSystem() error {
mainLog.Fatal("Redis connection details not set, please ensure that the storage type is set to Redis and that the connection parameters are correct.")
}

// suply rpc client globals to join it main loging and instrumentation sub systems
rpc.Log = log
rpc.Instrument = instrument

setupGlobals()

if *cli.Port != "" {
Expand Down Expand Up @@ -854,8 +864,8 @@ func afterConfSetup(conf *config.Config) {
conf.SlaveOptions.PingTimeout = 60
}

GlobalRPCPingTimeout = time.Second * time.Duration(conf.SlaveOptions.PingTimeout)
GlobalRPCCallTimeout = time.Second * time.Duration(conf.SlaveOptions.CallTimeout)
rpc.GlobalRPCPingTimeout = time.Second * time.Duration(conf.SlaveOptions.PingTimeout)
rpc.GlobalRPCCallTimeout = time.Second * time.Duration(conf.SlaveOptions.CallTimeout)
initGenericEventHandlers(conf)
regexp.ResetCache(time.Second*time.Duration(conf.RegexpCacheExpire), !conf.DisableRegexpCache)
}
Expand All @@ -880,8 +890,6 @@ func getGlobalStorageHandler(keyPrefix string, hashKeys bool) storage.Handler {
return &RPCStorageHandler{
KeyPrefix: keyPrefix,
HashKeys: hashKeys,
UserKey: config.Global().SlaveOptions.APIKey,
Address: config.Global().SlaveOptions.ConnectionString,
}
}
return storage.RedisCluster{KeyPrefix: keyPrefix, HashKeys: hashKeys}
Expand Down Expand Up @@ -1077,8 +1085,6 @@ func start() {
mainLog.Debug("Starting RPC reload listener")
RPCListener = RPCStorageHandler{
KeyPrefix: "rpc.listener.",
UserKey: slaveOptions.APIKey,
Address: slaveOptions.ConnectionString,
SuppressRegister: true,
}

Expand Down Expand Up @@ -1337,7 +1343,7 @@ func listen(listener, controlListener net.Listener, err error) {
fmt.Fprintf(w, "Hello Tiki")
})

if !rpcEmergencyMode {
if !rpc.IsEmergencyMode() {
doReload()
}
}
7 changes: 4 additions & 3 deletions policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
"os"
"time"

"github.com/TykTechnologies/tyk/rpc"

"github.com/Sirupsen/logrus"

"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/user"
)

Expand Down Expand Up @@ -157,11 +158,11 @@ func parsePoliciesFromRPC(list string) (map[string]user.Policy, error) {
}

func LoadPoliciesFromRPC(orgId string) map[string]user.Policy {
if rpcEmergencyMode {
if rpc.IsEmergencyMode() {
return LoadPoliciesFromRPCBackup()
}

store := &RPCStorageHandler{UserKey: config.Global().SlaveOptions.APIKey, Address: config.Global().SlaveOptions.ConnectionString}
store := &RPCStorageHandler{}
if !store.Connect() {
return nil
}
Expand Down
38 changes: 38 additions & 0 deletions redis_analytics_purger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"time"

"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/storage"
)

// Purger is an interface that will define how the in-memory store will be purged
// of analytics data to prevent it growing too large
type Purger interface {
PurgeCache()
PurgeLoop(<-chan time.Time)
}

type RedisPurger struct {
Store storage.Handler
}

func (r RedisPurger) PurgeLoop(ticker <-chan time.Time) {
for {
<-ticker
r.PurgeCache()
}
}

func (r *RedisPurger) PurgeCache() {
expireAfter := config.Global().AnalyticsConfig.StorageExpirationTime
if expireAfter == 0 {
expireAfter = 60 // 1 minute
}

exp, _ := r.Store.GetExp(analyticsKeyName)
if exp <= 0 {
r.Store.SetExp(analyticsKeyName, int64(expireAfter))
}
}
91 changes: 91 additions & 0 deletions rpc/rpc_analytics_purger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package rpc

import (
"encoding/json"
"time"

"gopkg.in/vmihailenco/msgpack.v2"

"github.com/TykTechnologies/tyk/storage"
)

const analyticsKeyName = "tyk-system-analytics"

// RPCPurger will purge analytics data into a Mongo database, requires that the Mongo DB string is specified
// in the Config object
type Purger struct {
Store storage.Handler
AnalyticsRecordFunc func() interface{}
}

// Connect Connects to RPC
func (r *Purger) Connect() {
if !clientIsConnected {
Log.Error("RPC client is not connected, use Connect method 1st")
}

// setup RPC func if needed
if !addedFuncs["Ping"] {
dispatcher.AddFunc("Ping", func() bool {
return false
})
addedFuncs["Ping"] = true
}
if !addedFuncs["PurgeAnalyticsData"] {
dispatcher.AddFunc("PurgeAnalyticsData", func(data string) error {
return nil
})
addedFuncs["PurgeAnalyticsData"] = true
}

Log.Info("RPC Analytics client using singleton")
}

// PurgeLoop starts the loop that will pull data out of the in-memory
// store and into RPC.
func (r Purger) PurgeLoop(ticker <-chan time.Time) {
for {
<-ticker
r.PurgeCache()
}
}

// PurgeCache will pull the data from the in-memory store and drop it into the specified MongoDB collection
func (r *Purger) PurgeCache() {
if !clientIsConnected {
Log.Error("RPC client is not connected, use Connect method 1st")
}

if _, err := FuncClientSingleton("Ping", nil); err != nil {
Log.WithError(err).Error("Can't purge cache, failed to ping RPC")
return
}

analyticsValues := r.Store.GetAndDeleteSet(analyticsKeyName)
if len(analyticsValues) == 0 {
return
}
keys := make([]interface{}, len(analyticsValues))

for i, v := range analyticsValues {
decoded := r.AnalyticsRecordFunc()
if err := msgpack.Unmarshal(v.([]byte), &decoded); err != nil {
Log.WithError(err).Error("Couldn't unmarshal analytics data")
} else {
Log.WithField("decoded", decoded).Debug("Decoded Record")
keys[i] = decoded
}
}

data, err := json.Marshal(keys)
if err != nil {
Log.WithError(err).Error("Failed to marshal analytics data")
return
}

// Send keys to RPC
if _, err := FuncClientSingleton("PurgeAnalyticsData", string(data)); err != nil {
EmitErrorEvent(FuncClientSingletonCall, "PurgeAnalyticsData", err)
Log.Warn("Failed to call purge, retrying: ", err)
}
}
Loading

0 comments on commit e640f4d

Please sign in to comment.