Skip to content

Commit

Permalink
Address more review, create client interface, add SIGINT handling,
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Sztandera <oss@kubuxu.com>
  • Loading branch information
Kubuxu committed May 23, 2024
1 parent 53a0381 commit 9ecfbf9
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 49 deletions.
18 changes: 16 additions & 2 deletions cmd/f3/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/urfave/cli/v2"
)
Expand All @@ -24,8 +27,19 @@ func main() {
},
}

if err := app.Run(os.Args); err != nil {
fmt.Printf("runtime error: +%v\n", err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT)

go func() {
<-sigChan
cancel()
}()

if err := app.RunContext(ctx, os.Args); err != nil {
fmt.Fprintf(os.Stderr, "runtime error: %+v\n", err)
os.Exit(1)
}
}
54 changes: 28 additions & 26 deletions host.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,34 @@ import (
"golang.org/x/xerrors"
)

type gpbfthost struct {
participant *gpbft.Participant
manifest Manifest
type client interface {
gpbft.Signer
gpbft.Verifier
broadcast func(context.Context, []byte) error
gpbft.Tracer

MessageQueue chan *gpbft.GMessage
SelfMessageQueue chan *gpbft.GMessage //for the future when self messages are async
BroadcastMessage(context.Context, []byte) error
IncommingMessages() <-chan *gpbft.GMessage
Logger() Logger
}

log Logger
type gpbfthost struct {
client
participant *gpbft.Participant
manifest Manifest

selfMessageQueue chan *gpbft.GMessage //for the future when self messages are async

alertTimer *time.Timer
runningCtx context.Context
log Logger
}

func newHost(id gpbft.ActorID, m Manifest, broadcast func(context.Context, []byte) error,
s gpbft.Signer, v gpbft.Verifier, log Logger) (*gpbfthost, error) {
func newHost(id gpbft.ActorID, m Manifest, client client) (*gpbfthost, error) {
h := &gpbfthost{
client: client,
manifest: m,
Signer: s,
Verifier: v,
MessageQueue: make(chan *gpbft.GMessage, 20),
SelfMessageQueue: make(chan *gpbft.GMessage, 20),
broadcast: broadcast,

log: log,
selfMessageQueue: make(chan *gpbft.GMessage, 20),
log: client.Logger(),
}

// create a stopped timer to facilitate alerts requested from gpbft
Expand All @@ -45,8 +46,8 @@ func newHost(id gpbft.ActorID, m Manifest, broadcast func(context.Context, []byt
<-h.alertTimer.C
}

log.Infof("starting host for P%d", id)
p, err := gpbft.NewParticipant(id, h, gpbft.WithTracer(tracer{log}))
h.log.Infof("starting host for P%d", id)
p, err := gpbft.NewParticipant(id, h, gpbft.WithTracer(client))

Check failure on line 50 in host.go

View workflow job for this annotation

GitHub Actions / Test

cannot use h (variable of type *gpbfthost) as gpbft.Host value in argument to gpbft.NewParticipant: *gpbfthost does not implement gpbft.Host (missing method GetChainForInstance)
if err != nil {
return nil, xerrors.Errorf("creating participant: %w", err)
}
Expand All @@ -67,10 +68,11 @@ func (h *gpbfthost) Run(ctx context.Context) error {
return xerrors.Errorf("starting a participant: %w", err)
}

messageQueue := h.client.IncommingMessages()
loop:
for {
select {
case msg := <-h.SelfMessageQueue:
case msg := <-h.selfMessageQueue:
_, err = h.participant.ReceiveMessage(msg, true)
default:
}
Expand All @@ -82,9 +84,13 @@ loop:
case <-h.alertTimer.C:
h.log.Infof("alarm fired")
err = h.participant.ReceiveAlarm()
case msg := <-h.SelfMessageQueue:
case msg := <-h.selfMessageQueue:
_, err = h.participant.ReceiveMessage(msg, true)
case msg := <-h.MessageQueue:
case msg, ok := <-messageQueue:
if !ok {
err = xerrors.Errorf("incoming messsage queue closed")
break loop
}
_, err = h.participant.ReceiveMessage(msg, false)
case <-ctx.Done():
return nil
Expand Down Expand Up @@ -142,7 +148,7 @@ func (h *gpbfthost) Broadcast(msg *gpbft.GMessage) {
if err != nil {
h.log.Errorf("marshalling GMessage: %+v", err)
}
err = h.broadcast(h.runningCtx, bw.Bytes())
err = h.client.BroadcastMessage(h.runningCtx, bw.Bytes())
if err != nil {
h.log.Errorf("broadcasting GMessage: %+v", err)
}
Expand Down Expand Up @@ -187,7 +193,3 @@ func (h *gpbfthost) MarshalPayloadForSigning(p *gpbft.Payload) []byte {
type tracer struct {
Logger
}

func (t tracer) Log(fmt string, args ...any) {
t.Debugf(fmt, args...)
}
82 changes: 61 additions & 21 deletions module.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"

logging "github.com/ipfs/go-log/v2"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
peer "github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -25,13 +26,38 @@ type Module struct {
ds datastore.Datastore
host host.Host
pubsub *pubsub.PubSub
verif gpbft.Verifier
sigs gpbft.Signer
ec ECBackend
log Logger

// topic is populated after Run is called
topic *pubsub.Topic
client moduleClient
}

type moduleClient struct {
gpbft.Verifier
gpbft.Signer
logger Logger
loggerWithSkip Logger

// Populated after Run is called
messageQueue <-chan *gpbft.GMessage
topic *pubsub.Topic
}

func (mc moduleClient) BroadcastMessage(ctx context.Context, msg []byte) error {
return mc.topic.Publish(ctx, msg)
}

func (mc moduleClient) IncommingMessages() <-chan *gpbft.GMessage {
return mc.messageQueue
}

// Log fulfills the gpbft.Tracer interface
func (mc moduleClient) Log(fmt string, args ...any) {
mc.loggerWithSkip.Debugf(fmt, args...)
}

func (mc moduleClient) Logger() Logger {
return mc.logger
}

// NewModule creates and setups new libp2p f3 module
Expand All @@ -43,6 +69,10 @@ func NewModule(ctx context.Context, id gpbft.ActorID, manifest Manifest, ds data
if err != nil {
return nil, xerrors.Errorf("creating CertStore: %w", err)
}
loggerWithSkip := log
if zapLogger, ok := log.(*logging.ZapEventLogger); ok {
loggerWithSkip = logging.WithSkip(zapLogger, 1)
}

m := Module{
Manifest: manifest,
Expand All @@ -52,10 +82,15 @@ func NewModule(ctx context.Context, id gpbft.ActorID, manifest Manifest, ds data
ds: ds,
host: h,
pubsub: ps,
verif: verif,
sigs: sigs,
ec: ec,
log: log,

client: moduleClient{
Verifier: verif,
Signer: sigs,
logger: log,
loggerWithSkip: loggerWithSkip,
},
}

return &m, nil
Expand All @@ -71,14 +106,14 @@ func (m *Module) setupPubsub() error {
if err != nil {
return xerrors.Errorf("could not join on pubsub topic: %s: %w", pubsubTopicName, err)
}
m.topic = topic
m.client.topic = topic
return nil
}

func (m *Module) teardownPubsub() error {
return multierr.Combine(
m.pubsub.UnregisterTopicValidator(m.Manifest.NetworkName.PubSubTopic()),
m.topic.Close(),
m.client.topic.Close(),
)
}

Expand All @@ -88,14 +123,15 @@ func (m *Module) Run(ctx context.Context) error {
return xerrors.Errorf("setting up pubsub: %w", err)
}

sub, err := m.topic.Subscribe()
sub, err := m.client.topic.Subscribe()
if err != nil {
return xerrors.Errorf("subscribing to topic: %w", err)
}

h, err := newHost(m.id, m.Manifest, func(ctx context.Context, b []byte) error {
return m.topic.Publish(ctx, b)
}, m.sigs, m.verif, m.log)
messageQueue := make(chan *gpbft.GMessage, 20)
m.client.messageQueue = messageQueue

h, err := newHost(m.id, m.Manifest, m.client)
if err != nil {
return xerrors.Errorf("creating gpbft host: %w", err)
}
Expand All @@ -117,14 +153,14 @@ loop:
m.log.Errorf("pubsub subscription.Next() returned an error: %+v", err)
break
}
var gmsg gpbft.GMessage
err = gmsg.UnmarshalCBOR(bytes.NewReader(msg.Data))
if err != nil {
m.log.Info("bad pubsub message: %+v", err)
gmsg, ok := msg.ValidatorData.(*gpbft.GMessage)
if !ok {
m.log.Errorf("invalid ValidatorData: %+v", msg.ValidatorData)
continue
}

select {
case h.MessageQueue <- &gmsg:
case messageQueue <- gmsg:
case <-ctx.Done():
break loop
}
Expand All @@ -141,6 +177,14 @@ var _ pubsub.ValidatorEx = (*Module)(nil).pubsubTopicValidator

// validator for the pubsub
func (m *Module) pubsubTopicValidator(ctx context.Context, pID peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
var gmsg gpbft.GMessage
err := gmsg.UnmarshalCBOR(bytes.NewReader(msg.Data))
if err != nil {
return pubsub.ValidationReject
}

// TODO more validation
msg.ValidatorData = &gmsg
return pubsub.ValidationAccept
}

Expand All @@ -151,12 +195,8 @@ type Logger interface {
Debugf(format string, args ...interface{})
Error(args ...interface{})
Errorf(format string, args ...interface{})
Fatal(args ...interface{})
Fatalf(format string, args ...interface{})
Info(args ...interface{})
Infof(format string, args ...interface{})
Panic(args ...interface{})
Panicf(format string, args ...interface{})
Warn(args ...interface{})
Warnf(format string, args ...interface{})
}

0 comments on commit 9ecfbf9

Please sign in to comment.