Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sync): Implement a subjectiveHead atomic pointer on the node to use #108

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
43 changes: 17 additions & 26 deletions headertest/dummy_header.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,27 @@
package headertest

import (
"bytes"
"crypto/rand"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"math"
"testing"
"time"

"golang.org/x/crypto/sha3"

"github.com/celestiaorg/go-header"
)

var ErrDummyVerify = errors.New("dummy verify error")
var ErrVerificationFailed = errors.New("forced failure")

type DummyHeader struct {
Chainid string
PreviousHash header.Hash
HeightI uint64
Timestamp time.Time

hash header.Hash
HashI header.Hash

// VerifyFailure allows for testing scenarios where a header would fail
// verification. When set to true, it forces a failure.
Expand All @@ -40,10 +39,6 @@ func RandDummyHeader(t *testing.T) *DummyHeader {
HeightI: randUint63(),
Timestamp: time.Now().UTC(),
}
err := dh.rehash()
if err != nil {
t.Fatal(err)
}
return dh
}

Expand All @@ -60,22 +55,7 @@ func (d *DummyHeader) ChainID() string {
}

func (d *DummyHeader) Hash() header.Hash {
if len(d.hash) == 0 {
if err := d.rehash(); err != nil {
panic(err)
}
}
return d.hash
}

func (d *DummyHeader) rehash() error {
b, err := d.MarshalBinary()
if err != nil {
return err
}
hash := sha3.Sum512(b)
d.hash = hash[:]
return nil
return d.HashI
}

func (d *DummyHeader) Height() uint64 {
Expand All @@ -101,7 +81,18 @@ func (d *DummyHeader) IsExpired(period time.Duration) bool {

func (d *DummyHeader) Verify(hdr *DummyHeader) error {
if hdr.VerifyFailure {
return &header.VerifyError{Reason: ErrDummyVerify, SoftFailure: hdr.SoftFailure}
return &header.VerifyError{Reason: ErrVerificationFailed, SoftFailure: hdr.SoftFailure}
}

// if adjacent, check PreviousHash -- this check is necessary
// to mock fork-following scenarios with the dummy header.
if hdr.Height() == d.Height()+1 {
if !bytes.Equal(hdr.PreviousHash, d.Hash()) {
return &header.VerifyError{
Reason: fmt.Errorf("adjacent verify failure on header at height %d, err: %x != %x",
hdr.Height(), hdr.PreviousHash, d.Hash()),
}
}
}
return nil
}
Expand Down
10 changes: 9 additions & 1 deletion headertest/dummy_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ func NewTestSuite(t *testing.T) *DummySuite {
}
}

func NewTestSuiteWithHead(t *testing.T, head *DummyHeader) *DummySuite {
return &DummySuite{
t: t,
head: head,
}
}

func (s *DummySuite) Head() *DummyHeader {
if s.head == nil {
s.head = s.genesis()
Expand All @@ -46,7 +53,7 @@ func (s *DummySuite) NextHeader() *DummyHeader {
dh.HeightI = s.head.Height() + 1
dh.PreviousHash = s.head.Hash()
dh.Chainid = s.head.ChainID()
_ = dh.rehash()
dh.HashI = RandBytes(64)
s.head = dh
return s.head
}
Expand All @@ -56,5 +63,6 @@ func (s *DummySuite) genesis() *DummyHeader {
HeightI: 1,
Timestamp: time.Now().Add(-10 * time.Second).UTC(),
Chainid: "test",
HashI: RandBytes(64),
}
}
4 changes: 2 additions & 2 deletions headertest/verify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ func TestVerify(t *testing.T) {
untrusted.VerifyFailure = true
return untrusted
},
err: ErrDummyVerify,
err: ErrVerificationFailed,
},
{
prepare: func() *DummyHeader {
untrusted := next()
untrusted.VerifyFailure = true
return untrusted
},
err: ErrDummyVerify,
err: ErrVerificationFailed,
soft: true, // soft because non-adjacent
},
{
Expand Down
165 changes: 165 additions & 0 deletions sync/fork_test/fork.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package main

import (
"context"
"testing"
"time"

logging "github.com/ipfs/go-log/v2"

"github.com/celestiaorg/go-header"
"github.com/celestiaorg/go-header/headertest"
"github.com/celestiaorg/go-header/local"
"github.com/celestiaorg/go-header/store"
"github.com/celestiaorg/go-header/sync"
)

// This program is for test purposes only. See TestForkFollowingPrevention
// for further context.
//
// This program runs an instance of a syncer against a modified p2p Exchange
// that is designed to serve it a fork instead of the canonical chain.
func main() {
t := &testing.T{}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

suite := headertest.NewTestSuite(t)
head := suite.Head()

// set up syncer with a malicious peer as its remote peer
ee := newEclipsedExchange(ctx, t, head)

localStore := store.NewTestStore(ctx, t, head)
syncer, err := sync.NewSyncer[*headertest.DummyHeader](
local.NewExchange[*headertest.DummyHeader](ee),
localStore,
headertest.NewDummySubscriber(),
// TrustingPeriod can be set to a nanosecond so even if the head
// given by the trusted peer expires by the time `subjectiveHead` is
// called again, it will still call Head on the `eclipsedPeer`
// which will return the same head as before.
sync.WithTrustingPeriod(time.Nanosecond),
)
if err != nil {
panic(err)
}

// generate a good canonical chain
canonical := suite.GenDummyHeaders(99)

// give good headers to the trusted peer in order to return a good subjective head
// to the syncer upon its start
err = ee.appendToTrusted(ctx, canonical...)
if err != nil {
panic(err)
}

// generate a fork starting at block height 50 of the canonical chain
fork := canonical[:50]
maliciousSuite := headertest.NewTestSuiteWithHead(t, fork[len(fork)-1])
// generate 50 blocks on the fork
fork = append(fork, maliciousSuite.GenDummyHeaders(50)...)
// give bad headers to the malicious (eclipsing) peer in order
// to attempt to get syncer to follow a fork
err = ee.appendToEclipsedExchange(ctx, fork...)
if err != nil {
panic(err)
}

_, err = ee.trustedPeer.GetByHeight(ctx, 100)
if err != nil {
panic(err)
}
_, err = ee.eclipsedPeer.GetByHeight(ctx, 100)
if err != nil {
panic(err)
}

logging.Logger("sync")

err = syncer.Start(ctx)
if err != nil {
panic(err)
}

// this sleep is necessary to allow the syncer to trigger a job
// as calling SyncWait prematurely may falsely return without error
// as the syncer has not yet registered a sync job.
//time.Sleep(time.Millisecond * 100)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be removed?

time.Sleep(time.Millisecond * 500)
syncer.SyncWait(ctx) //nolint:errcheck
}

// eclipsedExchange is an exchange that can serve a good Head to the syncer
// but attempts to "eclipse" the syncer by serving it a fork as it requests
// headers between its storeHead --> subjectiveHead.
type eclipsedExchange struct {
// trusted peer that serves a good Head to the syncer
trustedPeer header.Store[*headertest.DummyHeader]
// bad peers who attempt to eclipse the syncer and get it to follow a fork
eclipsedPeer header.Store[*headertest.DummyHeader]
}

func newEclipsedExchange(
ctx context.Context,
t *testing.T,
head *headertest.DummyHeader,
) *eclipsedExchange {
return &eclipsedExchange{
trustedPeer: store.NewTestStore(ctx, t, head),
eclipsedPeer: store.NewTestStore(ctx, t, head),
}
}

// Head returns a good header from the trusted peer.
func (e *eclipsedExchange) Head(ctx context.Context, h ...header.HeadOption[*headertest.DummyHeader]) (*headertest.DummyHeader, error) {
return e.trustedPeer.Head(ctx, h...)
}

// GetVerifiedRange returns a fork from the eclipsed exchange in an attempt to
// eclipse the syncer.
func (e *eclipsedExchange) GetVerifiedRange(ctx context.Context, from *headertest.DummyHeader, amount uint64) ([]*headertest.DummyHeader, error) {
return e.eclipsedPeer.GetVerifiedRange(ctx, from, amount)
}

func (e *eclipsedExchange) appendToTrusted(ctx context.Context, h ...*headertest.DummyHeader) error {
return e.trustedPeer.Append(ctx, h...)
}

func (e *eclipsedExchange) appendToEclipsedExchange(ctx context.Context, h ...*headertest.DummyHeader) error {
return e.eclipsedPeer.Append(ctx, h...)
}

func (e *eclipsedExchange) Get(ctx context.Context, hash header.Hash) (*headertest.DummyHeader, error) {
panic("implement me")
}

func (e *eclipsedExchange) GetByHeight(ctx context.Context, u uint64) (*headertest.DummyHeader, error) {
panic("implement me")
}

func (e *eclipsedExchange) GetRangeByHeight(ctx context.Context, from, amount uint64) ([]*headertest.DummyHeader, error) {
panic("implement me")
}

func (e *eclipsedExchange) Init(ctx context.Context, h *headertest.DummyHeader) error {
panic("implement me")
}

func (e *eclipsedExchange) Height() uint64 {
panic("implement me")
}

func (e *eclipsedExchange) Has(ctx context.Context, hash header.Hash) (bool, error) {
panic("implement me")
}

func (e *eclipsedExchange) HasAt(ctx context.Context, u uint64) bool {
panic("implement me")
}

func (e *eclipsedExchange) Append(ctx context.Context, h ...*headertest.DummyHeader) error {
panic("implement me")
}
Loading