Skip to content

Commit

Permalink
Fix RPC memory leaks (#485)
Browse files Browse the repository at this point in the history
It is not completely clear why RCP purger creates so many connections.
But seems like using global RPCFuncClientSingleton and removing reconnect logic fix the issue.
  • Loading branch information
buger committed Feb 10, 2017
1 parent a8a4643 commit 73e4cd2
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 40 deletions.
23 changes: 18 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ func setupGlobals() {

if config.AnalyticsConfig.Type == "rpc" {
log.Debug("Using RPC cache purge")
purger := RPCPurger{Store: &AnalyticsStore, Address: config.SlaveOptions.ConnectionString}

purger := RPCPurger{Store: &AnalyticsStore}
purger.Connect()
analytics.Clean = &purger
go analytics.Clean.PurgeLoop(10 * time.Second)
Expand Down Expand Up @@ -709,6 +710,13 @@ func doReload() {
// load the specs
specs := getAPISpecs()

if specs == nil {
log.WithFields(logrus.Fields{
"prefix": "main",
}).Warning("No API Definitions found (nil), not reloading")
return
}

if len(specs) == 0 {
log.WithFields(logrus.Fields{
"prefix": "main",
Expand Down Expand Up @@ -1324,8 +1332,10 @@ func listen(l net.Listener, err error) {

if !RPC_EmergencyMode {
specs := getAPISpecs()
loadApps(specs, defaultRouter)
getPolicies()
if specs != nil {
loadApps(specs, defaultRouter)
getPolicies()
}
}

// Use a custom server so we can control keepalives
Expand Down Expand Up @@ -1385,8 +1395,11 @@ func listen(l net.Listener, err error) {
// Resume accepting connections in a new goroutine.
if !RPC_EmergencyMode {
specs := getAPISpecs()
loadApps(specs, defaultRouter)
getPolicies()
if specs != nil {
loadApps(specs, defaultRouter)
getPolicies()
}

startHeartBeat()
}

Expand Down
30 changes: 3 additions & 27 deletions rpc_analytics_purger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"encoding/json"
"time"

"github.com/TykTechnologies/logrus"
"github.com/lonelycode/gorpc"
"gopkg.in/vmihailenco/msgpack.v2"
)

Expand All @@ -19,15 +17,7 @@ type Purger interface {
// RPCPurger will purge analytics data into a Mongo database, requires that the Mongo DB string is specified
// in the Config object
type RPCPurger struct {
Store *RedisClusterStorageManager
RPCClient *gorpc.Client
Client *gorpc.DispatcherClient
Address string
}

func (r *RPCPurger) ReConnect() {
r.RPCClient.Stop()
r.Connect()
Store *RedisClusterStorageManager
}

// Connect Connects to RPC
Expand All @@ -36,24 +26,11 @@ func (r *RPCPurger) Connect() {
if RPCCLientSingleton != nil {
if RPCFuncClientSingleton != nil {
log.Info("RPC Analytics client using singleton")
r.RPCClient = RPCCLientSingleton
r.Client = RPCFuncClientSingleton
return
}
}
}

log.Info("Connecting to RPC Analytics service")
r.RPCClient = gorpc.NewTCPClient(r.Address)

if log.Level != logrus.DebugLevel {
gorpc.SetErrorLogger(gorpc.NilErrorLogger)
}

r.RPCClient.Start()
d := GetDispatcher()
r.Client = d.NewFuncClient(r.RPCClient)

return
}

Expand Down Expand Up @@ -92,9 +69,8 @@ func (r *RPCPurger) PurgeCache() {
}

// Send keys to RPC
if _, err := r.Client.Call("PurgeAnalyticsData", string(data)); err != nil {
log.Error("Failed to call purge (reconnecting): ", err)
r.ReConnect()
if _, err := RPCFuncClientSingleton.Call("PurgeAnalyticsData", string(data)); err != nil {
log.Error("Failed to call purge: ", err)
}
}

Expand Down
8 changes: 3 additions & 5 deletions rpc_storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,9 @@ func (r *RPCStorageHandler) checkDisconnect() {
}

func (r *RPCStorageHandler) ReConnect() {
// Should only be used by reload checker
// r.Disconnect()
RPCClientIsConnected = false
r.Connect()
log.Info("Reconnected.")

// no-op, let the gorpc client handle it.

}

var RPCCLientSingleton *gorpc.Client
Expand Down
6 changes: 3 additions & 3 deletions utils/buddy_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ AMDDEBNAME="tyk-gateway_"$VERSION"_amd64.deb"
AMDRPMNAME="tyk-gateway-"$VERSION"-1.x86_64.rpm"

echo "Signing AMD RPM"
~/build_tools/rpm-sign.exp $amd64TGZDIR/$AMDRPMNAME
/src/github.com/TykTechnologies/tyk/build_tools/rpm-sign.exp $amd64TGZDIR/$AMDRPMNAME

echo Creating Deb Package for i386
cd $i386TGZDIR/
Expand All @@ -140,7 +140,7 @@ i386DEBNAME="tyk-gateway_"$VERSION"_i386.deb"
i386RPMNAME="tyk-gateway-"$VERSION"-1.i386.rpm"

echo "Signing i386 RPM"
~/build_tools/rpm-sign.exp $i386TGZDIR/$i386RPMNAME
/src/github.com/TykTechnologies/tyk/build_tools/rpm-sign.exp $i386TGZDIR/$i386RPMNAME

echo Creating Deb Package for ARM
cd $armTGZDIR/
Expand All @@ -151,4 +151,4 @@ ARMDEBNAME="tyk-gateway_"$VERSION"_arm64.deb"
ARMRPMNAME="tyk-gateway-"$VERSION"-1.arm64.rpm"

echo "Signing Arm RPM"
~/build_tools/rpm-sign.exp $armTGZDIR/$ARMRPMNAME
/src/github.com/TykTechnologies/tyk/build_tools/rpm-sign.exp $armTGZDIR/$ARMRPMNAME

0 comments on commit 73e4cd2

Please sign in to comment.