Skip to content

Commit

Permalink
reslove #149
Browse files Browse the repository at this point in the history
Signed-off-by: SamYuan1990 <yy19902439@126.com>
  • Loading branch information
SamYuan1990 committed Feb 26, 2021
1 parent 3144029 commit e7ddfd0
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 11 deletions.
8 changes: 8 additions & 0 deletions e2e/mock/orderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,11 @@ func (o *Orderer) Addrs() string {
func (o *Orderer) Start() {
o.GrpcServer.Serve(o.Listener)
}

func (o *Orderer) DummyBroadcast() {
for i := 0; i < 1000; i++ {
for _, c := range o.TxCs {
c <- struct{}{}
}
}
}
4 changes: 4 additions & 0 deletions e2e/mock/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,7 @@ func (s *Server) OrderAddr() string {
func (s *Server) Addresses() ([]string, string) {
return s.PeersAddresses(), s.OrderAddr()
}

func (s *Server) DummyBroadcast() {
s.orderer.DummyBroadcast()
}
43 changes: 37 additions & 6 deletions pkg/infra/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package infra

import (
"fmt"
"sync"
"time"

"github.com/hyperledger/fabric-protos-go/peer"
Expand All @@ -11,6 +12,7 @@ import (

type Observers struct {
workers []*Observer
ws *WaitingSingal
}

type Observer struct {
Expand All @@ -19,7 +21,12 @@ type Observer struct {
logger *log.Logger
}

func CreateObservers(channel string, nodes []Node, crypto *Crypto, logger *log.Logger) (*Observers, error) {
type WaitingSingal struct {
sync.Mutex
threshold int
}

func CreateObservers(channel string, nodes []Node, threshold int, crypto *Crypto, logger *log.Logger) (*Observers, error) {
var workers []*Observer
for _, node := range nodes {
worker, err := CreateObserver(channel, node, crypto, logger)
Expand All @@ -28,13 +35,21 @@ func CreateObservers(channel string, nodes []Node, crypto *Crypto, logger *log.L
}
workers = append(workers, worker)
}
return &Observers{workers: workers}, nil
ws := &WaitingSingal{threshold: threshold}
return &Observers{workers: workers, ws: ws}, nil
}

func (o *Observers) Start(N int, errorCh chan error, finishCh chan struct{}, now time.Time, blockCollector *BlockCollector) {
func (o *Observers) Start(N int, errorCh chan error, processWg *sync.WaitGroup, finishCh chan struct{}, now time.Time, blockCollector *BlockCollector) {
defer close(finishCh)
for i := 0; i < len(o.workers); i++ {
go o.workers[i].Start(N, errorCh, finishCh, now, blockCollector)
go o.workers[i].Start(N, errorCh, processWg, o.ws, now, blockCollector)
}
for !o.ws.Completed() {

}
duration := time.Since(now)
fmt.Printf("tx: %d, duration: %+v, tps: %f\n", N, duration, float64(N)/duration.Seconds())
processWg.Wait()
}

func CreateObserver(channel string, node Node, crypto *Crypto, logger *log.Logger) (*Observer, error) {
Expand All @@ -60,8 +75,9 @@ func CreateObserver(channel string, node Node, crypto *Crypto, logger *log.Logge
return &Observer{Address: node.Addr, d: deliverer, logger: logger}, nil
}

func (o *Observer) Start(N int, errorCh chan error, finishCh chan struct{}, now time.Time, blockCollector *BlockCollector) {
defer close(finishCh)
func (o *Observer) Start(N int, errorCh chan error, processWg *sync.WaitGroup, ws *WaitingSingal, now time.Time, blockCollector *BlockCollector) {
defer processWg.Done()

o.logger.Debugf("start observer")
n := 0
for n < N {
Expand All @@ -85,4 +101,19 @@ func (o *Observer) Start(N int, errorCh chan error, finishCh chan struct{}, now

n = n + len(fb.FilteredBlock.FilteredTransactions)
}
ws.Add()
}

func (ws *WaitingSingal) Completed() bool {
ws.Lock()
defer ws.Unlock()
return ws.threshold == 0
}

func (ws *WaitingSingal) Add() {
ws.Lock()
defer ws.Unlock()
if ws.threshold > 0 {
ws.threshold--
}
}
89 changes: 89 additions & 0 deletions pkg/infra/observer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package infra_test

import (
"io/ioutil"
"net"
"os"
"sync"
"tape/e2e"
"tape/e2e/mock"
"tape/pkg/infra"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
)

var _ = Describe("Observer", func() {
var (
tmpDir string
logger *log.Logger
mtlsCertFile, mtlsKeyFile *os.File
)

BeforeEach(func() {
logger = log.New()

tmpDir, err := ioutil.TempDir("", "tape-")
Expect(err).NotTo(HaveOccurred())

mtlsCertFile, err = ioutil.TempFile(tmpDir, "mtls-*.crt")
Expect(err).NotTo(HaveOccurred())

mtlsKeyFile, err = ioutil.TempFile(tmpDir, "mtls-*.key")
Expect(err).NotTo(HaveOccurred())

err = e2e.GenerateCertAndKeys(mtlsKeyFile, mtlsCertFile)
Expect(err).NotTo(HaveOccurred())

mtlsCertFile.Close()
mtlsKeyFile.Close()
})

AfterEach(func() {
os.RemoveAll(tmpDir)
})

It("It should work with mock", func() {
lis, err := net.Listen("tcp", "127.0.0.1:0")
Expect(err).NotTo(HaveOccurred())

grpcServer := grpc.NewServer()

mock := &mock.Server{GrpcServer: grpcServer, Listener: lis}
go mock.Start()
defer mock.Stop()

configFile, err := ioutil.TempFile(tmpDir, "config*.yaml")
Expect(err).NotTo(HaveOccurred())
configValue := e2e.Values{
PrivSk: mtlsKeyFile.Name(),
SignCert: mtlsCertFile.Name(),
Mtls: false,
Addr: lis.Addr().String(),
CommitThreshold: 1,
}
e2e.GenerateConfigFile(configFile.Name(), configValue)
config, err := infra.LoadConfig(configFile.Name())
Expect(err).NotTo(HaveOccurred())
crypto, err := config.LoadCrypto()
Expect(err).NotTo(HaveOccurred())

observers, err := infra.CreateObservers(config.Channel, config.Committers, config.CommitThreshold, crypto, logger)
Expect(err).NotTo(HaveOccurred())
processWg := &sync.WaitGroup{}
processWg.Add(len(config.Committers))

finishCh := make(chan struct{})
errorCh := make(chan error, 10)
start := time.Now()
blockCollector, err := infra.NewBlockCollector(config.CommitThreshold, len(config.Committers))
go observers.Start(100, errorCh, processWg, finishCh, start, blockCollector)
mock.DummyBroadcast()
<-finishCh
completed := time.Now()
Expect(start.Sub(completed)).Should(BeNumerically("<", 0.002), "observer with mock shouldn't take too long.")
})
})
10 changes: 5 additions & 5 deletions pkg/infra/process.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package infra

import (
"fmt"
"sync"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -50,26 +50,26 @@ func Process(configPath string, num int, burst int, rate float64, logger *log.Lo
}
broadcaster.Start(envs, errorCh, done)

observers, err := CreateObservers(config.Channel, config.Committers, crypto, logger)
observers, err := CreateObservers(config.Channel, config.Committers, config.CommitThreshold, crypto, logger)
if err != nil {
return err
}

start := time.Now()
processWg := &sync.WaitGroup{}
processWg.Add(len(config.Committers))

go observers.Start(num, errorCh, finishCh, start, blockCollector)
go observers.Start(num, errorCh, processWg, finishCh, start, blockCollector)
go StartCreateProposal(num, burst, rate, config, crypto, raw, errorCh, logger)

for {
select {
case err = <-errorCh:
return err
case <-finishCh:
duration := time.Since(start)
close(done)

logger.Infof("Completed processing transactions.")
fmt.Printf("tx: %d, duration: %+v, tps: %f\n", num, duration, float64(num)/duration.Seconds())
return nil
}
}
Expand Down

0 comments on commit e7ddfd0

Please sign in to comment.