Skip to content

Commit

Permalink
Supply cancelable context to ALL workers. (#167)
Browse files Browse the repository at this point in the history
  • Loading branch information
yiwenlong committed Apr 7, 2021
1 parent 756c0bd commit 0d2922a
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 77 deletions.
9 changes: 5 additions & 4 deletions pkg/infra/assembler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package infra

import (
"context"
"sync"

"github.com/hyperledger/fabric-protos-go/common"
Expand Down Expand Up @@ -38,7 +39,7 @@ func (a *Assembler) sign(e *Elements) (*Elements, error) {
return e, nil
}

func (a *Assembler) StartSigner(raw chan *Elements, signed []chan *Elements, errorCh chan error, done <-chan struct{}) {
func (a *Assembler) StartSigner(ctx context.Context, raw chan *Elements, signed []chan *Elements, errorCh chan error) {
for {
select {
case r := <-raw:
Expand All @@ -50,13 +51,13 @@ func (a *Assembler) StartSigner(raw chan *Elements, signed []chan *Elements, err
for _, v := range signed {
v <- t
}
case <-done:
case <-ctx.Done():
return
}
}
}

func (a *Assembler) StartIntegrator(processed, envs chan *Elements, errorCh chan error, done <-chan struct{}) {
func (a *Assembler) StartIntegrator(ctx context.Context, processed, envs chan *Elements, errorCh chan error) {
for {
select {
case p := <-processed:
Expand All @@ -66,7 +67,7 @@ func (a *Assembler) StartIntegrator(processed, envs chan *Elements, errorCh chan
return
}
envs <- e
case <-done:
case <-ctx.Done():
return
}
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/infra/benchmark_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package infra

import (
"context"
"testing"
"time"

Expand All @@ -10,19 +11,19 @@ import (
log "github.com/sirupsen/logrus"
)

func StartProposer(signed, processed chan *Elements, done chan struct{}, logger *log.Logger, threshold int, addr string) {
func StartProposer(ctx context.Context, signed, processed chan *Elements, logger *log.Logger, threshold int, addr string) {
peer := Node{
Addr: addr,
}
Proposer, _ := CreateProposer(peer, logger)
go Proposer.Start(signed, processed, done, threshold)
go Proposer.Start(ctx, signed, processed, threshold)
}

func benchmarkNPeer(concurrency int, b *testing.B) {
processed := make(chan *Elements, 10)
done := make(chan struct{})
defer close(done)
signeds := make([]chan *Elements, concurrency)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for i := 0; i < concurrency; i++ {
signeds[i] = make(chan *Elements, 10)
mockpeer, err := mock.NewServer(1, nil)
Expand All @@ -31,7 +32,7 @@ func benchmarkNPeer(concurrency int, b *testing.B) {
}
mockpeer.Start()
defer mockpeer.Stop()
StartProposer(signeds[i], processed, done, nil, concurrency, mockpeer.PeersAddresses()[0])
StartProposer(ctx, signeds[i], processed, nil, concurrency, mockpeer.PeersAddresses()[0])
}
b.ReportAllocs()
b.ResetTimer()
Expand Down Expand Up @@ -89,7 +90,7 @@ func benchmarkAsyncCollector(concurrent int, b *testing.B) {
instance, _ := NewBlockCollector(concurrent, concurrent)
block := make(chan *peer.FilteredBlock, 100)
done := make(chan struct{})
go instance.Start(block, done, b.N, time.Now(), false)
go instance.Start(context.Background(), block, done, b.N, time.Now(), false)

b.ReportAllocs()
b.ResetTimer()
Expand Down
43 changes: 25 additions & 18 deletions pkg/infra/block_collector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package infra

import (
"context"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -33,35 +34,41 @@ func NewBlockCollector(threshold int, total int) (*BlockCollector, error) {
}

func (bc *BlockCollector) Start(
ctx context.Context,
blockCh <-chan *peer.FilteredBlock,
finishCh chan struct{},
totalTx int,
now time.Time,
printResult bool, // controls whether to print block commit message. Tests set this to false to avoid polluting stdout.
) {
// TODO block collector should be able to detect repeated block, and exclude it from total tx counting.
for block := range blockCh {
cnt := bc.registry[block.Number] // cnt is default to 0 when key does not exist
cnt++
for {
select {
case block := <-blockCh:
cnt := bc.registry[block.Number] // cnt is default to 0 when key does not exist
cnt++

// newly committed block just hits threshold
if cnt == bc.thresholdP {
if printResult {
fmt.Printf("Time %8.2fs\tBlock %6d\tTx %6d\t \n", time.Since(now).Seconds(), block.Number, len(block.FilteredTransactions))
}
// newly committed block just hits threshold
if cnt == bc.thresholdP {
if printResult {
fmt.Printf("Time %8.2fs\tBlock %6d\tTx %6d\t \n", time.Since(now).Seconds(), block.Number, len(block.FilteredTransactions))
}

bc.totalTx += len(block.FilteredTransactions)
if bc.totalTx >= totalTx {
close(finishCh)
bc.totalTx += len(block.FilteredTransactions)
if bc.totalTx >= totalTx {
close(finishCh)
}
}
}

if cnt == bc.totalP {
// committed on all peers, remove from registry
delete(bc.registry, block.Number)
} else {
// upsert back to registry
bc.registry[block.Number] = cnt
if cnt == bc.totalP {
// committed on all peers, remove from registry
delete(bc.registry, block.Number)
} else {
// upsert back to registry
bc.registry[block.Number] = cnt
}
case <-ctx.Done():
return
}
}
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/infra/block_collector_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package infra_test

import (
"context"
"sync"
"tape/pkg/infra"
"time"
Expand All @@ -20,7 +21,7 @@ var _ = Describe("BlockCollector", func() {

block := make(chan *peer.FilteredBlock)
done := make(chan struct{})
go instance.Start(block, done, 2, time.Now(), false)
go instance.Start(context.Background(), block, done, 2, time.Now(), false)

block <- &peer.FilteredBlock{Number: 0, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
Consistently(done).ShouldNot(BeClosed())
Expand All @@ -34,7 +35,7 @@ var _ = Describe("BlockCollector", func() {

block := make(chan *peer.FilteredBlock)
done := make(chan struct{})
go instance.Start(block, done, 2, time.Now(), false)
go instance.Start(context.Background(), block, done, 2, time.Now(), false)

block <- &peer.FilteredBlock{Number: 0, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
Consistently(done).ShouldNot(BeClosed())
Expand All @@ -56,7 +57,7 @@ var _ = Describe("BlockCollector", func() {

block := make(chan *peer.FilteredBlock)
done := make(chan struct{})
go instance.Start(block, done, 2, time.Now(), false)
go instance.Start(context.Background(), block, done, 2, time.Now(), false)

block <- &peer.FilteredBlock{Number: 1, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
Consistently(done).ShouldNot(BeClosed())
Expand All @@ -83,7 +84,7 @@ var _ = Describe("BlockCollector", func() {

block := make(chan *peer.FilteredBlock)
done := make(chan struct{})
go instance.Start(block, done, 1, time.Now(), false)
go instance.Start(context.Background(), block, done, 1, time.Now(), false)

block <- &peer.FilteredBlock{FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
Consistently(done).ShouldNot(BeClosed())
Expand All @@ -97,7 +98,7 @@ var _ = Describe("BlockCollector", func() {

block := make(chan *peer.FilteredBlock)
done := make(chan struct{})
go instance.Start(block, done, 2, time.Now(), false)
go instance.Start(context.Background(), block, done, 2, time.Now(), false)

block <- &peer.FilteredBlock{Number: 0, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
Consistently(done).ShouldNot(BeClosed())
Expand All @@ -120,7 +121,7 @@ var _ = Describe("BlockCollector", func() {

block := make(chan *peer.FilteredBlock)
done := make(chan struct{})
go instance.Start(block, done, 1, time.Now(), false)
go instance.Start(context.Background(), block, done, 1, time.Now(), false)

var wg sync.WaitGroup
wg.Add(100)
Expand Down
8 changes: 4 additions & 4 deletions pkg/infra/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,20 @@ func CreateEndorserClient(node Node, logger *log.Logger) (peer.EndorserClient, e
return peer.NewEndorserClient(conn), nil
}

func CreateBroadcastClient(node Node, logger *log.Logger) (orderer.AtomicBroadcast_BroadcastClient, error) {
func CreateBroadcastClient(ctx context.Context, node Node, logger *log.Logger) (orderer.AtomicBroadcast_BroadcastClient, error) {
conn, err := DailConnection(node, logger)
if err != nil {
return nil, err
}
return orderer.NewAtomicBroadcastClient(conn).Broadcast(context.Background())
return orderer.NewAtomicBroadcastClient(conn).Broadcast(ctx)
}

func CreateDeliverFilteredClient(node Node, logger *log.Logger) (peer.Deliver_DeliverFilteredClient, error) {
func CreateDeliverFilteredClient(ctx context.Context, node Node, logger *log.Logger) (peer.Deliver_DeliverFilteredClient, error) {
conn, err := DailConnection(node, logger)
if err != nil {
return nil, err
}
return peer.NewDeliverClient(conn).DeliverFiltered(context.Background())
return peer.NewDeliverClient(conn).DeliverFiltered(ctx)
}

// TODO: use a global get logger function instead inject a logger
Expand Down
5 changes: 3 additions & 2 deletions pkg/infra/client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package infra_test

import (
"context"
"tape/pkg/infra"

. "github.com/onsi/ginkgo"
Expand All @@ -21,11 +22,11 @@ var _ = Describe("Client", func() {
Expect(err).Should(MatchError(ContainSubstring("error connecting to invalid_addr")))
})
It("captures error from broadcaster", func() {
_, err := infra.CreateBroadcastClient(dummy, logger)
_, err := infra.CreateBroadcastClient(context.Background(), dummy, logger)
Expect(err).Should(MatchError(ContainSubstring("error connecting to invalid_addr")))
})
It("captures error from DeliverFilter", func() {
_, err := infra.CreateDeliverFilteredClient(dummy, logger)
_, err := infra.CreateDeliverFilteredClient(context.Background(), dummy, logger)
Expect(err).Should(MatchError(ContainSubstring("error connecting to invalid_addr")))
})
})
Expand Down
12 changes: 6 additions & 6 deletions pkg/infra/observer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package infra

import (
"context"
"time"

"github.com/hyperledger/fabric-protos-go/peer"
Expand All @@ -18,10 +19,10 @@ type Observer struct {
logger *log.Logger
}

func CreateObservers(channel string, nodes []Node, crypto *Crypto, logger *log.Logger) (*Observers, error) {
func CreateObservers(ctx context.Context, channel string, nodes []Node, crypto *Crypto, logger *log.Logger) (*Observers, error) {
var workers []*Observer
for _, node := range nodes {
worker, err := CreateObserver(channel, node, crypto, logger)
worker, err := CreateObserver(ctx, channel, node, crypto, logger)
if err != nil {
return nil, err
}
Expand All @@ -36,13 +37,13 @@ func (o *Observers) Start(errorCh chan error, blockCh chan<- *peer.FilteredBlock
}
}

func CreateObserver(channel string, node Node, crypto *Crypto, logger *log.Logger) (*Observer, error) {
deliverer, err := CreateDeliverFilteredClient(node, logger)
func CreateObserver(ctx context.Context, channel string, node Node, crypto *Crypto, logger *log.Logger) (*Observer, error) {
seek, err := CreateSignedDeliverNewestEnv(channel, crypto)
if err != nil {
return nil, err
}

seek, err := CreateSignedDeliverNewestEnv(channel, crypto)
deliverer, err := CreateDeliverFilteredClient(ctx, node, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -76,7 +77,6 @@ func (o *Observer) Start(errorCh chan error, blockCh chan<- *peer.FilteredBlock,
fb := r.Type.(*peer.DeliverResponse_FilteredBlock)
o.logger.Debugf("receivedTime %8.2fs\tBlock %6d\tTx %6d\t Address %s\n", time.Since(now).Seconds(), fb.FilteredBlock.Number, len(fb.FilteredBlock.FilteredTransactions), o.Address)

// TODO use proper context to create deliver client so it could be cancelled (now it's context.Background).
blockCh <- fb.FilteredBlock

n = n + len(fb.FilteredBlock.FilteredTransactions)
Expand Down
21 changes: 11 additions & 10 deletions pkg/infra/process.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package infra

import (
"context"
"fmt"
"time"

Expand All @@ -22,7 +23,6 @@ func Process(configPath string, num int, burst int, rate float64, logger *log.Lo
signed := make([]chan *Elements, len(config.Endorsers))
processed := make(chan *Elements, burst)
envs := make(chan *Elements, burst)
done := make(chan struct{})
blockCh := make(chan *peer.FilteredBlock)
finishCh := make(chan struct{})
errorCh := make(chan error, burst)
Expand All @@ -35,31 +35,34 @@ func Process(configPath string, num int, burst int, rate float64, logger *log.Lo
signed[i] = make(chan *Elements, burst)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for i := 0; i < 5; i++ {
go assembler.StartSigner(raw, signed, errorCh, done)
go assembler.StartIntegrator(processed, envs, errorCh, done)
go assembler.StartSigner(ctx, raw, signed, errorCh)
go assembler.StartIntegrator(ctx, processed, envs, errorCh)
}

proposers, err := CreateProposers(config.NumOfConn, config.Endorsers, logger)
if err != nil {
return err
}
proposers.Start(signed, processed, done, config)
proposers.Start(ctx, signed, processed, config)

broadcaster, err := CreateBroadcasters(config.NumOfConn, config.Orderer, logger)
broadcaster, err := CreateBroadcasters(ctx, config.NumOfConn, config.Orderer, logger)
if err != nil {
return err
}
broadcaster.Start(envs, errorCh, done)
broadcaster.Start(ctx, envs, errorCh)

observers, err := CreateObservers(config.Channel, config.Committers, crypto, logger)
observers, err := CreateObservers(ctx, config.Channel, config.Committers, crypto, logger)
if err != nil {
return err
}

start := time.Now()

go blockCollector.Start(blockCh, finishCh, num, time.Now(), true)
go blockCollector.Start(ctx, blockCh, finishCh, num, time.Now(), true)
go observers.Start(errorCh, blockCh, start)
go StartCreateProposal(num, burst, rate, config, crypto, raw, errorCh)

Expand All @@ -69,8 +72,6 @@ func Process(configPath string, num int, burst int, rate float64, logger *log.Lo
return err
case <-finishCh:
duration := time.Since(start)
close(done)

logger.Infof("Completed processing transactions.")
fmt.Printf("tx: %d, duration: %+v, tps: %f\n", num, duration, float64(num)/duration.Seconds())
return nil
Expand Down
Loading

0 comments on commit 0d2922a

Please sign in to comment.