Skip to content

Commit

Permalink
feat(header)!: introduce header.Header interface (#1304)
Browse files Browse the repository at this point in the history
Most important changes in this PR:
* `tmbytes.HexBytes` replaced by `header.Hash`
* `header.Header` used in interfaces in `header` package
* `IsBefore` inlined, so we don't need to include this method in interface (it's a oneliner used once)
* `VerifyNonAdjacent` added to `Header` interface (at least for now)
* Generics introduced to avoid casting

Resolves rollkit/rollkit#575.
  • Loading branch information
tzdybal committed Jan 19, 2023
1 parent fb274bc commit a3137ba
Show file tree
Hide file tree
Showing 81 changed files with 1,679 additions and 1,211 deletions.
2 changes: 1 addition & 1 deletion api/gateway/share.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (h *Handler) getShares(ctx context.Context, height uint64, nID namespace.ID
}
// perform request
shares, err := h.share.GetSharesByNamespace(ctx, header.DAH, nID)
return shares.Flatten(), header.Height, err
return shares.Flatten(), header.Height(), err
}

func dataFromShares(shares []share.Share) ([][]byte, error) {
Expand Down
5 changes: 3 additions & 2 deletions cmd/cel-shed/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (

"github.com/spf13/cobra"

"github.com/celestiaorg/celestia-node/header/store"
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/libs/header/store"
"github.com/celestiaorg/celestia-node/nodebuilder"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
)
Expand Down Expand Up @@ -54,7 +55,7 @@ Custom store path is not supported yet.`,
return err
}

hstore, err := store.NewStore(ds)
hstore, err := store.NewStore[*header.ExtendedHeader](ds)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions core/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (
"fmt"

logging "github.com/ipfs/go-log/v2"
tmbytes "github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/types"

libhead "github.com/celestiaorg/celestia-node/libs/header"
)

const newBlockSubscriber = "NewBlock/Events"
Expand Down Expand Up @@ -64,7 +65,7 @@ func (f *BlockFetcher) GetBlock(ctx context.Context, height *int64) (*types.Bloc
return res.Block, nil
}

func (f *BlockFetcher) GetBlockByHash(ctx context.Context, hash tmbytes.HexBytes) (*types.Block, error) {
func (f *BlockFetcher) GetBlockByHash(ctx context.Context, hash libhead.Hash) (*types.Block, error) {
res, err := f.client.BlockByHash(ctx, hash)
if err != nil {
return nil, err
Expand Down
8 changes: 2 additions & 6 deletions core/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/types"

"github.com/tendermint/tendermint/libs/bytes"
)

func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) {
Expand Down Expand Up @@ -76,9 +74,7 @@ func TestBlockFetcherHeaderValues(t *testing.T) {
assert.Equal(t, nextBlock.LastCommit.Height, commit.Height)
assert.Equal(t, nextBlock.LastCommit.Signatures, commit.Signatures)
// compare ValidatorSet hash to the ValidatorsHash from first block height
hexBytes := bytes.HexBytes{}
err = hexBytes.Unmarshal(valSet.Hash())
require.NoError(t, err)
assert.Equal(t, nextBlock.ValidatorsHash, hexBytes)
hexBytes := valSet.Hash()
assert.Equal(t, nextBlock.ValidatorsHash.Bytes(), hexBytes)
require.NoError(t, fetcher.UnsubscribeNewBlockEvent(ctx))
}
5 changes: 3 additions & 2 deletions das/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"sync"

"github.com/celestiaorg/celestia-node/header"
libhead "github.com/celestiaorg/celestia-node/libs/header"
)

// samplingCoordinator runs and coordinates sampling workers and updates current sampling state
type samplingCoordinator struct {
concurrencyLimit int

getter header.Getter
getter libhead.Getter[*header.ExtendedHeader]
sampleFn sampleFn

state coordinatorState
Expand All @@ -37,7 +38,7 @@ type result struct {

func newSamplingCoordinator(
params Parameters,
getter header.Getter,
getter libhead.Getter[*header.ExtendedHeader],
sample sampleFn,
) *samplingCoordinator {
return &samplingCoordinator{
Expand Down
12 changes: 6 additions & 6 deletions das/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (m *mockSampler) sample(ctx context.Context, h *header.ExtendedHeader) erro
m.lock.Lock()
defer m.lock.Unlock()

height := uint64(h.Height)
height := uint64(h.Height())
m.done[height]++

if len(m.done) > int(m.NetworkHead-m.SampleFrom) && !m.isFinished {
Expand Down Expand Up @@ -435,7 +435,7 @@ func (o *checkOrder) middleWare(out sampleFn) sampleFn {

if len(o.queue) > 0 {
// check last item in queue to be same as input
if o.queue[0] != uint64(h.Height) {
if o.queue[0] != uint64(h.Height()) {
o.lock.Unlock()
return fmt.Errorf("expected height: %v,got: %v", o.queue[0], h)
}
Expand Down Expand Up @@ -505,7 +505,7 @@ func (l *lock) releaseAll(except ...uint64) {
func (l *lock) middleWare(out sampleFn) sampleFn {
return func(ctx context.Context, h *header.ExtendedHeader) error {
l.m.Lock()
ch, blocked := l.blockList[uint64(h.Height)]
ch, blocked := l.blockList[uint64(h.Height())]
l.m.Unlock()
if !blocked {
return out(ctx, h)
Expand All @@ -525,10 +525,10 @@ func onceMiddleWare(out sampleFn) sampleFn {
m := sync.Mutex{}
return func(ctx context.Context, h *header.ExtendedHeader) error {
m.Lock()
db[h.Height]++
if db[h.Height] > 1 {
db[h.Height()]++
if db[h.Height()] > 1 {
m.Unlock()
return fmt.Errorf("header sampled more than once: %v", h.Height)
return fmt.Errorf("header sampled more than once: %v", h.Height())
}
m.Unlock()
return out(ctx, h)
Expand Down
15 changes: 8 additions & 7 deletions das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/celestiaorg/celestia-node/fraud"
"github.com/celestiaorg/celestia-node/header"
libhead "github.com/celestiaorg/celestia-node/libs/header"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
)
Expand All @@ -23,8 +24,8 @@ type DASer struct {

da share.Availability
bcast fraud.Broadcaster
hsub header.Subscriber // listens for new headers in the network
getter header.Getter // retrieves past headers
hsub libhead.Subscriber[*header.ExtendedHeader] // listens for new headers in the network
getter libhead.Getter[*header.ExtendedHeader] // retrieves past headers

sampler *samplingCoordinator
store checkpointStore
Expand All @@ -41,8 +42,8 @@ type sampleFn func(context.Context, *header.ExtendedHeader) error
// NewDASer creates a new DASer.
func NewDASer(
da share.Availability,
hsub header.Subscriber,
getter header.Getter,
hsub libhead.Subscriber[*header.ExtendedHeader],
getter libhead.Getter[*header.ExtendedHeader],
dstore datastore.Datastore,
bcast fraud.Broadcaster,
options ...Option,
Expand Down Expand Up @@ -95,7 +96,7 @@ func (d *DASer) Start(ctx context.Context) error {
// attempt to get head info. No need to handle error, later DASer
// will be able to find new head from subscriber after it is started
if h, err := d.getter.Head(ctx); err == nil {
cp.NetworkHead = uint64(h.Height)
cp.NetworkHead = uint64(h.Height())
}
}
log.Info("starting DASer from checkpoint: ", cp.String())
Expand Down Expand Up @@ -151,13 +152,13 @@ func (d *DASer) sample(ctx context.Context, h *header.ExtendedHeader) error {
var byzantineErr *byzantine.ErrByzantine
if errors.As(err, &byzantineErr) {
log.Warn("Propagating proof...")
sendErr := d.bcast.Broadcast(ctx, byzantine.CreateBadEncodingProof(h.Hash(), uint64(h.Height), byzantineErr))
sendErr := d.bcast.Broadcast(ctx, byzantine.CreateBadEncodingProof(h.Hash(), uint64(h.Height()), byzantineErr))
if sendErr != nil {
log.Errorw("fraud proof propagating failed", "err", sendErr)
}
}

log.Errorw("sampling failed", "height", h.Height, "hash", h.Hash(),
log.Errorw("sampling failed", "height", h.Height(), "hash", h.Hash(),
"square width", len(h.DAH.RowsRoots), "data root", h.DAH.Hash(), "err", err)
return err
}
Expand Down
18 changes: 8 additions & 10 deletions das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@ import (
"testing"
"time"

"github.com/celestiaorg/celestia-node/share/availability/full"
"github.com/celestiaorg/celestia-node/share/availability/light"
availability_test "github.com/celestiaorg/celestia-node/share/availability/test"

"github.com/tendermint/tendermint/types"

"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
Expand All @@ -19,10 +13,14 @@ import (
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
tmbytes "github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/types"

"github.com/celestiaorg/celestia-node/fraud"
"github.com/celestiaorg/celestia-node/header"
libhead "github.com/celestiaorg/celestia-node/libs/header"
"github.com/celestiaorg/celestia-node/share/availability/full"
"github.com/celestiaorg/celestia-node/share/availability/light"
availability_test "github.com/celestiaorg/celestia-node/share/availability/test"
"github.com/celestiaorg/celestia-node/share/getters"
)

Expand Down Expand Up @@ -232,7 +230,7 @@ func (m *mockGetter) fillSubWithHeaders(
randHeader := header.RandExtendedHeader(t)
randHeader.DataHash = dah.Hash()
randHeader.DAH = dah
randHeader.Height = int64(i + 1)
randHeader.RawHeader.Height = int64(i + 1)

sub.Headers[index] = randHeader
// also checkpointStore to mock getter for duplicate sampling
Expand Down Expand Up @@ -260,7 +258,7 @@ func (m *mockGetter) generateHeaders(t *testing.T, bServ blockservice.BlockServi
randHeader := header.RandExtendedHeader(t)
randHeader.DataHash = dah.Hash()
randHeader.DAH = dah
randHeader.Height = int64(i + 1)
randHeader.RawHeader.Height = int64(i + 1)

m.headers[int64(i+1)] = randHeader
}
Expand Down Expand Up @@ -332,6 +330,6 @@ func (m getterStub) GetVerifiedRange(
return nil, nil
}

func (m getterStub) Get(context.Context, tmbytes.HexBytes) (*header.ExtendedHeader, error) {
func (m getterStub) Get(context.Context, libhead.Hash) (*header.ExtendedHeader, error) {
return nil, nil
}
7 changes: 4 additions & 3 deletions das/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/celestiaorg/celestia-node/header"
libhead "github.com/celestiaorg/celestia-node/libs/header"
)

// subscriber subscribes to notifications about new headers in the network to keep
Expand All @@ -16,7 +17,7 @@ func newSubscriber() subscriber {
return subscriber{newDone("subscriber")}
}

func (s *subscriber) run(ctx context.Context, sub header.Subscription, emit listenFn) {
func (s *subscriber) run(ctx context.Context, sub libhead.Subscription[*header.ExtendedHeader], emit listenFn) {
defer s.indicateDone()
defer sub.Cancel()

Expand All @@ -30,8 +31,8 @@ func (s *subscriber) run(ctx context.Context, sub header.Subscription, emit list
log.Errorw("failed to get next header", "err", err)
continue
}
log.Infow("new header received via subscription", "height", h.Height)
log.Infow("new header received via subscription", "height", h.Height())

emit(ctx, uint64(h.Height))
emit(ctx, uint64(h.Height()))
}
}
9 changes: 5 additions & 4 deletions das/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.uber.org/multierr"

"github.com/celestiaorg/celestia-node/header"
libhead "github.com/celestiaorg/celestia-node/libs/header"
)

type worker struct {
Expand All @@ -36,7 +37,7 @@ type job struct {

func (w *worker) run(
ctx context.Context,
getter header.Getter,
getter libhead.Getter[*header.ExtendedHeader],
sample sampleFn,
metrics *metrics,
resultCh chan<- result) {
Expand All @@ -59,7 +60,7 @@ func (w *worker) run(
}

metrics.observeGetHeader(ctx, time.Since(startGet))
log.Debugw("got header from header store", "height", h.Height, "hash", h.Hash(),
log.Debugw("got header from header store", "height", h.Height(), "hash", h.Hash(),
"square width", len(h.DAH.RowsRoots), "data root", h.DAH.Hash(), "finished (s)", time.Since(startGet))

startSample := time.Now()
Expand All @@ -71,10 +72,10 @@ func (w *worker) run(
w.setResult(curr, err)
metrics.observeSample(ctx, h, time.Since(startSample), err)
if err != nil {
log.Debugw("failed to sampled header", "height", h.Height, "hash", h.Hash(),
log.Debugw("failed to sampled header", "height", h.Height(), "hash", h.Hash(),
"square width", len(h.DAH.RowsRoots), "data root", h.DAH.Hash(), "err", err)
} else {
log.Debugw("sampled header", "height", h.Height, "hash", h.Hash(),
log.Debugw("sampled header", "height", h.Height(), "hash", h.Hash(),
"square width", len(h.DAH.RowsRoots), "data root", h.DAH.Hash(), "finished (s)", time.Since(startSample))
}
}
Expand Down
6 changes: 3 additions & 3 deletions fraud/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ func createStore(t *testing.T, numHeaders int) *mockStore {

for i := 0; i < numHeaders; i++ {
header := suite.GenExtendedHeader()
store.headers[header.Height] = header
store.headers[header.Height()] = header

if header.Height > store.headHeight {
store.headHeight = header.Height
if header.Height() > store.headHeight {
store.headHeight = header.Height()
}
}
return store
Expand Down
7 changes: 3 additions & 4 deletions header/core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import (
"github.com/ipfs/go-blockservice"
logging "github.com/ipfs/go-log/v2"

tmbytes "github.com/tendermint/tendermint/libs/bytes"

"github.com/celestiaorg/celestia-node/core"
"github.com/celestiaorg/celestia-node/header"
libhead "github.com/celestiaorg/celestia-node/libs/header"
)

var log = logging.Logger("header/core")
Expand Down Expand Up @@ -57,7 +56,7 @@ func (ce *Exchange) GetRangeByHeight(ctx context.Context, from, amount uint64) (

func (ce *Exchange) GetVerifiedRange(ctx context.Context, from *header.ExtendedHeader, amount uint64,
) ([]*header.ExtendedHeader, error) {
headers, err := ce.GetRangeByHeight(ctx, uint64(from.Height)+1, amount)
headers, err := ce.GetRangeByHeight(ctx, uint64(from.Height())+1, amount)
if err != nil {
return nil, err
}
Expand All @@ -72,7 +71,7 @@ func (ce *Exchange) GetVerifiedRange(ctx context.Context, from *header.ExtendedH
return headers, nil
}

func (ce *Exchange) Get(ctx context.Context, hash tmbytes.HexBytes) (*header.ExtendedHeader, error) {
func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.ExtendedHeader, error) {
log.Debugw("requesting header", "hash", hash.String())
block, err := ce.fetcher.GetBlockByHash(ctx, hash)
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions header/core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/celestiaorg/celestia-node/core"
"github.com/celestiaorg/celestia-node/header"
libhead "github.com/celestiaorg/celestia-node/libs/header"
)

// Listener is responsible for listening to Core for
Expand All @@ -20,15 +21,15 @@ import (
// broadcasts the new `ExtendedHeader` to the header-sub gossipsub
// network.
type Listener struct {
bcast header.Broadcaster
bcast libhead.Broadcaster[*header.ExtendedHeader]
fetcher *core.BlockFetcher
bServ blockservice.BlockService
construct header.ConstructFn
cancel context.CancelFunc
}

func NewListener(
bcast header.Broadcaster,
bcast libhead.Broadcaster[*header.ExtendedHeader],
fetcher *core.BlockFetcher,
bServ blockservice.BlockService,
construct header.ConstructFn,
Expand Down Expand Up @@ -98,7 +99,7 @@ func (cl *Listener) listen(ctx context.Context, sub <-chan *types.Block) {
// broadcast new ExtendedHeader, but if core is still syncing, notify only local subscribers
err = cl.bcast.Broadcast(ctx, eh, pubsub.WithLocalPublication(syncing))
if err != nil {
log.Errorw("listener: broadcasting next header", "height", eh.Height,
log.Errorw("listener: broadcasting next header", "height", eh.Height(),
"err", err)
}
case <-ctx.Done():
Expand Down
Loading

0 comments on commit a3137ba

Please sign in to comment.