Skip to content

Commit ac47960

Browse files
author
Jason Yellick
committed
[FAB-5836] Cleanup orderer sampleclients
The sample clients for the orderer have fallen into a mild state of disrepair. The single_tx_client has not worked in a long time, and some unnecessary sleeps have crept into broadcast_msg. Additionally, deliver_stdout does not sign messages properly, and tries to print binary to the screen. This CR fixes all of those issues. Change-Id: Iec182163fcbc06197d6bd944e3b4f2e58a0de769 Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
1 parent 47d0e3a commit ac47960

File tree

4 files changed

+59
-217
lines changed

4 files changed

+59
-217
lines changed

orderer/sample_clients/broadcast_msg/client.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"fmt"
1212
"os"
1313
"sync"
14-
"time"
1514

1615
"github.com/hyperledger/fabric/common/crypto"
1716
"github.com/hyperledger/fabric/common/localmsp"
@@ -27,22 +26,21 @@ import (
2726
)
2827

2928
type broadcastClient struct {
30-
client ab.AtomicBroadcast_BroadcastClient
31-
signer crypto.LocalSigner
32-
chainID string
29+
client ab.AtomicBroadcast_BroadcastClient
30+
signer crypto.LocalSigner
31+
channelID string
3332
}
3433

3534
// newBroadcastClient creates a simple instance of the broadcastClient interface
36-
func newBroadcastClient(client ab.AtomicBroadcast_BroadcastClient, chainID string, signer crypto.LocalSigner) *broadcastClient {
37-
return &broadcastClient{client: client, chainID: chainID, signer: signer}
35+
func newBroadcastClient(client ab.AtomicBroadcast_BroadcastClient, channelID string, signer crypto.LocalSigner) *broadcastClient {
36+
return &broadcastClient{client: client, channelID: channelID, signer: signer}
3837
}
3938

4039
func (s *broadcastClient) broadcast(transaction []byte) error {
41-
env, err := utils.CreateSignedEnvelope(cb.HeaderType_MESSAGE, s.chainID, s.signer, &cb.Envelope{Signature: transaction}, 0, 0)
40+
env, err := utils.CreateSignedEnvelope(cb.HeaderType_MESSAGE, s.channelID, s.signer, &cb.ConfigValue{Value: transaction}, 0, 0)
4241
if err != nil {
4342
panic(err)
4443
}
45-
time.Sleep(time.Second)
4644
return s.client.Send(env)
4745
}
4846

@@ -69,14 +67,14 @@ func main() {
6967

7068
signer := localmsp.NewSigner()
7169

72-
var chainID string
70+
var channelID string
7371
var serverAddr string
7472
var messages uint64
7573
var goroutines uint64
7674
var msgSize uint64
7775

7876
flag.StringVar(&serverAddr, "server", fmt.Sprintf("%s:%d", config.General.ListenAddress, config.General.ListenPort), "The RPC server to connect to.")
79-
flag.StringVar(&chainID, "chainID", provisional.TestChainID, "The chain ID to broadcast to.")
77+
flag.StringVar(&channelID, "channelID", provisional.TestChainID, "The channel ID to broadcast to.")
8078
flag.Uint64Var(&messages, "messages", 1, "The number of messages to broadcast.")
8179
flag.Uint64Var(&goroutines, "goroutines", 1, "The number of concurrent go routines to broadcast the messages on")
8280
flag.Uint64Var(&msgSize, "size", 1024, "The size in bytes of the data section for the payload")
@@ -104,13 +102,12 @@ func main() {
104102
for i := uint64(0); i < goroutines; i++ {
105103
go func(i uint64) {
106104
client, err := ab.NewAtomicBroadcastClient(conn).Broadcast(context.TODO())
107-
time.Sleep(10 * time.Second)
108105
if err != nil {
109106
fmt.Println("Error connecting:", err)
110107
return
111108
}
112109

113-
s := newBroadcastClient(client, chainID, signer)
110+
s := newBroadcastClient(client, channelID, signer)
114111
done := make(chan (struct{}))
115112
go func() {
116113
for i := uint64(0); i < msgsPerGo; i++ {

orderer/sample_clients/deliver_stdout/client.go

Lines changed: 49 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,7 @@
11
/*
2-
Copyright IBM Corp. 2016 All Rights Reserved.
2+
Copyright IBM Corp. All Rights Reserved.
33
4-
Licensed under the Apache License, Version 2.0 (the "License");
5-
you may not use this file except in compliance with the License.
6-
You may obtain a copy of the License at
7-
8-
http://www.apache.org/licenses/LICENSE-2.0
9-
10-
Unless required by applicable law or agreed to in writing, software
11-
distributed under the License is distributed on an "AS IS" BASIS,
12-
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
See the License for the specific language governing permissions and
14-
limitations under the License.
4+
SPDX-License-Identifier: Apache-2.0
155
*/
166

177
package main
@@ -20,12 +10,18 @@ import (
2010
"flag"
2111
"fmt"
2212
"math"
13+
"os"
2314

15+
"github.com/hyperledger/fabric/common/crypto"
16+
"github.com/hyperledger/fabric/common/localmsp"
2417
"github.com/hyperledger/fabric/common/tools/configtxgen/provisional"
18+
"github.com/hyperledger/fabric/common/tools/protolator"
19+
mspmgmt "github.com/hyperledger/fabric/msp/mgmt"
2520
"github.com/hyperledger/fabric/orderer/common/localconfig"
2621
cb "github.com/hyperledger/fabric/protos/common"
2722
ab "github.com/hyperledger/fabric/protos/orderer"
2823
"github.com/hyperledger/fabric/protos/utils"
24+
2925
"golang.org/x/net/context"
3026
"google.golang.org/grpc"
3127
)
@@ -37,44 +33,39 @@ var (
3733
)
3834

3935
type deliverClient struct {
40-
client ab.AtomicBroadcast_DeliverClient
41-
chainID string
36+
client ab.AtomicBroadcast_DeliverClient
37+
channelID string
38+
signer crypto.LocalSigner
39+
quiet bool
4240
}
4341

44-
func newDeliverClient(client ab.AtomicBroadcast_DeliverClient, chainID string) *deliverClient {
45-
return &deliverClient{client: client, chainID: chainID}
42+
func newDeliverClient(client ab.AtomicBroadcast_DeliverClient, channelID string, signer crypto.LocalSigner, quiet bool) *deliverClient {
43+
return &deliverClient{client: client, channelID: channelID, signer: signer, quiet: quiet}
4644
}
4745

48-
func seekHelper(chainID string, start *ab.SeekPosition, stop *ab.SeekPosition) *cb.Envelope {
49-
return &cb.Envelope{
50-
Payload: utils.MarshalOrPanic(&cb.Payload{
51-
Header: &cb.Header{
52-
ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{
53-
ChannelId: chainID,
54-
}),
55-
SignatureHeader: utils.MarshalOrPanic(&cb.SignatureHeader{}),
56-
},
57-
58-
Data: utils.MarshalOrPanic(&ab.SeekInfo{
59-
Start: start,
60-
Stop: stop,
61-
Behavior: ab.SeekInfo_BLOCK_UNTIL_READY,
62-
}),
63-
}),
46+
func (r *deliverClient) seekHelper(start *ab.SeekPosition, stop *ab.SeekPosition) *cb.Envelope {
47+
env, err := utils.CreateSignedEnvelope(cb.HeaderType_DELIVER_SEEK_INFO, r.channelID, r.signer, &ab.SeekInfo{
48+
Start: start,
49+
Stop: stop,
50+
Behavior: ab.SeekInfo_BLOCK_UNTIL_READY,
51+
}, 0, 0)
52+
if err != nil {
53+
panic(err)
6454
}
55+
return env
6556
}
6657

6758
func (r *deliverClient) seekOldest() error {
68-
return r.client.Send(seekHelper(r.chainID, oldest, maxStop))
59+
return r.client.Send(r.seekHelper(oldest, maxStop))
6960
}
7061

7162
func (r *deliverClient) seekNewest() error {
72-
return r.client.Send(seekHelper(r.chainID, newest, maxStop))
63+
return r.client.Send(r.seekHelper(newest, maxStop))
7364
}
7465

7566
func (r *deliverClient) seekSingle(blockNumber uint64) error {
7667
specific := &ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: blockNumber}}}
77-
return r.client.Send(seekHelper(r.chainID, specific, specific))
68+
return r.client.Send(r.seekHelper(specific, specific))
7869
}
7970

8071
func (r *deliverClient) readUntilClose() {
@@ -90,20 +81,39 @@ func (r *deliverClient) readUntilClose() {
9081
fmt.Println("Got status ", t)
9182
return
9283
case *ab.DeliverResponse_Block:
93-
fmt.Println("Received block: ", t.Block)
84+
if !r.quiet {
85+
fmt.Println("Received block: ")
86+
err := protolator.DeepMarshalJSON(os.Stdout, t.Block)
87+
if err != nil {
88+
fmt.Println(" Error pretty printing block: %s", err)
89+
}
90+
} else {
91+
fmt.Println("Received block: ", t.Block.Header.Number)
92+
}
9493
}
9594
}
9695
}
9796

9897
func main() {
9998
config := config.Load()
10099

101-
var chainID string
100+
// Load local MSP
101+
err := mspmgmt.LoadLocalMsp(config.General.LocalMSPDir, config.General.BCCSP, config.General.LocalMSPID)
102+
if err != nil { // Handle errors reading the config file
103+
fmt.Println("Failed to initialize local MSP:", err)
104+
os.Exit(0)
105+
}
106+
107+
signer := localmsp.NewSigner()
108+
109+
var channelID string
102110
var serverAddr string
103111
var seek int
112+
var quiet bool
104113

105114
flag.StringVar(&serverAddr, "server", fmt.Sprintf("%s:%d", config.General.ListenAddress, config.General.ListenPort), "The RPC server to connect to.")
106-
flag.StringVar(&chainID, "chainID", provisional.TestChainID, "The chain ID to deliver from.")
115+
flag.StringVar(&channelID, "channelID", provisional.TestChainID, "The channel ID to deliver from.")
116+
flag.BoolVar(&quiet, "quiet", false, "Only print the block number, will not attempt to print its block contents.")
107117
flag.IntVar(&seek, "seek", -2, "Specify the range of requested blocks."+
108118
"Acceptable values:"+
109119
"-2 (or -1) to start from oldest (or newest) and keep at it indefinitely."+
@@ -126,7 +136,7 @@ func main() {
126136
return
127137
}
128138

129-
s := newDeliverClient(client, chainID)
139+
s := newDeliverClient(client, channelID, signer, quiet)
130140
switch seek {
131141
case -2:
132142
err = s.seekOldest()

orderer/sample_clients/single_tx_client/single_tx_client.go

Lines changed: 0 additions & 166 deletions
This file was deleted.

protos/common/common.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ func (p *Payload) VariablyOpaqueFields() []string {
3030
var PayloadDataMap = map[int32]proto.Message{
3131
int32(HeaderType_CONFIG): &ConfigEnvelope{},
3232
int32(HeaderType_CONFIG_UPDATE): &ConfigUpdateEnvelope{},
33+
int32(HeaderType_MESSAGE): &ConfigValue{}, // Only used by broadcast_msg sample client
3334
}
3435

3536
func (p *Payload) VariablyOpaqueFieldProto(name string) (proto.Message, error) {

0 commit comments

Comments
 (0)