diff --git a/config/config.go b/config/config.go index 91824218e..3d563d1d8 100644 --- a/config/config.go +++ b/config/config.go @@ -99,13 +99,14 @@ type Config struct { CipherString string `yaml:"cipher_suites"` CipherSuites []uint16 - PublishStartMessageIntervalInSeconds int `yaml:"publish_start_message_interval"` - PruneStaleDropletsIntervalInSeconds int `yaml:"prune_stale_droplets_interval"` - DropletStaleThresholdInSeconds int `yaml:"droplet_stale_threshold"` - PublishActiveAppsIntervalInSeconds int `yaml:"publish_active_apps_interval"` - StartResponseDelayIntervalInSeconds int `yaml:"start_response_delay_interval"` - EndpointTimeoutInSeconds int `yaml:"endpoint_timeout"` - RouteServiceTimeoutInSeconds int `yaml:"route_services_timeout"` + PublishStartMessageIntervalInSeconds int `yaml:"publish_start_message_interval"` + SuspendPruningIfNatsUnavailable bool `yaml:"suspend_pruning_if_nats_unavailable"` + PruneStaleDropletsIntervalInSeconds int `yaml:"prune_stale_droplets_interval"` + DropletStaleThresholdInSeconds int `yaml:"droplet_stale_threshold"` + PublishActiveAppsIntervalInSeconds int `yaml:"publish_active_apps_interval"` + StartResponseDelayIntervalInSeconds int `yaml:"start_response_delay_interval"` + EndpointTimeoutInSeconds int `yaml:"endpoint_timeout"` + RouteServiceTimeoutInSeconds int `yaml:"route_services_timeout"` DrainWaitInSeconds int `yaml:"drain_wait,omitempty"` DrainTimeoutInSeconds int `yaml:"drain_timeout,omitempty"` diff --git a/config/config_test.go b/config/config_test.go index 779115a1b..73e73d023 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -73,6 +73,20 @@ nats: Expect(config.Nats[0].Pass).To(Equal("pass")) }) + Context("Suspend Pruning option", func() { + It("sets default suspend_pruning_if_nats_unavailable", func() { + Expect(config.SuspendPruningIfNatsUnavailable).To(BeFalse()) + }) + + It("sets default suspend_pruning_if_nats_unavailable", func() { + var b = []byte(` +suspend_pruning_if_nats_unavailable: true +`) + config.Initialize(b) + Expect(config.SuspendPruningIfNatsUnavailable).To(BeTrue()) + }) + }) + It("sets default logging configs", func() { Expect(config.Logging.File).To(Equal("")) Expect(config.Logging.Syslog).To(Equal("")) diff --git a/main.go b/main.go index 6fd7617f7..87dd3011c 100644 --- a/main.go +++ b/main.go @@ -87,6 +87,9 @@ func main() { metricsReporter := metrics.NewMetricsReporter() registry := rregistry.NewRouteRegistry(logger.Session("registry"), c, metricsReporter) + if c.SuspendPruningIfNatsUnavailable { + registry.SuspendPruning(func() bool { return !(natsClient.Status() == nats.CONNECTED) }) + } varz := rvarz.NewVarz(registry) compositeReporter := metrics.NewCompositeReporter(varz, metricsReporter) @@ -234,6 +237,11 @@ func connectToNatsServer(logger lager.Logger, c *config.Config) *nats.Conn { options.ClosedCB = func(conn *nats.Conn) { logger.Fatal("nats-connection-closed", errors.New("unexpected close"), lager.Data{"last_error": conn.LastError()}) } + + // in the case of suspending pruning, we need to ensure we retry reconnects indefinitely + if c.SuspendPruningIfNatsUnavailable { + options.MaxReconnect = -1 + } natsClient, err = options.Connect() if err == nil { break diff --git a/main_test.go b/main_test.go index bed94f193..ec2a30288 100644 --- a/main_test.go +++ b/main_test.go @@ -64,11 +64,13 @@ var _ = Describe("Router Integration", func() { cfg.DrainWaitInSeconds = drainWait } - createConfig := func(cfgFile string, statusPort, proxyPort uint16, pruneInterval, pruneThreshold, drainWait int, natsPorts ...uint16) *config.Config { + createConfig := func(cfgFile string, statusPort, proxyPort uint16, pruneInterval, pruneThreshold, drainWait int, suspendPruning bool, natsPorts ...uint16) *config.Config { cfg := test_util.SpecConfig(statusPort, proxyPort, natsPorts...) configDrainSetup(cfg, pruneInterval, pruneThreshold, drainWait) + cfg.SuspendPruningIfNatsUnavailable = suspendPruning + caCertsPath := filepath.Join(lastGoPath(), "src", "github.com", "cloudfoundry", "gorouter", "test", "assets", "certs", "uaa-ca.pem") cfg.OAuth = config.OAuthConfig{ TokenEndpoint: "127.0.0.1", @@ -146,7 +148,7 @@ var _ = Describe("Router Integration", func() { proxyPort = test_util.NextAvailPort() cfgFile = filepath.Join(tmpdir, "config.yml") - config = createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 1, natsPort) + config = createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 1, false, natsPort) }) JustBeforeEach(func() { @@ -329,7 +331,7 @@ var _ = Describe("Router Integration", func() { proxyPort := test_util.NextAvailPort() cfgFile := filepath.Join(tmpdir, "config.yml") - config := createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, natsPort) + config := createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, false, natsPort) config.Logging.MetronAddress = "" writeConfig(config, cfgFile) @@ -343,7 +345,7 @@ var _ = Describe("Router Integration", func() { statusPort := test_util.NextAvailPort() proxyPort := test_util.NextAvailPort() cfgFile := filepath.Join(tmpdir, "config.yml") - createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, natsPort) + createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, false, natsPort) gorouterSession = startGorouterSession(cfgFile) @@ -358,7 +360,7 @@ var _ = Describe("Router Integration", func() { proxyPort := test_util.NextAvailPort() cfgFile := filepath.Join(tmpdir, "config.yml") - config := createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, natsPort) + config := createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, false, natsPort) gorouterSession = startGorouterSession(cfgFile) @@ -439,7 +441,7 @@ var _ = Describe("Router Integration", func() { cfgFile = filepath.Join(tmpdir, "config.yml") pruneInterval = 2 pruneThreshold = 10 - config = createConfig(cfgFile, statusPort, proxyPort, pruneInterval, pruneThreshold, 0, natsPort, natsPort2) + config = createConfig(cfgFile, statusPort, proxyPort, pruneInterval, pruneThreshold, 0, false, natsPort, natsPort2) }) AfterEach(func() { @@ -495,6 +497,66 @@ var _ = Describe("Router Integration", func() { natsRunner.Start() }) + + Context("when suspend_pruning_if_nats_unavailable enabled", func() { + + BeforeEach(func() { + natsPort2 = test_util.NextAvailPort() + natsRunner2 = test_util.NewNATSRunner(int(natsPort2)) + + statusPort = test_util.NextAvailPort() + proxyPort = test_util.NextAvailPort() + + cfgFile = filepath.Join(tmpdir, "config.yml") + pruneInterval = 2 + pruneThreshold = 10 + suspendPruningIfNatsUnavailable := true + config = createConfig(cfgFile, statusPort, proxyPort, pruneInterval, pruneThreshold, 0, suspendPruningIfNatsUnavailable, natsPort, natsPort2) + }) + + It("does not prune routes when nats is unavailable", func() { + localIP, err := localip.LocalIP() + Expect(err).ToNot(HaveOccurred()) + + mbusClient, err := newMessageBus(config) + + runningApp := test.NewGreetApp([]route.Uri{"demo.vcap.me"}, proxyPort, mbusClient, nil) + runningApp.Listen() + + routesUri := fmt.Sprintf("http://%s:%s@%s:%d/routes", config.Status.User, config.Status.Pass, localIP, statusPort) + + Eventually(func() bool { return appRegistered(routesUri, runningApp) }).Should(BeTrue()) + + heartbeatInterval := 200 * time.Millisecond + runningTicker := time.NewTicker(heartbeatInterval) + + go func() { + for { + select { + case <-runningTicker.C: + runningApp.Register() + } + } + }() + + runningApp.VerifyAppStatus(200) + + // Give enough time to register multiple times + time.Sleep(heartbeatInterval * 3) + + natsRunner.Stop() + + staleCheckInterval := config.PruneStaleDropletsIntervalInSeconds + staleThreshold := config.DropletStaleThresholdInSeconds + + // Give router time to make a bad decision (i.e. prune routes) + sleepTime := time.Duration((2*staleCheckInterval)+(2*staleThreshold)) * time.Second + time.Sleep(sleepTime) + + // Expect not to have pruned the routes after nats goes away + runningApp.VerifyAppStatus(200) + }) + }) }) Context("when the route_services_secret and the route_services_secret_decrypt_only are valid", func() { @@ -503,7 +565,7 @@ var _ = Describe("Router Integration", func() { proxyPort := test_util.NextAvailPort() cfgFile := filepath.Join(tmpdir, "config.yml") - config := createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, natsPort) + config := createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, false, natsPort) config.RouteServiceSecret = "route-service-secret" config.RouteServiceSecretPrev = "my-previous-route-service-secret" writeConfig(config, cfgFile) @@ -521,7 +583,7 @@ var _ = Describe("Router Integration", func() { proxyPort := test_util.NextAvailPort() cfgFile := filepath.Join(tmpdir, "config.yml") - cfg := createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, natsPort) + cfg := createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, false, natsPort) cfg.OAuth = config.OAuthConfig{} writeConfig(cfg, cfgFile) @@ -544,7 +606,7 @@ var _ = Describe("Router Integration", func() { cfgFile = filepath.Join(tmpdir, "config.yml") - cfg = createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, natsPort) + cfg = createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, false, natsPort) writeConfig(cfg, cfgFile) }) @@ -569,7 +631,7 @@ var _ = Describe("Router Integration", func() { proxyPort := test_util.NextAvailPort() cfgFile = filepath.Join(tmpdir, "config.yml") - config = createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, natsPort) + config = createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, false, natsPort) routingApi = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { jsonBytes := []byte(`[{"route":"foo.com","port":65340,"ip":"1.2.3.4","ttl":60,"log_guid":"foo-guid"}]`) @@ -660,7 +722,7 @@ var _ = Describe("Router Integration", func() { proxyPort := test_util.NextAvailPort() cfgFile = filepath.Join(tmpdir, "config.yml") - config := createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, natsPort) + config := createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, false, natsPort) config.Logging.File = "nonExistentDir/file" writeConfig(config, cfgFile) }) diff --git a/registry/registry.go b/registry/registry.go index d83b5d26a..a9c2866fb 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -24,6 +24,13 @@ type RegistryInterface interface { MarshalJSON() ([]byte, error) } +type PruneStatus int + +const ( + CONNECTED = PruneStatus(iota) + DISCONNECTED +) + type RouteRegistry struct { sync.RWMutex @@ -32,6 +39,10 @@ type RouteRegistry struct { // Access to the Trie datastructure should be governed by the RWMutex of RouteRegistry byUri *container.Trie + // used for ability to suspend pruning + suspendPruning func() bool + pruningStatus PruneStatus + pruneStaleDropletsInterval time.Duration dropletStaleThreshold time.Duration @@ -48,6 +59,7 @@ func NewRouteRegistry(logger lager.Logger, c *config.Config, reporter reporter.R r.pruneStaleDropletsInterval = c.PruneStaleDropletsInterval r.dropletStaleThreshold = c.DropletStaleThreshold + r.suspendPruning = func() bool { return false } r.reporter = reporter return r @@ -185,6 +197,24 @@ func (r *RouteRegistry) MarshalJSON() ([]byte, error) { func (r *RouteRegistry) pruneStaleDroplets() { r.Lock() + defer r.Unlock() + + // suspend pruning if option enabled and if NATS is unavailable + if r.suspendPruning() { + r.logger.Debug("prune-suspended") + r.pruningStatus = DISCONNECTED + return + } else { + if r.pruningStatus == DISCONNECTED { + // if we are coming back from being disconnected from source, + // bulk update routes / mark updated to avoid pruning right away + r.logger.Debug("prune-unsuspended-refresh-routes-start") + r.freshenRoutes() + r.logger.Debug("prune-unsuspended-refresh-routes-complete") + } + r.pruningStatus = CONNECTED + } + r.byUri.EachNodeWithPool(func(t *container.Trie) { endpoints := t.Pool.PruneEndpoints(r.dropletStaleThreshold) t.Snip() @@ -196,9 +226,22 @@ func (r *RouteRegistry) pruneStaleDroplets() { r.logger.Debug("prune", lager.Data{"uri": t.ToPath(), "endpoints": addresses}) } }) +} + +func (r *RouteRegistry) SuspendPruning(f func() bool) { + r.Lock() + r.suspendPruning = f r.Unlock() } +// bulk update to mark pool / endpoints as updated +func (r *RouteRegistry) freshenRoutes() { + now := time.Now() + r.byUri.EachNodeWithPool(func(t *container.Trie) { + t.Pool.MarkUpdated(now) + }) +} + func parseContextPath(uri route.Uri) string { contextPath := "/" split := strings.SplitN(strings.TrimPrefix(uri.String(), "/"), "/", 2) diff --git a/registry/registry_test.go b/registry/registry_test.go index 72b5959f2..9e1b69e36 100644 --- a/registry/registry_test.go +++ b/registry/registry_test.go @@ -1,6 +1,9 @@ package registry_test import ( + "fmt" + "net" + "github.com/cloudfoundry-incubator/routing-api/models" . "github.com/cloudfoundry/gorouter/registry" . "github.com/onsi/ginkgo" @@ -622,6 +625,7 @@ var _ = Describe("RouteRegistry", func() { configObj.PruneStaleDropletsInterval = 50 * time.Millisecond configObj.DropletStaleThreshold = 100 * time.Millisecond reporter = new(fakes.FakeRouteRegistryReporter) + r = NewRouteRegistry(logger, configObj, reporter) }) @@ -638,6 +642,51 @@ var _ = Describe("RouteRegistry", func() { Expect(logger).ToNot(gbytes.Say(`prune.*"log_level":0.*foo.com/bar`)) }) }) + + Context("when suspend pruning is triggered (i.e. nats offline)", func() { + var totalRoutes int + + BeforeEach(func() { + totalRoutes = 1000 + Expect(r.NumUris()).To(Equal(0)) + Expect(r.NumEndpoints()).To(Equal(0)) + + // add endpoints + for i := 0; i < totalRoutes; i++ { + e := route.NewEndpoint("12345", "192.168.1.1", uint16(1024+i), "id1", nil, -1, "", modTag) + r.Register(route.Uri(fmt.Sprintf("foo-%d", i)), e) + } + + r.StartPruningCycle() + r.SuspendPruning(func() bool { return true }) + time.Sleep(configObj.PruneStaleDropletsInterval + 10*time.Millisecond) + }) + + It("does not remove any routes", func() { + Expect(r.NumUris()).To(Equal(totalRoutes)) + Expect(r.NumEndpoints()).To(Equal(totalRoutes)) + + time.Sleep(configObj.PruneStaleDropletsInterval + 50*time.Millisecond) + + Expect(r.NumUris()).To(Equal(totalRoutes)) + Expect(r.NumEndpoints()).To(Equal(totalRoutes)) + }) + + Context("when suspend pruning is turned off (i.e. nats back online)", func() { + It("marks all routes as updated and does not remove routes", func() { + Expect(r.NumUris()).To(Equal(totalRoutes)) + Expect(r.NumEndpoints()).To(Equal(totalRoutes)) + + r.SuspendPruning(func() bool { return false }) + + time.Sleep(configObj.PruneStaleDropletsInterval + 10*time.Millisecond) + + Expect(r.NumUris()).To(Equal(totalRoutes)) + Expect(r.NumEndpoints()).To(Equal(totalRoutes)) + }) + }) + }) + }) Context("Varz data", func() { @@ -687,3 +736,13 @@ var _ = Describe("RouteRegistry", func() { Expect(string(marshalled)).To(Equal(`{}`)) }) }) + +func portListening(addr string) bool { + var ok bool + conn, err := net.Dial("tcp", addr) + if err == nil { + ok = true + defer conn.Close() + } + return ok +}