Skip to content

Commit

Permalink
Replaced most pubsub handlers with tcf
Browse files Browse the repository at this point in the history
  • Loading branch information
lonelycode committed Apr 24, 2017
1 parent 38a45cf commit c03db86
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 17 deletions.
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ type Config struct {
UseDistributedQuotaCounter bool `bson:"use_distributed_counter" json:"use_distributed_counter"`
DistributedQuotaFlushIntervalInMS int `bson:"distributed_quota_flush_interval_in_ms" json:"distributed_quota_flush_interval_in_ms"`
DQSetMaster bool `bson:"distributed_quota_set_master" json:"distributed_quota_set_master"`
PubSubServerPort string `bson:"pubsub_server_port" json:"pubsub_server_port"`
PubSubMasterConnectionString string `bson:"pubsub_master_connectionstring" json:"pubsub_master_connectionstring"`
}

type CertData struct {
Expand Down
15 changes: 12 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ var (
GlobalEventsJSVM = &JSVM{}
memProfFile *os.File
Policies = map[string]Policy{}
MainNotifier = RedisNotifier{}
DefaultOrgStore = DefaultSessionManager{}
DefaultQuotaStore = DefaultSessionManager{}
FallbackKeySesionManager = SessionHandler(&DefaultSessionManager{})
Expand All @@ -54,6 +53,7 @@ var (
argumentsBackup map[string]interface{}
DashService DashboardServiceSender

MainNotifier Notifier
ApiSpecRegister map[string]*APISpec
keyGen = DefaultKeyGenerator{}

Expand Down Expand Up @@ -172,7 +172,11 @@ func setupGlobals() {
}).Debug("Notifier will not work in hybrid mode")
mainNotifierStore := RedisClusterStorageManager{}
mainNotifierStore.Connect()
MainNotifier = RedisNotifier{&mainNotifierStore, RedisPubSubChannel}
if config.PubSubMasterConnectionString == "" {
MainNotifier = &RedisNotifier{&mainNotifierStore, RedisPubSubChannel}
} else {
MainNotifier = &TCFNotifier{channel: RedisPubSubChannel}
}

if config.Monitor.EnableTriggerMonitors {
var err error
Expand Down Expand Up @@ -1121,7 +1125,12 @@ func start(arguments map[string]interface{}) {

// Start listening for reload messages
if !config.SuppressRedisSignalReload {
go startPubSubLoop()
if config.PubSubMasterConnectionString == "" {
go startPubSubLoop()
} else {
// Use Mangos
startSubscription()
}
}

if config.SlaveOptions.UseRPC {
Expand Down
34 changes: 27 additions & 7 deletions message_bus.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,27 @@
package main

import "github.com/TykTechnologies/tyk/pubsub"

var PubSubServer *pubsub.PSServer


package main

import "github.com/TykTechnologies/tyk/pubsub"

var PubSubServer *pubsub.PSServer
var PubSubClient *pubsub.PSClient

func StartPubSubServer() {
if PubSubServer == nil {
var err error
p := config.PubSubServerPort
if config.PubSubServerPort == "" {
p = "1211"
}
PubSubServer, err = pubsub.NewPSServer(p)
if err != nil {
log.Fatal(err)
}
}
}

func StartGlobalClient(cs string) {
PubSubClient = pubsub.NewPSClient()
if err := PubSubClient.Start(cs); err != nil {
log.Fatal(err)
}
}
13 changes: 10 additions & 3 deletions redis_logrus_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,22 @@ import (
)

type redisChannelHook struct {
Notifier RedisNotificationHandler
Notifier Notifier
formatter logrus.Formatter
}

func NewRedisHook() *redisChannelHook {
hook := &redisChannelHook{}
hook.formatter = new(logrus.JSONFormatter)
hook.Notifier = RedisNotificationHandler{}
hook.Notifier.Start()

if config.PubSubMasterConnectionString == "" {
// TODO: Deprecate this
legacyNotifier := &RedisNotificationHandler{}
legacyNotifier.Start()
hook.Notifier = legacyNotifier
} else {
hook.Notifier = &TCFNotifier{channel: UIChanName}
}

return hook
}
Expand Down
12 changes: 11 additions & 1 deletion redis_notifier_outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,24 @@ type Notification struct {
Signature string `json:"signature"`
}

type Notifier interface {
Notify(notification interface{}) bool
}

// RedisNotifier will use redis pub/sub channels to send notifications
type RedisNotifier struct {
store *RedisClusterStorageManager
channel string
}

// Notify will send a notification to a channel
func (r *RedisNotifier) Notify(notification Notification) bool {
func (r *RedisNotifier) Notify(n interface{}) bool {
notification, ok := n.(Notification)
if !ok {
log.Error("Notifier requires Notification type")
return false
}

toSend, err := json.Marshal(notification)
if err != nil {
log.Error("Problem marshalling notification: ", err)
Expand Down
6 changes: 3 additions & 3 deletions redis_signal_outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ func (u *RedisNotificationHandler) Start() {
go u.StartUIPubSubConn()
}

func (u *RedisNotificationHandler) Notify(n InterfaceNotification) error {
func (u *RedisNotificationHandler) Notify(n interface{}) bool {
jsonError, err := json.Marshal(n)
if err != nil {
return err
return false
}

if u.CacheStore != nil {
u.CacheStore.Publish(UIChanName, string(jsonError))
}

return nil
return true
}

func (u *RedisNotificationHandler) StartUIPubSubConn() {
Expand Down
31 changes: 31 additions & 0 deletions tcf_notifier_outbound.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package main

import (
"github.com/TykTechnologies/tyk-cluster-framework/payloads"
)

// RedisNotifier will use redis pub/sub channels to send notifications
type TCFNotifier struct {
channel string
}

// Notify will send a notification to a channel
func (r *TCFNotifier) Notify(notification interface{}) bool {
toSend, err := payloads.NewPayload(notification)
if err != nil {
log.Error("Problem marshalling notification: ", err)
return false
}
log.Debug("Sending notification", notification)

if PubSubClient == nil {
log.Warning("Client is nil, can't send notification")
return false
}

if err := PubSubClient.Publish(r.channel, toSend); err != nil {
log.Error("Could not send notification: ", err)
return false
}
return true
}
88 changes: 88 additions & 0 deletions tcf_signals.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package main

import (
"time"

"github.com/Sirupsen/logrus"
"github.com/TykTechnologies/tyk-cluster-framework/payloads"
)

func startSubscription() {
if PubSubClient == nil {
log.Info("Starting pub/sub client")
// TODO: This must be set dynamically
StartGlobalClient(config.PubSubMasterConnectionString)
}

if err := PubSubClient.Subscribe(RedisPubSubChannel, func(payload payloads.Payload) {
handleNotificationEvent(payload, nil, nil)
}); err != nil {
log.WithFields(logrus.Fields{
"prefix": "pub-sub",
"err": err,
}).Error("Connection to Master pub/sub failed, reconnect in 10s")

time.Sleep(10 * time.Second)
log.WithFields(logrus.Fields{
"prefix": "pub-sub",
}).Warning("Reconnecting")

// TODO: This must be set dynamically
PubSubClient.Start(config.PubSubMasterConnectionString)
}
}

func handleNotificationEvent(v payloads.Payload, handled func(NotificationCommand), reloaded func()) {
notif := Notification{}
if err := v.DecodeMessage(&notif); err != nil {
log.Error("Unmarshalling message body failed, malformed: ", err)
return
}

// Add messages to ignore here
switch notif.Command {
case NoticeGatewayConfigResponse:
return
}

// Check for a signature, if not signature found, handle
if !isPayloadSignatureValid(notif) {
log.WithFields(logrus.Fields{
"prefix": "pub-sub",
}).Error("Payload signature is invalid!")
return
}

switch notif.Command {
case NoticeDashboardZeroConf:
handleDashboardZeroConfMessage(notif.Payload)
case NoticeConfigUpdate:
handleNewConfiguration(notif.Payload)
case NoticeDashboardConfigRequest:
handleSendMiniConfig(notif.Payload)
case NoticeGatewayDRLNotification:
if config.ManagementNode {
// DRL is not initialized, going through would
// be mostly harmless but would flood the log
// with warnings since DRLManager.Ready == false
return
}
onServerStatusReceivedHandler(notif.Payload)
case NoticeGatewayLENotification:
onLESSLStatusReceivedHandler(notif.Payload)
case NoticeApiUpdated, NoticeApiRemoved, NoticeApiAdded, NoticePolicyChanged, NoticeGroupReload:
log.WithFields(logrus.Fields{
"prefix": "pub-sub",
}).Info("Reloading endpoints")
reloadURLStructure(reloaded)
default:
log.WithFields(logrus.Fields{
"prefix": "pub-sub",
}).Warnf("Unknown tcf notification command: %q", notif.Command)
return
}
if handled != nil {
// went through. all others shoul have returned early.
handled(notif.Command)
}
}

0 comments on commit c03db86

Please sign in to comment.