From fe90282d8f2173857e9e3246eacc056fe03e07b0 Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Wed, 19 Apr 2017 16:15:06 +0300 Subject: [PATCH 01/28] fix log message --- server/gobbler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/gobbler.go b/server/gobbler.go index cde325b3..36fba884 100644 --- a/server/gobbler.go +++ b/server/gobbler.go @@ -196,7 +196,7 @@ var CreateModules = func(router router.Router) (modules []interface{}) { } smsConn, err := sms.New(router, nexmoSender, Config.SMS) if err != nil { - logger.WithError(err).Error("Error creating Nexmo Sender") + logger.WithError(err).Error("Error creating SMS Gateway") } else { modules = append(modules, smsConn) } From 34657bbc02d5afac340d0edea417e49c62edb167 Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Wed, 19 Apr 2017 16:16:24 +0300 Subject: [PATCH 02/28] idempotent Start and Stop for SMS gateway; fix log level for some log-messages --- server/sms/sms_gateway.go | 23 ++++++++++++++--------- server/sms/sms_gateway_test.go | 19 ++++++++++++++++++- 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/server/sms/sms_gateway.go b/server/sms/sms_gateway.go index 8fb3b06b..5d526239 100644 --- a/server/sms/sms_gateway.go +++ b/server/sms/sms_gateway.go @@ -54,7 +54,7 @@ func New(router router.Router, sender Sender, config Config) (*gateway, error) { if *config.Workers <= 0 { *config.Workers = connector.DefaultWorkers } - logger.WithField("number", *config.Workers).Debug("sms workers") + logger.WithField("number", *config.Workers).Info("sms workers") config.Schema = SMSSchema config.Name = SMSDefaultTopic return &gateway{ @@ -66,7 +66,11 @@ func New(router router.Router, sender Sender, config Config) (*gateway, error) { } func (g *gateway) Start() error { - g.logger.Debug("Starting gateway") + g.logger.Info("Starting gateway") + if g.cancelFunc != nil { + g.logger.Info("Gateway already started") + return nil + } err := g.ReadLastID() if err != nil { @@ -81,7 +85,7 @@ func (g *gateway) Start() error { g.startMetrics() - g.logger.Debug("Started gateway") + g.logger.Info("Started gateway") return nil } @@ -107,7 +111,7 @@ func (g *gateway) fetchRequest() (fr *store.FetchRequest) { } func (g *gateway) Run() { - g.logger.Debug("Run gateway") + g.logger.Info("Run gateway") var provideErr error go func() { err := g.route.Provide(g.router, true) @@ -206,7 +210,7 @@ func (g *gateway) send(receivedMsg *protocol.Message) error { } func (g *gateway) Restart() error { - g.logger.WithField("LastIDSent", g.LastIDSent).Debug("Restart in progress") + g.logger.WithField("LastIDSent", g.LastIDSent).Info("Restart in progress") g.Cancel() g.cancelFunc = nil @@ -221,17 +225,18 @@ func (g *gateway) Restart() error { go g.Run() - g.logger.WithField("LastIDSent", g.LastIDSent).Debug("Restart finished") + g.logger.WithField("LastIDSent", g.LastIDSent).Info("Restart finished") return nil } func (g *gateway) Stop() error { - g.logger.Debug("Stopping gateway") + g.logger.Info("Stopping gateway") if g.cancelFunc != nil { - g.logger.Debug("Canceling in Stop") + g.logger.Info("Canceling in Stop") g.cancelFunc() + g.cancelFunc = nil } - g.logger.Debug("Stopped gateway") + g.logger.Info("Stopped gateway") return nil } diff --git a/server/sms/sms_gateway_test.go b/server/sms/sms_gateway_test.go index 335eb73d..fbf0b5cc 100644 --- a/server/sms/sms_gateway_test.go +++ b/server/sms/sms_gateway_test.go @@ -38,7 +38,7 @@ func Test_StartStop(t *testing.T) { routerMock.EXPECT().Subscribe(gomock.Any()).Do(func(r *router.Route) (*router.Route, error) { a.Equal("sms", r.Path.Partition()) return r, nil - }) + }).Times(2) gw, err := New(routerMock, mockSmsSender, config) a.NoError(err) @@ -46,6 +46,23 @@ func Test_StartStop(t *testing.T) { err = gw.Start() a.NoError(err) + // try to start for the second time in a row + err = gw.Start() + a.NoError(err) + + err = gw.Stop() + a.NoError(err) + + // try to stop for the second time in a row + err = gw.Stop() + a.NoError(err) + + time.Sleep(100 * time.Millisecond) + + // try to start & stop for a second time + err = gw.Start() + a.NoError(err) + err = gw.Stop() a.NoError(err) From b3081997b77449fdaf0d6f524dec8db49c890dbc Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Wed, 19 Apr 2017 16:23:14 +0300 Subject: [PATCH 03/28] removing useless sleep from test --- server/sms/sms_gateway_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/sms/sms_gateway_test.go b/server/sms/sms_gateway_test.go index fbf0b5cc..8bacc75a 100644 --- a/server/sms/sms_gateway_test.go +++ b/server/sms/sms_gateway_test.go @@ -57,8 +57,6 @@ func Test_StartStop(t *testing.T) { err = gw.Stop() a.NoError(err) - time.Sleep(100 * time.Millisecond) - // try to start & stop for a second time err = gw.Start() a.NoError(err) From 2fdb0b9ca34211261d7322dcf0bcda5282efdafa Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Wed, 19 Apr 2017 16:25:52 +0300 Subject: [PATCH 04/28] adding log message in case sms gateway Stop is invoked more than once --- server/sms/sms_gateway.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/sms/sms_gateway.go b/server/sms/sms_gateway.go index 5d526239..d0c67436 100644 --- a/server/sms/sms_gateway.go +++ b/server/sms/sms_gateway.go @@ -235,8 +235,10 @@ func (g *gateway) Stop() error { g.logger.Info("Canceling in Stop") g.cancelFunc() g.cancelFunc = nil + g.logger.Info("Stopped gateway") + } else { + g.logger.Info("Gateway was already stopped") } - g.logger.Info("Stopped gateway") return nil } From 27c7151a28d070bb275fcc381f88052718d0cef1 Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Wed, 19 Apr 2017 16:28:10 +0300 Subject: [PATCH 05/28] go doc --- server/sms/sms_gateway.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/sms/sms_gateway.go b/server/sms/sms_gateway.go index d0c67436..f9e3021e 100644 --- a/server/sms/sms_gateway.go +++ b/server/sms/sms_gateway.go @@ -65,6 +65,7 @@ func New(router router.Router, sender Sender, config Config) (*gateway, error) { }, nil } +// Start the sms gateway; it is an idempotent operation. func (g *gateway) Start() error { g.logger.Info("Starting gateway") if g.cancelFunc != nil { @@ -190,6 +191,7 @@ func (g *gateway) proxyLoop() error { //TODO Cosmin Bogdan returning this error can mean 2 things: overflow of route's channel, or intentional stopping of router / gubled. return connector.ErrRouteChannelClosed } + func (g *gateway) send(receivedMsg *protocol.Message) error { err := g.sender.Send(receivedMsg) if err != nil { @@ -229,6 +231,7 @@ func (g *gateway) Restart() error { return nil } +// Stop the sms gateway; it is an idempotent operation. func (g *gateway) Stop() error { g.logger.Info("Stopping gateway") if g.cancelFunc != nil { From a188f626c4c600f86342b7ea59f33e003fb4f06f Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Wed, 19 Apr 2017 16:31:37 +0300 Subject: [PATCH 06/28] improve test in sms_gateway --- server/sms/sms_gateway_test.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/server/sms/sms_gateway_test.go b/server/sms/sms_gateway_test.go index 8bacc75a..a4187012 100644 --- a/server/sms/sms_gateway_test.go +++ b/server/sms/sms_gateway_test.go @@ -43,21 +43,27 @@ func Test_StartStop(t *testing.T) { gw, err := New(routerMock, mockSmsSender, config) a.NoError(err) + // try to start & stop + err = gw.Start() + a.NoError(err) + + err = gw.Stop() + a.NoError(err) + + // try to start twice, and then stop twice err = gw.Start() a.NoError(err) - // try to start for the second time in a row err = gw.Start() a.NoError(err) err = gw.Stop() a.NoError(err) - // try to stop for the second time in a row err = gw.Stop() a.NoError(err) - // try to start & stop for a second time + // try to start & stop once again err = gw.Start() a.NoError(err) From d2f33878f6466db8baee2245bf46795e83494ec9 Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Wed, 19 Apr 2017 16:43:50 +0300 Subject: [PATCH 07/28] renaming var; log fix --- server/gobbler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/gobbler.go b/server/gobbler.go index 36fba884..907b64b2 100644 --- a/server/gobbler.go +++ b/server/gobbler.go @@ -186,7 +186,7 @@ var CreateModules = func(router router.Router) (modules []interface{}) { } if *Config.SMS.Enabled { - logger.Info("Nexmo SMS: enabled") + logger.Info("SMS: enabled") if *Config.SMS.APIKey == "" || *Config.SMS.APISecret == "" { logger.Panic("The API Key has to be provided when NEXMO SMS connector is enabled") } @@ -194,11 +194,11 @@ var CreateModules = func(router router.Router) (modules []interface{}) { if err != nil { logger.WithError(err).Error("Error creating Nexmo Sender") } - smsConn, err := sms.New(router, nexmoSender, Config.SMS) + smsGateway, err := sms.New(router, nexmoSender, Config.SMS) if err != nil { logger.WithError(err).Error("Error creating SMS Gateway") } else { - modules = append(modules, smsConn) + modules = append(modules, smsGateway) } } else { logger.Info("SMS: disabled") From 8ae92efcc27f86631c3247c7b8d91077f7d912ef Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Wed, 19 Apr 2017 18:11:21 +0300 Subject: [PATCH 08/28] go doc --- server/service/service.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/service/service.go b/server/service/service.go index 96c3f34a..edbb3c8e 100644 --- a/server/service/service.go +++ b/server/service/service.go @@ -88,7 +88,8 @@ func (s *Service) PrometheusEndpoint(endpointPrefix string) *Service { return s } -// Start checks the modules for the following interfaces and registers and/or starts: +// Start the health-check, old-format metrics, and Prometheus metrics endpoint, +// and then check the modules for the following interfaces and registers and/or start: // Startable: // health.Checker: // Endpoint: Register the handler function of the Endpoint in the http service at prefix From a60cd170fc6d8e422c5328115f01928bd156cbe8 Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Wed, 19 Apr 2017 18:26:11 +0300 Subject: [PATCH 09/28] fix test --- server/sms/sms_gateway_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/sms/sms_gateway_test.go b/server/sms/sms_gateway_test.go index a4187012..c9b30600 100644 --- a/server/sms/sms_gateway_test.go +++ b/server/sms/sms_gateway_test.go @@ -38,7 +38,7 @@ func Test_StartStop(t *testing.T) { routerMock.EXPECT().Subscribe(gomock.Any()).Do(func(r *router.Route) (*router.Route, error) { a.Equal("sms", r.Path.Partition()) return r, nil - }).Times(2) + }).Times(3) gw, err := New(routerMock, mockSmsSender, config) a.NoError(err) From c481291dca12dff495f622a963e15aea4f047ab6 Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Thu, 20 Apr 2017 18:50:58 +0300 Subject: [PATCH 10/28] configuring and using a Toggles-Endpoint in the service; simple start/stop logic exposed using endpoint for all modules, with their name as URL parameters --- server/config.go | 6 ++++ server/gobbler.go | 3 +- server/service/service.go | 63 ++++++++++++++++++++++++++++++++++++++- server/sms/sms_gateway.go | 13 ++++---- 4 files changed, 76 insertions(+), 9 deletions(-) diff --git a/server/config.go b/server/config.go index c9257500..23ddd45b 100644 --- a/server/config.go +++ b/server/config.go @@ -24,6 +24,7 @@ const ( defaultHealthEndpoint = "/admin/healthcheck" defaultMetricsEndpoint = "/admin/metrics-old" defaultPrometheusEndpoint = "/admin/metrics" + defaultTogglesEndpoint = "/admin/toggles" defaultKVSBackend = "file" defaultMSBackend = "file" defaultStoragePath = "/var/lib/guble" @@ -71,6 +72,7 @@ type ( HealthEndpoint *string MetricsEndpoint *string PrometheusEndpoint *string + TogglesEndpoint *string Profile *string Postgres PostgresConfig FCM fcm.Config @@ -124,6 +126,10 @@ var ( Default(defaultPrometheusEndpoint). Envar("GUBLE_PROMETHEUS_ENDPOINT"). String(), + TogglesEndpoint: kingpin.Flag("toggles-endpoint", `The Feature-Toggles endpoint to be used by the HTTP server (value for disabling it: "")`). + Default(defaultTogglesEndpoint). + Envar("GUBLE_TOGGLES_ENDPOINT"). + String(), Profile: kingpin.Flag("profile", `The profiler to be used (default: none): mem | cpu | block`). Default(""). Envar("GUBLE_PROFILE"). diff --git a/server/gobbler.go b/server/gobbler.go index 907b64b2..4a2f28b8 100644 --- a/server/gobbler.go +++ b/server/gobbler.go @@ -271,7 +271,8 @@ func StartService() *service.Service { srv := service.New(r, websrv). HealthEndpoint(*Config.HealthEndpoint). MetricsEndpoint(*Config.MetricsEndpoint). - PrometheusEndpoint(*Config.PrometheusEndpoint) + PrometheusEndpoint(*Config.PrometheusEndpoint). + TogglesEndpoint(*Config.TogglesEndpoint) srv.RegisterModules(0, 6, kvStore, messageStore) srv.RegisterModules(4, 3, CreateModules(r)...) diff --git a/server/service/service.go b/server/service/service.go index edbb3c8e..63f7dc9e 100644 --- a/server/service/service.go +++ b/server/service/service.go @@ -3,15 +3,17 @@ package service import ( log "github.com/Sirupsen/logrus" "github.com/docker/distribution/health" + "github.com/hashicorp/go-multierror" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/cosminrentea/gobbler/server/metrics" "github.com/cosminrentea/gobbler/server/router" "github.com/cosminrentea/gobbler/server/webserver" - "github.com/hashicorp/go-multierror" + "fmt" "net/http" "reflect" + "strconv" "time" ) @@ -30,6 +32,7 @@ type Service struct { healthThreshold int metricsEndpoint string prometheusEndpoint string + togglesEndpoint string } // New creates a new Service, using the given Router and WebServer. @@ -88,6 +91,12 @@ func (s *Service) PrometheusEndpoint(endpointPrefix string) *Service { return s } +// TogglesEndpoint sets the endpoint used for Feature-Toggles. Parameter for disabling the endpoint is: "". Returns the updated service. +func (s *Service) TogglesEndpoint(endpointPrefix string) *Service { + s.togglesEndpoint = endpointPrefix + return s +} + // Start the health-check, old-format metrics, and Prometheus metrics endpoint, // and then check the modules for the following interfaces and registers and/or start: // Startable: @@ -113,6 +122,12 @@ func (s *Service) Start() error { } else { logger.Info("Prometheus metrics endpoint disabled") } + if s.togglesEndpoint != "" { + logger.WithField("togglesEndpoint", s.togglesEndpoint).Info("Toggles endpoint") + s.webserver.Handle(s.togglesEndpoint, http.HandlerFunc(s.togglesHandlerFunc)) + } else { + logger.Info("Toggles endpoint disabled") + } for order, iface := range s.ModulesSortedByStartOrder() { name := reflect.TypeOf(iface).String() if s, ok := iface.(Startable); ok { @@ -173,3 +188,49 @@ func (s *Service) modulesSortedBy(criteria by) []interface{} { } return sorted } + +func (s *Service) togglesHandlerFunc(w http.ResponseWriter, r *http.Request) { + logger.Info("toggles") + for key, values := range r.URL.Query() { + if len(values) != 1 { + logger.WithFields(log.Fields{ + "key": key, + }).Info("ignoring toggles parameter since it has more than one value") + continue + } + value := values[0] + enable, err := strconv.ParseBool(value) + if err != nil { + logger.WithFields(log.Fields{ + "key": key, + "value": value, + }).Info("ignoring toggles single parameter since it is not boolean") + continue + } + for order, iface := range s.ModulesSortedByStartOrder() { + name := reflect.TypeOf(iface).String() + if name == key { + logger.WithFields(log.Fields{ + "key": key, + "value": value, + }).Info("toggles single boolean valid parameter") + + if s, ok := iface.(Startable); ok && enable { + logger.WithFields(log.Fields{"name": name, "order": order}).Info("Starting module") + if err := s.Start(); err != nil { + logger.WithError(err).WithField("name", name).Error("Error while starting module") + } + w.Write([]byte(fmt.Sprintf("%s was started.\n", key))) + } + + if s, ok := iface.(Stopable); ok && !enable { + logger.WithFields(log.Fields{"name": name, "order": order}).Info("Stopping module") + if err := s.Stop(); err != nil { + logger.WithError(err).WithField("name", name).Error("Error while stopping module") + } + w.Write([]byte(fmt.Sprintf("%s was stopped.\n", key))) + } + } + } + } +} diff --git a/server/sms/sms_gateway.go b/server/sms/sms_gateway.go index f9e3021e..010dc758 100644 --- a/server/sms/sms_gateway.go +++ b/server/sms/sms_gateway.go @@ -131,10 +131,9 @@ func (g *gateway) Run() { // If Route channel closed, try restarting if err == connector.ErrRouteChannelClosed { g.logger.Info("Restarting because ErrRouteChannelClosed") - g.Restart() + g.restart() return } - } if provideErr != nil { @@ -144,7 +143,7 @@ func (g *gateway) Run() { // Router closed the route, try restart if provideErr == router.ErrInvalidRoute { g.logger.Info("Restarting because ErrInvalidRoute") - g.Restart() + g.restart() return } // Router module is stopping, exit the process @@ -211,15 +210,15 @@ func (g *gateway) send(receivedMsg *protocol.Message) error { return nil } -func (g *gateway) Restart() error { - g.logger.WithField("LastIDSent", g.LastIDSent).Info("Restart in progress") +func (g *gateway) restart() error { + g.logger.WithField("LastIDSent", g.LastIDSent).Info("restart in progress") g.Cancel() g.cancelFunc = nil err := g.ReadLastID() if err != nil { - g.logger.WithError(err).Error("Could not ReadLastID in Restart") + g.logger.WithError(err).Error("Could not ReadLastID in restart") return err } @@ -227,7 +226,7 @@ func (g *gateway) Restart() error { go g.Run() - g.logger.WithField("LastIDSent", g.LastIDSent).Info("Restart finished") + g.logger.WithField("LastIDSent", g.LastIDSent).Info("restart finished") return nil } From fcd2145f421cc919ea54637324f52c13d53001de Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Fri, 21 Apr 2017 16:12:03 +0300 Subject: [PATCH 11/28] making FCM and APNS connector Stopable; and also their Start and Stop becomes idempotent --- server/apns/apns.go | 4 ++++ server/connector/connector.go | 11 ++++++++++- server/fcm/fcm.go | 4 ++++ 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/server/apns/apns.go b/server/apns/apns.go index 1722974f..a18c927c 100644 --- a/server/apns/apns.go +++ b/server/apns/apns.go @@ -67,6 +67,10 @@ func (a *apns) Start() error { return err } +func (a *apns) Stop() error { + return a.Connector.Stop() +} + func (a *apns) startMetrics() { mTotalSentMessages.Set(0) mTotalSendErrors.Set(0) diff --git a/server/connector/connector.go b/server/connector/connector.go index 6d4ff2f3..6b94fdaa 100644 --- a/server/connector/connector.go +++ b/server/connector/connector.go @@ -259,14 +259,19 @@ func (c *connector) Substitute(w http.ResponseWriter, req *http.Request) { // Start will run start all current subscriptions and workers to process the messages func (c *connector) Start() error { + c.logger.Info("Starting connector") + if c.cancel != nil { + c.logger.Info("Connector was already started") + return nil + } c.queue.Start() - c.logger.Info("Starting connector") c.ctx, c.cancel = context.WithCancel(context.Background()) c.logger.Info("Loading subscriptions") err := c.manager.Load() if err != nil { + c.logger.Error("error while loading subscriptions") return err } @@ -340,7 +345,11 @@ func (c *connector) restart(s Subscriber) error { // Stop the connector (the context, the queue, the subscription loops) func (c *connector) Stop() error { c.logger.Info("Stopping connector") + if c.cancel == nil{ + return nil + } c.cancel() + c.cancel=nil c.queue.Stop() c.wg.Wait() c.logger.Info("Stopped connector") diff --git a/server/fcm/fcm.go b/server/fcm/fcm.go index f03e1b90..379fdbee 100644 --- a/server/fcm/fcm.go +++ b/server/fcm/fcm.go @@ -63,6 +63,10 @@ func (f *fcm) Start() error { return err } +func (f *fcm) Stop() error { + return f.Connector.Stop() +} + func (f *fcm) startMetrics() { mTotalSentMessages.Set(0) mTotalSendErrors.Set(0) From 1f32415963f335737a296a59a3f91f220740e173 Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Fri, 21 Apr 2017 16:12:43 +0300 Subject: [PATCH 12/28] updating the matching mechanism between URL parameters and the module names / packages --- server/service/service.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/server/service/service.go b/server/service/service.go index 63f7dc9e..469e92f8 100644 --- a/server/service/service.go +++ b/server/service/service.go @@ -14,6 +14,7 @@ import ( "net/http" "reflect" "strconv" + "strings" "time" ) @@ -208,8 +209,13 @@ func (s *Service) togglesHandlerFunc(w http.ResponseWriter, r *http.Request) { continue } for order, iface := range s.ModulesSortedByStartOrder() { - name := reflect.TypeOf(iface).String() - if name == key { + packagePath := reflect.TypeOf(iface).String() + names := strings.Split(packagePath, ".") + var name string + if len(names) > 0 { + name = strings.TrimPrefix(names[0],"*") + } + if key == name { logger.WithFields(log.Fields{ "key": key, "value": value, From c2420f4879e100aa61aef6a67ebaf2ae520caf7a Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Fri, 21 Apr 2017 16:13:48 +0300 Subject: [PATCH 13/28] unsubscribing on SMS GW Stop; fixing tests; not fetching by default on sms route init --- server/sms/sms_gateway.go | 9 ++++++--- server/sms/sms_gateway_test.go | 6 ++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/server/sms/sms_gateway.go b/server/sms/sms_gateway.go index 010dc758..b8d97444 100644 --- a/server/sms/sms_gateway.go +++ b/server/sms/sms_gateway.go @@ -27,6 +27,7 @@ type Config struct { APISecret *string Workers *int SMSTopic *string + Fetch *bool IntervalMetrics *bool KafkaReportingTopic *string @@ -69,7 +70,7 @@ func New(router router.Router, sender Sender, config Config) (*gateway, error) { func (g *gateway) Start() error { g.logger.Info("Starting gateway") if g.cancelFunc != nil { - g.logger.Info("Gateway already started") + g.logger.Info("Gateway was already started") return nil } @@ -101,7 +102,7 @@ func (g *gateway) initRoute() { } func (g *gateway) fetchRequest() (fr *store.FetchRequest) { - if g.LastIDSent > 0 { + if *g.config.Fetch && g.LastIDSent > 0 { fr = store.NewFetchRequest( protocol.Path(*g.config.SMSTopic).Partition(), g.LastIDSent+1, @@ -234,7 +235,9 @@ func (g *gateway) restart() error { func (g *gateway) Stop() error { g.logger.Info("Stopping gateway") if g.cancelFunc != nil { - g.logger.Info("Canceling in Stop") + g.logger.Info("Unsubscribing sms route") + g.router.Unsubscribe(g.route) + g.logger.Info("Calling the cancel function") g.cancelFunc() g.cancelFunc = nil g.logger.Info("Stopped gateway") diff --git a/server/sms/sms_gateway_test.go b/server/sms/sms_gateway_test.go index c9b30600..32a17322 100644 --- a/server/sms/sms_gateway_test.go +++ b/server/sms/sms_gateway_test.go @@ -39,6 +39,9 @@ func Test_StartStop(t *testing.T) { a.Equal("sms", r.Path.Partition()) return r, nil }).Times(3) + routerMock.EXPECT().Unsubscribe(gomock.Any()).Do(func(r *router.Route) () { + a.Equal("sms", r.Path.Partition()) + }).Times(3) gw, err := New(routerMock, mockSmsSender, config) a.NoError(err) @@ -94,6 +97,9 @@ func Test_SendOneSms(t *testing.T) { a.Equal(*config.SMSTopic, string(r.Path)) return r, nil }) + routerMock.EXPECT().Unsubscribe(gomock.Any()).Do(func(r *router.Route) () { + a.Equal(*config.SMSTopic, string(r.Path)) + }) gw, err := New(routerMock, mockSmsSender, config) a.NoError(err) From dcd74d24258c01b5c08810b49a3aae737e70a530 Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Fri, 21 Apr 2017 17:59:55 +0300 Subject: [PATCH 14/28] for sms: initRoute() not fetching messages from the message-store on start, only on restart --- server/sms/sms_gateway.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/server/sms/sms_gateway.go b/server/sms/sms_gateway.go index b8d97444..463d8959 100644 --- a/server/sms/sms_gateway.go +++ b/server/sms/sms_gateway.go @@ -27,7 +27,6 @@ type Config struct { APISecret *string Workers *int SMSTopic *string - Fetch *bool IntervalMetrics *bool KafkaReportingTopic *string @@ -81,7 +80,7 @@ func (g *gateway) Start() error { } g.ctx, g.cancelFunc = context.WithCancel(context.Background()) - g.initRoute() + g.initRoute(false) go g.Run() @@ -91,23 +90,26 @@ func (g *gateway) Start() error { return nil } -func (g *gateway) initRoute() { +func (g *gateway) initRoute(fetch bool) { g.route = router.NewRoute(router.RouteConfig{ Path: protocol.Path(*g.config.SMSTopic), ChannelSize: 5000, QueueSize: -1, Timeout: -1, - FetchRequest: g.fetchRequest(), }) + if fetch { + g.route.FetchRequest = g.fetchRequest() + } } func (g *gateway) fetchRequest() (fr *store.FetchRequest) { - if *g.config.Fetch && g.LastIDSent > 0 { + if g.LastIDSent > 0 { fr = store.NewFetchRequest( protocol.Path(*g.config.SMSTopic).Partition(), g.LastIDSent+1, 0, - store.DirectionForward, -1) + store.DirectionForward, + -1) } return } @@ -223,7 +225,7 @@ func (g *gateway) restart() error { return err } - g.initRoute() + g.initRoute(true) go g.Run() From fb68c4ebbfe44758eae8493786daeb965ba7f10a Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Sat, 22 Apr 2017 22:55:27 +0300 Subject: [PATCH 15/28] using a config for Skip-Fetch on sms gw start; not doing Unsubscribe on Stop; tests are passing --- server/config.go | 3 +++ server/sms/sms_gateway.go | 5 ++--- server/sms/sms_gateway_test.go | 17 ++++++++++------- server/sms/utils_test.go | 3 ++- 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/server/config.go b/server/config.go index 23ddd45b..65dcdbf1 100644 --- a/server/config.go +++ b/server/config.go @@ -228,6 +228,9 @@ var ( Envar("GUBLE_SMS_TOPIC"). Default(sms.SMSDefaultTopic). String(), + SkipFetch: kingpin.Flag("sms-skip-fetch", "If sms gateway should skip fetching from message-store when starting"). + Envar("GUBLE_SMS_SKIP_FETCH"). + Bool(), Workers: kingpin.Flag("sms-workers", "The number of workers handling traffic with Nexmo sms endpoint(default: number of CPUs)"). Default(strconv.Itoa(runtime.NumCPU())). Envar("GUBLE_SMS_WORKERS"). diff --git a/server/sms/sms_gateway.go b/server/sms/sms_gateway.go index 463d8959..374df469 100644 --- a/server/sms/sms_gateway.go +++ b/server/sms/sms_gateway.go @@ -28,6 +28,7 @@ type Config struct { Workers *int SMSTopic *string IntervalMetrics *bool + SkipFetch *bool KafkaReportingTopic *string @@ -97,7 +98,7 @@ func (g *gateway) initRoute(fetch bool) { QueueSize: -1, Timeout: -1, }) - if fetch { + if fetch || !*g.config.SkipFetch { g.route.FetchRequest = g.fetchRequest() } } @@ -237,8 +238,6 @@ func (g *gateway) restart() error { func (g *gateway) Stop() error { g.logger.Info("Stopping gateway") if g.cancelFunc != nil { - g.logger.Info("Unsubscribing sms route") - g.router.Unsubscribe(g.route) g.logger.Info("Calling the cancel function") g.cancelFunc() g.cancelFunc = nil diff --git a/server/sms/sms_gateway_test.go b/server/sms/sms_gateway_test.go index 32a17322..2aabbaeb 100644 --- a/server/sms/sms_gateway_test.go +++ b/server/sms/sms_gateway_test.go @@ -39,9 +39,6 @@ func Test_StartStop(t *testing.T) { a.Equal("sms", r.Path.Partition()) return r, nil }).Times(3) - routerMock.EXPECT().Unsubscribe(gomock.Any()).Do(func(r *router.Route) () { - a.Equal("sms", r.Path.Partition()) - }).Times(3) gw, err := New(routerMock, mockSmsSender, config) a.NoError(err) @@ -97,9 +94,6 @@ func Test_SendOneSms(t *testing.T) { a.Equal(*config.SMSTopic, string(r.Path)) return r, nil }) - routerMock.EXPECT().Unsubscribe(gomock.Any()).Do(func(r *router.Route) () { - a.Equal(*config.SMSTopic, string(r.Path)) - }) gw, err := New(routerMock, mockSmsSender, config) a.NoError(err) @@ -275,7 +269,16 @@ func Test_RetryLoop(t *testing.T) { worker := 8 topic := SMSDefaultTopic enableMetrics := false - gateway, err := New(routerMock, mockSmsSender, Config{Workers: &worker, Name: SMSDefaultTopic, Schema: SMSSchema, SMSTopic: &topic, IntervalMetrics: &enableMetrics}) + skipFetch := false + gateway, err := New(routerMock, mockSmsSender, + Config{ + Workers: &worker, + Name: SMSDefaultTopic, + Schema: SMSSchema, + SMSTopic: &topic, + SkipFetch: &skipFetch, + IntervalMetrics: &enableMetrics, + }) a.NoError(err) //create a new route on which the gateway will subscribe on /sms diff --git a/server/sms/utils_test.go b/server/sms/utils_test.go index f4f8fb2b..35bb85ff 100644 --- a/server/sms/utils_test.go +++ b/server/sms/utils_test.go @@ -57,12 +57,13 @@ func createConfig() Config { topic := "/sms" worker := 1 intervalMetrics := true + skipFetch := false return Config{ Workers: &worker, SMSTopic: &topic, Name: "test_gateway", Schema: SMSSchema, - + SkipFetch: &skipFetch, IntervalMetrics: &intervalMetrics, } } From bb9f7986cfba564b59d1520f4a1581a5d3b203ca Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Mon, 24 Apr 2017 08:04:13 +0300 Subject: [PATCH 16/28] fix log message on panic (sms config) --- server/gobbler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/gobbler.go b/server/gobbler.go index 4a2f28b8..c1608c6b 100644 --- a/server/gobbler.go +++ b/server/gobbler.go @@ -188,7 +188,7 @@ var CreateModules = func(router router.Router) (modules []interface{}) { if *Config.SMS.Enabled { logger.Info("SMS: enabled") if *Config.SMS.APIKey == "" || *Config.SMS.APISecret == "" { - logger.Panic("The API Key has to be provided when NEXMO SMS connector is enabled") + logger.Panic("The API Key and Secret have to be provided when NEXMO SMS connector is enabled") } nexmoSender, err := sms.NewNexmoSender(*Config.SMS.APIKey, *Config.SMS.APISecret, kafkaProducer, *Config.SMS.KafkaReportingTopic) if err != nil { From a9fec7bedf3d3cec01564644e1a110bc8760c2b4 Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Mon, 24 Apr 2017 10:07:29 +0300 Subject: [PATCH 17/28] go doc --- server/connector/connector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/connector/connector.go b/server/connector/connector.go index 6b94fdaa..9402da11 100644 --- a/server/connector/connector.go +++ b/server/connector/connector.go @@ -257,7 +257,7 @@ func (c *connector) Substitute(w http.ResponseWriter, req *http.Request) { fmt.Fprintf(w, `{"modified":"%d"}`, totalSubscribersUpdated) } -// Start will run start all current subscriptions and workers to process the messages +// Start all current subscriptions and workers to process the messages func (c *connector) Start() error { c.logger.Info("Starting connector") if c.cancel != nil { From 30af0bb573054ff0d0d1df2600acecb757008210 Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Mon, 24 Apr 2017 10:43:06 +0300 Subject: [PATCH 18/28] unsubscribing on SMS GW Stop; fixing tests --- server/sms/sms_gateway.go | 2 ++ server/sms/sms_gateway_test.go | 6 ++++++ test.sh | 2 +- 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/server/sms/sms_gateway.go b/server/sms/sms_gateway.go index 374df469..30c4e45a 100644 --- a/server/sms/sms_gateway.go +++ b/server/sms/sms_gateway.go @@ -238,6 +238,8 @@ func (g *gateway) restart() error { func (g *gateway) Stop() error { g.logger.Info("Stopping gateway") if g.cancelFunc != nil { + g.logger.Info("Unsubscribing the sms route") + g.router.Unsubscribe(g.route) g.logger.Info("Calling the cancel function") g.cancelFunc() g.cancelFunc = nil diff --git a/server/sms/sms_gateway_test.go b/server/sms/sms_gateway_test.go index 2aabbaeb..ca3a9cb9 100644 --- a/server/sms/sms_gateway_test.go +++ b/server/sms/sms_gateway_test.go @@ -39,6 +39,9 @@ func Test_StartStop(t *testing.T) { a.Equal("sms", r.Path.Partition()) return r, nil }).Times(3) + routerMock.EXPECT().Unsubscribe(gomock.Any()).Do(func(r *router.Route) { + a.Equal("sms", r.Path.Partition()) + }).AnyTimes() gw, err := New(routerMock, mockSmsSender, config) a.NoError(err) @@ -94,6 +97,9 @@ func Test_SendOneSms(t *testing.T) { a.Equal(*config.SMSTopic, string(r.Path)) return r, nil }) + routerMock.EXPECT().Unsubscribe(gomock.Any()).Do(func(r *router.Route) { + a.Equal(*config.SMSTopic, string(r.Path)) + }) gw, err := New(routerMock, mockSmsSender, config) a.NoError(err) diff --git a/test.sh b/test.sh index c4690915..1cd71973 100755 --- a/test.sh +++ b/test.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -GO_TEST_DISABLED=true go test -short ./... +GO_TEST_DISABLED=true go test -v -short ./... TESTRESULT=$? RED='\033[0;31m' From 3d13b1bb8868fd9d44e62ea1b4d1dd0abe11a362 Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Mon, 24 Apr 2017 11:48:31 +0300 Subject: [PATCH 19/28] refactoring to simplify toggles-handler func in service --- server/service/service.go | 54 +++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/server/service/service.go b/server/service/service.go index 469e92f8..d9f73f99 100644 --- a/server/service/service.go +++ b/server/service/service.go @@ -208,34 +208,38 @@ func (s *Service) togglesHandlerFunc(w http.ResponseWriter, r *http.Request) { }).Info("ignoring toggles single parameter since it is not boolean") continue } - for order, iface := range s.ModulesSortedByStartOrder() { - packagePath := reflect.TypeOf(iface).String() - names := strings.Split(packagePath, ".") - var name string - if len(names) > 0 { - name = strings.TrimPrefix(names[0],"*") - } - if key == name { - logger.WithFields(log.Fields{ - "key": key, - "value": value, - }).Info("toggles single boolean valid parameter") - - if s, ok := iface.(Startable); ok && enable { - logger.WithFields(log.Fields{"name": name, "order": order}).Info("Starting module") - if err := s.Start(); err != nil { - logger.WithError(err).WithField("name", name).Error("Error while starting module") - } - w.Write([]byte(fmt.Sprintf("%s was started.\n", key))) + s.toggleModule(key, enable, w) + } +} + +func (s *Service) toggleModule(searchedModulePackage string, enable bool, w http.ResponseWriter) { + for order, iface := range s.ModulesSortedByStartOrder() { + packagePath := reflect.TypeOf(iface).String() + packagePathTokens := strings.Split(packagePath, ".") + var modulePackage string + if len(packagePathTokens) > 0 { + modulePackage = strings.TrimPrefix(packagePathTokens[0], "*") + } + if searchedModulePackage == modulePackage { + logger.WithFields(log.Fields{ + "module": searchedModulePackage, + "enable": enable, + }).Info("toggles single boolean valid parameter") + + if s, ok := iface.(Startable); ok && enable { + logger.WithFields(log.Fields{"modulePackage": modulePackage, "order": order}).Info("Starting module") + if err := s.Start(); err != nil { + logger.WithError(err).WithField("modulePackage", modulePackage).Error("Error while starting module") } + w.Write([]byte(fmt.Sprintf("%s was started.\n", searchedModulePackage))) + } - if s, ok := iface.(Stopable); ok && !enable { - logger.WithFields(log.Fields{"name": name, "order": order}).Info("Stopping module") - if err := s.Stop(); err != nil { - logger.WithError(err).WithField("name", name).Error("Error while stopping module") - } - w.Write([]byte(fmt.Sprintf("%s was stopped.\n", key))) + if s, ok := iface.(Stopable); ok && !enable { + logger.WithFields(log.Fields{"modulePackage": modulePackage, "order": order}).Info("Stopping module") + if err := s.Stop(); err != nil { + logger.WithError(err).WithField("modulePackage", modulePackage).Error("Error while stopping module") } + w.Write([]byte(fmt.Sprintf("%s was stopped.\n", searchedModulePackage))) } } } From 1672df59ce281af5ae363e7a46245109ee206d9d Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Mon, 24 Apr 2017 12:09:52 +0300 Subject: [PATCH 20/28] refactoring to simplify toggles-handler func in service (2) --- server/service/service.go | 53 ++++++++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/server/service/service.go b/server/service/service.go index d9f73f99..e2d8b500 100644 --- a/server/service/service.go +++ b/server/service/service.go @@ -191,8 +191,14 @@ func (s *Service) modulesSortedBy(criteria by) []interface{} { } func (s *Service) togglesHandlerFunc(w http.ResponseWriter, r *http.Request) { - logger.Info("toggles") + logger.Info("togglesHandlerFunc") for key, values := range r.URL.Query() { + if !toggleAllowed(key) { + logger.WithFields(log.Fields{ + "key": key, + }).Info("toggling this module is not explicitly allowed") + continue + } if len(values) != 1 { logger.WithFields(log.Fields{ "key": key, @@ -212,6 +218,13 @@ func (s *Service) togglesHandlerFunc(w http.ResponseWriter, r *http.Request) { } } +func toggleAllowed(modulePackage string) bool { + if modulePackage == "sms" { + return true + } + return false +} + func (s *Service) toggleModule(searchedModulePackage string, enable bool, w http.ResponseWriter) { for order, iface := range s.ModulesSortedByStartOrder() { packagePath := reflect.TypeOf(iface).String() @@ -220,27 +233,27 @@ func (s *Service) toggleModule(searchedModulePackage string, enable bool, w http if len(packagePathTokens) > 0 { modulePackage = strings.TrimPrefix(packagePathTokens[0], "*") } - if searchedModulePackage == modulePackage { - logger.WithFields(log.Fields{ - "module": searchedModulePackage, - "enable": enable, - }).Info("toggles single boolean valid parameter") - - if s, ok := iface.(Startable); ok && enable { - logger.WithFields(log.Fields{"modulePackage": modulePackage, "order": order}).Info("Starting module") - if err := s.Start(); err != nil { - logger.WithError(err).WithField("modulePackage", modulePackage).Error("Error while starting module") - } - w.Write([]byte(fmt.Sprintf("%s was started.\n", searchedModulePackage))) + if searchedModulePackage != modulePackage { + continue + } + logger.WithFields(log.Fields{ + "module": searchedModulePackage, + "enable": enable, + }).Info("toggles single boolean valid parameter") + if s, ok := iface.(Startable); ok && enable { + logger.WithFields(log.Fields{"modulePackage": modulePackage, "order": order}).Info("Starting module") + if err := s.Start(); err != nil { + logger.WithError(err).WithField("modulePackage", modulePackage).Error("Error while starting module") } - - if s, ok := iface.(Stopable); ok && !enable { - logger.WithFields(log.Fields{"modulePackage": modulePackage, "order": order}).Info("Stopping module") - if err := s.Stop(); err != nil { - logger.WithError(err).WithField("modulePackage", modulePackage).Error("Error while stopping module") - } - w.Write([]byte(fmt.Sprintf("%s was stopped.\n", searchedModulePackage))) + w.Write([]byte(fmt.Sprintf("%s was started.\n", searchedModulePackage))) + } + if s, ok := iface.(Stopable); ok && !enable { + logger.WithFields(log.Fields{"modulePackage": modulePackage, "order": order}).Info("Stopping module") + if err := s.Stop(); err != nil { + logger.WithError(err).WithField("modulePackage", modulePackage).Error("Error while stopping module") } + w.Write([]byte(fmt.Sprintf("%s was stopped.\n", searchedModulePackage))) } + } } From 3239f419c619d11ca23f5700f9ec1de4d069f090 Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Mon, 24 Apr 2017 14:42:36 +0300 Subject: [PATCH 21/28] fmt --- server/sms/utils_test.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/server/sms/utils_test.go b/server/sms/utils_test.go index 35bb85ff..76d3b390 100644 --- a/server/sms/utils_test.go +++ b/server/sms/utils_test.go @@ -59,11 +59,12 @@ func createConfig() Config { intervalMetrics := true skipFetch := false return Config{ - Workers: &worker, - SMSTopic: &topic, - Name: "test_gateway", - Schema: SMSSchema, + Workers: &worker, + SMSTopic: &topic, + Name: "test_gateway", + Schema: SMSSchema, SkipFetch: &skipFetch, + IntervalMetrics: &intervalMetrics, } } @@ -120,13 +121,9 @@ func encodeUnmarshallableProtocolMessage(ID int) protocol.Message { func createGateway(t *testing.T, kvStore kvstore.KVStore) *gateway { a := assert.New(t) - sender := createNexmoSender(t) - config := createConfig() msgStore := dummystore.New(kvStore) - unstartedRouter := router.New(msgStore, kvStore, nil) - - gw, err := New(unstartedRouter, sender, config) + gw, err := New(unstartedRouter, createNexmoSender(t), createConfig()) a.NoError(err) err = gw.Start() if err != nil { From bc8409c8bdb7f7af255f8566d6a253470f8e6438 Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Mon, 24 Apr 2017 15:03:41 +0300 Subject: [PATCH 22/28] using a SMS_TOGGLEABLE env.var. to: avoid fetching on Start, and unsubscribing on Stop; adding and fixing tests --- server/config.go | 4 ++-- server/config_test.go | 11 +++++++++++ server/sms/sms_gateway.go | 31 +++++++++++++++++-------------- server/sms/sms_gateway_test.go | 12 ++++++------ server/sms/utils_test.go | 10 +++++----- 5 files changed, 41 insertions(+), 27 deletions(-) diff --git a/server/config.go b/server/config.go index 65dcdbf1..66db0c54 100644 --- a/server/config.go +++ b/server/config.go @@ -228,8 +228,8 @@ var ( Envar("GUBLE_SMS_TOPIC"). Default(sms.SMSDefaultTopic). String(), - SkipFetch: kingpin.Flag("sms-skip-fetch", "If sms gateway should skip fetching from message-store when starting"). - Envar("GUBLE_SMS_SKIP_FETCH"). + Toggleable: kingpin.Flag("sms-toggleable", "If sms gateway should be able to be stopped and restarted at runtime"). + Envar("GUBLE_SMS_TOGGLEABLE"). Bool(), Workers: kingpin.Flag("sms-workers", "The number of workers handling traffic with Nexmo sms endpoint(default: number of CPUs)"). Default(strconv.Itoa(runtime.NumCPU())). diff --git a/server/config_test.go b/server/config_test.go index ed1f631f..20957196 100644 --- a/server/config_test.go +++ b/server/config_test.go @@ -42,6 +42,9 @@ func TestParsingOfEnvironmentVariables(t *testing.T) { os.Setenv("GUBLE_PROMETHEUS_ENDPOINT", "prometheus_endpoint") defer os.Unsetenv("GUBLE_PROMETHEUS_ENDPOINT") + os.Setenv("GUBLE_TOGGLES_ENDPOINT", "toggles_endpoint") + defer os.Unsetenv("GUBLE_TOGGLES_ENDPOINT") + os.Setenv("GUBLE_MS", "ms-backend") defer os.Unsetenv("GUBLE_MS") @@ -105,6 +108,9 @@ func TestParsingOfEnvironmentVariables(t *testing.T) { os.Setenv("GUBLE_SMS_KAFKA_TOPIC", "sms_reporting_topic") defer os.Unsetenv("GUBLE_SMS_KAFKA_TOPIC") + os.Setenv("GUBLE_SMS_TOGGLEABLE", "sms_toggleable") + defer os.Unsetenv("GUBLE_SMS_TOGGLEABLE") + // when we parse the arguments from environment variables parseConfig() @@ -131,6 +137,7 @@ func TestParsingArgs(t *testing.T) { "--health-endpoint", "health_endpoint", "--metrics-endpoint", "metrics_endpoint", "--prometheus-endpoint", "prometheus_endpoint", + "--toggles-endpoint", "toggles_endpoint", "--ws", "--ws-prefix", "/wstream/", "--fcm", @@ -151,6 +158,7 @@ func TestParsingArgs(t *testing.T) { "--remotes", "127.0.0.1:8080 127.0.0.1:20002", "--kafka-brokers", "127.0.0.1:9092 127.0.0.1:9091", "--sms-kafka-topic", "sms_reporting_topic", + "--sms-toggleable", "sms_toggleable", } // when we parse the arguments from command-line flags @@ -169,6 +177,7 @@ func assertArguments(a *assert.Assertions) { a.Equal("metrics_endpoint", *Config.MetricsEndpoint) a.Equal("prometheus_endpoint", *Config.PrometheusEndpoint) + a.Equal("toggles_endpoint", *Config.TogglesEndpoint) a.Equal(true, *Config.WS.Enabled) a.Equal("/wstream/", *Config.WS.Prefix) @@ -199,6 +208,8 @@ func assertArguments(a *assert.Assertions) { a.Equal("[127.0.0.1:9092 127.0.0.1:9091]", (*Config.KafkaProducer.Brokers).String()) a.Equal("sms_reporting_topic", *Config.SMS.KafkaReportingTopic) + a.Equal("sms_toggleable", *Config.SMS.Toggleable) + assertClusterRemotes(a) } diff --git a/server/sms/sms_gateway.go b/server/sms/sms_gateway.go index 30c4e45a..1c6f5cac 100644 --- a/server/sms/sms_gateway.go +++ b/server/sms/sms_gateway.go @@ -28,7 +28,7 @@ type Config struct { Workers *int SMSTopic *string IntervalMetrics *bool - SkipFetch *bool + Toggleable *bool KafkaReportingTopic *string @@ -98,7 +98,7 @@ func (g *gateway) initRoute(fetch bool) { QueueSize: -1, Timeout: -1, }) - if fetch || !*g.config.SkipFetch { + if fetch || !*g.config.Toggleable { g.route.FetchRequest = g.fetchRequest() } } @@ -122,7 +122,7 @@ func (g *gateway) Run() { err := g.route.Provide(g.router, true) if err != nil { // cancel subscription loop if there is an error on the provider - logger.WithField("error", err.Error()).Error("Provide returned error") + logger.WithError(err).Error("Provide returned error") provideErr = err g.Cancel() } else { @@ -134,7 +134,7 @@ func (g *gateway) Run() { if err != nil && provideErr == nil { // If Route channel closed, try restarting if err == connector.ErrRouteChannelClosed { - g.logger.Info("Restarting because ErrRouteChannelClosed") + g.logger.WithError(err).Info("Restarting") g.restart() return } @@ -146,7 +146,7 @@ func (g *gateway) Run() { // Router closed the route, try restart if provideErr == router.ErrInvalidRoute { - g.logger.Info("Restarting because ErrInvalidRoute") + g.logger.WithError(provideErr).Info("Restarting") g.restart() return } @@ -215,7 +215,7 @@ func (g *gateway) send(receivedMsg *protocol.Message) error { } func (g *gateway) restart() error { - g.logger.WithField("LastIDSent", g.LastIDSent).Info("restart in progress") + g.logger.WithField("LastIDSent", g.LastIDSent).Info("SMS Gateway restarting") g.Cancel() g.cancelFunc = nil @@ -230,23 +230,26 @@ func (g *gateway) restart() error { go g.Run() - g.logger.WithField("LastIDSent", g.LastIDSent).Info("restart finished") + g.logger.WithField("LastIDSent", g.LastIDSent).Info("SMS Gateway restarted") return nil } // Stop the sms gateway; it is an idempotent operation. func (g *gateway) Stop() error { g.logger.Info("Stopping gateway") - if g.cancelFunc != nil { + if g.cancelFunc == nil { + g.logger.Info("Gateway was already stopped") + return nil + } + if *g.config.Toggleable { g.logger.Info("Unsubscribing the sms route") g.router.Unsubscribe(g.route) - g.logger.Info("Calling the cancel function") - g.cancelFunc() - g.cancelFunc = nil - g.logger.Info("Stopped gateway") - } else { - g.logger.Info("Gateway was already stopped") } + g.logger.Info("Calling the cancel function") + g.cancelFunc() + g.cancelFunc = nil + g.logger.Info("Stopped gateway") + return nil } diff --git a/server/sms/sms_gateway_test.go b/server/sms/sms_gateway_test.go index ca3a9cb9..c5b85dd3 100644 --- a/server/sms/sms_gateway_test.go +++ b/server/sms/sms_gateway_test.go @@ -99,7 +99,7 @@ func Test_SendOneSms(t *testing.T) { }) routerMock.EXPECT().Unsubscribe(gomock.Any()).Do(func(r *router.Route) { a.Equal(*config.SMSTopic, string(r.Path)) - }) + }).AnyTimes() gw, err := New(routerMock, mockSmsSender, config) a.NoError(err) @@ -278,11 +278,11 @@ func Test_RetryLoop(t *testing.T) { skipFetch := false gateway, err := New(routerMock, mockSmsSender, Config{ - Workers: &worker, - Name: SMSDefaultTopic, - Schema: SMSSchema, - SMSTopic: &topic, - SkipFetch: &skipFetch, + Workers: &worker, + Name: SMSDefaultTopic, + Schema: SMSSchema, + SMSTopic: &topic, + Toggleable: &skipFetch, IntervalMetrics: &enableMetrics, }) a.NoError(err) diff --git a/server/sms/utils_test.go b/server/sms/utils_test.go index 76d3b390..b97fea64 100644 --- a/server/sms/utils_test.go +++ b/server/sms/utils_test.go @@ -59,11 +59,11 @@ func createConfig() Config { intervalMetrics := true skipFetch := false return Config{ - Workers: &worker, - SMSTopic: &topic, - Name: "test_gateway", - Schema: SMSSchema, - SkipFetch: &skipFetch, + Workers: &worker, + SMSTopic: &topic, + Name: "test_gateway", + Schema: SMSSchema, + Toggleable: &skipFetch, IntervalMetrics: &intervalMetrics, } From ea8999d3e6812fa778a0462ab5870d3dea07c2ef Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Mon, 24 Apr 2017 15:17:34 +0300 Subject: [PATCH 23/28] refactorin to decrease toggles-handling code complexity --- server/service/service.go | 43 +++++++++++++++++++++------------------ server/sms/sms_gateway.go | 8 ++++---- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/server/service/service.go b/server/service/service.go index e2d8b500..6bc5ba26 100644 --- a/server/service/service.go +++ b/server/service/service.go @@ -214,7 +214,7 @@ func (s *Service) togglesHandlerFunc(w http.ResponseWriter, r *http.Request) { }).Info("ignoring toggles single parameter since it is not boolean") continue } - s.toggleModule(key, enable, w) + s.tryToggleModule(key, enable, w) } } @@ -225,8 +225,8 @@ func toggleAllowed(modulePackage string) bool { return false } -func (s *Service) toggleModule(searchedModulePackage string, enable bool, w http.ResponseWriter) { - for order, iface := range s.ModulesSortedByStartOrder() { +func (s *Service) tryToggleModule(searchedModulePackage string, enable bool, w http.ResponseWriter) { + for _, iface := range s.ModulesSortedByStartOrder() { packagePath := reflect.TypeOf(iface).String() packagePathTokens := strings.Split(packagePath, ".") var modulePackage string @@ -236,24 +236,27 @@ func (s *Service) toggleModule(searchedModulePackage string, enable bool, w http if searchedModulePackage != modulePackage { continue } - logger.WithFields(log.Fields{ - "module": searchedModulePackage, - "enable": enable, - }).Info("toggles single boolean valid parameter") - if s, ok := iface.(Startable); ok && enable { - logger.WithFields(log.Fields{"modulePackage": modulePackage, "order": order}).Info("Starting module") - if err := s.Start(); err != nil { - logger.WithError(err).WithField("modulePackage", modulePackage).Error("Error while starting module") - } - w.Write([]byte(fmt.Sprintf("%s was started.\n", searchedModulePackage))) + s.toggleModule(modulePackage, enable, iface, w) + } +} + +func (s *Service) toggleModule(modulePackage string, enable bool, iface interface{}, w http.ResponseWriter) { + le := logger.WithFields(log.Fields{ + "modulePackage": modulePackage, + "enable": enable, + }) + if s, ok := iface.(Startable); ok && enable { + le.Info("Starting module") + if err := s.Start(); err != nil { + le.WithError(err).Error("Error while starting module") } - if s, ok := iface.(Stopable); ok && !enable { - logger.WithFields(log.Fields{"modulePackage": modulePackage, "order": order}).Info("Stopping module") - if err := s.Stop(); err != nil { - logger.WithError(err).WithField("modulePackage", modulePackage).Error("Error while stopping module") - } - w.Write([]byte(fmt.Sprintf("%s was stopped.\n", searchedModulePackage))) + w.Write([]byte(fmt.Sprintf("%s was started.\n", modulePackage))) + } + if s, ok := iface.(Stopable); ok && !enable { + le.Info("Stopping module") + if err := s.Stop(); err != nil { + le.WithError(err).Error("Error while stopping module") } - + w.Write([]byte(fmt.Sprintf("%s was stopped.\n", modulePackage))) } } diff --git a/server/sms/sms_gateway.go b/server/sms/sms_gateway.go index 1c6f5cac..868a3e51 100644 --- a/server/sms/sms_gateway.go +++ b/server/sms/sms_gateway.go @@ -93,10 +93,10 @@ func (g *gateway) Start() error { func (g *gateway) initRoute(fetch bool) { g.route = router.NewRoute(router.RouteConfig{ - Path: protocol.Path(*g.config.SMSTopic), - ChannelSize: 5000, - QueueSize: -1, - Timeout: -1, + Path: protocol.Path(*g.config.SMSTopic), + ChannelSize: 5000, + QueueSize: -1, + Timeout: -1, }) if fetch || !*g.config.Toggleable { g.route.FetchRequest = g.fetchRequest() From 8a4ad340a0f06e1d3c901617f60f420ebde3cdf8 Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Mon, 24 Apr 2017 15:26:35 +0300 Subject: [PATCH 24/28] fixing tests --- server/config_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/config_test.go b/server/config_test.go index 20957196..a844c5ab 100644 --- a/server/config_test.go +++ b/server/config_test.go @@ -108,7 +108,7 @@ func TestParsingOfEnvironmentVariables(t *testing.T) { os.Setenv("GUBLE_SMS_KAFKA_TOPIC", "sms_reporting_topic") defer os.Unsetenv("GUBLE_SMS_KAFKA_TOPIC") - os.Setenv("GUBLE_SMS_TOGGLEABLE", "sms_toggleable") + os.Setenv("GUBLE_SMS_TOGGLEABLE", "true") defer os.Unsetenv("GUBLE_SMS_TOGGLEABLE") // when we parse the arguments from environment variables @@ -158,7 +158,7 @@ func TestParsingArgs(t *testing.T) { "--remotes", "127.0.0.1:8080 127.0.0.1:20002", "--kafka-brokers", "127.0.0.1:9092 127.0.0.1:9091", "--sms-kafka-topic", "sms_reporting_topic", - "--sms-toggleable", "sms_toggleable", + "--sms-toggleable", } // when we parse the arguments from command-line flags @@ -208,7 +208,7 @@ func assertArguments(a *assert.Assertions) { a.Equal("[127.0.0.1:9092 127.0.0.1:9091]", (*Config.KafkaProducer.Brokers).String()) a.Equal("sms_reporting_topic", *Config.SMS.KafkaReportingTopic) - a.Equal("sms_toggleable", *Config.SMS.Toggleable) + a.Equal(true, *Config.SMS.Toggleable) assertClusterRemotes(a) } From 7ee34b311369ca8477e7d9f47affab6c728d84f7 Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Mon, 24 Apr 2017 15:26:54 +0300 Subject: [PATCH 25/28] minor --- server/sms/sms_gateway_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/sms/sms_gateway_test.go b/server/sms/sms_gateway_test.go index c5b85dd3..8f2517f4 100644 --- a/server/sms/sms_gateway_test.go +++ b/server/sms/sms_gateway_test.go @@ -272,7 +272,7 @@ func Test_RetryLoop(t *testing.T) { routerMock.EXPECT().MessageStore().AnyTimes().Return(mockMessageStore, nil) //setup a new sms gateway - worker := 8 + worker := 1 topic := SMSDefaultTopic enableMetrics := false skipFetch := false From 92a7a94eae2269895aadc35511a331b72ab6a1f7 Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Mon, 24 Apr 2017 15:36:50 +0300 Subject: [PATCH 26/28] simpler logs --- server/service/service.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/server/service/service.go b/server/service/service.go index 6bc5ba26..e4b6208f 100644 --- a/server/service/service.go +++ b/server/service/service.go @@ -194,15 +194,11 @@ func (s *Service) togglesHandlerFunc(w http.ResponseWriter, r *http.Request) { logger.Info("togglesHandlerFunc") for key, values := range r.URL.Query() { if !toggleAllowed(key) { - logger.WithFields(log.Fields{ - "key": key, - }).Info("toggling this module is not explicitly allowed") + logger.WithField("key", key).Info("toggling this module is not explicitly allowed") continue } if len(values) != 1 { - logger.WithFields(log.Fields{ - "key": key, - }).Info("ignoring toggles parameter since it has more than one value") + logger.WithField("key", key).Info("ignoring toggles parameter since it has more than one value") continue } value := values[0] From dad633237e044ce26e651b40b0b11cf649dbecc6 Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Mon, 24 Apr 2017 15:39:16 +0300 Subject: [PATCH 27/28] fmt --- server/connector/connector.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/connector/connector.go b/server/connector/connector.go index 9402da11..2225f302 100644 --- a/server/connector/connector.go +++ b/server/connector/connector.go @@ -345,11 +345,11 @@ func (c *connector) restart(s Subscriber) error { // Stop the connector (the context, the queue, the subscription loops) func (c *connector) Stop() error { c.logger.Info("Stopping connector") - if c.cancel == nil{ + if c.cancel == nil { return nil } c.cancel() - c.cancel=nil + c.cancel = nil c.queue.Stop() c.wg.Wait() c.logger.Info("Stopped connector") From 5e3e676fed12cb904ef6ed29fb052543f3b02bc1 Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Mon, 24 Apr 2017 16:10:25 +0300 Subject: [PATCH 28/28] better handling of error-cases on toggling modules (Start/Stop) --- server/service/service.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/service/service.go b/server/service/service.go index e4b6208f..7e9c91c9 100644 --- a/server/service/service.go +++ b/server/service/service.go @@ -245,14 +245,18 @@ func (s *Service) toggleModule(modulePackage string, enable bool, iface interfac le.Info("Starting module") if err := s.Start(); err != nil { le.WithError(err).Error("Error while starting module") + w.Write([]byte(fmt.Sprintf("%s could not be started.\n", modulePackage))) + return } - w.Write([]byte(fmt.Sprintf("%s was started.\n", modulePackage))) + w.Write([]byte(fmt.Sprintf("%s was successfully started.\n", modulePackage))) } if s, ok := iface.(Stopable); ok && !enable { le.Info("Stopping module") if err := s.Stop(); err != nil { le.WithError(err).Error("Error while stopping module") + w.Write([]byte(fmt.Sprintf("%s could not be stopped.\n", modulePackage))) + return } - w.Write([]byte(fmt.Sprintf("%s was stopped.\n", modulePackage))) + w.Write([]byte(fmt.Sprintf("%s was successfully stopped.\n", modulePackage))) } }