Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/f3/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ var runCmd = cli.Command{
return xerrors.Errorf("creating module: %w", err)
}

initialInstance := c.Uint64("id")
initialInstance := c.Uint64("instance")
return module.Run(initialInstance, ctx)
},
}
Expand Down
31 changes: 20 additions & 11 deletions f3.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ type F3 struct {
ec ECBackend
log Logger

client *clientImpl
client *client
}

type clientImpl struct {
id gpbft.ActorID
nn gpbft.NetworkName
type client struct {
certstore *certstore.Store
id gpbft.ActorID
nn gpbft.NetworkName

gpbft.Verifier
gpbft.SignerWithMarshaler
logger Logger
Expand All @@ -45,7 +47,7 @@ type clientImpl struct {
topic *pubsub.Topic
}

func (mc *clientImpl) BroadcastMessage(ctx context.Context, mb *gpbft.MessageBuilder) error {
func (mc *client) BroadcastMessage(ctx context.Context, mb *gpbft.MessageBuilder) error {
msg, err := mb.Build(mc.nn, mc.SignerWithMarshaler, mc.id)
if err != nil {
if errors.Is(err, gpbft.ErrNoPower) {
Expand All @@ -59,21 +61,26 @@ func (mc *clientImpl) BroadcastMessage(ctx context.Context, mb *gpbft.MessageBui
if err != nil {
mc.Log("marshalling GMessage: %+v", err)
}
return mc.topic.Publish(ctx, bw.Bytes())
err = mc.topic.Publish(ctx, bw.Bytes())
if err != nil {
return xerrors.Errorf("publishing on topic: %w", err)
}
return nil

}

func (mc *clientImpl) IncomingMessages() <-chan gpbft.ValidatedMessage {
func (mc *client) IncomingMessages() <-chan gpbft.ValidatedMessage {
return mc.messageQueue
}

var _ gpbft.Tracer = (*clientImpl)(nil)
var _ gpbft.Tracer = (*client)(nil)

// Log fulfills the gpbft.Tracer interface
func (mc *clientImpl) Log(fmt string, args ...any) {
func (mc *client) Log(fmt string, args ...any) {
mc.loggerWithSkip.Debugf(fmt, args...)
}

func (mc *clientImpl) Logger() Logger {
func (mc *client) Logger() Logger {
return mc.logger
}

Expand Down Expand Up @@ -101,7 +108,8 @@ func New(ctx context.Context, id gpbft.ActorID, manifest Manifest, ds datastore.
ec: ec,
log: log,

client: &clientImpl{
client: &client{
certstore: cs,
nn: manifest.NetworkName,
id: id,
Verifier: verif,
Expand All @@ -128,6 +136,7 @@ func (m *F3) setupPubsub(runner *gpbftRunner) error {
}
validatedMessage, err := runner.ValidateMessage(&gmsg)
if errors.Is(err, gpbft.ErrValidationInvalid) {
m.log.Debugf("validation error during validation: %+v", err)
return pubsub.ValidationReject
}
if err != nil {
Expand Down
21 changes: 15 additions & 6 deletions gpbft/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gpbft
import (
"errors"
"fmt"
"runtime/debug"
"sort"
"sync"

Expand Down Expand Up @@ -51,12 +52,20 @@ func (v *validatedMessage) Message() *GMessage {

var _ Receiver = (*Participant)(nil)

func newPanicError(panicCause any) *PanicError {
return &PanicError{
Err: panicCause,
stackTrace: string(debug.Stack()),
}
}

type PanicError struct {
Err any
Err any
stackTrace string
}

func (e *PanicError) Error() string {
return fmt.Sprintf("participant panicked: %v", e.Err)
return fmt.Sprintf("participant panicked: %v\n%v", e.Err, e.stackTrace)
}

func NewParticipant(host Host, o ...Option) (*Participant, error) {
Expand All @@ -79,7 +88,7 @@ func (p *Participant) StartInstance(instance uint64) (err error) {
defer p.apiMutex.Unlock()
defer func() {
if r := recover(); r != nil {
err = &PanicError{Err: r}
err = newPanicError(r)
}
}()

Expand Down Expand Up @@ -122,7 +131,7 @@ func (p *Participant) ValidateMessage(msg *GMessage) (valid ValidatedMessage, er
// The instance mutex is taken when appropriate by inner methods.
defer func() {
if r := recover(); r != nil {
err = &PanicError{Err: r}
err = newPanicError(r)
}
}()

Expand All @@ -146,7 +155,7 @@ func (p *Participant) ReceiveMessage(vmsg ValidatedMessage) (err error) {
defer p.apiMutex.Unlock()
defer func() {
if r := recover(); r != nil {
err = &PanicError{Err: r}
err = newPanicError(r)
}
}()
msg := vmsg.Message()
Expand Down Expand Up @@ -177,7 +186,7 @@ func (p *Participant) ReceiveAlarm() (err error) {
defer p.apiMutex.Unlock()
defer func() {
if r := recover(); r != nil {
err = &PanicError{Err: r}
err = newPanicError(r)
}
}()

Expand Down
72 changes: 56 additions & 16 deletions host.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,16 @@ import (
"context"
"time"

"github.com/filecoin-project/go-f3/certs"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/sim"
"golang.org/x/xerrors"
)

type Client interface {
gpbft.SignerWithMarshaler
gpbft.Verifier
gpbft.Tracer

BroadcastMessage(context.Context, *gpbft.MessageBuilder) error
IncomingMessages() <-chan gpbft.ValidatedMessage
Logger() Logger
}

// gpbftRunner is responsible for running gpbft.Participant, taking in all concurrent events and
// passing them to gpbft in a single thread.
type gpbftRunner struct {
client Client
client *client
participant *gpbft.Participant
manifest Manifest

Expand All @@ -35,7 +26,7 @@ type gpbftRunner struct {
// gpbftHost is a newtype of gpbftRunner exposing APIs required by the gpbft.Participant
type gpbftHost gpbftRunner

func newRunner(id gpbft.ActorID, m Manifest, client Client) (*gpbftRunner, error) {
func newRunner(id gpbft.ActorID, m Manifest, client *client) (*gpbftRunner, error) {
runner := &gpbftRunner{
client: client,
manifest: m,
Expand Down Expand Up @@ -118,14 +109,30 @@ func (h *gpbftRunner) ValidateMessage(msg *gpbft.GMessage) (gpbft.ValidatedMessa
// ReceiveDecision (or known to be final via some other channel).
func (h *gpbftHost) GetProposalForInstance(instance uint64) (*gpbft.SupplementalData, gpbft.ECChain, error) {
// TODO: this is just a complete fake

pt, _, err := h.GetCommitteeForInstance(0)
if err != nil {
return nil, nil, xerrors.Errorf("getting power table: %w", err)
}
ptCid, err := certs.MakePowerTableCID(pt.Entries)
if err != nil {
return nil, nil, xerrors.Errorf("computing power table CID: %w", err)
}

ts := sim.NewTipSetGenerator(1)
chain, err := gpbft.NewChain(gpbft.TipSet{Epoch: 0, Key: ts.Sample()}, gpbft.TipSet{Epoch: 1, Key: ts.Sample()})
chain, err := gpbft.NewChain(
gpbft.TipSet{Epoch: 0, Key: ts.Sample(), PowerTable: ptCid},
gpbft.TipSet{Epoch: 1, Key: ts.Sample(), PowerTable: ptCid},
)
if err != nil {
return nil, nil, err
return nil, nil, xerrors.Errorf("geenrating chain: %w", err)
}
sd := &gpbft.SupplementalData{
PowerTable: ptCid,
}

// TODO: use lookback to return the correct next power table commitment and commitments hash.
return new(gpbft.SupplementalData), chain, nil
return sd, chain, nil
}

func (h *gpbftHost) GetCommitteeForInstance(instance uint64) (*gpbft.PowerTable, []byte, error) {
Expand Down Expand Up @@ -178,10 +185,43 @@ func (h *gpbftHost) SetAlarm(at time.Time) {
// E.g. this might be: finalised tipset timestamp + epoch duration + stabilisation delay.
func (h *gpbftHost) ReceiveDecision(decision *gpbft.Justification) time.Time {
h.log.Infof("got decision: %+v", decision)
//TODO propagate and save this for use in GetCanonicalChain
err := h.saveDecision(decision)
if err != nil {
h.log.Errorf("error while saving decision: %+v", err)
}

return time.Now().Add(2 * time.Second)
}

func (h *gpbftHost) saveDecision(decision *gpbft.Justification) error {
instance := decision.Vote.Instance
current, _, err := h.GetCommitteeForInstance(instance)
if err != nil {
return xerrors.Errorf("getting commitee for current instance %d: %w", instance, err)
}

next, _, err := h.GetCommitteeForInstance(instance + 1)
if err != nil {
return xerrors.Errorf("getting commitee for next instance %d: %w", instance+1, err)
}
powerDiff := certs.MakePowerTableDiff(current.Entries, next.Entries)

cert, err := certs.NewFinalityCertificate(powerDiff, decision)
if err != nil {
return xerrors.Errorf("forming certificate out of decision: %w", err)
}
_, _, _, err = certs.ValidateFinalityCertificates(h, h.NetworkName(), current.Entries, decision.Vote.Instance, cert.ECChain.Base())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: you don't need to pass the base. If you pass nil, it'll just ignore it.

if err != nil {
return xerrors.Errorf("certificate is invalid: %w", err)
}

err = h.client.certstore.Put(h.runningCtx, cert)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, we should spend the extra time and validate this certificate. We can either do that inside the certstore, or here.

if err != nil {
return xerrors.Errorf("saving ceritifcate in a store: %w", err)
}
return nil
}

// MarshalPayloadForSigning marshals the given payload into the bytes that should be signed.
// This should usually call `Payload.MarshalForSigning(NetworkName)` except when testing as
// that method is slow (computes a merkle tree that's necessary for testing).
Expand Down