Skip to content

Commit

Permalink
generate Proposals to peer and observe endorsement ProposalResponse
Browse files Browse the repository at this point in the history
Skip process if orderer is null
Skip observer if committer is null

Fixs Hyperledger-TWGC#56

Signed-off-by: SamYuan1990 <yy19902439@126.com>
  • Loading branch information
SamYuan1990 committed Aug 16, 2020
1 parent f1353c4 commit 8d9a006
Show file tree
Hide file tree
Showing 12 changed files with 369 additions and 30 deletions.
4 changes: 4 additions & 0 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ jobs:
INTERGATION_CASE: 'ANDLogic'
MOCK_FABRIC:
FABRIC_VERSION: 'mock'
MOCK_FABRIC_ENDORSEMTNONLY:
FABRIC_VERSION: 'mockEndorsementOnly'
MOCK_FABRIC_COMMITONLY:
FABRIC_VERSION: 'mockCommiterOnly'
LATEST:
FABRIC_VERSION: ''
steps:
Expand Down
70 changes: 42 additions & 28 deletions cmd/stupid/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package main

import (
"fmt"
"math"
"os"
"strconv"
"time"

"github.com/guoger/stupid/pkg/infra"
"github.com/guoger/stupid/pkg/process"
log "github.com/sirupsen/logrus"
)

Expand All @@ -29,48 +31,60 @@ func main() {
if err != nil {
panic(err)
}
rowLength := int(math.Ceil(float64(N) * 0.75))
crypto := config.LoadCrypto()
processBit := config.GetBitFlag()

raw := make(chan *infra.Elements, 100)
signed := make([]chan *infra.Elements, len(config.Endorsers))
processed := make(chan *infra.Elements, 10)
envs := make(chan *infra.Elements, 10)
var processed chan *infra.Elements
done := make(chan struct{})
SignProcess := &process.SignProcess{}
EndorsementProcess := &process.EndorsementProcess{}
BroadcasterProcess := &process.BroadcasterProcess{}
BlockProcess := &process.BlockProcess{}

assember := &infra.Assembler{Signer: crypto}
if processBit == infra.GenerateProposal || processBit == infra.ProcessALL {
// for endorsement only, warm up , as the comsuer
SignProcess = process.CreateSignProcess(len(config.Endorsers), rowLength, crypto)
SignProcess.Consume(done)
EndorsementProcess = process.CreateEndorsementProcess(rowLength, config, crypto, logger)

for i := 0; i < len(config.Endorsers); i++ {
signed[i] = make(chan *infra.Elements, 10)
EndorsementProcess.SetCommunication(SignProcess.Output)
processed = EndorsementProcess.Output
EndorsementProcess.Consume(done)
}

for i := 0; i < 5; i++ {
go assember.StartSigner(raw, signed, done)
go assember.StartIntegrator(processed, envs, done)
if processBit == infra.GenerateEnvelope || processBit == infra.ProcessALL {
// for order test, warm up
BroadcasterProcess = process.CreateBroadcasterProcessProcess(rowLength, config, crypto, logger)
BroadcasterProcess.Consume(done)
BroadcasterProcess.Produce(processed, done)
}

proposor := infra.CreateProposers(config.NumOfConn, config.ClientPerConn, config.Endorsers, logger)
proposor.Start(signed, processed, done, config)
start := time.Now()

broadcaster := infra.CreateBroadcasters(config.NumOfConn, config.Orderer, logger)
broadcaster.Start(envs, done)
if processBit == infra.GenerateBlock || processBit == infra.ProcessALL {
// Observer Only, warm up
BlockProcess = process.CreateBlockProcessProcess(config, crypto, logger)
BlockProcess.Consume(N, start)
}

observer := infra.CreateObserver(config.Channel, config.Committer, crypto, logger)
if processBit == infra.GenerateProposal || processBit == infra.ProcessALL {
SignProcess.Produce(N, crypto, config)
}

start := time.Now()
go observer.Start(N, start)

for i := 0; i < N; i++ {
prop := infra.CreateProposal(
crypto,
config.Channel,
config.Chaincode,
config.Version,
config.Args...,
)
raw <- &infra.Elements{Proposal: prop}
if processBit == infra.GenerateEnvelope {
BroadcasterProcess.MockProduce(N, crypto, config)
}

// to do
/*if processBit == infra.GenerateBlock {
}*/

if processBit == infra.GenerateBlock || processBit == infra.ProcessALL {
BlockProcess.Consumer.Wait()
}

observer.Wait()
duration := time.Since(start)
close(done)
logger.Infof("Completed processing transactions.")
Expand Down
21 changes: 21 additions & 0 deletions docs/design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Here is high level desgin for stupid

## "Bit Map"
As description in #56, often we need to **spot network bottleneck** by testing 3 phases `Endorsement - Order - Commitment` **separately**:
- generate `Proposals` to peer and observe endorsement `ProposalResponse`
- generate `Envelope` to orderers and observe ordered `Block`
- generate `Block` to peer and observed committed `Block` (this is the most tricky one)

This could help us benchmarking fabric components in finer-grained manners.

So here is a bit map designed in `/cmd/stupid/main.go`

| generate `Proposals` | generate `Envelope` | generate `Block` | Bit number | Comments|
| ---- | ---- | ---- | ---- | ---- |
| Lowest | -> | Highet | | from left to right is the lowest to highest bit
| 1 | 0 | 0 | 1 | test generate `Proposals` to peer and observe endorsement `ProposalResponse`
| 0 | 1 | 0 | 2 | test generate `Envelope` to orderers and observe ordered `Block`
| 0 | 0 | 1 | 4 | test generate `Block` to peer and observed committed `Block`
| 1 | 1 | 1 | 7 | test all process to target fabric network

## Producer, Consumer mode
3 changes: 1 addition & 2 deletions mock/fabric/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func (o *Orderer) Broadcast(srv orderer.AtomicBroadcast_BroadcastServer) error {
}

o.TxC <- struct{}{}

err = srv.Send(&orderer.BroadcastResponse{Status: common.Status_SUCCESS})
if err != nil {
return err
Expand All @@ -88,7 +87,7 @@ func main() {

fmt.Println("Start listening on localhost...")

blockC := make(chan struct{}, 1000)
blockC := make(chan struct{}, 10000)

p := &Peer{
BlkSize: 10,
Expand Down
21 changes: 21 additions & 0 deletions pkg/infra/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,27 @@ func LoadConfig(f string) Config {
return config
}

const GenerateProposal = 1
const GenerateEnvelope = 2
const GenerateBlock = 4
const ProcessALL = 7

func (c Config) GetBitFlag() int {
processBit := 0
if len(c.Endorsers) > 0 {
processBit += GenerateProposal
}

if len(c.Orderer.Addr) > 0 {
processBit += GenerateEnvelope
}

if len(c.Committer.Addr) > 0 {
processBit += GenerateBlock
}
return processBit
}

func (c Config) LoadCrypto() *Crypto {
var allcerts []string
for _, p := range c.Endorsers {
Expand Down
34 changes: 34 additions & 0 deletions pkg/process/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package process

import (
"time"

"github.com/guoger/stupid/pkg/infra"
log "github.com/sirupsen/logrus"
)

type BlockProcess struct {
//Producer *infra.Assembler
Consumer *infra.Observer
//Communication chan *infra.Elements
}

func CreateBlockProcessProcess(config infra.Config, crypto *infra.Crypto, logger *log.Logger) *BlockProcess {
observer := infra.CreateObserver(config.Channel, config.Committer, crypto, logger)
return &BlockProcess{
//Producer: assember,
Consumer: observer,
//Communication: envs,
}
}

func (process *BlockProcess) Produce() {
// start producert to produce data into channel
// the productor of broadcaster
}

func (process *BlockProcess) Consume(N int, start time.Time) {
// the consume don't do nothing but start and listening the Communication channel
go process.Consumer.Start(N, start)
//go process.Consumer.Wait()
}
86 changes: 86 additions & 0 deletions pkg/process/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package process

import (
"github.com/guoger/stupid/pkg/infra"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/protoutil"
log "github.com/sirupsen/logrus"
)

type BroadcasterProcess struct {
Producer *infra.Assembler
Consumer infra.Broadcasters
Communication chan *infra.Elements
}

func CreateBroadcasterProcessProcess(row_length int, config infra.Config, crypto *infra.Crypto, logger *log.Logger) *BroadcasterProcess {
envs := make(chan *infra.Elements, row_length)
assember := &infra.Assembler{Signer: crypto}
broadcaster := infra.CreateBroadcasters(config.NumOfConn, config.Orderer, logger)
return &BroadcasterProcess{
Producer: assember,
Consumer: broadcaster,
Communication: envs,
}
}

func createTestHeader(channelId string) *common.Header {
nonce := []byte("nonce-abc-12345")
creator := []byte("nonce-abc-12345")
txid := protoutil.ComputeTxID(nonce, creator)

txType := common.HeaderType_ENDORSER_TRANSACTION
chdr := &common.ChannelHeader{
Type: int32(txType),
ChannelId: channelId,
TxId: txid,
Epoch: uint64(0),
}

shdr := &common.SignatureHeader{
Creator: creator,
Nonce: nonce,
}

return &common.Header{
ChannelHeader: protoutil.MarshalOrPanic(chdr),
SignatureHeader: protoutil.MarshalOrPanic(shdr),
}
}

func createTestEnvelope(channelId string, signer *infra.Crypto) *common.Envelope {
payload := &common.Payload{
Header: createTestHeader(channelId),
Data: []byte("data"),
}
payloadBytes, _ := protoutil.GetBytesPayload(payload)

signature, _ := signer.Sign(payloadBytes)

return &common.Envelope{
Payload: payloadBytes,
Signature: signature,
}
}

func (process *BroadcasterProcess) MockProduce(N int, crypto *infra.Crypto, config infra.Config) {
for i := 0; i < N; i++ {
env := createTestEnvelope(config.Channel, crypto)
process.Communication <- &infra.Elements{
Envelope: env,
}
}
}

func (process *BroadcasterProcess) Produce(processed chan *infra.Elements, done <-chan struct{}) {
// start producert to produce data into channel
// the productor of broadcaster
for i := 0; i < 5; i++ {
go process.Producer.StartIntegrator(processed, process.Communication, done)
}
}

func (process *BroadcasterProcess) Consume(done <-chan struct{}) {
// the consume don't do nothing but start and listening the Communication channel
go process.Consumer.Start(process.Communication, done)
}
49 changes: 49 additions & 0 deletions pkg/process/endorsement.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package process

import (
log "github.com/sirupsen/logrus"

"github.com/guoger/stupid/pkg/infra"
)

type EndorsementProcess struct {
Producer *infra.Assembler
Consumer *infra.Proposers
Config infra.Config
Communication []chan *infra.Elements
Output chan *infra.Elements
}

func CreateEndorsementProcess(row_length int, config infra.Config, crypto *infra.Crypto, logger *log.Logger) *EndorsementProcess {
assember := &infra.Assembler{Signer: crypto}
proposor := infra.CreateProposers(config.NumOfConn, config.ClientPerConn, config.Endorsers, logger)
processed := make(chan *infra.Elements, row_length)
signed := make([]chan *infra.Elements, len(config.Endorsers))
for i := 0; i < len(config.Endorsers); i++ {
signed[i] = make(chan *infra.Elements, row_length)
}
return &EndorsementProcess{
Config: config,
Producer: assember,
Consumer: proposor,
Communication: signed,
Output: processed,
}
}

func (process *EndorsementProcess) SetCommunication(communication []chan *infra.Elements) {
process.Communication = communication
}

func (process *EndorsementProcess) SetOutput(output chan *infra.Elements) {
process.Output = output
}

func (process *EndorsementProcess) Produce() {
// start producert to produce data into channel
}

func (process *EndorsementProcess) Consume(done <-chan struct{}) {
process.Consumer.Start(process.Communication, process.Output, done, process.Config)
// the consume don't do nothing but start and listening the Communication channel
}
Loading

0 comments on commit 8d9a006

Please sign in to comment.