Permalink
Browse files

WIP

  • Loading branch information...
1 parent ac20601 commit 77a061739b01f37f4eb85448664018c1ee0cec19 @axw committed Jun 8, 2017
View
@@ -167,6 +167,9 @@ const (
AgentServiceName = "AGENT_SERVICE_NAME"
MongoOplogSize = "MONGO_OPLOG_SIZE"
NUMACtlPreference = "NUMA_CTL_PREFERENCE"
+
+ AttrLogSinkRateLimitBurst = "LOGSINK_RATELIMIT_BURST"
+ AttrLogSinkRateLimitRefill = "LOGSINK_RATELIMIT_REFILL"
)
// The Config interface is the sole way that the agent gets access to the
View
@@ -536,6 +536,10 @@ func (s *apiclientSuite) TestPublicDNSName(c *gc.C) {
AutocertDNSName: "somewhere.example.com",
NewObserver: func() observer.Observer { return &fakeobserver.Instance{} },
AutocertURL: "https://0.1.2.3/no-autocert-here",
+ LogSink: apiserver.LogSinkConfig{
+ RateLimitBurst: apiserver.DefaultLogSinkRateLimitBurst,
+ RateLimitRefill: apiserver.DefaultLogSinkRateLimitRefill,
+ },
})
c.Assert(err, jc.ErrorIsNil)
defer worker.Stop(srv)
@@ -227,6 +227,10 @@ func (s *legacySuite) TestAPIServerCanShutdownWithOutstandingNext(c *gc.C) {
NewObserver: func() observer.Observer { return &fakeobserver.Instance{} },
AutocertURL: "https://0.1.2.3/no-autocert-here",
StatePool: state.NewStatePool(s.State),
+ LogSink: apiserver.LogSinkConfig{
+ RateLimitBurst: apiserver.DefaultLogSinkRateLimitBurst,
+ RateLimitRefill: apiserver.DefaultLogSinkRateLimitRefill,
+ },
})
c.Assert(err, gc.IsNil)
@@ -235,6 +235,10 @@ func newServerWithHub(c *gc.C, st *state.State, hub *pubsub.StructuredHub) (*api
Hub: hub,
NewObserver: func() observer.Observer { return &fakeobserver.Instance{} },
StatePool: state.NewStatePool(st),
+ LogSink: apiserver.LogSinkConfig{
+ RateLimitBurst: apiserver.DefaultLogSinkRateLimitBurst,
+ RateLimitRefill: apiserver.DefaultLogSinkRateLimitRefill,
+ },
})
c.Assert(err, jc.ErrorIsNil)
port := listener.Addr().(*net.TCPAddr).Port
View
@@ -16,12 +16,14 @@ import (
"strings"
"sync"
"sync/atomic"
+ "time"
"github.com/bmizerany/pat"
"github.com/gorilla/websocket"
"github.com/juju/errors"
"github.com/juju/loggo"
"github.com/juju/pubsub"
+ "github.com/juju/ratelimit"
"github.com/juju/utils"
"github.com/juju/utils/clock"
"golang.org/x/crypto/acme"
@@ -47,9 +49,20 @@ var logger = loggo.GetLogger("juju.apiserver")
var defaultHTTPMethods = []string{"GET", "POST", "HEAD", "PUT", "DELETE", "OPTIONS"}
-// loginRateLimit defines how many concurrent Login requests we will
-// accept
-const loginRateLimit = 10
+const (
+ // loginRateLimit defines how many concurrent Login requests we will
+ // accept.
+ loginRateLimit = 10
+
+ // DefaultLogSinkRateLimitBurst defines the default number of log
+ // messages that will be let through before we start rate limiting.
+ DefaultLogSinkRateLimitBurst = 1000
+
+ // DefaultLogSinkRateLimitRefill defines the default rate at which
+ // log messages will be let through once the initial burst amount
+ // has been depleted.
+ DefaultLogSinkRateLimitRefill = time.Millisecond
+)
// Server holds the server side of the API.
type Server struct {
@@ -76,6 +89,7 @@ type Server struct {
tlsConfig *tls.Config
allowModelAccess bool
logSinkWriter io.WriteCloser
+ logSinkConfig LogSinkConfig
// mu guards the fields below it.
mu sync.Mutex
@@ -146,9 +160,14 @@ type ServerConfig struct {
// is to support registering the handlers underneath the
// "/introspection" prefix.
RegisterIntrospectionHandlers func(func(string, http.Handler))
+
+ // LogSink holds configuration for the API server's log
+ // sink endpoint.
+ LogSink LogSinkConfig
}
-func (c *ServerConfig) Validate() error {
+// Validate validates the API server configuration.
+func (c ServerConfig) Validate() error {
if c.Hub == nil {
return errors.NotValidf("missing Hub")
}
@@ -161,17 +180,33 @@ func (c *ServerConfig) Validate() error {
if c.StatePool == nil {
return errors.NotValidf("missing StatePool")
}
-
- return nil
+ return errors.Annotate(c.LogSink.Validate(), "validating logsink configuration")
}
-func (c *ServerConfig) pingClock() clock.Clock {
+func (c ServerConfig) pingClock() clock.Clock {
if c.PingClock == nil {
return c.Clock
}
return c.PingClock
}
+// LogSinkConfig holds configuration for the API server's log sink endpoint.
+type LogSinkConfig struct {
+ RateLimitBurst int64
+ RateLimitRefill time.Duration
+}
+
+// Validate validates the log sink endpoint configuration.
+func (c LogSinkConfig) Validate() error {
+ if c.RateLimitBurst <= 0 {
+ return errors.NotValidf("rate-limit burst %d <= 0", c.RateLimitBurst)
+ }
+ if c.RateLimitRefill <= 0 {
+ return errors.NotValidf("rate-limit refill %d <= 0", c.RateLimitRefill)
+ }
+ return nil
+}
+
// NewServer serves the given state by accepting requests on the given
// listener, using the given certificate and key (in PEM format) for
// authentication.
@@ -220,6 +255,7 @@ func newServer(s *state.State, lis net.Listener, cfg ServerConfig) (_ *Server, e
allowModelAccess: cfg.AllowModelAccess,
publicDNSName_: cfg.AutocertDNSName,
registerIntrospectionHandlers: cfg.RegisterIntrospectionHandlers,
+ logSinkConfig: cfg.LogSink,
}
srv.tlsConfig = srv.newTLSConfig(cfg)
@@ -432,11 +468,27 @@ func (srv *Server) endpoints() []apihttp.Endpoint {
add("/model/:modeluuid/logstream", logStreamHandler)
add("/model/:modeluuid/log", debugLogHandler)
- logSinkHandler := newLogSinkHandler(httpCtxt, srv.logSinkWriter, newAgentLoggingStrategy)
+ logSinkHandler := newLogSinkHandler(
+ httpCtxt,
+ srv.logSinkWriter,
+ newAgentLoggingStrategy,
+ srv.clock,
+ ratelimit.NewBucketWithClock(
+ srv.logSinkConfig.RateLimitRefill,
+ srv.logSinkConfig.RateLimitBurst,
+ ratelimitClock{srv.clock},
+ ),
+ )
add("/model/:modeluuid/logsink", srv.trackRequests(logSinkHandler))
// We don't need to save the migrated logs to a logfile as well as to the DB.
- logTransferHandler := newLogSinkHandler(httpCtxt, ioutil.Discard, newMigrationLoggingStrategy)
+ logTransferHandler := newLogSinkHandler(
+ httpCtxt,
+ ioutil.Discard,
+ newMigrationLoggingStrategy,
+ srv.clock,
+ nil, // do not rate-limit log migration
+ )
add("/migrate/logtransfer", srv.trackRequests(logTransferHandler))
modelRestHandler := &modelRestHandler{
@@ -855,3 +907,13 @@ func (srv *Server) processModelRemovals() error {
}
}
}
+
+// ratelimitClock adapts clock.Clock to ratelimit.Clock.
+type ratelimitClock struct {
+ clock.Clock
+}
+
+// Sleep is defined by the ratelimit.Clock interface.
+func (c ratelimitClock) Sleep(d time.Duration) {
+ <-c.Clock.After(d)
+}
@@ -54,6 +54,10 @@ func (s *apiserverBaseSuite) sampleConfig(c *gc.C) apiserver.ServerConfig {
NewObserver: func() observer.Observer { return &fakeobserver.Instance{} },
AutocertURL: "https://0.1.2.3/no-autocert-here",
StatePool: state.NewStatePool(s.State),
+ LogSink: apiserver.LogSinkConfig{
+ RateLimitBurst: apiserver.DefaultLogSinkRateLimitBurst,
+ RateLimitRefill: apiserver.DefaultLogSinkRateLimitRefill,
+ },
}
}
View
@@ -13,7 +13,9 @@ import (
"github.com/gorilla/websocket"
"github.com/juju/errors"
"github.com/juju/loggo"
+ "github.com/juju/ratelimit"
"github.com/juju/utils"
+ "github.com/juju/utils/clock"
"github.com/juju/utils/featureflag"
"github.com/juju/version"
"gopkg.in/juju/names.v2"
@@ -119,8 +121,20 @@ func (s *agentLoggingStrategy) Stop() {
// Should we clear out s.st, s.releaser, s.entity here?
}
-func newLogSinkHandler(h httpContext, w io.Writer, newStrategy func(httpContext, io.Writer) LoggingStrategy) http.Handler {
- return &logSinkHandler{ctxt: h, fileLogger: w, newStrategy: newStrategy}
+func newLogSinkHandler(
+ h httpContext,
+ w io.Writer,
+ newStrategy func(httpContext, io.Writer) LoggingStrategy,
+ clock clock.Clock,
+ tokenBucket *ratelimit.Bucket,
+) http.Handler {
+ return &logSinkHandler{
+ ctxt: h,
+ fileLogger: w,
+ newStrategy: newStrategy,
+ clock: clock,
+ tokenBucket: tokenBucket,
+ }
}
func newLogSinkWriter(logPath string) (io.WriteCloser, error) {
@@ -153,6 +167,8 @@ type logSinkHandler struct {
ctxt httpContext
newStrategy func(httpContext, io.Writer) LoggingStrategy
fileLogger io.Writer
+ tokenBucket *ratelimit.Bucket
+ clock clock.Clock
}
// Since the logsink only receives messages, it is possible for the other end
@@ -297,6 +313,15 @@ func (h *logSinkHandler) receiveLogs(socket *websocket.Conn, endpointVersion int
defer close(logCh)
var m params.LogRecord
for {
+ // Rate-limit receipt of log messages across all connections
+ // if a token bucket is configured.
+ if h.tokenBucket != nil {
+ select {
+ case <-h.clock.After(h.tokenBucket.Take(1)):
+ case <-h.ctxt.stop():
+ return
+ }
+ }
// Receive() blocks until data arrives but will also be
// unblocked when the API handler calls socket.Close as it
// finishes.
View
@@ -624,6 +624,10 @@ func defaultServerConfig(c *gc.C, st *state.State) apiserver.ServerConfig {
NewObserver: func() observer.Observer { return &fakeobserver.Instance{} },
AutocertURL: "https://0.1.2.3/no-autocert-here",
StatePool: state.NewStatePool(st),
+ LogSink: apiserver.LogSinkConfig{
+ RateLimitBurst: apiserver.DefaultLogSinkRateLimitBurst,
+ RateLimitRefill: apiserver.DefaultLogSinkRateLimitRefill,
+ },
}
}
View
@@ -1230,6 +1230,11 @@ func (a *MachineAgent) newAPIserverWorker(
}, f)
}
+ logSinkConfig, err := getLogSinkConfig(agentConfig)
+ if err != nil {
+ return nil, errors.Annotate(err, "getting logsink config")
+ }
+
server, err := apiserver.NewServer(st, listener, apiserver.ServerConfig{
Clock: clock.WallClock,
Cert: cert,
@@ -1246,6 +1251,7 @@ func (a *MachineAgent) newAPIserverWorker(
NewObserver: newObserver,
StatePool: statePool,
RegisterIntrospectionHandlers: registerIntrospectionHandlers,
+ LogSink: logSinkConfig,
})
if err != nil {
return nil, errors.Annotate(err, "cannot start api server worker")
@@ -1709,3 +1715,29 @@ func newStateMetricsWorker(st *state.State, registry *prometheus.Registry) worke
return nil
})
}
+
+func getLogSinkConfig(cfg agent.Config) (apiserver.LogSinkConfig, error) {
+ result := apiserver.LogSinkConfig{
+ RateLimitBurst: apiserver.DefaultLogSinkRateLimitBurst,
+ RateLimitRefill: apiserver.DefaultLogSinkRateLimitRefill,
+ }
+ if v := cfg.Value(agent.AttrLogSinkRateLimitBurst); v != "" {
+ burst, err := strconv.ParseInt(v, 10, 64)
+ if err != nil {
+ return apiserver.LogSinkConfig{}, errors.Annotatef(
+ err, "parsing %s", agent.AttrLogSinkRateLimitBurst,
+ )
+ }
+ result.RateLimitBurst = burst
+ }
+ if v := cfg.Value(agent.AttrLogSinkRateLimitRefill); v != "" {
+ refill, err := time.ParseDuration(v)
+ if err != nil {
+ return apiserver.LogSinkConfig{}, errors.Annotatef(
+ err, "parsing %s", agent.AttrLogSinkRateLimitRefill,
+ )
+ }
+ result.RateLimitRefill = refill
+ }
+ return result, nil
+}
@@ -834,6 +834,10 @@ func (e *environ) Bootstrap(ctx environs.BootstrapContext, args environs.Bootstr
io.WriteString(w, "gazing")
}))
},
+ LogSink: apiserver.LogSinkConfig{
+ RateLimitBurst: apiserver.DefaultLogSinkRateLimitBurst,
+ RateLimitRefill: apiserver.DefaultLogSinkRateLimitRefill,
+ },
})
if err != nil {
panic(err)

0 comments on commit 77a0617

Please sign in to comment.