Skip to content
This repository was archived by the owner on Apr 30, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,14 @@ type Config struct {
CipherString string `yaml:"cipher_suites"`
CipherSuites []uint16

PublishStartMessageIntervalInSeconds int `yaml:"publish_start_message_interval"`
PruneStaleDropletsIntervalInSeconds int `yaml:"prune_stale_droplets_interval"`
DropletStaleThresholdInSeconds int `yaml:"droplet_stale_threshold"`
PublishActiveAppsIntervalInSeconds int `yaml:"publish_active_apps_interval"`
StartResponseDelayIntervalInSeconds int `yaml:"start_response_delay_interval"`
EndpointTimeoutInSeconds int `yaml:"endpoint_timeout"`
RouteServiceTimeoutInSeconds int `yaml:"route_services_timeout"`
PublishStartMessageIntervalInSeconds int `yaml:"publish_start_message_interval"`
SuspendPruningIfNatsUnavailable bool `yaml:"suspend_pruning_if_nats_unavailable"`
PruneStaleDropletsIntervalInSeconds int `yaml:"prune_stale_droplets_interval"`
DropletStaleThresholdInSeconds int `yaml:"droplet_stale_threshold"`
PublishActiveAppsIntervalInSeconds int `yaml:"publish_active_apps_interval"`
StartResponseDelayIntervalInSeconds int `yaml:"start_response_delay_interval"`
EndpointTimeoutInSeconds int `yaml:"endpoint_timeout"`
RouteServiceTimeoutInSeconds int `yaml:"route_services_timeout"`

DrainWaitInSeconds int `yaml:"drain_wait,omitempty"`
DrainTimeoutInSeconds int `yaml:"drain_timeout,omitempty"`
Expand Down
14 changes: 14 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,20 @@ nats:
Expect(config.Nats[0].Pass).To(Equal("pass"))
})

Context("Suspend Pruning option", func() {
It("sets default suspend_pruning_if_nats_unavailable", func() {
Expect(config.SuspendPruningIfNatsUnavailable).To(BeFalse())
})

It("sets default suspend_pruning_if_nats_unavailable", func() {
var b = []byte(`
suspend_pruning_if_nats_unavailable: true
`)
config.Initialize(b)
Expect(config.SuspendPruningIfNatsUnavailable).To(BeTrue())
})
})

It("sets default logging configs", func() {
Expect(config.Logging.File).To(Equal(""))
Expect(config.Logging.Syslog).To(Equal(""))
Expand Down
8 changes: 8 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ func main() {

metricsReporter := metrics.NewMetricsReporter()
registry := rregistry.NewRouteRegistry(logger.Session("registry"), c, metricsReporter)
if c.SuspendPruningIfNatsUnavailable {
registry.SuspendPruning(func() bool { return !(natsClient.Status() == nats.CONNECTED) })
}

varz := rvarz.NewVarz(registry)
compositeReporter := metrics.NewCompositeReporter(varz, metricsReporter)
Expand Down Expand Up @@ -234,6 +237,11 @@ func connectToNatsServer(logger lager.Logger, c *config.Config) *nats.Conn {
options.ClosedCB = func(conn *nats.Conn) {
logger.Fatal("nats-connection-closed", errors.New("unexpected close"), lager.Data{"last_error": conn.LastError()})
}

// in the case of suspending pruning, we need to ensure we retry reconnects indefinitely
if c.SuspendPruningIfNatsUnavailable {
options.MaxReconnect = -1
}
natsClient, err = options.Connect()
if err == nil {
break
Expand Down
84 changes: 73 additions & 11 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ var _ = Describe("Router Integration", func() {
cfg.DrainWaitInSeconds = drainWait
}

createConfig := func(cfgFile string, statusPort, proxyPort uint16, pruneInterval, pruneThreshold, drainWait int, natsPorts ...uint16) *config.Config {
createConfig := func(cfgFile string, statusPort, proxyPort uint16, pruneInterval, pruneThreshold, drainWait int, suspendPruning bool, natsPorts ...uint16) *config.Config {
cfg := test_util.SpecConfig(statusPort, proxyPort, natsPorts...)

configDrainSetup(cfg, pruneInterval, pruneThreshold, drainWait)

cfg.SuspendPruningIfNatsUnavailable = suspendPruning

caCertsPath := filepath.Join(lastGoPath(), "src", "github.com", "cloudfoundry", "gorouter", "test", "assets", "certs", "uaa-ca.pem")
cfg.OAuth = config.OAuthConfig{
TokenEndpoint: "127.0.0.1",
Expand Down Expand Up @@ -146,7 +148,7 @@ var _ = Describe("Router Integration", func() {
proxyPort = test_util.NextAvailPort()

cfgFile = filepath.Join(tmpdir, "config.yml")
config = createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 1, natsPort)
config = createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 1, false, natsPort)
})

JustBeforeEach(func() {
Expand Down Expand Up @@ -329,7 +331,7 @@ var _ = Describe("Router Integration", func() {
proxyPort := test_util.NextAvailPort()

cfgFile := filepath.Join(tmpdir, "config.yml")
config := createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, natsPort)
config := createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, false, natsPort)
config.Logging.MetronAddress = ""
writeConfig(config, cfgFile)

Expand All @@ -343,7 +345,7 @@ var _ = Describe("Router Integration", func() {
statusPort := test_util.NextAvailPort()
proxyPort := test_util.NextAvailPort()
cfgFile := filepath.Join(tmpdir, "config.yml")
createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, natsPort)
createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, false, natsPort)

gorouterSession = startGorouterSession(cfgFile)

Expand All @@ -358,7 +360,7 @@ var _ = Describe("Router Integration", func() {
proxyPort := test_util.NextAvailPort()

cfgFile := filepath.Join(tmpdir, "config.yml")
config := createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, natsPort)
config := createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, false, natsPort)

gorouterSession = startGorouterSession(cfgFile)

Expand Down Expand Up @@ -439,7 +441,7 @@ var _ = Describe("Router Integration", func() {
cfgFile = filepath.Join(tmpdir, "config.yml")
pruneInterval = 2
pruneThreshold = 10
config = createConfig(cfgFile, statusPort, proxyPort, pruneInterval, pruneThreshold, 0, natsPort, natsPort2)
config = createConfig(cfgFile, statusPort, proxyPort, pruneInterval, pruneThreshold, 0, false, natsPort, natsPort2)
})

AfterEach(func() {
Expand Down Expand Up @@ -495,6 +497,66 @@ var _ = Describe("Router Integration", func() {
natsRunner.Start()

})

Context("when suspend_pruning_if_nats_unavailable enabled", func() {

BeforeEach(func() {
natsPort2 = test_util.NextAvailPort()
natsRunner2 = test_util.NewNATSRunner(int(natsPort2))

statusPort = test_util.NextAvailPort()
proxyPort = test_util.NextAvailPort()

cfgFile = filepath.Join(tmpdir, "config.yml")
pruneInterval = 2
pruneThreshold = 10
suspendPruningIfNatsUnavailable := true
config = createConfig(cfgFile, statusPort, proxyPort, pruneInterval, pruneThreshold, 0, suspendPruningIfNatsUnavailable, natsPort, natsPort2)
})

It("does not prune routes when nats is unavailable", func() {
localIP, err := localip.LocalIP()
Expect(err).ToNot(HaveOccurred())

mbusClient, err := newMessageBus(config)

runningApp := test.NewGreetApp([]route.Uri{"demo.vcap.me"}, proxyPort, mbusClient, nil)
runningApp.Listen()

routesUri := fmt.Sprintf("http://%s:%s@%s:%d/routes", config.Status.User, config.Status.Pass, localIP, statusPort)

Eventually(func() bool { return appRegistered(routesUri, runningApp) }).Should(BeTrue())

heartbeatInterval := 200 * time.Millisecond
runningTicker := time.NewTicker(heartbeatInterval)

go func() {
for {
select {
case <-runningTicker.C:
runningApp.Register()
}
}
}()

runningApp.VerifyAppStatus(200)

// Give enough time to register multiple times
time.Sleep(heartbeatInterval * 3)

natsRunner.Stop()

staleCheckInterval := config.PruneStaleDropletsIntervalInSeconds
staleThreshold := config.DropletStaleThresholdInSeconds

// Give router time to make a bad decision (i.e. prune routes)
sleepTime := time.Duration((2*staleCheckInterval)+(2*staleThreshold)) * time.Second
time.Sleep(sleepTime)

// Expect not to have pruned the routes after nats goes away
runningApp.VerifyAppStatus(200)
})
})
})

Context("when the route_services_secret and the route_services_secret_decrypt_only are valid", func() {
Expand All @@ -503,7 +565,7 @@ var _ = Describe("Router Integration", func() {
proxyPort := test_util.NextAvailPort()

cfgFile := filepath.Join(tmpdir, "config.yml")
config := createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, natsPort)
config := createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, false, natsPort)
config.RouteServiceSecret = "route-service-secret"
config.RouteServiceSecretPrev = "my-previous-route-service-secret"
writeConfig(config, cfgFile)
Expand All @@ -521,7 +583,7 @@ var _ = Describe("Router Integration", func() {
proxyPort := test_util.NextAvailPort()

cfgFile := filepath.Join(tmpdir, "config.yml")
cfg := createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, natsPort)
cfg := createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, false, natsPort)
cfg.OAuth = config.OAuthConfig{}
writeConfig(cfg, cfgFile)

Expand All @@ -544,7 +606,7 @@ var _ = Describe("Router Integration", func() {

cfgFile = filepath.Join(tmpdir, "config.yml")

cfg = createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, natsPort)
cfg = createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, false, natsPort)
writeConfig(cfg, cfgFile)
})

Expand All @@ -569,7 +631,7 @@ var _ = Describe("Router Integration", func() {
proxyPort := test_util.NextAvailPort()

cfgFile = filepath.Join(tmpdir, "config.yml")
config = createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, natsPort)
config = createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, false, natsPort)

routingApi = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
jsonBytes := []byte(`[{"route":"foo.com","port":65340,"ip":"1.2.3.4","ttl":60,"log_guid":"foo-guid"}]`)
Expand Down Expand Up @@ -660,7 +722,7 @@ var _ = Describe("Router Integration", func() {
proxyPort := test_util.NextAvailPort()

cfgFile = filepath.Join(tmpdir, "config.yml")
config := createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, natsPort)
config := createConfig(cfgFile, statusPort, proxyPort, defaultPruneInterval, defaultPruneThreshold, 0, false, natsPort)
config.Logging.File = "nonExistentDir/file"
writeConfig(config, cfgFile)
})
Expand Down
43 changes: 43 additions & 0 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ type RegistryInterface interface {
MarshalJSON() ([]byte, error)
}

type PruneStatus int

const (
CONNECTED = PruneStatus(iota)
DISCONNECTED
)

type RouteRegistry struct {
sync.RWMutex

Expand All @@ -32,6 +39,10 @@ type RouteRegistry struct {
// Access to the Trie datastructure should be governed by the RWMutex of RouteRegistry
byUri *container.Trie

// used for ability to suspend pruning
suspendPruning func() bool
pruningStatus PruneStatus

pruneStaleDropletsInterval time.Duration
dropletStaleThreshold time.Duration

Expand All @@ -48,6 +59,7 @@ func NewRouteRegistry(logger lager.Logger, c *config.Config, reporter reporter.R

r.pruneStaleDropletsInterval = c.PruneStaleDropletsInterval
r.dropletStaleThreshold = c.DropletStaleThreshold
r.suspendPruning = func() bool { return false }

r.reporter = reporter
return r
Expand Down Expand Up @@ -185,6 +197,24 @@ func (r *RouteRegistry) MarshalJSON() ([]byte, error) {

func (r *RouteRegistry) pruneStaleDroplets() {
r.Lock()
defer r.Unlock()

// suspend pruning if option enabled and if NATS is unavailable
if r.suspendPruning() {
r.logger.Debug("prune-suspended")
r.pruningStatus = DISCONNECTED
return
} else {
if r.pruningStatus == DISCONNECTED {
// if we are coming back from being disconnected from source,
// bulk update routes / mark updated to avoid pruning right away
r.logger.Debug("prune-unsuspended-refresh-routes-start")
r.freshenRoutes()
r.logger.Debug("prune-unsuspended-refresh-routes-complete")
}
r.pruningStatus = CONNECTED
}

r.byUri.EachNodeWithPool(func(t *container.Trie) {
endpoints := t.Pool.PruneEndpoints(r.dropletStaleThreshold)
t.Snip()
Expand All @@ -196,9 +226,22 @@ func (r *RouteRegistry) pruneStaleDroplets() {
r.logger.Debug("prune", lager.Data{"uri": t.ToPath(), "endpoints": addresses})
}
})
}

func (r *RouteRegistry) SuspendPruning(f func() bool) {
r.Lock()
r.suspendPruning = f
r.Unlock()
}

// bulk update to mark pool / endpoints as updated
func (r *RouteRegistry) freshenRoutes() {
now := time.Now()
r.byUri.EachNodeWithPool(func(t *container.Trie) {
t.Pool.MarkUpdated(now)
})
}

func parseContextPath(uri route.Uri) string {
contextPath := "/"
split := strings.SplitN(strings.TrimPrefix(uri.String(), "/"), "/", 2)
Expand Down
Loading