From 6b1b6033c6497d64db97d6c96f2dc01594fec425 Mon Sep 17 00:00:00 2001 From: Kostas Christidis Date: Thu, 15 Dec 2016 17:47:48 -0500 Subject: [PATCH] [FAB-1351] New chain config client for Kafka https://jira.hyperledger.org/browse/FAB-1351 As things stand right now, the configuration transaction posted by the broadcast_config sample client only works for solo. This is because the genesis block for the Kafka orderer contains extra keys. This changeset: 1. Introduces an extra flag that allows us to specify the consenter type, and then, by using the provisional bootstrapper, it creates the appropriate configuration transaction. 2. Refactors/simplifies the client to bring it more in line with the rest of the sample_clients in the `orderer` package and make future maintenance easier. Change-Id: I61ce5dfb36a11ab58d41a65267768cf0948263dc Signed-off-by: Kostas Christidis --- .../broadcast_config/broadcast.go | 61 ---------- .../sample_clients/broadcast_config/client.go | 108 ++++++++++++++++++ .../sample_clients/broadcast_config/main.go | 98 ---------------- .../broadcast_config/newchain.go | 11 +- 4 files changed, 111 insertions(+), 167 deletions(-) delete mode 100644 orderer/sample_clients/broadcast_config/broadcast.go create mode 100644 orderer/sample_clients/broadcast_config/client.go delete mode 100644 orderer/sample_clients/broadcast_config/main.go diff --git a/orderer/sample_clients/broadcast_config/broadcast.go b/orderer/sample_clients/broadcast_config/broadcast.go deleted file mode 100644 index 9513c34f073..00000000000 --- a/orderer/sample_clients/broadcast_config/broadcast.go +++ /dev/null @@ -1,61 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -import ( - "fmt" - "io" - - cb "github.com/hyperledger/fabric/protos/common" - ab "github.com/hyperledger/fabric/protos/orderer" - context "golang.org/x/net/context" -) - -func (c *clientImpl) broadcast(envelope *cb.Envelope) { - stream, err := c.rpc.Broadcast(context.Background()) - if err != nil { - panic(fmt.Errorf("Failed to invoke broadcast RPC: %s", err)) - } - go c.recvBroadcastReplies(stream) - - if err := stream.Send(envelope); err != nil { - panic(fmt.Errorf("Failed to send broadcast message to ordering service: %s", err)) - } - logger.Debugf("Sent broadcast message \"%v\" to ordering service\n", envelope) - - if err := stream.CloseSend(); err != nil { - panic(fmt.Errorf("Failed to close the send direction of the broadcast stream: %v", err)) - } - - <-c.doneChan // Wait till we've had a chance to get back a reply (or an error) - logger.Info("Client shutting down") -} - -func (c *clientImpl) recvBroadcastReplies(stream ab.AtomicBroadcast_BroadcastClient) { - defer close(c.doneChan) - for { - reply, err := stream.Recv() - if err == io.EOF { - return - } - if err != nil { - panic(fmt.Errorf("Failed to receive a broadcast reply from orderer: %v", err)) - } - logger.Info("Broadcast reply from orderer:", reply.Status.String()) - break - } -} diff --git a/orderer/sample_clients/broadcast_config/client.go b/orderer/sample_clients/broadcast_config/client.go new file mode 100644 index 00000000000..84437a7263d --- /dev/null +++ b/orderer/sample_clients/broadcast_config/client.go @@ -0,0 +1,108 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "flag" + "fmt" + + "google.golang.org/grpc" + + "github.com/hyperledger/fabric/orderer/localconfig" + cb "github.com/hyperledger/fabric/protos/common" + ab "github.com/hyperledger/fabric/protos/orderer" +) + +var conf *config.TopLevel + +type broadcastClient struct { + ab.AtomicBroadcast_BroadcastClient +} + +func (bc *broadcastClient) broadcast(env *cb.Envelope) error { + var err error + var resp *ab.BroadcastResponse + + err = bc.Send(env) + if err != nil { + return err + } + + resp, err = bc.Recv() + if err != nil { + return err + } + + fmt.Println("Status:", resp) + return nil +} + +// cmdImpl holds the command and its arguments. +type cmdImpl struct { + name string + args argsImpl +} + +// argsImpl holds all the possible arguments for all possible commands. +type argsImpl struct { + consensusType string + creationPolicy string + chainID string +} + +func init() { + conf = config.Load() +} + +func main() { + cmd := new(cmdImpl) + var srv string + + flag.StringVar(&srv, "server", fmt.Sprintf("%s:%d", conf.General.ListenAddress, conf.General.ListenPort), "The RPC server to connect to.") + flag.StringVar(&cmd.name, "cmd", "newChain", "The action that this client is requesting via the config transaction.") + flag.StringVar(&cmd.args.consensusType, "consensusType", conf.General.OrdererType, "In case of a newChain command, the type of consensus the ordering service is running on.") + flag.StringVar(&cmd.args.creationPolicy, "creationPolicy", "AcceptAllPolicy", "In case of a newChain command, the chain creation policy this request should be validated against.") + flag.StringVar(&cmd.args.chainID, "chainID", "NewChainID", "In case of a newChain command, the chain ID to create.") + flag.Parse() + + conn, err := grpc.Dial(srv, grpc.WithInsecure()) + defer func() { + _ = conn.Close() + }() + if err != nil { + fmt.Println("Error connecting:", err) + return + } + + client, err := ab.NewAtomicBroadcastClient(conn).Broadcast(context.TODO()) + if err != nil { + fmt.Println("Error connecting:", err) + return + } + + bc := &broadcastClient{client} + + switch cmd.name { + case "newChain": + env := newChainRequest(cmd.args.consensusType, cmd.args.creationPolicy, cmd.args.chainID) + fmt.Println("Requesting the creation of chain", cmd.args.chainID) + fmt.Println(bc.broadcast(env)) + default: + panic("Invalid command given") + } +} diff --git a/orderer/sample_clients/broadcast_config/main.go b/orderer/sample_clients/broadcast_config/main.go deleted file mode 100644 index ea18f2ce152..00000000000 --- a/orderer/sample_clients/broadcast_config/main.go +++ /dev/null @@ -1,98 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -import ( - "flag" - "os" - "strings" - - ab "github.com/hyperledger/fabric/protos/orderer" - logging "github.com/op/go-logging" - "google.golang.org/grpc" -) - -const pkgName = "orderer/broadcast_config" - -var logger *logging.Logger - -// Include here all the possible arguments for a command -type argsImpl struct { - creationPolicy string - chainID string -} - -// This holds the command and its arguments -type cmdImpl struct { - cmd string - args argsImpl -} - -type configImpl struct { - logLevel logging.Level - server string - cmd cmdImpl -} - -type clientImpl struct { - config configImpl - rpc ab.AtomicBroadcastClient - doneChan chan struct{} -} - -func main() { - var loglevel string - - client := &clientImpl{doneChan: make(chan struct{})} - - backend := logging.NewLogBackend(os.Stderr, "", 0) - logging.SetBackend(backend) - formatter := logging.MustStringFormatter("[%{time:15:04:05}] %{shortfile:18s}: %{color}[%{level:-5s}]%{color:reset} %{message}") - logging.SetFormatter(formatter) - logger = logging.MustGetLogger(pkgName) - - flag.StringVar(&loglevel, "loglevel", "info", - "The logging level. (Suggested values: info, debug)") - flag.StringVar(&client.config.server, "server", - "127.0.0.1:7050", "The RPC server to connect to.") - flag.StringVar(&client.config.cmd.cmd, "cmd", "new-chain", - "The action that this client is requesting via the config transaction.") - flag.StringVar(&client.config.cmd.args.creationPolicy, "creationPolicy", "AcceptAllPolicy", - "In case of a new-chain command, the chain createion policy this request should be validated against.") - flag.StringVar(&client.config.cmd.args.chainID, "chainID", "NewChainID", - "In case of a new-chain command, the chain ID to create.") - flag.Parse() - - client.config.logLevel, _ = logging.LogLevel(strings.ToUpper(loglevel)) - logging.SetLevel(client.config.logLevel, logger.Module) - - conn, err := grpc.Dial(client.config.server, grpc.WithInsecure()) - if err != nil { - logger.Fatalf("Client did not connect to %s: %v", client.config.server, err) - } - defer conn.Close() - client.rpc = ab.NewAtomicBroadcastClient(conn) - - switch client.config.cmd.cmd { - case "new-chain": - envelope := newChainRequest(client.config.cmd.args.creationPolicy, client.config.cmd.args.chainID) - logger.Infof("Requesting the creation of chain \"%s\"", client.config.cmd.args.chainID) - client.broadcast(envelope) - default: - panic("Invalid cmd given") - } -} diff --git a/orderer/sample_clients/broadcast_config/newchain.go b/orderer/sample_clients/broadcast_config/newchain.go index bd141b3ed38..1426b9abe8b 100644 --- a/orderer/sample_clients/broadcast_config/newchain.go +++ b/orderer/sample_clients/broadcast_config/newchain.go @@ -18,18 +18,13 @@ package main import ( "github.com/hyperledger/fabric/orderer/common/bootstrap/provisional" - "github.com/hyperledger/fabric/orderer/localconfig" cb "github.com/hyperledger/fabric/protos/common" "github.com/hyperledger/fabric/protos/utils" ) -var genesisBlock *cb.Block - -func init() { - genesisBlock = provisional.New(config.Load()).GenesisBlock() -} - -func newChainRequest(creationPolicy, newChainID string) *cb.Envelope { +func newChainRequest(consensusType, creationPolicy, newChainID string) *cb.Envelope { + conf.General.OrdererType = consensusType + genesisBlock := provisional.New(conf).GenesisBlock() oldGenesisTx := utils.ExtractEnvelopeOrPanic(genesisBlock, 0) oldGenesisTxPayload := utils.ExtractPayloadOrPanic(oldGenesisTx) oldConfigEnv := utils.UnmarshalConfigurationEnvelopeOrPanic(oldGenesisTxPayload.Data)