Skip to content

Commit

Permalink
Added a read rate limiter which allows limiting the read QpS per clie…
Browse files Browse the repository at this point in the history
…nt (#252)

This can be used in order to protect the broker from misbehaving clients. This PR also changes the previous writeRate to flushRate which is a more appropriate name.
  • Loading branch information
kelindar committed Jun 5, 2019
1 parent 7ff71cb commit b8c4d8e
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 21 deletions.
4 changes: 2 additions & 2 deletions .vscode/settings.json
Expand Up @@ -3,14 +3,14 @@
"**/.git": true,
"**/.DS_Store": true,
".gitignore": true,
// "vendor/": true,
"vendor/": true,
".github/": true,
".vscode": true,
"build*": true,
"LICENSE": true,
"appveyor.yml": true,
"Dockerfile": true
},

"go.formatTool": "gofmt",
"go.formatFlags": [
"-s"
Expand Down
20 changes: 16 additions & 4 deletions internal/broker/conn.go
Expand Up @@ -31,8 +31,11 @@ import (
"github.com/emitter-io/emitter/internal/provider/logging"
"github.com/emitter-io/emitter/internal/security"
"github.com/emitter-io/stats"
"github.com/kelindar/rate"
)

const defaultReadRate = 100000

// Conn represents an incoming connection.
type Conn struct {
sync.Mutex
Expand All @@ -45,10 +48,11 @@ type Conn struct {
subs *message.Counters // The subscriptions for this connection.
measurer stats.Measurer // The measurer to use for monitoring.
links map[string]string // The map of all pre-authorized links.
limit *rate.Limiter // The read rate limiter.
}

// NewConn creates a new connection.
func (s *Service) newConn(t net.Conn) *Conn {
func (s *Service) newConn(t net.Conn, readRate int) *Conn {
c := &Conn{
tracked: 0,
luid: security.NewID(),
Expand All @@ -61,7 +65,11 @@ func (s *Service) newConn(t net.Conn) *Conn {

// Generate a globally unique id as well
c.guid = c.luid.Unique(uint64(address.GetHardware()), "emitter")
//logging.LogTarget("conn", "created", c.guid)
if readRate == 0 {
readRate = defaultReadRate
}

c.limit = rate.New(readRate, time.Second)

// Increment the connection counter
atomic.AddInt64(&s.connections, 1)
Expand Down Expand Up @@ -103,13 +111,17 @@ func (c *Conn) track(contract contract.Contract) {
func (c *Conn) Process() error {
defer c.Close()
reader := bufio.NewReaderSize(c.socket, 65536)
limit := c.service.Config.MaxMessageBytes()
maxSize := c.service.Config.MaxMessageBytes()
for {
// Set read/write deadlines so we can close dangling connections
c.socket.SetDeadline(time.Now().Add(time.Second * 120))
if c.limit.Limit() {
time.Sleep(50 * time.Millisecond)
continue
}

// Decode an incoming MQTT packet
msg, err := mqtt.DecodePacket(reader, limit)
msg, err := mqtt.DecodePacket(reader, maxSize)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/broker/conn_test.go
Expand Up @@ -34,7 +34,7 @@ func newTestConn() (pipe *netmock.Conn, conn *Conn) {
}

pipe = netmock.NewConn()
conn = s.newConn(pipe.Client)
conn = s.newConn(pipe.Client, 0)
return
}

Expand Down
16 changes: 8 additions & 8 deletions internal/broker/handlers_test.go
Expand Up @@ -70,7 +70,7 @@ func TestHandlers_onLink(t *testing.T) {

s.Cipher, _ = s.License.Cipher()
conn := netmock.NewConn()
nc := s.newConn(conn.Client)
nc := s.newConn(conn.Client, 0)

resp, ok := nc.onLink([]byte(tc.packet))
assert.Equal(t, tc.success, ok)
Expand All @@ -89,7 +89,7 @@ func TestHandlers_onMe(t *testing.T) {
}

conn := netmock.NewConn()
nc := s.newConn(conn.Client)
nc := s.newConn(conn.Client, 0)
nc.links["0"] = "key/a/b/c/"
resp, success := nc.onMe()
meResp := resp.(*meResponse)
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestHandlers_onSubscribeUnsubscribe(t *testing.T) {
}

conn := netmock.NewConn()
nc := s.newConn(conn.Client)
nc := s.newConn(conn.Client, 0)
s.Cipher, _ = s.License.Cipher()

// Subscribe and check for error.
Expand Down Expand Up @@ -333,7 +333,7 @@ func TestHandlers_onPublish(t *testing.T) {
}

conn := netmock.NewConn()
nc := s.newConn(conn.Client)
nc := s.newConn(conn.Client, 0)
s.Cipher, _ = s.License.Cipher()

err := nc.onPublish(&mqtt.Publish{
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestHandlers_onPresence(t *testing.T) {
}

conn := netmock.NewConn()
nc := s.newConn(conn.Client)
nc := s.newConn(conn.Client, 0)
s.Cipher, _ = s.License.Cipher()

resp, success := nc.onPresence([]byte(tc.payload))
Expand Down Expand Up @@ -518,7 +518,7 @@ func TestHandlers_onKeygen(t *testing.T) {
}

conn := netmock.NewConn()
nc := s.newConn(conn.Client)
nc := s.newConn(conn.Client, 0)
s.Cipher, _ = s.License.Cipher()

//resp
Expand Down Expand Up @@ -590,7 +590,7 @@ func TestHandlers_onEmitterRequest(t *testing.T) {
measurer: stats.NewNoop(),
}

nc := s.newConn(netmock.NewNoop())
nc := s.newConn(netmock.NewNoop(), 0)
ok := nc.onEmitterRequest(channel, []byte(tc.payload), 0)
assert.Equal(t, tc.success, ok, tc.channel)
})
Expand Down Expand Up @@ -640,7 +640,7 @@ func TestHandlers_lookupPresence(t *testing.T) {
measurer: stats.NewNoop(),
}

s.subscriptions.Subscribe(message.Ssid{1, 2, 3}, s.newConn(netmock.NewNoop()))
s.subscriptions.Subscribe(message.Ssid{1, 2, 3}, s.newConn(netmock.NewNoop(), 0))
presence := s.lookupPresence(message.Ssid{1, 2, 3})
assert.NotEmpty(t, presence)
}
4 changes: 2 additions & 2 deletions internal/broker/service.go
Expand Up @@ -223,7 +223,7 @@ func (s *Service) listen(addr *net.TCPAddr, conf *tls.Config) {
// Create new listener
logging.LogTarget("service", "starting the listener", addr)
l, err := listener.New(addr.String(), listener.Config{
WriteRate: s.Config.Limit.WriteRate,
FlushRate: s.Config.Limit.FlushRate,
TLS: conf,
})
if err != nil {
Expand Down Expand Up @@ -291,7 +291,7 @@ func (s *Service) notifyUnsubscribe(conn *Conn, ssid message.Ssid, channel []byt

// Occurs when a new client connection is accepted.
func (s *Service) onAcceptConn(t net.Conn) {
conn := s.newConn(t)
conn := s.newConn(t, s.Config.Limit.ReadRate)
go conn.Process()
}

Expand Down
8 changes: 6 additions & 2 deletions internal/config/config.go
Expand Up @@ -160,12 +160,16 @@ type ClusterConfig struct {
// LimitConfig represents various limit configurations - such as message size.
type LimitConfig struct {

// Maximum message size allowed from/to the peer. Default if not specified is 64kB.
// Maximum message size allowed from/to the client. Default if not specified is 64kB.
MessageSize int `json:"messageSize,omitempty"`

// The maximum messages per second allowed to be processed per client connection. This
// effectively restricts the QpS for an individual connection.
ReadRate int `json:"readRate,omitempty"`

// The maximum socket write rate per connection. This does not limit QpS but instead
// can be used to scale throughput. Defaults to 60.
WriteRate int `json:"writeRate,omitempty"`
FlushRate int `json:"flushRate,omitempty"`
}

// LoadProvider loads a provider from the configuration or panics if the configuration is
Expand Down
4 changes: 2 additions & 2 deletions internal/network/listener/listener.go
Expand Up @@ -69,7 +69,7 @@ var noTimeout time.Duration
// Config represents the configuration of the listener.
type Config struct {
TLS *tls.Config // The TLS/SSL configuration.
WriteRate int // The maximum write rate (QPS) per connection.
FlushRate int // The maximum flush rate (QPS) per connection.
}

// New announces on the local network address laddr. The syntax of laddr is
Expand Down Expand Up @@ -175,7 +175,7 @@ func (m *Listener) Serve() error {
func (m *Listener) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()

muc := newConn(c, m.config.WriteRate)
muc := newConn(c, m.config.FlushRate)
if m.readTimeout > noTimeout {
_ = c.SetReadDeadline(time.Now().Add(m.readTimeout))
}
Expand Down

0 comments on commit b8c4d8e

Please sign in to comment.