Permalink
Browse files

A throttling implementation per client base

Summary: to be done.

Reviewed By: pallotron

Differential Revision: D3796898

fbshipit-source-id: ee0ba3e376c6530265848108f75ada20105d961f
  • Loading branch information...
Emre Cantimur Facebook Github Bot 9
Emre Cantimur authored and Facebook Github Bot 9 committed Sep 7, 2016
1 parent 27d8066 commit fca33deba5d7d251ef3af49e984a1bdc7fe9c909
Showing with 370 additions and 6 deletions.
  1. +1 −0 AUTHORS
  2. +6 −1 README.md
  3. +8 −2 config.json
  4. +20 −1 docs/getting-started.md
  5. +9 −0 lib/config.go
  6. +10 −1 lib/handler.go
  7. +8 −0 lib/interface.go
  8. +10 −1 lib/server.go
  9. +128 −0 lib/throttle.go
  10. +170 −0 lib/throttle_test.go
View
@@ -3,3 +3,4 @@
* Roman Gushchin (Production Engineer)
* Mateusz Kaczanowski (Production Engineer)
* Jake Bunce (Network Engineer)
* Emre Cantimur (Production Engineer)
View
@@ -96,6 +96,7 @@ $ go get github.com/fsnotify/fsnotify
$ go get github.com/golang/glog
$ go get github.com/krolaw/dhcp4
$ go get github.com/facebookgo/ensure
$ go get github.com/hashicorp/golang-lru
```
# Installation
@@ -158,7 +159,7 @@ production ready system.
`dhcplb` has been deployed globally and currently balances all production DHCP
traffic efficiently across our KEA DHCP servers.
Hackathon project members:
Original Hackathon project members:
* Angelo Failla ([@pallotron](https://github.com/pallotron)), Production Engineer
* Roman Gushchin ([@rgushchin](https://github.com/rgushchin)), Production Engineer
@@ -170,3 +171,7 @@ Internship project members:
* Vinnie Magro ([@vmagro](https://github.com/vmagro)), Production Engineer intern
* Angelo Failla (@pallotron), Intern mentor, Production Engineer
* Mateusz Kaczanowski (@mkaczanowski), Production Engineer
Other contributors:
* Emre Cantimur, Production Engineer, Facebook, Throttling support
View
@@ -8,7 +8,10 @@
"free_conn_timeout": 30,
"algorithm": "xid",
"host_sourcer": "file:hosts-v4.txt",
"rc_ratio": 0
"rc_ratio": 0,
"throttle_cache_size": 1024,
"throttle_cache_rate": 128,
"throttle_rate_per_conn": 256
},
"v6": {
"version": 6,
@@ -19,6 +22,9 @@
"free_conn_timeout": 30,
"algorithm": "xid",
"host_sourcer": "file:hosts-v6.txt",
"rc_ratio": 0
"rc_ratio": 0,
"throttle_cache_size": 1024,
"throttle_cache_rate": 128,
"throttle_rate_per_conn": 256
}
}
View
@@ -16,7 +16,10 @@ Configuration is provided to the program via a JSON file
"free_conn_timeout": 30, // how long to wait after removal before closing a connection to a server (in seconds)
"algorithm": "xid", // balancing algorithm, supported are xid and rr (client hash and roundrobin)
"host_sourcer": "file:hosts-v4.txt", // load DHCP server list from hosts-v4.txt
"rc_ratio": 0 // what percentage of requests should go to RC servers
"rc_ratio": 0, // what percentage of requests should go to RC servers
"throttle_cache_size": 1024, // cache size for number of throttling objects for unique clients
"throttle_cache_rate": 128, // rate value for throttling cache invalidation (per second)
"throttle_rate_per_conn": 256 // rate value for request per client (per second)
},
... (same options for "v6") ...
```
@@ -53,6 +56,22 @@ servers returned by the `GetServersFromTier(tier string)` function of the
`DHCPServerSourcer` being used.)
Throttling
----------
`dhcplb` keeps track of the request rate per second for each client.
It can be set through `throttle_rate_per_conn` configuration parameter.
Requests exceeding this limit will be logged and dropped. For 0 or negative
values no throttling will be done, and no cache will be created.
An LRU cache is used to keep track of rate information for each client. Cache
size can be set through `throttle_cache_size`. To prevent fast cache
invalidation from malicious clients, `dhcplb` also keeps track of the number of
new clients being added to the cache (per second). This behavior can be set
through `throttle_cache_rate` configuration parameter. For 0 or negative values
no cache rate limiting will be done.
A/B testing
-----------
View
@@ -41,6 +41,9 @@ type Config struct {
RCRatio uint32
Overrides map[string]Override
Extras interface{}
TCacheSize int
TCacheRate int
TRatePerConn int
}
// Override represents the dhcp server or the group of dhcp servers (tier) we
@@ -176,6 +179,9 @@ type configSpec struct {
FreeConnTimeout int `json:"free_conn_timeout"`
RCRatio uint32 `json:"rc_ratio"`
Extras json.RawMessage `json:"extras"`
TCacheSize int `json:"throttle_cache_size"`
TCacheRate int `json:"throttle_cache_rate"`
TRatePerConn int `json:"throttle_rate_per_conn"`
}
type combinedconfigSpec struct {
@@ -270,6 +276,9 @@ func newConfig(spec *configSpec, overrides map[string]Override, provider ConfigP
RCRatio: spec.RCRatio,
Overrides: overrides,
Extras: extras,
TCacheSize: spec.TCacheSize,
TCacheRate: spec.TCacheRate,
TRatePerConn: spec.TRatePerConn,
}, nil
}
View
@@ -31,9 +31,10 @@ const (
ErrGi0 = "E_GI_0"
ErrParse = "E_PARSE"
ErrNoServer = "E_NO_SERVER"
ErrConnRate = "E_CONN_RATE"
)
func handleConnection(conn *net.UDPConn, config *Config, logger loggerHelper, bufPool *sync.Pool) {
func handleConnection(conn *net.UDPConn, config *Config, logger loggerHelper, bufPool *sync.Pool, throttle Throttle) {
buffer := bufPool.Get().([]byte)
bytesRead, peer, err := conn.ReadFromUDP(buffer)
if err != nil || bytesRead == 0 {
@@ -43,6 +44,14 @@ func handleConnection(conn *net.UDPConn, config *Config, logger loggerHelper, bu
return
}
// Check for connection rate per IP address
ok, err := throttle.OK(peer.IP.String())
if !ok {
bufPool.Put(buffer)
logger.LogErr(time.Now(), nil, nil, peer, ErrConnRate, err)
return
}
go func() {
defer func() {
// always release this routine's buffer back to the pool
View
@@ -50,3 +50,11 @@ type DHCPServerSourcer interface {
GetRCServers() ([]*DHCPServer, error)
GetServersFromTier(tier string) ([]*DHCPServer, error)
}
// Throttle is interface that implements rate limiting per key
type Throttle interface {
// Returns whether the rate is below max for a key
OK(interface{}) (bool, error)
// Returns the number of items
len() int
}
View
@@ -26,6 +26,7 @@ type serverImpl struct {
stableServers []*DHCPServer
rcServers []*DHCPServer
bufPool sync.Pool
throttle Throttle
}
// returns a pointer to the current config struct, so that if it does get changed while being used,
@@ -40,7 +41,7 @@ func (s *serverImpl) ListenAndServe() error {
glog.Infof("Started thrift server, processing DHCP requests...")
for {
handleConnection(s.conn, s.getConfig(), s.logger, &s.bufPool)
handleConnection(s.conn, s.getConfig(), s.logger, &s.bufPool, s.throttle)
}
}
@@ -84,5 +85,13 @@ func NewServer(config *Config, version int, personalizedLogger PersonalizedLogge
},
}
glog.Infof("Setting up throttle: Cache Size: %d - Cache Rate: %d - Request Rate: %d",
config.TCacheSize, config.TCacheRate, config.TRatePerConn)
throttle, err := NewThrottle(config.TCacheSize, config.TCacheRate, config.TRatePerConn)
if err != nil {
return nil, err
}
server.throttle = throttle
return server, nil
}
View
@@ -0,0 +1,128 @@
/**
* Copyright (c) 2016-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
package dhcplb
import (
"fmt"
"github.com/golang/glog"
"github.com/hashicorp/golang-lru"
"golang.org/x/time/rate"
"sync"
)
// An LRU cache implementation of Throttle.
//
// We keep track of request rates per client in an LRU cache to
// keep memory usage under control against malcious requests. Each
// value in the cache is a rate.Limiter struct which is an implementation
// of Taken Bucket algorithm.
//
// Adding new items to the cache is also limited to control cache
// invalidation rate.
//
type throttleImpl struct {
mu sync.Mutex
lru *lru.Cache
maxRatePerItem int
bucketSize int
cacheLimiter *rate.Limiter
cacheRate int
}
// Returns true if the rate is below maximum for the given key
func (c *throttleImpl) OK(key interface{}) (bool, error) {
c.mu.Lock()
defer c.mu.Unlock()
// If the limiter is not in the cache for the given key
// check for the cache limiter. If it is below the maximum,
// then create a limiter, add it to the cache and allocate a bucket.
value, ok := c.lru.Get(key)
if !ok {
if c.cacheLimiter.Allow() {
limiter := rate.NewLimiter(rate.Limit(c.maxRatePerItem), c.bucketSize)
c.lru.Add(key, limiter)
return limiter.Allow(), nil
}
err := fmt.Errorf("Cache invalidation is too fast (max: %d item/sec) - throttling", c.cacheRate)
return false, err
}
// So the limiter object is in the cache. Try to allocate a bucket.
limiter := value.(*rate.Limiter)
if !limiter.Allow() {
err := fmt.Errorf("Request rate is too high for %v (max: %d req/sec) - throttling", key, c.maxRatePerItem)
return false, err
}
return true, nil
}
func (c *throttleImpl) len() int {
return c.lru.Len()
}
// A dummy throttle implementation, It simply allows all events
type dummyThtolleImp struct{}
func (*dummyThtolleImp) OK(key interface{}) (bool, error) {
return true, nil
}
func (*dummyThtolleImp) len() int {
return -1
}
// NewThrottle returns a Throttle struct
//
// Capacity:
// Maximum capacity of the LRU cache
//
// CacheRate (per second):
// Maximum allowed rate for adding new items to the cache. By that way it
// prevents the cache invalidation to happen too soon for the existing rate
// items in the cache. Cache rate will be infinite for 0 or negative values.
//
// MaxRatePerItem (per second):
// Maximum allowed requests rate for each key in the cache. Throttling will
// be disabled for 0 or negative values. No cache will be created in that case.
//
func NewThrottle(Capacity int, CacheRate int, MaxRatePerItem int) (Throttle, error) {
if MaxRatePerItem <= 0 {
glog.Info("No throttling will be done")
return &dummyThtolleImp{}, nil
}
cache, err := lru.New(int(Capacity))
if err != nil {
return nil, err
}
// Keep track of the item creation rate.
var cacheLimiter *rate.Limiter
if CacheRate <= 0 {
glog.Info("No cache rate limiting will be done")
cacheLimiter = rate.NewLimiter(rate.Inf, 1) // bucket size is ignored
} else {
cacheLimiter = rate.NewLimiter(rate.Limit(CacheRate), CacheRate)
}
throttle := &throttleImpl{
lru: cache,
maxRatePerItem: MaxRatePerItem,
bucketSize: MaxRatePerItem,
cacheLimiter: cacheLimiter,
cacheRate: CacheRate,
}
return throttle, nil
}
Oops, something went wrong.

0 comments on commit fca33de

Please sign in to comment.