Skip to content

Commit

Permalink
Merge bb9f798 into 5c976e2
Browse files Browse the repository at this point in the history
  • Loading branch information
Cosmin Rentea committed Apr 24, 2017
2 parents 5c976e2 + bb9f798 commit 361e9b2
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 31 deletions.
4 changes: 4 additions & 0 deletions server/apns/apns.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (a *apns) Start() error {
return err
}

func (a *apns) Stop() error {
return a.Connector.Stop()
}

func (a *apns) startMetrics() {
mTotalSentMessages.Set(0)
mTotalSendErrors.Set(0)
Expand Down
9 changes: 9 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
defaultHealthEndpoint = "/admin/healthcheck"
defaultMetricsEndpoint = "/admin/metrics-old"
defaultPrometheusEndpoint = "/admin/metrics"
defaultTogglesEndpoint = "/admin/toggles"
defaultKVSBackend = "file"
defaultMSBackend = "file"
defaultStoragePath = "/var/lib/guble"
Expand Down Expand Up @@ -71,6 +72,7 @@ type (
HealthEndpoint *string
MetricsEndpoint *string
PrometheusEndpoint *string
TogglesEndpoint *string
Profile *string
Postgres PostgresConfig
FCM fcm.Config
Expand Down Expand Up @@ -124,6 +126,10 @@ var (
Default(defaultPrometheusEndpoint).
Envar("GUBLE_PROMETHEUS_ENDPOINT").
String(),
TogglesEndpoint: kingpin.Flag("toggles-endpoint", `The Feature-Toggles endpoint to be used by the HTTP server (value for disabling it: "")`).
Default(defaultTogglesEndpoint).
Envar("GUBLE_TOGGLES_ENDPOINT").
String(),
Profile: kingpin.Flag("profile", `The profiler to be used (default: none): mem | cpu | block`).
Default("").
Envar("GUBLE_PROFILE").
Expand Down Expand Up @@ -222,6 +228,9 @@ var (
Envar("GUBLE_SMS_TOPIC").
Default(sms.SMSDefaultTopic).
String(),
SkipFetch: kingpin.Flag("sms-skip-fetch", "If sms gateway should skip fetching from message-store when starting").
Envar("GUBLE_SMS_SKIP_FETCH").
Bool(),
Workers: kingpin.Flag("sms-workers", "The number of workers handling traffic with Nexmo sms endpoint(default: number of CPUs)").
Default(strconv.Itoa(runtime.NumCPU())).
Envar("GUBLE_SMS_WORKERS").
Expand Down
11 changes: 10 additions & 1 deletion server/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,19 @@ func (c *connector) Substitute(w http.ResponseWriter, req *http.Request) {

// Start will run start all current subscriptions and workers to process the messages
func (c *connector) Start() error {
c.logger.Info("Starting connector")
if c.cancel != nil {
c.logger.Info("Connector was already started")
return nil
}
c.queue.Start()

c.logger.Info("Starting connector")
c.ctx, c.cancel = context.WithCancel(context.Background())

c.logger.Info("Loading subscriptions")
err := c.manager.Load()
if err != nil {
c.logger.Error("error while loading subscriptions")
return err
}

Expand Down Expand Up @@ -340,7 +345,11 @@ func (c *connector) restart(s Subscriber) error {
// Stop the connector (the context, the queue, the subscription loops)
func (c *connector) Stop() error {
c.logger.Info("Stopping connector")
if c.cancel == nil{
return nil
}
c.cancel()
c.cancel=nil
c.queue.Stop()
c.wg.Wait()
c.logger.Info("Stopped connector")
Expand Down
4 changes: 4 additions & 0 deletions server/fcm/fcm.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (f *fcm) Start() error {
return err
}

func (f *fcm) Stop() error {
return f.Connector.Stop()
}

func (f *fcm) startMetrics() {
mTotalSentMessages.Set(0)
mTotalSendErrors.Set(0)
Expand Down
13 changes: 7 additions & 6 deletions server/gobbler.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,19 +186,19 @@ var CreateModules = func(router router.Router) (modules []interface{}) {
}

if *Config.SMS.Enabled {
logger.Info("Nexmo SMS: enabled")
logger.Info("SMS: enabled")
if *Config.SMS.APIKey == "" || *Config.SMS.APISecret == "" {
logger.Panic("The API Key has to be provided when NEXMO SMS connector is enabled")
logger.Panic("The API Key and Secret have to be provided when NEXMO SMS connector is enabled")
}
nexmoSender, err := sms.NewNexmoSender(*Config.SMS.APIKey, *Config.SMS.APISecret, kafkaProducer, *Config.SMS.KafkaReportingTopic)
if err != nil {
logger.WithError(err).Error("Error creating Nexmo Sender")
}
smsConn, err := sms.New(router, nexmoSender, Config.SMS)
smsGateway, err := sms.New(router, nexmoSender, Config.SMS)
if err != nil {
logger.WithError(err).Error("Error creating Nexmo Sender")
logger.WithError(err).Error("Error creating SMS Gateway")
} else {
modules = append(modules, smsConn)
modules = append(modules, smsGateway)
}
} else {
logger.Info("SMS: disabled")
Expand Down Expand Up @@ -271,7 +271,8 @@ func StartService() *service.Service {
srv := service.New(r, websrv).
HealthEndpoint(*Config.HealthEndpoint).
MetricsEndpoint(*Config.MetricsEndpoint).
PrometheusEndpoint(*Config.PrometheusEndpoint)
PrometheusEndpoint(*Config.PrometheusEndpoint).
TogglesEndpoint(*Config.TogglesEndpoint)

srv.RegisterModules(0, 6, kvStore, messageStore)
srv.RegisterModules(4, 3, CreateModules(r)...)
Expand Down
72 changes: 70 additions & 2 deletions server/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package service
import (
log "github.com/Sirupsen/logrus"
"github.com/docker/distribution/health"
"github.com/hashicorp/go-multierror"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/cosminrentea/gobbler/server/metrics"
"github.com/cosminrentea/gobbler/server/router"
"github.com/cosminrentea/gobbler/server/webserver"

"github.com/hashicorp/go-multierror"
"fmt"
"net/http"
"reflect"
"strconv"
"strings"
"time"
)

Expand All @@ -30,6 +33,7 @@ type Service struct {
healthThreshold int
metricsEndpoint string
prometheusEndpoint string
togglesEndpoint string
}

// New creates a new Service, using the given Router and WebServer.
Expand Down Expand Up @@ -88,7 +92,14 @@ func (s *Service) PrometheusEndpoint(endpointPrefix string) *Service {
return s
}

// Start checks the modules for the following interfaces and registers and/or starts:
// TogglesEndpoint sets the endpoint used for Feature-Toggles. Parameter for disabling the endpoint is: "". Returns the updated service.
func (s *Service) TogglesEndpoint(endpointPrefix string) *Service {
s.togglesEndpoint = endpointPrefix
return s
}

// Start the health-check, old-format metrics, and Prometheus metrics endpoint,
// and then check the modules for the following interfaces and registers and/or start:
// Startable:
// health.Checker:
// Endpoint: Register the handler function of the Endpoint in the http service at prefix
Expand All @@ -112,6 +123,12 @@ func (s *Service) Start() error {
} else {
logger.Info("Prometheus metrics endpoint disabled")
}
if s.togglesEndpoint != "" {
logger.WithField("togglesEndpoint", s.togglesEndpoint).Info("Toggles endpoint")
s.webserver.Handle(s.togglesEndpoint, http.HandlerFunc(s.togglesHandlerFunc))
} else {
logger.Info("Toggles endpoint disabled")
}
for order, iface := range s.ModulesSortedByStartOrder() {
name := reflect.TypeOf(iface).String()
if s, ok := iface.(Startable); ok {
Expand Down Expand Up @@ -172,3 +189,54 @@ func (s *Service) modulesSortedBy(criteria by) []interface{} {
}
return sorted
}

func (s *Service) togglesHandlerFunc(w http.ResponseWriter, r *http.Request) {
logger.Info("toggles")
for key, values := range r.URL.Query() {
if len(values) != 1 {
logger.WithFields(log.Fields{
"key": key,
}).Info("ignoring toggles parameter since it has more than one value")
continue
}
value := values[0]
enable, err := strconv.ParseBool(value)
if err != nil {
logger.WithFields(log.Fields{
"key": key,
"value": value,
}).Info("ignoring toggles single parameter since it is not boolean")
continue
}
for order, iface := range s.ModulesSortedByStartOrder() {
packagePath := reflect.TypeOf(iface).String()
names := strings.Split(packagePath, ".")
var name string
if len(names) > 0 {
name = strings.TrimPrefix(names[0],"*")
}
if key == name {
logger.WithFields(log.Fields{
"key": key,
"value": value,
}).Info("toggles single boolean valid parameter")

if s, ok := iface.(Startable); ok && enable {
logger.WithFields(log.Fields{"name": name, "order": order}).Info("Starting module")
if err := s.Start(); err != nil {
logger.WithError(err).WithField("name", name).Error("Error while starting module")
}
w.Write([]byte(fmt.Sprintf("%s was started.\n", key)))
}

if s, ok := iface.(Stopable); ok && !enable {
logger.WithFields(log.Fields{"name": name, "order": order}).Info("Stopping module")
if err := s.Stop(); err != nil {
logger.WithError(err).WithField("name", name).Error("Error while stopping module")
}
w.Write([]byte(fmt.Sprintf("%s was stopped.\n", key)))
}
}
}
}
}
51 changes: 32 additions & 19 deletions server/sms/sms_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Config struct {
Workers *int
SMSTopic *string
IntervalMetrics *bool
SkipFetch *bool

KafkaReportingTopic *string

Expand All @@ -54,7 +55,7 @@ func New(router router.Router, sender Sender, config Config) (*gateway, error) {
if *config.Workers <= 0 {
*config.Workers = connector.DefaultWorkers
}
logger.WithField("number", *config.Workers).Debug("sms workers")
logger.WithField("number", *config.Workers).Info("sms workers")
config.Schema = SMSSchema
config.Name = SMSDefaultTopic
return &gateway{
Expand All @@ -65,8 +66,13 @@ func New(router router.Router, sender Sender, config Config) (*gateway, error) {
}, nil
}

// Start the sms gateway; it is an idempotent operation.
func (g *gateway) Start() error {
g.logger.Debug("Starting gateway")
g.logger.Info("Starting gateway")
if g.cancelFunc != nil {
g.logger.Info("Gateway was already started")
return nil
}

err := g.ReadLastID()
if err != nil {
Expand All @@ -75,24 +81,26 @@ func (g *gateway) Start() error {
}

g.ctx, g.cancelFunc = context.WithCancel(context.Background())
g.initRoute()
g.initRoute(false)

go g.Run()

g.startMetrics()

g.logger.Debug("Started gateway")
g.logger.Info("Started gateway")
return nil
}

func (g *gateway) initRoute() {
func (g *gateway) initRoute(fetch bool) {
g.route = router.NewRoute(router.RouteConfig{
Path: protocol.Path(*g.config.SMSTopic),
ChannelSize: 5000,
QueueSize: -1,
Timeout: -1,
FetchRequest: g.fetchRequest(),
})
if fetch || !*g.config.SkipFetch {
g.route.FetchRequest = g.fetchRequest()
}
}

func (g *gateway) fetchRequest() (fr *store.FetchRequest) {
Expand All @@ -101,13 +109,14 @@ func (g *gateway) fetchRequest() (fr *store.FetchRequest) {
protocol.Path(*g.config.SMSTopic).Partition(),
g.LastIDSent+1,
0,
store.DirectionForward, -1)
store.DirectionForward,
-1)
}
return
}

func (g *gateway) Run() {
g.logger.Debug("Run gateway")
g.logger.Info("Run gateway")
var provideErr error
go func() {
err := g.route.Provide(g.router, true)
Expand All @@ -126,10 +135,9 @@ func (g *gateway) Run() {
// If Route channel closed, try restarting
if err == connector.ErrRouteChannelClosed {
g.logger.Info("Restarting because ErrRouteChannelClosed")
g.Restart()
g.restart()
return
}

}

if provideErr != nil {
Expand All @@ -139,7 +147,7 @@ func (g *gateway) Run() {
// Router closed the route, try restart
if provideErr == router.ErrInvalidRoute {
g.logger.Info("Restarting because ErrInvalidRoute")
g.Restart()
g.restart()
return
}
// Router module is stopping, exit the process
Expand Down Expand Up @@ -186,6 +194,7 @@ func (g *gateway) proxyLoop() error {
//TODO Cosmin Bogdan returning this error can mean 2 things: overflow of route's channel, or intentional stopping of router / gubled.
return connector.ErrRouteChannelClosed
}

func (g *gateway) send(receivedMsg *protocol.Message) error {
err := g.sender.Send(receivedMsg)
if err != nil {
Expand All @@ -205,33 +214,37 @@ func (g *gateway) send(receivedMsg *protocol.Message) error {
return nil
}

func (g *gateway) Restart() error {
g.logger.WithField("LastIDSent", g.LastIDSent).Debug("Restart in progress")
func (g *gateway) restart() error {
g.logger.WithField("LastIDSent", g.LastIDSent).Info("restart in progress")

g.Cancel()
g.cancelFunc = nil

err := g.ReadLastID()
if err != nil {
g.logger.WithError(err).Error("Could not ReadLastID in Restart")
g.logger.WithError(err).Error("Could not ReadLastID in restart")
return err
}

g.initRoute()
g.initRoute(true)

go g.Run()

g.logger.WithField("LastIDSent", g.LastIDSent).Debug("Restart finished")
g.logger.WithField("LastIDSent", g.LastIDSent).Info("restart finished")
return nil
}

// Stop the sms gateway; it is an idempotent operation.
func (g *gateway) Stop() error {
g.logger.Debug("Stopping gateway")
g.logger.Info("Stopping gateway")
if g.cancelFunc != nil {
g.logger.Debug("Canceling in Stop")
g.logger.Info("Calling the cancel function")
g.cancelFunc()
g.cancelFunc = nil
g.logger.Info("Stopped gateway")
} else {
g.logger.Info("Gateway was already stopped")
}
g.logger.Debug("Stopped gateway")
return nil
}

Expand Down
Loading

0 comments on commit 361e9b2

Please sign in to comment.