Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

testreplica: Use first available port for TransactionReceiver listener #511

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions cmd/bench/cmd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"context"
"encoding/csv"
"fmt"
"net"
gonet "net"
"os"
"strconv"
"time"
Expand Down Expand Up @@ -172,10 +172,13 @@ func runNode() error {
return es.Errorf("could not create node: %w", err)
}

txReceiver := transactionreceiver.NewTransactionReceiver(node, "mempool", logger)
if err := txReceiver.Start(TxReceiverBasePort + ownNumericID); err != nil {
return es.Errorf("could not start transaction receiver: %w", err)
txReceiverListener, err := gonet.Listen("tcp", fmt.Sprintf(":%v", TxReceiverBasePort+ownNumericID))
if err != nil {
return fmt.Errorf("could not create transaction receiver listener: %w", err)
}

txReceiver := transactionreceiver.NewTransactionReceiver(node, "mempool", logger)
txReceiver.Start(txReceiverListener)
defer txReceiver.Stop()

if err := benchApp.Start(); err != nil {
Expand Down Expand Up @@ -230,7 +233,7 @@ func getPortStr(addressStr string) (string, error) {
return "", err
}

_, portStr, err := net.SplitHostPort(addrStr)
_, portStr, err := gonet.SplitHostPort(addrStr)
if err != nil {
return "", err
}
Expand Down
27 changes: 11 additions & 16 deletions pkg/deploytest/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"
"crypto"
"fmt"
"net"
"path/filepath"
"runtime"
"sync"
Expand Down Expand Up @@ -183,17 +184,25 @@ func (d *Deployment) Run(ctx context.Context) (nodeErrors []error, heapObjects i

// Start the Mir nodes.
nodeWg.Add(len(d.TestReplicas))
trAddrs := make(map[t.NodeID]string, len(d.TestReplicas))
for i, testReplica := range d.TestReplicas {
i, testReplica := i, testReplica

trListener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
nodeErrors[i] = err
return nodeErrors, 0, 0
}
trAddrs[testReplica.ID] = fmt.Sprintf("127.0.0.1:%v", trListener.Addr().(*net.TCPAddr).Port)

// Start the replica in a separate goroutine.
start := make(chan struct{})
go func() {
defer nodeWg.Done()

<-start
testReplica.Config.Logger.Log(logging.LevelDebug, "running")
nodeErrors[i] = testReplica.Run(ctx2)
nodeErrors[i] = testReplica.Run(ctx2, trListener)
if err := nodeErrors[i]; err != nil {
testReplica.Config.Logger.Log(logging.LevelError, "exit with error", "err", errstack.ToString(err))
} else {
Expand All @@ -220,7 +229,7 @@ func (d *Deployment) Run(ctx context.Context) (nodeErrors []error, heapObjects i
go func(c *dummyclient.DummyClient) {
defer clientWg.Done()

c.Connect(ctx2, d.localTransactionreceiverAddrs())
c.Connect(ctx2, trAddrs)
submitDummyTransactions(ctx2, c, d.TestConfig.NumNetTXs)
c.Disconnect()
}(client)
Expand Down Expand Up @@ -258,20 +267,6 @@ func (d *Deployment) EventLogFiles() map[t.NodeID]string {
return logFiles
}

// localTransactionreceiverAddrs computes network addresses and ports for the Transactionreceivers at all replicas and returns
// an address map.
// It is assumed that node ID strings must be parseable to decimal numbers.
// Each test replica is on the local machine - 127.0.0.1
func (d *Deployment) localTransactionreceiverAddrs() map[t.NodeID]string {

addrs := make(map[t.NodeID]string, len(d.TestReplicas))
for i, tr := range d.TestReplicas {
addrs[tr.ID] = fmt.Sprintf("127.0.0.1:%d", TXListenPort+i)
}

return addrs
}

// submitDummyTransactions submits n dummy transactions using client.
// It returns when all transactions have been submitted or when ctx is done.
func submitDummyTransactions(ctx context.Context, client *dummyclient.DummyClient, n int) {
Expand Down
21 changes: 6 additions & 15 deletions pkg/deploytest/testreplica.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package deploytest
import (
"context"
"fmt"
gonet "net"
"path/filepath"
"strconv"
"sync"

es "github.com/go-errors/errors"
Expand Down Expand Up @@ -67,7 +67,7 @@ func (tr *TestReplica) EventLogFile() string {
// The function blocks until the replica stops.
// The replica stops when stopC is closed.
// Run returns the error returned by the run of the underlying Mir node.
func (tr *TestReplica) Run(ctx context.Context) error {
func (tr *TestReplica) Run(ctx context.Context, txReceiverListener gonet.Listener) error {

// Initialize recording of events.
interceptor, err := eventlog.NewRecorder(
Expand Down Expand Up @@ -104,17 +104,8 @@ func (tr *TestReplica) Run(ctx context.Context) error {
}

// Create a Transactionreceiver for transactions coming over the network.
txreceiver := transactionreceiver.NewTransactionReceiver(node, tr.FakeTXDestModule, logging.Decorate(tr.Config.Logger, "TxRec: "))

// TODO: do not assume that node IDs are integers.
p, err := strconv.Atoi(tr.ID.Pb())
if err != nil {
return es.Errorf("error converting node ID %s: %w", tr.ID, err)
}
err = txreceiver.Start(TXListenPort + p)
if err != nil {
return es.Errorf("error starting transaction receiver: %w", err)
}
txReceiver := transactionreceiver.NewTransactionReceiver(node, tr.FakeTXDestModule, logging.Decorate(tr.Config.Logger, "TxRec: "))
txReceiver.Start(txReceiverListener)

// Initialize WaitGroup for the replica's transaction submission thread.
var wg sync.WaitGroup
Expand Down Expand Up @@ -149,8 +140,8 @@ func (tr *TestReplica) Run(ctx context.Context) error {
tr.Config.Logger.Log(logging.LevelDebug, "Node run returned!")

// Stop the transaction receiver.
txreceiver.Stop()
if err := txreceiver.ServerError(); err != nil {
txReceiver.Stop()
if err := txReceiver.ServerError(); err != nil {
return es.Errorf("transaction receiver returned server error: %w", err)
}

Expand Down
18 changes: 4 additions & 14 deletions pkg/transactionreceiver/transactionreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package transactionreceiver
import (
"fmt"
"net"
"strconv"
"sync"

es "github.com/go-errors/errors"
Expand Down Expand Up @@ -113,32 +112,23 @@ func (rr *TransactionReceiver) Listen(srv TransactionReceiver_ListenServer) erro
}

// Start starts the TransactionReceiver by initializing and starting the internal gRPC server,
// listening on the passed port.
// listening on the passed net.Listener.
// Before ths method is called, no client connections are accepted.
func (rr *TransactionReceiver) Start(port int) error {
func (rr *TransactionReceiver) Start(listener net.Listener) {

rr.logger.Log(logging.LevelInfo, fmt.Sprintf("Listening for transaction connections on port %d", port))
rr.logger.Log(logging.LevelInfo, fmt.Sprintf("Listening for transaction connections on %v", listener.Addr().String()))

// Create a gRPC server and assign it the logic of this TransactionReceiver.
rr.grpcServer = grpc.NewServer()
RegisterTransactionReceiverServer(rr.grpcServer, rr)

// Start listening on the network
conn, err := net.Listen("tcp", ":"+strconv.Itoa(port))
if err != nil {
return es.Errorf("failed to listen for connections on port %d: %w", port, err)
}

// Start the gRPC server in a separate goroutine.
// When the server stops, it will write its exit error into gt.grpcServerError.
rr.grpcServerWg.Add(1)
go func() {
rr.grpcServerError = rr.grpcServer.Serve(conn)
rr.grpcServerError = rr.grpcServer.Serve(listener)
rr.grpcServerWg.Done()
}()

// If we got all the way here, no error occurred.
return nil
}

// Stop stops the own gRPC server (preventing further incoming connections).
Expand Down