Skip to content

Commit

Permalink
Fix some nits from previous commit (#71)
Browse files Browse the repository at this point in the history
- move process out of main
- correct some grammar in log msg

Signed-off-by: Jay Guo <guojiannan1101@gmail.com>
  • Loading branch information
guoger committed Nov 6, 2020
1 parent 926e9ec commit 454c637
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 124 deletions.
106 changes: 1 addition & 105 deletions cmd/stupid/main.go
Original file line number Diff line number Diff line change
@@ -1,113 +1,9 @@
package main

import (
"fmt"
"os"
"strconv"
"time"

"github.com/guoger/stupid/pkg/infra"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

const loglevel = "STUPID_LOGLEVEL"

func main() {
logger := log.New()
logger.SetLevel(log.WarnLevel)
if customerLevel, customerSet := os.LookupEnv(loglevel); customerSet {
if lvl, err := log.ParseLevel(customerLevel); err == nil {
logger.SetLevel(lvl)
}
}
err := process(logger)
if err != nil {
logger.Error(err)
os.Exit(1)
}
os.Exit(0)
}

func process(logger *log.Logger) error {
if len(os.Args) != 3 {
return errors.Errorf("error input parameters for stupid: stupid config.yaml 500")
}
N, err := strconv.Atoi(os.Args[2])
if err != nil {
return errors.Errorf("error input parameters for stupid: stupid config.yaml 500")
}
config, err := infra.LoadConfig(os.Args[1])
if err != nil {
return err
}
crypto, err := config.LoadCrypto()
if err != nil {
return err
}
raw := make(chan *infra.Elements, 100)
signed := make([]chan *infra.Elements, len(config.Endorsers))
processed := make(chan *infra.Elements, 10)
envs := make(chan *infra.Elements, 10)
done := make(chan struct{})
finishCh := make(chan struct{})
errorCh := make(chan error, 10)
assember := &infra.Assembler{Signer: crypto}

for i := 0; i < len(config.Endorsers); i++ {
signed[i] = make(chan *infra.Elements, 10)
}

for i := 0; i < 5; i++ {
go assember.StartSigner(raw, signed, errorCh, done)
go assember.StartIntegrator(processed, envs, errorCh, done)
}

proposor, err := infra.CreateProposers(config.NumOfConn, config.ClientPerConn, config.Endorsers, logger)
if err != nil {
return err
}
proposor.Start(signed, processed, done, config)

broadcaster, err := infra.CreateBroadcasters(config.NumOfConn, config.Orderer, logger)
if err != nil {
return err
}
broadcaster.Start(envs, errorCh, done)

observer, err := infra.CreateObserver(config.Channel, config.Committer, crypto, logger)
if err != nil {
return err
}

start := time.Now()
go observer.Start(N, errorCh, finishCh, start)

for i := 0; i < N; i++ {
prop, err := infra.CreateProposal(
crypto,
config.Channel,
config.Chaincode,
config.Version,
config.Args...,
)
if err != nil {
errCP := errors.Wrapf(err, "error creating proposal")
return errCP
}
raw <- &infra.Elements{Proposal: prop}
}
for {
select {
case err = <-errorCh:
return err
case <-finishCh:
duration := time.Since(start)
close(done)

logger.Infof("Completed processing transactions.")
fmt.Printf("tx: %d, duration: %+v, tps: %f\n", N, duration, float64(N)/duration.Seconds())
return nil
}
}
infra.Main()
}
2 changes: 1 addition & 1 deletion e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ var _ = Describe("Mock test", func() {
cmd := exec.Command(stupidBin, config.Name(), "500")
stupidSession, err = gexec.Start(cmd, nil, nil)
Expect(err).NotTo(HaveOccurred())
Eventually(stupidSession.Err).Should(Say("error connect to invalid_addr"))
Eventually(stupidSession.Err).Should(Say("error connecting to invalid_addr"))
})
})
})
Expand Down
8 changes: 4 additions & 4 deletions pkg/infra/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func CreateGRPCClient(node Node) (*comm.GRPCClient, error) {
grpcClient, err := comm.NewGRPCClient(config)
//to do: unit test for this error, current fails to make case for this
if err != nil {
return nil, errors.Wrapf(err, "error connect to %s", node.Addr)
return nil, errors.Wrapf(err, "error connecting to %s", node.Addr)
}

return grpcClient, nil
Expand All @@ -53,7 +53,7 @@ func CreateEndorserClient(node Node) (peer.EndorserClient, error) {

conn, err := gRPCClient.NewConnection(node.Addr, func(tlsConfig *tls.Config) { tlsConfig.InsecureSkipVerify = true })
if err != nil {
return nil, errors.Wrapf(err, "error connect to %s", node.Addr)
return nil, errors.Wrapf(err, "error connecting to %s", node.Addr)
}

return peer.NewEndorserClient(conn), nil
Expand All @@ -67,7 +67,7 @@ func CreateBroadcastClient(node Node) (orderer.AtomicBroadcast_BroadcastClient,

conn, err := gRPCClient.NewConnection(node.Addr, func(tlsConfig *tls.Config) { tlsConfig.InsecureSkipVerify = true })
if err != nil {
return nil, errors.Wrapf(err, "error connect to %s", node.Addr)
return nil, errors.Wrapf(err, "error connecting to %s", node.Addr)
}

return orderer.NewAtomicBroadcastClient(conn).Broadcast(context.Background())
Expand All @@ -81,7 +81,7 @@ func CreateDeliverFilteredClient(node Node) (peer.Deliver_DeliverFilteredClient,

conn, err := gRPCClient.NewConnection(node.Addr, func(tlsConfig *tls.Config) { tlsConfig.InsecureSkipVerify = true })
if err != nil {
return nil, errors.Wrapf(err, "error connect to %s", node.Addr)
return nil, errors.Wrapf(err, "error connecting to %s", node.Addr)
}

return peer.NewDeliverClient(conn).DeliverFiltered(context.Background())
Expand Down
14 changes: 7 additions & 7 deletions pkg/infra/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@ import (

var _ = Describe("Client", func() {

Context("Should Error handle", func() {
Context("Client Error handling", func() {
dummy := infra.Node{
Addr: "invalid_addr",
}
It("for endorser", func() {
It("captures error from endorser", func() {
_, err := infra.CreateEndorserClient(dummy)
Expect(err).Should(MatchError(ContainSubstring("error connect to invalid_addr")))
Expect(err).Should(MatchError(ContainSubstring("error connecting to invalid_addr")))
})
It("for broadcaster", func() {
It("captures error from broadcaster", func() {
_, err := infra.CreateBroadcastClient(dummy)
Expect(err).Should(MatchError(ContainSubstring("error connect to invalid_addr")))
Expect(err).Should(MatchError(ContainSubstring("error connecting to invalid_addr")))
})
It("for DeliverFilter", func() {
It("captures error from DeliverFilter", func() {
_, err := infra.CreateDeliverFilteredClient(dummy)
Expect(err).Should(MatchError(ContainSubstring("error connect to invalid_addr")))
Expect(err).Should(MatchError(ContainSubstring("error connecting to invalid_addr")))
})
})

Expand Down
10 changes: 6 additions & 4 deletions pkg/infra/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ client_per_conn: 40

var _ = Describe("Config", func() {

Context("config", func() {
It("successful load", func() {
Context("good", func() {
It("successful loads", func() {
tlsFile, err := ioutil.TempFile("", "dummy-*.pem")
Expect(err).NotTo(HaveOccurred())
defer os.Remove(tlsFile.Name())
Expand Down Expand Up @@ -94,13 +94,15 @@ var _ = Describe("Config", func() {
_, err = c.LoadCrypto()
Expect(err).Should(MatchError(ContainSubstring("error loading priv key")))
})
})

It("Error Handle for config not there", func() {
Context("bad", func() {
It("fails to load missing config file", func() {
_, err := infra.LoadConfig("invalid_file")
Expect(err).Should(MatchError(ContainSubstring("invalid_file")))
})

It("Error Handle for tls", func() {
It("fails to load invalid config file", func() {

f, _ := ioutil.TempFile("", "config-*.yaml")
defer os.Remove(f.Name())
Expand Down
114 changes: 114 additions & 0 deletions pkg/infra/process.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package infra

import (
"fmt"
"os"
"strconv"
"time"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

const loglevel = "STUPID_LOGLEVEL"

func Main() {
logger := log.New()
logger.SetLevel(log.WarnLevel)
if customerLevel, customerSet := os.LookupEnv(loglevel); customerSet {
if lvl, err := log.ParseLevel(customerLevel); err == nil {
logger.SetLevel(lvl)
}
}
err := process(logger)
if err != nil {
logger.Error(err)
os.Exit(1)
}
os.Exit(0)
}

func process(logger *log.Logger) error {
if len(os.Args) != 3 {
return errors.Errorf("error input parameters for stupid: stupid config.yaml 500")
}
N, err := strconv.Atoi(os.Args[2])
if err != nil {
return errors.Errorf("error input parameters for stupid: stupid config.yaml 500")
}
config, err := LoadConfig(os.Args[1])
if err != nil {
return err
}
crypto, err := config.LoadCrypto()
if err != nil {
return err
}
raw := make(chan *Elements, 100)
signed := make([]chan *Elements, len(config.Endorsers))
processed := make(chan *Elements, 10)
envs := make(chan *Elements, 10)
done := make(chan struct{})
finishCh := make(chan struct{})
errorCh := make(chan error, 10)
assember := &Assembler{Signer: crypto}

for i := 0; i < len(config.Endorsers); i++ {
signed[i] = make(chan *Elements, 10)
}

for i := 0; i < 5; i++ {
go assember.StartSigner(raw, signed, errorCh, done)
go assember.StartIntegrator(processed, envs, errorCh, done)
}

proposor, err := CreateProposers(config.NumOfConn, config.ClientPerConn, config.Endorsers, logger)
if err != nil {
return err
}
proposor.Start(signed, processed, done, config)

broadcaster, err := CreateBroadcasters(config.NumOfConn, config.Orderer, logger)
if err != nil {
return err
}
broadcaster.Start(envs, errorCh, done)

observer, err := CreateObserver(config.Channel, config.Committer, crypto, logger)
if err != nil {
return err
}

start := time.Now()
go observer.Start(N, errorCh, finishCh, start)
go func() {
for i := 0; i < N; i++ {
prop, err := CreateProposal(
crypto,
config.Channel,
config.Chaincode,
config.Version,
config.Args...,
)
if err != nil {
errorCh <- errors.Wrapf(err, "error creating proposal")
return
}
raw <- &Elements{Proposal: prop}
}
}()

for {
select {
case err = <-errorCh:
return err
case <-finishCh:
duration := time.Since(start)
close(done)

logger.Infof("Completed processing transactions.")
fmt.Printf("tx: %d, duration: %+v, tps: %f\n", N, duration, float64(N)/duration.Seconds())
return nil
}
}
}
6 changes: 3 additions & 3 deletions pkg/infra/proposer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var _ = Describe("Proposer", func() {
Addr: "invalid_addr",
}
_, err := infra.CreateProposer(dummy, logger)
Expect(err).Should(MatchError(ContainSubstring("error connect to invalid_addr")))
Expect(err).Should(MatchError(ContainSubstring("error connecting to invalid_addr")))
})
})

Expand All @@ -47,12 +47,12 @@ var _ = Describe("Proposer", func() {
Expect(err).NotTo(HaveOccurred())
})

It("handle error ", func() {
It("captures connection errors", func() {
dummy := infra.Node{
Addr: "invalid_addr",
}
_, err := infra.CreateBroadcaster(dummy, logger)
Expect(err).Should(MatchError(ContainSubstring("error connect to invalid_addr")))
Expect(err).Should(MatchError(ContainSubstring("error connecting to invalid_addr")))
})
})
})

0 comments on commit 454c637

Please sign in to comment.