diff --git a/block/internal/da/fiber/types.go b/block/internal/da/fiber/types.go index 98eae0fcd..7c5aacbea 100644 --- a/block/internal/da/fiber/types.go +++ b/block/internal/da/fiber/types.go @@ -57,7 +57,13 @@ type DA interface { // Returns the original data that was passed to Upload. Download(ctx context.Context, blobID BlobID) ([]byte, error) - // Listen streams confirmed blob events for the given namespace. - // The returned channel is closed when the context is cancelled. - Listen(ctx context.Context, namespace []byte) (<-chan BlobEvent, error) + // Listen streams confirmed blob events for the given namespace, + // starting at fromHeight. + // + // fromHeight == 0 starts the stream from the current chain head; any + // positive value replays events from that block forward so a + // subscriber can resume after a restart without missing blobs (the + // DA backend is expected to block, not error, on future heights). + // The returned channel is closed when ctx is cancelled. + Listen(ctx context.Context, namespace []byte, fromHeight uint64) (<-chan BlobEvent, error) } diff --git a/block/internal/da/fiber_client.go b/block/internal/da/fiber_client.go index 56d455b6d..417cd86ae 100644 --- a/block/internal/da/fiber_client.go +++ b/block/internal/da/fiber_client.go @@ -234,7 +234,11 @@ func (c *fiberDAClient) Subscribe(ctx context.Context, namespace []byte, _ bool) go func() { defer close(out) - blobCh, err := c.fiber.Listen(ctx, namespace) + // The outer DA Subscribe entry point does not expose a starting + // height, so start from the live tip (fromHeight=0). A future + // refactor that plumbs resume-from-height through datypes.DA can + // thread the value here. + blobCh, err := c.fiber.Listen(ctx, namespace, 0) if err != nil { c.logger.Error().Err(err).Msg("fiber listen failed") return diff --git a/block/internal/da/fibremock/mock.go b/block/internal/da/fibremock/mock.go index d48642a25..26475232b 100644 --- a/block/internal/da/fibremock/mock.go +++ b/block/internal/da/fibremock/mock.go @@ -157,11 +157,41 @@ func (m *MockDA) Download(ctx context.Context, blobID fiber.BlobID) ([]byte, err } // Listen returns a channel that receives events when blobs matching the -// namespace are uploaded. The channel is closed when ctx is cancelled. -func (m *MockDA) Listen(ctx context.Context, namespace []byte) (<-chan fiber.BlobEvent, error) { +// namespace are uploaded, starting at fromHeight. +// +// fromHeight == 0 subscribes to future uploads only. fromHeight > 0 first +// replays every matching blob still in the store with height >= fromHeight, +// then attaches a live subscriber for subsequent uploads. The replay may +// interleave with live events emitted between the Listen call and the +// replay goroutine's drain; consumers should dedupe by BlobID. +// +// The channel is closed when ctx is cancelled. +func (m *MockDA) Listen(ctx context.Context, namespace []byte, fromHeight uint64) (<-chan fiber.BlobEvent, error) { ch := make(chan fiber.BlobEvent, 64) m.mu.Lock() + // Snapshot matching historicals under the lock to avoid racing with + // concurrent Upload calls; the replay goroutine emits them after. + var replay []fiber.BlobEvent + if fromHeight > 0 { + for _, key := range m.order { + b, ok := m.blobs[key] + if !ok { + continue + } + if !namespaceMatch(namespace, b.namespace) { + continue + } + if b.height < fromHeight { + continue + } + replay = append(replay, fiber.BlobEvent{ + BlobID: mockBlobID(b.data), + Height: b.height, + DataSize: uint64(len(b.data)), + }) + } + } idx := len(m.subscribers) m.subscribers = append(m.subscribers, subscriber{ namespace: namespace, @@ -169,6 +199,20 @@ func (m *MockDA) Listen(ctx context.Context, namespace []byte) (<-chan fiber.Blo }) m.mu.Unlock() + // Replay historical events in a goroutine so the caller isn't + // blocked if the buffer fills. Live events may interleave. + if len(replay) > 0 { + go func() { + for _, ev := range replay { + select { + case ch <- ev: + case <-ctx.Done(): + return + } + } + }() + } + // Clean up when context is done. go func() { <-ctx.Done() diff --git a/tools/celestia-node-fiber/adapter_test.go b/tools/celestia-node-fiber/adapter_test.go index 7b5de1915..b815775a9 100644 --- a/tools/celestia-node-fiber/adapter_test.go +++ b/tools/celestia-node-fiber/adapter_test.go @@ -74,7 +74,7 @@ func (f *fakeFibre) PendingWithdrawals(context.Context, string) ([]celfibre.Pend // fakeBlob is a minimal stand-in for blobapi.Module. Only Subscribe is // exercised by the adapter, so the rest return errors if called. type fakeBlob struct { - subscribeFn func(context.Context, libshare.Namespace) (<-chan *nodeblob.SubscriptionResponse, error) + subscribeFn func(context.Context, libshare.Namespace, uint64) (<-chan *nodeblob.SubscriptionResponse, error) } var _ blobapi.Module = (*fakeBlob)(nil) @@ -103,8 +103,8 @@ func (b *fakeBlob) GetCommitmentProof(context.Context, uint64, libshare.Namespac return nil, errors.New("fakeBlob.GetCommitmentProof not implemented") } -func (b *fakeBlob) Subscribe(ctx context.Context, ns libshare.Namespace) (<-chan *nodeblob.SubscriptionResponse, error) { - return b.subscribeFn(ctx, ns) +func (b *fakeBlob) Subscribe(ctx context.Context, ns libshare.Namespace, fromHeight uint64) (<-chan *nodeblob.SubscriptionResponse, error) { + return b.subscribeFn(ctx, ns, fromHeight) } // TestAdapterSatisfiesDA is a compile-time assertion that the adapter @@ -205,9 +205,11 @@ func TestListen_FiltersFibreOnlyAndEmitsEvent(t *testing.T) { close(ch) var seenNs libshare.Namespace + var seenFromHeight uint64 blob := &fakeBlob{ - subscribeFn: func(_ context.Context, sub libshare.Namespace) (<-chan *nodeblob.SubscriptionResponse, error) { + subscribeFn: func(_ context.Context, sub libshare.Namespace, fromHeight uint64) (<-chan *nodeblob.SubscriptionResponse, error) { seenNs = sub + seenFromHeight = fromHeight return ch, nil }, } @@ -223,9 +225,10 @@ func TestListen_FiltersFibreOnlyAndEmitsEvent(t *testing.T) { }, } a := cnfiber.FromModules(fibre, blob, 0) - events, err := a.Listen(context.Background(), namespaceBytes()) + events, err := a.Listen(context.Background(), namespaceBytes(), 0) require.NoError(t, err) require.Equal(t, ns, seenNs) + require.Equal(t, uint64(0), seenFromHeight, "fromHeight=0 must be forwarded to blob.Subscribe") select { case ev, ok := <-events: @@ -253,7 +256,7 @@ func TestListen_CancelledContextClosesOutput(t *testing.T) { ns := namespace(t) upstream := make(chan *nodeblob.SubscriptionResponse) blob := &fakeBlob{ - subscribeFn: func(_ context.Context, arg libshare.Namespace) (<-chan *nodeblob.SubscriptionResponse, error) { + subscribeFn: func(_ context.Context, arg libshare.Namespace, _ uint64) (<-chan *nodeblob.SubscriptionResponse, error) { require.Equal(t, ns, arg) return upstream, nil }, @@ -261,7 +264,7 @@ func TestListen_CancelledContextClosesOutput(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) a := cnfiber.FromModules(&fakeFibre{}, blob, 0) - events, err := a.Listen(ctx, namespaceBytes()) + events, err := a.Listen(ctx, namespaceBytes(), 0) require.NoError(t, err) cancel() diff --git a/tools/celestia-node-fiber/go.mod b/tools/celestia-node-fiber/go.mod index 06446602d..804056aa0 100644 --- a/tools/celestia-node-fiber/go.mod +++ b/tools/celestia-node-fiber/go.mod @@ -32,7 +32,7 @@ replace github.com/evstack/ev-node => ../../. require ( cosmossdk.io/math v1.5.3 github.com/celestiaorg/celestia-app/v8 v8.0.0-20260408095837-ab08b6d5e54e - github.com/celestiaorg/celestia-node v0.0.0-20260423103931-b07242fbfec8 + github.com/celestiaorg/celestia-node v0.0.0-20260423143400-194cc74ce99c github.com/celestiaorg/go-square/v4 v4.0.0-rc4.0.20260318002530-1ca8ff7b42ea github.com/cometbft/cometbft v1.0.1 github.com/cosmos/cosmos-sdk v0.50.13 diff --git a/tools/celestia-node-fiber/go.sum b/tools/celestia-node-fiber/go.sum index 0340750b3..19afc250d 100644 --- a/tools/celestia-node-fiber/go.sum +++ b/tools/celestia-node-fiber/go.sum @@ -231,8 +231,8 @@ github.com/celestiaorg/celestia-app/v8 v8.0.0-20260408095837-ab08b6d5e54e h1:Vl4 github.com/celestiaorg/celestia-app/v8 v8.0.0-20260408095837-ab08b6d5e54e/go.mod h1:WbwLD+5c+dCUhoJtH4rBdOCkMGjZGO6rN3iAD6jRRl4= github.com/celestiaorg/celestia-core v0.40.1 h1:JF9gyLKLU5oCFIeTAQtiHBZeOhppozEToN+o8bgvxT8= github.com/celestiaorg/celestia-core v0.40.1/go.mod h1:3Jhugz4ibMVEP2+7+FjLEDsV4TcU2tHgfGSm4zkSNv4= -github.com/celestiaorg/celestia-node v0.0.0-20260423103931-b07242fbfec8 h1:B2ZRfGG1qUFrYZlxamx5ZIAaQBQq2vqmMJoJD9P3Ryc= -github.com/celestiaorg/celestia-node v0.0.0-20260423103931-b07242fbfec8/go.mod h1:ShZyEpZcdpbHOAK3gS6/6WRs+CPJq060o2SkvtV6ZnY= +github.com/celestiaorg/celestia-node v0.0.0-20260423143400-194cc74ce99c h1:uOct/9Y8SvkKdccw5SvNZbPydB8ktgoXb395N09YTgo= +github.com/celestiaorg/celestia-node v0.0.0-20260423143400-194cc74ce99c/go.mod h1:ShZyEpZcdpbHOAK3gS6/6WRs+CPJq060o2SkvtV6ZnY= github.com/celestiaorg/cosmos-sdk v0.52.3 h1:YPMFCycTw77P7tn+HQHTmmdBwXWNMDOrZ6/xVPK9nvM= github.com/celestiaorg/cosmos-sdk v0.52.3/go.mod h1:2N4NRio08+WQsB7hsKo/ELXCQSWl78GiYdd9M1H6MpQ= github.com/celestiaorg/cosmos-sdk/api v0.7.6 h1:81in9Zk+noz0ko+hZFSSK8L1aawFN8/CmdcQAUhbiUU= diff --git a/tools/celestia-node-fiber/listen.go b/tools/celestia-node-fiber/listen.go index 7c666622a..b38645f36 100644 --- a/tools/celestia-node-fiber/listen.go +++ b/tools/celestia-node-fiber/listen.go @@ -14,10 +14,15 @@ import ( ) // Listen implements fiber.DA.Listen. It subscribes to blob.Subscribe on the -// bridge node for the given namespace and forwards only share-version-2 +// bridge node starting at fromHeight and forwards only share-version-2 // (Fibre) blobs as BlobEvents. PFB blobs (v0/v1) sharing the namespace are // dropped so consumers see a pure Fibre event stream. // +// fromHeight == 0 starts the stream at the chain head (live follow). +// fromHeight > 0 replays from that block forward via the node's +// WaitForHeight loop so a subscriber can resume after a restart without +// missing blobs. +// // DataSize on emitted events is the original payload byte length — matching // the fibermock contract ev-node consumers code against. The v2 share only // carries (fibre_blob_version + commitment), so the real size isn't derivable @@ -25,12 +30,12 @@ import ( // to recover the size before forwarding. This adds one FSP round-trip per // blob. If that cost becomes material we can expose an opt-out mode, but for // now correctness over latency. -func (a *Adapter) Listen(ctx context.Context, namespace []byte) (<-chan block.FiberBlobEvent, error) { +func (a *Adapter) Listen(ctx context.Context, namespace []byte, fromHeight uint64) (<-chan block.FiberBlobEvent, error) { ns, err := toV0Namespace(namespace) if err != nil { return nil, fmt.Errorf("namespace: %w", err) } - sub, err := a.blob.Subscribe(ctx, ns) + sub, err := a.blob.Subscribe(ctx, ns, fromHeight) if err != nil { return nil, fmt.Errorf("subscribing to blob stream: %w", err) } diff --git a/tools/celestia-node-fiber/testing/showcase_test.go b/tools/celestia-node-fiber/testing/showcase_test.go index 929e540fb..f55b0a0c2 100644 --- a/tools/celestia-node-fiber/testing/showcase_test.go +++ b/tools/celestia-node-fiber/testing/showcase_test.go @@ -69,7 +69,9 @@ func TestShowcase(t *testing.T) { namespace := bytes.Repeat([]byte{0xfe}, 10) // Subscribe BEFORE uploading so we don't race against settlements. - events, err := adapter.Listen(ctx, namespace) + // fromHeight=0 → follow from the live tip. TestShowcaseResume below + // exercises the non-zero path. + events, err := adapter.Listen(ctx, namespace, 0) require.NoError(t, err, "starting Listen subscription") // Build N distinctive payloads so byte-swapping or off-by-one BlobID @@ -155,3 +157,128 @@ func TestShowcase(t *testing.T) { t.Logf("download[%02d] ok: blob_id=%s bytes=%d", i, key, len(got)) } } + +// TestShowcaseResume verifies that a subscriber can rejoin the stream +// from a historical block height and receive every matching blob that +// was settled since. This is the fromHeight > 0 path added by +// celestia-node#4962: Listen opens a WaitForHeight loop starting at the +// requested height, so callers can resume after a restart without +// missing blobs. +func TestShowcaseResume(t *testing.T) { + const resumeBlobs = 3 + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + t.Cleanup(cancel) + + network := cnfibertest.StartNetwork(t, ctx) + bridge := cnfibertest.StartBridge(t, ctx, network) + + adapter, err := cnfiber.New(ctx, cnfiber.Config{ + Client: client.Config{ + ReadConfig: client.ReadConfig{ + BridgeDAAddr: bridge.RPCAddr(), + DAAuthToken: bridge.AdminToken, + EnableDATLS: false, + }, + SubmitConfig: client.SubmitConfig{ + DefaultKeyName: network.ClientAccount, + Network: "private", + CoreGRPCConfig: client.CoreGRPCConfig{ + Addr: network.ConsensusGRPCAddr(), + }, + }, + }, + }, network.Consensus.Keyring) + require.NoError(t, err, "constructing adapter") + t.Cleanup(func() { _ = adapter.Close() }) + + namespace := bytes.Repeat([]byte{0xfd}, 10) + + // Phase 1: open a live subscription, upload N blobs, and harvest + // each blob's settlement height as reported by Listen. These + // heights are the ground truth for the resume test below. + liveCtx, liveCancel := context.WithCancel(ctx) + liveEvents, err := adapter.Listen(liveCtx, namespace, 0) + require.NoError(t, err, "starting live Listen for height discovery") + + payloads := make([][]byte, resumeBlobs) + ids := make([]block.FiberBlobID, resumeBlobs) + expected := make(map[string][]byte, resumeBlobs) + for i := range payloads { + payloads[i] = []byte(fmt.Sprintf("resume blob %d", i)) + res, err := adapter.Upload(ctx, namespace, payloads[i]) + require.NoError(t, err, "upload #%d", i) + ids[i] = res.BlobID + expected[hex.EncodeToString(res.BlobID)] = payloads[i] + t.Logf("phase1 upload[%d] blob_id=%s", i, hex.EncodeToString(res.BlobID)) + } + + heights := make(map[string]uint64, resumeBlobs) + for len(heights) < resumeBlobs { + select { + case ev, ok := <-liveEvents: + require.True(t, ok, "live Listen channel closed early") + key := hex.EncodeToString(ev.BlobID) + if _, want := expected[key]; !want { + continue + } + heights[key] = ev.Height + t.Logf("phase1 listen blob_id=%s height=%d", key, ev.Height) + case <-time.After(listenEventsTimeout): + t.Fatalf("timed out collecting heights: got %d/%d", len(heights), resumeBlobs) + } + } + liveCancel() + + // Pick the smallest height across all uploads. Resuming from this + // height must replay every blob we uploaded, regardless of whether + // multiple settlements landed in the same block. + var fromHeight uint64 + for _, h := range heights { + if fromHeight == 0 || h < fromHeight { + fromHeight = h + } + } + t.Logf("resume fromHeight=%d", fromHeight) + + // Phase 2: fresh Listen starting at fromHeight. Expect every blob + // we uploaded in phase 1 to be replayed. + resumeEvents, err := adapter.Listen(ctx, namespace, fromHeight) + require.NoError(t, err, "starting resume Listen") + + seen := make(map[string]block.FiberBlobEvent, resumeBlobs) + for len(seen) < resumeBlobs { + select { + case ev, ok := <-resumeEvents: + require.True(t, ok, "resume Listen channel closed early") + key := hex.EncodeToString(ev.BlobID) + if _, want := expected[key]; !want { + continue + } + if _, dup := seen[key]; dup { + t.Fatalf("resume Listen emitted duplicate for BlobID %s", key) + } + seen[key] = ev + t.Logf("phase2 listen[%d/%d] blob_id=%s height=%d data_size=%d", + len(seen), resumeBlobs, key, ev.Height, ev.DataSize) + case <-time.After(listenEventsTimeout): + missing := make([]string, 0, resumeBlobs-len(seen)) + for k := range expected { + if _, got := seen[k]; !got { + missing = append(missing, k) + } + } + t.Fatalf("timed out on resume Listen: got %d/%d; missing=%v", + len(seen), resumeBlobs, missing) + } + } + + // Every resume event must carry the correct DataSize (Download- + // resolved, same as the live Listen path) and the right height. + for key, ev := range seen { + require.Equal(t, uint64(len(expected[key])), ev.DataSize, + "resume BlobEvent %s DataSize must match original payload length", key) + require.Equal(t, heights[key], ev.Height, + "resume BlobEvent %s Height must match the block it settled in", key) + } +}