Skip to content

Commit

Permalink
Merge 5e3e676 into 5c976e2
Browse files Browse the repository at this point in the history
  • Loading branch information
Cosmin Rentea committed Apr 24, 2017
2 parents 5c976e2 + 5e3e676 commit 76aabea
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 51 deletions.
4 changes: 4 additions & 0 deletions server/apns/apns.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (a *apns) Start() error {
return err
}

func (a *apns) Stop() error {
return a.Connector.Stop()
}

func (a *apns) startMetrics() {
mTotalSentMessages.Set(0)
mTotalSendErrors.Set(0)
Expand Down
9 changes: 9 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
defaultHealthEndpoint = "/admin/healthcheck"
defaultMetricsEndpoint = "/admin/metrics-old"
defaultPrometheusEndpoint = "/admin/metrics"
defaultTogglesEndpoint = "/admin/toggles"
defaultKVSBackend = "file"
defaultMSBackend = "file"
defaultStoragePath = "/var/lib/guble"
Expand Down Expand Up @@ -71,6 +72,7 @@ type (
HealthEndpoint *string
MetricsEndpoint *string
PrometheusEndpoint *string
TogglesEndpoint *string
Profile *string
Postgres PostgresConfig
FCM fcm.Config
Expand Down Expand Up @@ -124,6 +126,10 @@ var (
Default(defaultPrometheusEndpoint).
Envar("GUBLE_PROMETHEUS_ENDPOINT").
String(),
TogglesEndpoint: kingpin.Flag("toggles-endpoint", `The Feature-Toggles endpoint to be used by the HTTP server (value for disabling it: "")`).
Default(defaultTogglesEndpoint).
Envar("GUBLE_TOGGLES_ENDPOINT").
String(),
Profile: kingpin.Flag("profile", `The profiler to be used (default: none): mem | cpu | block`).
Default("").
Envar("GUBLE_PROFILE").
Expand Down Expand Up @@ -222,6 +228,9 @@ var (
Envar("GUBLE_SMS_TOPIC").
Default(sms.SMSDefaultTopic).
String(),
Toggleable: kingpin.Flag("sms-toggleable", "If sms gateway should be able to be stopped and restarted at runtime").
Envar("GUBLE_SMS_TOGGLEABLE").
Bool(),
Workers: kingpin.Flag("sms-workers", "The number of workers handling traffic with Nexmo sms endpoint(default: number of CPUs)").
Default(strconv.Itoa(runtime.NumCPU())).
Envar("GUBLE_SMS_WORKERS").
Expand Down
11 changes: 11 additions & 0 deletions server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func TestParsingOfEnvironmentVariables(t *testing.T) {
os.Setenv("GUBLE_PROMETHEUS_ENDPOINT", "prometheus_endpoint")
defer os.Unsetenv("GUBLE_PROMETHEUS_ENDPOINT")

os.Setenv("GUBLE_TOGGLES_ENDPOINT", "toggles_endpoint")
defer os.Unsetenv("GUBLE_TOGGLES_ENDPOINT")

os.Setenv("GUBLE_MS", "ms-backend")
defer os.Unsetenv("GUBLE_MS")

Expand Down Expand Up @@ -105,6 +108,9 @@ func TestParsingOfEnvironmentVariables(t *testing.T) {
os.Setenv("GUBLE_SMS_KAFKA_TOPIC", "sms_reporting_topic")
defer os.Unsetenv("GUBLE_SMS_KAFKA_TOPIC")

os.Setenv("GUBLE_SMS_TOGGLEABLE", "true")
defer os.Unsetenv("GUBLE_SMS_TOGGLEABLE")

// when we parse the arguments from environment variables
parseConfig()

Expand All @@ -131,6 +137,7 @@ func TestParsingArgs(t *testing.T) {
"--health-endpoint", "health_endpoint",
"--metrics-endpoint", "metrics_endpoint",
"--prometheus-endpoint", "prometheus_endpoint",
"--toggles-endpoint", "toggles_endpoint",
"--ws",
"--ws-prefix", "/wstream/",
"--fcm",
Expand All @@ -151,6 +158,7 @@ func TestParsingArgs(t *testing.T) {
"--remotes", "127.0.0.1:8080 127.0.0.1:20002",
"--kafka-brokers", "127.0.0.1:9092 127.0.0.1:9091",
"--sms-kafka-topic", "sms_reporting_topic",
"--sms-toggleable",
}

// when we parse the arguments from command-line flags
Expand All @@ -169,6 +177,7 @@ func assertArguments(a *assert.Assertions) {

a.Equal("metrics_endpoint", *Config.MetricsEndpoint)
a.Equal("prometheus_endpoint", *Config.PrometheusEndpoint)
a.Equal("toggles_endpoint", *Config.TogglesEndpoint)

a.Equal(true, *Config.WS.Enabled)
a.Equal("/wstream/", *Config.WS.Prefix)
Expand Down Expand Up @@ -199,6 +208,8 @@ func assertArguments(a *assert.Assertions) {
a.Equal("[127.0.0.1:9092 127.0.0.1:9091]", (*Config.KafkaProducer.Brokers).String())
a.Equal("sms_reporting_topic", *Config.SMS.KafkaReportingTopic)

a.Equal(true, *Config.SMS.Toggleable)

assertClusterRemotes(a)
}

Expand Down
13 changes: 11 additions & 2 deletions server/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,16 +257,21 @@ func (c *connector) Substitute(w http.ResponseWriter, req *http.Request) {
fmt.Fprintf(w, `{"modified":"%d"}`, totalSubscribersUpdated)
}

// Start will run start all current subscriptions and workers to process the messages
// Start all current subscriptions and workers to process the messages
func (c *connector) Start() error {
c.logger.Info("Starting connector")
if c.cancel != nil {
c.logger.Info("Connector was already started")
return nil
}
c.queue.Start()

c.logger.Info("Starting connector")
c.ctx, c.cancel = context.WithCancel(context.Background())

c.logger.Info("Loading subscriptions")
err := c.manager.Load()
if err != nil {
c.logger.Error("error while loading subscriptions")
return err
}

Expand Down Expand Up @@ -340,7 +345,11 @@ func (c *connector) restart(s Subscriber) error {
// Stop the connector (the context, the queue, the subscription loops)
func (c *connector) Stop() error {
c.logger.Info("Stopping connector")
if c.cancel == nil {
return nil
}
c.cancel()
c.cancel = nil
c.queue.Stop()
c.wg.Wait()
c.logger.Info("Stopped connector")
Expand Down
4 changes: 4 additions & 0 deletions server/fcm/fcm.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (f *fcm) Start() error {
return err
}

func (f *fcm) Stop() error {
return f.Connector.Stop()
}

func (f *fcm) startMetrics() {
mTotalSentMessages.Set(0)
mTotalSendErrors.Set(0)
Expand Down
13 changes: 7 additions & 6 deletions server/gobbler.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,19 +186,19 @@ var CreateModules = func(router router.Router) (modules []interface{}) {
}

if *Config.SMS.Enabled {
logger.Info("Nexmo SMS: enabled")
logger.Info("SMS: enabled")
if *Config.SMS.APIKey == "" || *Config.SMS.APISecret == "" {
logger.Panic("The API Key has to be provided when NEXMO SMS connector is enabled")
logger.Panic("The API Key and Secret have to be provided when NEXMO SMS connector is enabled")
}
nexmoSender, err := sms.NewNexmoSender(*Config.SMS.APIKey, *Config.SMS.APISecret, kafkaProducer, *Config.SMS.KafkaReportingTopic)
if err != nil {
logger.WithError(err).Error("Error creating Nexmo Sender")
}
smsConn, err := sms.New(router, nexmoSender, Config.SMS)
smsGateway, err := sms.New(router, nexmoSender, Config.SMS)
if err != nil {
logger.WithError(err).Error("Error creating Nexmo Sender")
logger.WithError(err).Error("Error creating SMS Gateway")
} else {
modules = append(modules, smsConn)
modules = append(modules, smsGateway)
}
} else {
logger.Info("SMS: disabled")
Expand Down Expand Up @@ -271,7 +271,8 @@ func StartService() *service.Service {
srv := service.New(r, websrv).
HealthEndpoint(*Config.HealthEndpoint).
MetricsEndpoint(*Config.MetricsEndpoint).
PrometheusEndpoint(*Config.PrometheusEndpoint)
PrometheusEndpoint(*Config.PrometheusEndpoint).
TogglesEndpoint(*Config.TogglesEndpoint)

srv.RegisterModules(0, 6, kvStore, messageStore)
srv.RegisterModules(4, 3, CreateModules(r)...)
Expand Down
92 changes: 90 additions & 2 deletions server/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package service
import (
log "github.com/Sirupsen/logrus"
"github.com/docker/distribution/health"
"github.com/hashicorp/go-multierror"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/cosminrentea/gobbler/server/metrics"
"github.com/cosminrentea/gobbler/server/router"
"github.com/cosminrentea/gobbler/server/webserver"

"github.com/hashicorp/go-multierror"
"fmt"
"net/http"
"reflect"
"strconv"
"strings"
"time"
)

Expand All @@ -30,6 +33,7 @@ type Service struct {
healthThreshold int
metricsEndpoint string
prometheusEndpoint string
togglesEndpoint string
}

// New creates a new Service, using the given Router and WebServer.
Expand Down Expand Up @@ -88,7 +92,14 @@ func (s *Service) PrometheusEndpoint(endpointPrefix string) *Service {
return s
}

// Start checks the modules for the following interfaces and registers and/or starts:
// TogglesEndpoint sets the endpoint used for Feature-Toggles. Parameter for disabling the endpoint is: "". Returns the updated service.
func (s *Service) TogglesEndpoint(endpointPrefix string) *Service {
s.togglesEndpoint = endpointPrefix
return s
}

// Start the health-check, old-format metrics, and Prometheus metrics endpoint,
// and then check the modules for the following interfaces and registers and/or start:
// Startable:
// health.Checker:
// Endpoint: Register the handler function of the Endpoint in the http service at prefix
Expand All @@ -112,6 +123,12 @@ func (s *Service) Start() error {
} else {
logger.Info("Prometheus metrics endpoint disabled")
}
if s.togglesEndpoint != "" {
logger.WithField("togglesEndpoint", s.togglesEndpoint).Info("Toggles endpoint")
s.webserver.Handle(s.togglesEndpoint, http.HandlerFunc(s.togglesHandlerFunc))
} else {
logger.Info("Toggles endpoint disabled")
}
for order, iface := range s.ModulesSortedByStartOrder() {
name := reflect.TypeOf(iface).String()
if s, ok := iface.(Startable); ok {
Expand Down Expand Up @@ -172,3 +189,74 @@ func (s *Service) modulesSortedBy(criteria by) []interface{} {
}
return sorted
}

func (s *Service) togglesHandlerFunc(w http.ResponseWriter, r *http.Request) {
logger.Info("togglesHandlerFunc")
for key, values := range r.URL.Query() {
if !toggleAllowed(key) {
logger.WithField("key", key).Info("toggling this module is not explicitly allowed")
continue
}
if len(values) != 1 {
logger.WithField("key", key).Info("ignoring toggles parameter since it has more than one value")
continue
}
value := values[0]
enable, err := strconv.ParseBool(value)
if err != nil {
logger.WithFields(log.Fields{
"key": key,
"value": value,
}).Info("ignoring toggles single parameter since it is not boolean")
continue
}
s.tryToggleModule(key, enable, w)
}
}

func toggleAllowed(modulePackage string) bool {
if modulePackage == "sms" {
return true
}
return false
}

func (s *Service) tryToggleModule(searchedModulePackage string, enable bool, w http.ResponseWriter) {
for _, iface := range s.ModulesSortedByStartOrder() {
packagePath := reflect.TypeOf(iface).String()
packagePathTokens := strings.Split(packagePath, ".")
var modulePackage string
if len(packagePathTokens) > 0 {
modulePackage = strings.TrimPrefix(packagePathTokens[0], "*")
}
if searchedModulePackage != modulePackage {
continue
}
s.toggleModule(modulePackage, enable, iface, w)
}
}

func (s *Service) toggleModule(modulePackage string, enable bool, iface interface{}, w http.ResponseWriter) {
le := logger.WithFields(log.Fields{
"modulePackage": modulePackage,
"enable": enable,
})
if s, ok := iface.(Startable); ok && enable {
le.Info("Starting module")
if err := s.Start(); err != nil {
le.WithError(err).Error("Error while starting module")
w.Write([]byte(fmt.Sprintf("%s could not be started.\n", modulePackage)))
return
}
w.Write([]byte(fmt.Sprintf("%s was successfully started.\n", modulePackage)))
}
if s, ok := iface.(Stopable); ok && !enable {
le.Info("Stopping module")
if err := s.Stop(); err != nil {
le.WithError(err).Error("Error while stopping module")
w.Write([]byte(fmt.Sprintf("%s could not be stopped.\n", modulePackage)))
return
}
w.Write([]byte(fmt.Sprintf("%s was successfully stopped.\n", modulePackage)))
}
}
Loading

0 comments on commit 76aabea

Please sign in to comment.