Skip to content

Commit

Permalink
Throttle greedy clients in orderer Broadcast API (#4640)
Browse files Browse the repository at this point in the history
This commit implements a shared rate limiter for connections authenticated with mutual TLS.
The throttling and rate limiting is applied at both the Authority Key Identifier and Subject Key Identifier level,
which guarantees no single organization or client can bombard the orderer with endless transactions on the expense of others.

The effective rate is divided across all clients and orgs for a time period specified by InactivityTimeout.

By default this feature is turned off.

Signed-off-by: Yacov Manevich <yacov.manevich@ibm.com>
  • Loading branch information
yacovm committed Jan 30, 2024
1 parent c98a1ee commit 5cb00be
Show file tree
Hide file tree
Showing 12 changed files with 1,050 additions and 3 deletions.
7 changes: 7 additions & 0 deletions integration/nwo/template/orderer_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ const DefaultOrderer = `---
General:
ListenAddress: 127.0.0.1
ListenPort: {{ .OrdererPort Orderer "Listen" }}
Throttling:
# Rate is the maximum rate for all clients combined.
Rate: 0
# InactivityTimeout defines the time frame after which
# inactive clients are pruned from memory and are not considered
# when allocating the budget for throttling per client.
InactivityTimeout: 5s
TLS:
Enabled: {{ .TLSEnabled }}
PrivateKey: {{ $w.OrdererLocalTLSDir Orderer }}/server.key
Expand Down
120 changes: 118 additions & 2 deletions integration/raft/cft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package raft

import (
"context"
"crypto/ecdsa"
"crypto/rand"
"crypto/x509"
Expand All @@ -16,19 +17,20 @@ import (
"path"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

"github.com/hyperledger/fabric/integration/channelparticipation"

docker "github.com/fsouza/go-dockerclient"
"github.com/golang/protobuf/proto"
conftx "github.com/hyperledger/fabric-config/configtx"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/msp"
orderer2 "github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric-protos-go/orderer/etcdraft"
"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/integration/channelparticipation"
"github.com/hyperledger/fabric/integration/nwo"
"github.com/hyperledger/fabric/integration/nwo/commands"
"github.com/hyperledger/fabric/integration/ordererclient"
Expand Down Expand Up @@ -141,6 +143,60 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
})
})

When("orderer is configured with throttling", func() {
It("a hyperactive client cannot overwhelm the orderer", func() {
network = nwo.New(nwo.BasicEtcdRaft(), testDir, client, StartPort(), components)

network.GenerateConfigTree()
network.Bootstrap()

orderer := network.Orderer("orderer")

oRunner := network.OrdererRunner(orderer)

ordererProc = ifrit.Invoke(oRunner)
Eventually(ordererProc.Ready(), network.EventuallyTimeout).Should(BeClosed())

channelparticipation.JoinOrderersAppChannelCluster(network, "testchannel", orderer)

envs := make(chan *common.Envelope, 5000)

// Create 5000 envelopes to send to the orderer at the same time
for i := 0; i < 5000; i++ {
envs <- ordererclient.CreateBroadcastEnvelope(network, orderer, "testchannel", []byte(fmt.Sprintf("%d", i)))
}

close(envs)

// Broadcast all envelopes in parallel from 50 clients
Eventually(oRunner.Err, time.Minute).Should(gbytes.Say("Start accepting requests as Raft leader"))
TPS := measureTPS(5000, network, orderer, envs)
Expect(TPS).To(BeNumerically(">", 500))

// Next, restart the orderer with throttling enabled
ordererProc.Signal(syscall.SIGTERM)
Eventually(ordererProc.Wait(), network.EventuallyTimeout).Should(Receive())

oRunner = network.OrdererRunner(orderer, "ORDERER_GENERAL_THROTTLING_RATE=500")
ordererProc = ifrit.Invoke(oRunner)

// Re-create the envelopes
envs = make(chan *common.Envelope, 5000)

// Create 5000 envelopes to send to the orderer at the same time
for i := 0; i < 5000; i++ {
envs <- ordererclient.CreateBroadcastEnvelope(network, orderer, "testchannel", []byte(fmt.Sprintf("%d", i)))
}

close(envs)

// Broadcast all envelopes in parallel from 50 clients and ensure it's not as fast as earlier
Eventually(oRunner.Err, time.Minute).Should(gbytes.Say("Start accepting requests as Raft leader"))
TPS = measureTPS(5000, network, orderer, envs)
Expect(TPS).To(Equal(500))
})
})

When("an orderer is behind the latest snapshot on leader", func() {
It("catches up using the block stored in snapshot", func() {
// Steps:
Expand Down Expand Up @@ -1018,6 +1074,66 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
})
})

func measureTPS(txNum int, network *nwo.Network, orderer *nwo.Orderer, envs chan *common.Envelope) int {
var bcastWG sync.WaitGroup
bcastWG.Add(50)

var lock sync.Mutex
cond := &sync.Cond{L: &lock}

var wg sync.WaitGroup
wg.Add(50)

for i := 0; i < 50; i++ {
go func() {
defer bcastWG.Done()
conn := network.NewClientConn(
network.OrdererAddress(orderer, nwo.ListenPort),
filepath.Join(network.OrdererLocalTLSDir(orderer), "ca.crt"),
filepath.Join(network.OrdererLocalTLSDir(orderer), "server.crt"),
filepath.Join(network.OrdererLocalTLSDir(orderer), "server.key"),
)

broadcaster, err := orderer2.NewAtomicBroadcastClient(conn).Broadcast(context.Background())
if err != nil {
panic(err)
}

wg.Done()

lock.Lock()
cond.Wait()
lock.Unlock()

for len(envs) > 0 {
env, ok := <-envs
if ok {
err := broadcaster.Send(env)
if err != nil {
panic(err)
}
_, err = broadcaster.Recv()
if err != nil {
panic(err)
}
}
}
}()
}

start := time.Now()
wg.Wait()
cond.Broadcast()
bcastWG.Wait()

elapsed := time.Since(start)
if elapsed < time.Second {
elapsed = time.Second
}

return txNum / int(elapsed.Seconds())
}

func renewOrdererCertificates(network *nwo.Network, orderers ...*nwo.Orderer) {
if len(orderers) == 0 {
return
Expand Down
20 changes: 20 additions & 0 deletions orderer/common/localconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type General struct {
Authentication Authentication
MaxRecvMsgSize int32
MaxSendMsgSize int32
Throttling Throttling
}

type Cluster struct {
Expand Down Expand Up @@ -148,6 +149,19 @@ type Admin struct {
TLS TLS
}

// Throttling defines a max rate of transactions per client.
// The effective rate per client is the rate defined divided equally
// by all clients, until the clients cease from sending transactions
// and inactivity timeout expires for them.
type Throttling struct {
// Rate is the maximum rate for all clients combined.
Rate int
// InactivityTimeout defines the time frame after which
// inactive clients are pruned from memory and are not considered
// when allocating the budget for throttling per client.
InactivityTimeout time.Duration
}

// ChannelParticipation provides the channel participation API configuration for the orderer.
// Channel participation uses the same ListenAddress and TLS settings of the Operations service.
type ChannelParticipation struct {
Expand Down Expand Up @@ -183,6 +197,9 @@ var Defaults = TopLevel{
},
MaxRecvMsgSize: comm.DefaultMaxRecvMsgSize,
MaxSendMsgSize: comm.DefaultMaxSendMsgSize,
Throttling: Throttling{
InactivityTimeout: time.Second * 5,
},
},
FileLedger: FileLedger{
Location: "/var/hyperledger/production/orderer",
Expand Down Expand Up @@ -344,6 +361,9 @@ func (c *TopLevel) completeInitialization(configDir string) {
case c.General.MaxSendMsgSize == 0:
logger.Infof("General.MaxSendMsgSize is unset, setting to %v", Defaults.General.MaxSendMsgSize)
c.General.MaxSendMsgSize = Defaults.General.MaxSendMsgSize
case c.General.Throttling.InactivityTimeout == 0:
logger.Infof("General.Throttling.InactivityTimeout is unset, setting to %v", Defaults.General.Throttling.InactivityTimeout)
c.General.Throttling.InactivityTimeout = Defaults.General.Throttling.InactivityTimeout
default:
return
}
Expand Down
12 changes: 11 additions & 1 deletion orderer/common/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func Main() {
defer adminServer.Stop()

mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert

server := NewServer(
manager,
metricsProvider,
Expand Down Expand Up @@ -220,7 +221,16 @@ func Main() {
if conf.General.Profile.Enabled {
go initializeProfilingService(conf)
}
ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)

clientRateLimiter, orgRateLimiter := CreateThrottlers(conf.General.Throttling)
throttlingWrapper := &ThrottlingAtomicBroadcast{
ThrottlingEnabled: conf.General.Throttling.Rate > 0,
PerOrgRateLimiter: orgRateLimiter,
PerClientRateLimiter: clientRateLimiter,
AtomicBroadcastServer: server,
}

ab.RegisterAtomicBroadcastServer(grpcServer.Server(), throttlingWrapper)
logger.Info("Beginning to serve requests")
if err := grpcServer.Start(); err != nil {
logger.Fatalf("Atomic Broadcast gRPC server has terminated while serving requests due to: %v", err)
Expand Down

0 comments on commit 5cb00be

Please sign in to comment.