Permalink
Browse files

WIP

  • Loading branch information...
1 parent ac20601 commit e77cbf1b49d0a9e158f54c629a44dca253c32426 @axw committed Jun 8, 2017
Showing with 68 additions and 7 deletions.
  1. +41 −5 apiserver/apiserver.go
  2. +27 −2 apiserver/logsink.go
View
@@ -16,12 +16,14 @@ import (
"strings"
"sync"
"sync/atomic"
+ "time"
"github.com/bmizerany/pat"
"github.com/gorilla/websocket"
"github.com/juju/errors"
"github.com/juju/loggo"
"github.com/juju/pubsub"
+ "github.com/juju/ratelimit"
"github.com/juju/utils"
"github.com/juju/utils/clock"
"golang.org/x/crypto/acme"
@@ -47,9 +49,19 @@ var logger = loggo.GetLogger("juju.apiserver")
var defaultHTTPMethods = []string{"GET", "POST", "HEAD", "PUT", "DELETE", "OPTIONS"}
-// loginRateLimit defines how many concurrent Login requests we will
-// accept
-const loginRateLimit = 10
+const (
+ // loginRateLimit defines how many concurrent Login requests we will
+ // accept.
+ loginRateLimit = 10
+
+ // logSinkBurst defines the number of log messages that will be
+ // let through before we start rate limiting.
+ logSinkBurst = 1000
+
+ // logSinkRefillRate defines the rate at which log messages will be
+ // let through once the initial burst amount has been depleted.
+ logSinkRefillRate = time.Millisecond
+)
// Server holds the server side of the API.
type Server struct {
@@ -432,11 +444,25 @@ func (srv *Server) endpoints() []apihttp.Endpoint {
add("/model/:modeluuid/logstream", logStreamHandler)
add("/model/:modeluuid/log", debugLogHandler)
- logSinkHandler := newLogSinkHandler(httpCtxt, srv.logSinkWriter, newAgentLoggingStrategy)
+ logSinkHandler := newLogSinkHandler(
+ httpCtxt,
+ srv.logSinkWriter,
+ newAgentLoggingStrategy,
+ srv.clock,
+ ratelimit.NewBucketWithClock(
+ logSinkRefillRate, logSinkBurst, ratelimitClock{srv.clock},
+ ),
+ )
add("/model/:modeluuid/logsink", srv.trackRequests(logSinkHandler))
// We don't need to save the migrated logs to a logfile as well as to the DB.
- logTransferHandler := newLogSinkHandler(httpCtxt, ioutil.Discard, newMigrationLoggingStrategy)
+ logTransferHandler := newLogSinkHandler(
+ httpCtxt,
+ ioutil.Discard,
+ newMigrationLoggingStrategy,
+ srv.clock,
+ nil, // do not rate-limit log migration
+ )
add("/migrate/logtransfer", srv.trackRequests(logTransferHandler))
modelRestHandler := &modelRestHandler{
@@ -855,3 +881,13 @@ func (srv *Server) processModelRemovals() error {
}
}
}
+
+// ratelimitClock adapts clock.Clock to ratelimit.Clock.
+type ratelimitClock struct {
+ clock.Clock
+}
+
+// Sleep is defined by the ratelimit.Clock interface.
+func (c ratelimitClock) Sleep(d time.Duration) {
+ <-c.Clock.After(d)
+}
View
@@ -13,7 +13,9 @@ import (
"github.com/gorilla/websocket"
"github.com/juju/errors"
"github.com/juju/loggo"
+ "github.com/juju/ratelimit"
"github.com/juju/utils"
+ "github.com/juju/utils/clock"
"github.com/juju/utils/featureflag"
"github.com/juju/version"
"gopkg.in/juju/names.v2"
@@ -119,8 +121,20 @@ func (s *agentLoggingStrategy) Stop() {
// Should we clear out s.st, s.releaser, s.entity here?
}
-func newLogSinkHandler(h httpContext, w io.Writer, newStrategy func(httpContext, io.Writer) LoggingStrategy) http.Handler {
- return &logSinkHandler{ctxt: h, fileLogger: w, newStrategy: newStrategy}
+func newLogSinkHandler(
+ h httpContext,
+ w io.Writer,
+ newStrategy func(httpContext, io.Writer) LoggingStrategy,
+ clock clock.Clock,
+ tokenBucket *ratelimit.Bucket,
+) http.Handler {
+ return &logSinkHandler{
+ ctxt: h,
+ fileLogger: w,
+ newStrategy: newStrategy,
+ clock: clock,
+ tokenBucket: tokenBucket,
+ }
}
func newLogSinkWriter(logPath string) (io.WriteCloser, error) {
@@ -153,6 +167,8 @@ type logSinkHandler struct {
ctxt httpContext
newStrategy func(httpContext, io.Writer) LoggingStrategy
fileLogger io.Writer
+ tokenBucket *ratelimit.Bucket
+ clock clock.Clock
}
// Since the logsink only receives messages, it is possible for the other end
@@ -297,6 +313,15 @@ func (h *logSinkHandler) receiveLogs(socket *websocket.Conn, endpointVersion int
defer close(logCh)
var m params.LogRecord
for {
+ // Rate-limit receipt of log messages across all connections
+ // if a token bucket is configured.
+ if h.tokenBucket != nil {
+ select {
+ case <-h.clock.After(h.tokenBucket.Take(1)):
+ case <-h.ctxt.stop():
+ return
+ }
+ }
// Receive() blocks until data arrives but will also be
// unblocked when the API handler calls socket.Close as it
// finishes.

0 comments on commit e77cbf1

Please sign in to comment.