Skip to content

Commit

Permalink
FAB-1195 CLI broadcast client should receive ack
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-1195

Not receiving Ack in the client side could cause
the orderer to exit the broadcast stream without
draining it. Made sure the CLI orderer client receives
Ack.

https://jira.hyperledger.org/browse/FAB-1196

Also fixed the build issue  consensus_test.go.

Change-Id: I26f81a8f49e0381b89c532cfd68dbec20d2a5985
Signed-off-by: Srinivasan Muralidharan <muralisr@us.ibm.com>
  • Loading branch information
Srinivasan Muralidharan committed Nov 25, 2016
1 parent f4e12d9 commit f302533
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
2 changes: 1 addition & 1 deletion orderer/solo/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestBatchTimer(t *testing.T) {
func TestBatchTimerHaltOnFilledBatch(t *testing.T) {
filters, cm := getFiltersAndConfig()
batchSize := 2
rl := ramledger.New(10, genesisBlock)
_, rl := ramledger.New(10, genesisBlock)
bs := NewConsenter(batchSize, time.Hour, rl, filters, cm)
defer bs.halt()
it, _ := rl.Iterator(ab.SeekInfo_SPECIFIED, 1)
Expand Down
27 changes: 24 additions & 3 deletions peer/chaincode/noopsordererclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package chaincode
//-------------------------------------------------------------
import (
"fmt"
"time"

cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
Expand All @@ -40,9 +41,25 @@ func newBroadcastClient(client ab.AtomicBroadcast_BroadcastClient) *broadcastCli
return &broadcastClient{client: client}
}

func (s *broadcastClient) getAck() error {
msg, err := s.client.Recv()
if err != nil {
return err
}
if msg.Status != cb.Status_SUCCESS {
return fmt.Errorf("Got unexpected status: %v", msg.Status)
}
return nil
}

//Send data to solo orderer
func Send(serverAddr string, env *cb.Envelope) error {
conn, err := grpc.Dial(serverAddr, grpc.WithInsecure())
var opts []grpc.DialOption
opts = append(opts, grpc.WithInsecure())
opts = append(opts, grpc.WithTimeout(3*time.Second))
opts = append(opts, grpc.WithBlock())

conn, err := grpc.Dial(serverAddr, opts...)
defer conn.Close()
if err != nil {
return fmt.Errorf("Error connecting: %s", err)
Expand All @@ -53,7 +70,11 @@ func Send(serverAddr string, env *cb.Envelope) error {
}

s := newBroadcastClient(client)
s.client.Send(env)
if err = s.client.Send(env); err != nil {
return fmt.Errorf("Could not send :%s)", err)
}

return nil
err = s.getAck()

return err
}

0 comments on commit f302533

Please sign in to comment.