Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions block/internal/da/fiber/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,13 @@ type DA interface {
// Returns the original data that was passed to Upload.
Download(ctx context.Context, blobID BlobID) ([]byte, error)

// Listen streams confirmed blob events for the given namespace.
// The returned channel is closed when the context is cancelled.
Listen(ctx context.Context, namespace []byte) (<-chan BlobEvent, error)
// Listen streams confirmed blob events for the given namespace,
// starting at fromHeight.
//
// fromHeight == 0 starts the stream from the current chain head; any
// positive value replays events from that block forward so a
// subscriber can resume after a restart without missing blobs (the
// DA backend is expected to block, not error, on future heights).
// The returned channel is closed when ctx is cancelled.
Listen(ctx context.Context, namespace []byte, fromHeight uint64) (<-chan BlobEvent, error)
}
6 changes: 5 additions & 1 deletion block/internal/da/fiber_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,11 @@ func (c *fiberDAClient) Subscribe(ctx context.Context, namespace []byte, _ bool)
go func() {
defer close(out)

blobCh, err := c.fiber.Listen(ctx, namespace)
// The outer DA Subscribe entry point does not expose a starting
// height, so start from the live tip (fromHeight=0). A future
// refactor that plumbs resume-from-height through datypes.DA can
// thread the value here.
blobCh, err := c.fiber.Listen(ctx, namespace, 0)
if err != nil {
c.logger.Error().Err(err).Msg("fiber listen failed")
return
Expand Down
48 changes: 46 additions & 2 deletions block/internal/da/fibremock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,18 +157,62 @@ func (m *MockDA) Download(ctx context.Context, blobID fiber.BlobID) ([]byte, err
}

// Listen returns a channel that receives events when blobs matching the
// namespace are uploaded. The channel is closed when ctx is cancelled.
func (m *MockDA) Listen(ctx context.Context, namespace []byte) (<-chan fiber.BlobEvent, error) {
// namespace are uploaded, starting at fromHeight.
//
// fromHeight == 0 subscribes to future uploads only. fromHeight > 0 first
// replays every matching blob still in the store with height >= fromHeight,
// then attaches a live subscriber for subsequent uploads. The replay may
// interleave with live events emitted between the Listen call and the
// replay goroutine's drain; consumers should dedupe by BlobID.
//
// The channel is closed when ctx is cancelled.
func (m *MockDA) Listen(ctx context.Context, namespace []byte, fromHeight uint64) (<-chan fiber.BlobEvent, error) {
ch := make(chan fiber.BlobEvent, 64)

m.mu.Lock()
// Snapshot matching historicals under the lock to avoid racing with
// concurrent Upload calls; the replay goroutine emits them after.
var replay []fiber.BlobEvent
if fromHeight > 0 {
for _, key := range m.order {
b, ok := m.blobs[key]
if !ok {
continue
}
if !namespaceMatch(namespace, b.namespace) {
continue
}
if b.height < fromHeight {
continue
}
replay = append(replay, fiber.BlobEvent{
BlobID: mockBlobID(b.data),
Height: b.height,
DataSize: uint64(len(b.data)),
})
}
}
idx := len(m.subscribers)
m.subscribers = append(m.subscribers, subscriber{
namespace: namespace,
ch: ch,
})
m.mu.Unlock()

// Replay historical events in a goroutine so the caller isn't
// blocked if the buffer fills. Live events may interleave.
if len(replay) > 0 {
go func() {
for _, ev := range replay {
select {
case ch <- ev:
case <-ctx.Done():
return
}
}
}()
}

// Clean up when context is done.
go func() {
<-ctx.Done()
Expand Down
17 changes: 10 additions & 7 deletions tools/celestia-node-fiber/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (f *fakeFibre) PendingWithdrawals(context.Context, string) ([]celfibre.Pend
// fakeBlob is a minimal stand-in for blobapi.Module. Only Subscribe is
// exercised by the adapter, so the rest return errors if called.
type fakeBlob struct {
subscribeFn func(context.Context, libshare.Namespace) (<-chan *nodeblob.SubscriptionResponse, error)
subscribeFn func(context.Context, libshare.Namespace, uint64) (<-chan *nodeblob.SubscriptionResponse, error)
}

var _ blobapi.Module = (*fakeBlob)(nil)
Expand Down Expand Up @@ -103,8 +103,8 @@ func (b *fakeBlob) GetCommitmentProof(context.Context, uint64, libshare.Namespac
return nil, errors.New("fakeBlob.GetCommitmentProof not implemented")
}

func (b *fakeBlob) Subscribe(ctx context.Context, ns libshare.Namespace) (<-chan *nodeblob.SubscriptionResponse, error) {
return b.subscribeFn(ctx, ns)
func (b *fakeBlob) Subscribe(ctx context.Context, ns libshare.Namespace, fromHeight uint64) (<-chan *nodeblob.SubscriptionResponse, error) {
return b.subscribeFn(ctx, ns, fromHeight)
}

// TestAdapterSatisfiesDA is a compile-time assertion that the adapter
Expand Down Expand Up @@ -205,9 +205,11 @@ func TestListen_FiltersFibreOnlyAndEmitsEvent(t *testing.T) {
close(ch)

var seenNs libshare.Namespace
var seenFromHeight uint64
blob := &fakeBlob{
subscribeFn: func(_ context.Context, sub libshare.Namespace) (<-chan *nodeblob.SubscriptionResponse, error) {
subscribeFn: func(_ context.Context, sub libshare.Namespace, fromHeight uint64) (<-chan *nodeblob.SubscriptionResponse, error) {
seenNs = sub
seenFromHeight = fromHeight
return ch, nil
},
}
Expand All @@ -223,9 +225,10 @@ func TestListen_FiltersFibreOnlyAndEmitsEvent(t *testing.T) {
},
}
a := cnfiber.FromModules(fibre, blob, 0)
events, err := a.Listen(context.Background(), namespaceBytes())
events, err := a.Listen(context.Background(), namespaceBytes(), 0)
require.NoError(t, err)
require.Equal(t, ns, seenNs)
require.Equal(t, uint64(0), seenFromHeight, "fromHeight=0 must be forwarded to blob.Subscribe")

select {
case ev, ok := <-events:
Expand Down Expand Up @@ -253,15 +256,15 @@ func TestListen_CancelledContextClosesOutput(t *testing.T) {
ns := namespace(t)
upstream := make(chan *nodeblob.SubscriptionResponse)
blob := &fakeBlob{
subscribeFn: func(_ context.Context, arg libshare.Namespace) (<-chan *nodeblob.SubscriptionResponse, error) {
subscribeFn: func(_ context.Context, arg libshare.Namespace, _ uint64) (<-chan *nodeblob.SubscriptionResponse, error) {
require.Equal(t, ns, arg)
return upstream, nil
},
}

ctx, cancel := context.WithCancel(context.Background())
a := cnfiber.FromModules(&fakeFibre{}, blob, 0)
events, err := a.Listen(ctx, namespaceBytes())
events, err := a.Listen(ctx, namespaceBytes(), 0)
require.NoError(t, err)

cancel()
Expand Down
2 changes: 1 addition & 1 deletion tools/celestia-node-fiber/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ replace github.com/evstack/ev-node => ../../.
require (
cosmossdk.io/math v1.5.3
github.com/celestiaorg/celestia-app/v8 v8.0.0-20260408095837-ab08b6d5e54e
github.com/celestiaorg/celestia-node v0.0.0-20260423103931-b07242fbfec8
github.com/celestiaorg/celestia-node v0.0.0-20260423143400-194cc74ce99c
github.com/celestiaorg/go-square/v4 v4.0.0-rc4.0.20260318002530-1ca8ff7b42ea
github.com/cometbft/cometbft v1.0.1
github.com/cosmos/cosmos-sdk v0.50.13
Expand Down
4 changes: 2 additions & 2 deletions tools/celestia-node-fiber/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ github.com/celestiaorg/celestia-app/v8 v8.0.0-20260408095837-ab08b6d5e54e h1:Vl4
github.com/celestiaorg/celestia-app/v8 v8.0.0-20260408095837-ab08b6d5e54e/go.mod h1:WbwLD+5c+dCUhoJtH4rBdOCkMGjZGO6rN3iAD6jRRl4=
github.com/celestiaorg/celestia-core v0.40.1 h1:JF9gyLKLU5oCFIeTAQtiHBZeOhppozEToN+o8bgvxT8=
github.com/celestiaorg/celestia-core v0.40.1/go.mod h1:3Jhugz4ibMVEP2+7+FjLEDsV4TcU2tHgfGSm4zkSNv4=
github.com/celestiaorg/celestia-node v0.0.0-20260423103931-b07242fbfec8 h1:B2ZRfGG1qUFrYZlxamx5ZIAaQBQq2vqmMJoJD9P3Ryc=
github.com/celestiaorg/celestia-node v0.0.0-20260423103931-b07242fbfec8/go.mod h1:ShZyEpZcdpbHOAK3gS6/6WRs+CPJq060o2SkvtV6ZnY=
github.com/celestiaorg/celestia-node v0.0.0-20260423143400-194cc74ce99c h1:uOct/9Y8SvkKdccw5SvNZbPydB8ktgoXb395N09YTgo=
github.com/celestiaorg/celestia-node v0.0.0-20260423143400-194cc74ce99c/go.mod h1:ShZyEpZcdpbHOAK3gS6/6WRs+CPJq060o2SkvtV6ZnY=
github.com/celestiaorg/cosmos-sdk v0.52.3 h1:YPMFCycTw77P7tn+HQHTmmdBwXWNMDOrZ6/xVPK9nvM=
github.com/celestiaorg/cosmos-sdk v0.52.3/go.mod h1:2N4NRio08+WQsB7hsKo/ELXCQSWl78GiYdd9M1H6MpQ=
github.com/celestiaorg/cosmos-sdk/api v0.7.6 h1:81in9Zk+noz0ko+hZFSSK8L1aawFN8/CmdcQAUhbiUU=
Expand Down
11 changes: 8 additions & 3 deletions tools/celestia-node-fiber/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,28 @@ import (
)

// Listen implements fiber.DA.Listen. It subscribes to blob.Subscribe on the
// bridge node for the given namespace and forwards only share-version-2
// bridge node starting at fromHeight and forwards only share-version-2
// (Fibre) blobs as BlobEvents. PFB blobs (v0/v1) sharing the namespace are
// dropped so consumers see a pure Fibre event stream.
//
// fromHeight == 0 starts the stream at the chain head (live follow).
// fromHeight > 0 replays from that block forward via the node's
// WaitForHeight loop so a subscriber can resume after a restart without
// missing blobs.
//
// DataSize on emitted events is the original payload byte length — matching
// the fibermock contract ev-node consumers code against. The v2 share only
// carries (fibre_blob_version + commitment), so the real size isn't derivable
// from the subscription alone; Listen therefore performs a Download per event
// to recover the size before forwarding. This adds one FSP round-trip per
// blob. If that cost becomes material we can expose an opt-out mode, but for
// now correctness over latency.
func (a *Adapter) Listen(ctx context.Context, namespace []byte) (<-chan block.FiberBlobEvent, error) {
func (a *Adapter) Listen(ctx context.Context, namespace []byte, fromHeight uint64) (<-chan block.FiberBlobEvent, error) {
ns, err := toV0Namespace(namespace)
if err != nil {
return nil, fmt.Errorf("namespace: %w", err)
}
sub, err := a.blob.Subscribe(ctx, ns)
sub, err := a.blob.Subscribe(ctx, ns, fromHeight)
if err != nil {
return nil, fmt.Errorf("subscribing to blob stream: %w", err)
}
Expand Down
129 changes: 128 additions & 1 deletion tools/celestia-node-fiber/testing/showcase_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ func TestShowcase(t *testing.T) {
namespace := bytes.Repeat([]byte{0xfe}, 10)

// Subscribe BEFORE uploading so we don't race against settlements.
events, err := adapter.Listen(ctx, namespace)
// fromHeight=0 → follow from the live tip. TestShowcaseResume below
// exercises the non-zero path.
events, err := adapter.Listen(ctx, namespace, 0)
require.NoError(t, err, "starting Listen subscription")

// Build N distinctive payloads so byte-swapping or off-by-one BlobID
Expand Down Expand Up @@ -155,3 +157,128 @@ func TestShowcase(t *testing.T) {
t.Logf("download[%02d] ok: blob_id=%s bytes=%d", i, key, len(got))
}
}

// TestShowcaseResume verifies that a subscriber can rejoin the stream
// from a historical block height and receive every matching blob that
// was settled since. This is the fromHeight > 0 path added by
// celestia-node#4962: Listen opens a WaitForHeight loop starting at the
// requested height, so callers can resume after a restart without
// missing blobs.
func TestShowcaseResume(t *testing.T) {
const resumeBlobs = 3

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
t.Cleanup(cancel)

network := cnfibertest.StartNetwork(t, ctx)
bridge := cnfibertest.StartBridge(t, ctx, network)

adapter, err := cnfiber.New(ctx, cnfiber.Config{
Client: client.Config{
ReadConfig: client.ReadConfig{
BridgeDAAddr: bridge.RPCAddr(),
DAAuthToken: bridge.AdminToken,
EnableDATLS: false,
},
SubmitConfig: client.SubmitConfig{
DefaultKeyName: network.ClientAccount,
Network: "private",
CoreGRPCConfig: client.CoreGRPCConfig{
Addr: network.ConsensusGRPCAddr(),
},
},
},
}, network.Consensus.Keyring)
require.NoError(t, err, "constructing adapter")
t.Cleanup(func() { _ = adapter.Close() })

namespace := bytes.Repeat([]byte{0xfd}, 10)

// Phase 1: open a live subscription, upload N blobs, and harvest
// each blob's settlement height as reported by Listen. These
// heights are the ground truth for the resume test below.
liveCtx, liveCancel := context.WithCancel(ctx)
liveEvents, err := adapter.Listen(liveCtx, namespace, 0)
require.NoError(t, err, "starting live Listen for height discovery")

payloads := make([][]byte, resumeBlobs)
ids := make([]block.FiberBlobID, resumeBlobs)
expected := make(map[string][]byte, resumeBlobs)
for i := range payloads {
payloads[i] = []byte(fmt.Sprintf("resume blob %d", i))
res, err := adapter.Upload(ctx, namespace, payloads[i])
require.NoError(t, err, "upload #%d", i)
ids[i] = res.BlobID
expected[hex.EncodeToString(res.BlobID)] = payloads[i]
t.Logf("phase1 upload[%d] blob_id=%s", i, hex.EncodeToString(res.BlobID))
}

heights := make(map[string]uint64, resumeBlobs)
for len(heights) < resumeBlobs {
select {
case ev, ok := <-liveEvents:
require.True(t, ok, "live Listen channel closed early")
key := hex.EncodeToString(ev.BlobID)
if _, want := expected[key]; !want {
continue
}
heights[key] = ev.Height
t.Logf("phase1 listen blob_id=%s height=%d", key, ev.Height)
case <-time.After(listenEventsTimeout):
t.Fatalf("timed out collecting heights: got %d/%d", len(heights), resumeBlobs)
}
}
liveCancel()

// Pick the smallest height across all uploads. Resuming from this
// height must replay every blob we uploaded, regardless of whether
// multiple settlements landed in the same block.
var fromHeight uint64
for _, h := range heights {
if fromHeight == 0 || h < fromHeight {
fromHeight = h
}
}
t.Logf("resume fromHeight=%d", fromHeight)

// Phase 2: fresh Listen starting at fromHeight. Expect every blob
// we uploaded in phase 1 to be replayed.
resumeEvents, err := adapter.Listen(ctx, namespace, fromHeight)
require.NoError(t, err, "starting resume Listen")

seen := make(map[string]block.FiberBlobEvent, resumeBlobs)
for len(seen) < resumeBlobs {
select {
case ev, ok := <-resumeEvents:
require.True(t, ok, "resume Listen channel closed early")
key := hex.EncodeToString(ev.BlobID)
if _, want := expected[key]; !want {
continue
}
if _, dup := seen[key]; dup {
t.Fatalf("resume Listen emitted duplicate for BlobID %s", key)
}
seen[key] = ev
t.Logf("phase2 listen[%d/%d] blob_id=%s height=%d data_size=%d",
len(seen), resumeBlobs, key, ev.Height, ev.DataSize)
case <-time.After(listenEventsTimeout):
missing := make([]string, 0, resumeBlobs-len(seen))
for k := range expected {
if _, got := seen[k]; !got {
missing = append(missing, k)
}
}
t.Fatalf("timed out on resume Listen: got %d/%d; missing=%v",
len(seen), resumeBlobs, missing)
}
}

// Every resume event must carry the correct DataSize (Download-
// resolved, same as the live Listen path) and the right height.
for key, ev := range seen {
require.Equal(t, uint64(len(expected[key])), ev.DataSize,
"resume BlobEvent %s DataSize must match original payload length", key)
require.Equal(t, heights[key], ev.Height,
"resume BlobEvent %s Height must match the block it settled in", key)
}
}
Loading