diff --git a/tools/celestia-node-fiber/go.mod b/tools/celestia-node-fiber/go.mod index aaf20bc3c..06446602d 100644 --- a/tools/celestia-node-fiber/go.mod +++ b/tools/celestia-node-fiber/go.mod @@ -30,12 +30,16 @@ replace ( replace github.com/evstack/ev-node => ../../. require ( + cosmossdk.io/math v1.5.3 github.com/celestiaorg/celestia-app/v8 v8.0.0-20260408095837-ab08b6d5e54e github.com/celestiaorg/celestia-node v0.0.0-20260423103931-b07242fbfec8 github.com/celestiaorg/go-square/v4 v4.0.0-rc4.0.20260318002530-1ca8ff7b42ea + github.com/cometbft/cometbft v1.0.1 github.com/cosmos/cosmos-sdk v0.50.13 + github.com/cristalhq/jwt/v5 v5.4.0 github.com/evstack/ev-node v1.1.0 github.com/stretchr/testify v1.11.1 + go.uber.org/fx v1.24.0 ) require ( @@ -58,7 +62,6 @@ require ( cosmossdk.io/depinject v1.2.1 // indirect cosmossdk.io/errors v1.0.2 // indirect cosmossdk.io/log v1.6.1 // indirect - cosmossdk.io/math v1.5.3 // indirect cosmossdk.io/store v1.1.2 // indirect cosmossdk.io/x/circuit v0.1.1 // indirect cosmossdk.io/x/evidence v0.1.1 // indirect @@ -70,6 +73,7 @@ require ( filippo.io/keygen v0.0.0-20260114151900-8e2790ea4c5b // indirect github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect github.com/99designs/keyring v1.2.2 // indirect + github.com/BurntSushi/toml v1.6.0 // indirect github.com/DataDog/datadog-go v4.8.3+incompatible // indirect github.com/DataDog/zstd v1.5.7 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.31.0 // indirect @@ -131,7 +135,6 @@ require ( github.com/cockroachdb/redact v1.1.6 // indirect github.com/cockroachdb/swiss v0.0.0-20251224182025-b0f6560f979b // indirect github.com/cockroachdb/tokenbucket v0.0.0-20250429170803-42689b6311bb // indirect - github.com/cometbft/cometbft v1.0.1 // indirect github.com/cometbft/cometbft-db v1.0.4 // indirect github.com/consensys/gnark v0.14.0 // indirect github.com/consensys/gnark-crypto v0.19.2 // indirect @@ -147,7 +150,6 @@ require ( github.com/cosmos/ibc-go/v8 v8.7.0 // indirect github.com/cosmos/ics23/go v0.11.0 // indirect github.com/cosmos/ledger-cosmos-go v0.15.0 // indirect - github.com/cristalhq/jwt/v5 v5.4.0 // indirect github.com/cskr/pubsub v1.0.2 // indirect github.com/danieljoos/wincred v1.2.1 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -184,6 +186,7 @@ require ( github.com/go-viper/mapstructure/v2 v2.5.0 // indirect github.com/goccy/go-yaml v1.19.2 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect + github.com/gofrs/flock v0.13.0 // indirect github.com/gogo/googleapis v1.4.1 // indirect github.com/gogo/protobuf v1.3.3 // indirect github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect @@ -232,6 +235,7 @@ require ( github.com/huin/goupnp v1.3.0 // indirect github.com/iancoleman/orderedmap v0.3.0 // indirect github.com/iancoleman/strcase v0.3.0 // indirect + github.com/imdario/mergo v0.3.16 // indirect github.com/improbable-eng/grpc-web v0.15.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/ingonyama-zk/icicle-gnark/v3 v3.2.2 // indirect @@ -335,6 +339,7 @@ require ( github.com/quic-go/webtransport-go v0.10.0 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect + github.com/rollkit/go-da v0.9.0 // indirect github.com/ronanh/intcomp v1.1.1 // indirect github.com/rs/cors v1.11.1 // indirect github.com/rs/zerolog v1.35.0 // indirect @@ -363,9 +368,11 @@ require ( go.etcd.io/bbolt v1.4.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/contrib/bridges/prometheus v0.67.0 // indirect go.opentelemetry.io/contrib/detectors/gcp v1.39.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0 // indirect + go.opentelemetry.io/contrib/instrumentation/runtime v0.67.0 // indirect go.opentelemetry.io/otel v1.43.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.43.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 // indirect @@ -377,7 +384,6 @@ require ( go.opentelemetry.io/otel/trace v1.43.0 // indirect go.opentelemetry.io/proto/otlp v1.10.0 // indirect go.uber.org/dig v1.19.0 // indirect - go.uber.org/fx v1.24.0 // indirect go.uber.org/mock v0.5.2 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.1 // indirect diff --git a/tools/celestia-node-fiber/listen.go b/tools/celestia-node-fiber/listen.go index 40dabe289..6ba5c87f1 100644 --- a/tools/celestia-node-fiber/listen.go +++ b/tools/celestia-node-fiber/listen.go @@ -84,6 +84,14 @@ func resolveHeight(resp *blob.SubscriptionResponse) uint64 { // fibreBlobToEvent reconstructs the Fibre BlobID (version byte + 32-byte // commitment) from a share-version-2 libshare.Blob and wraps it as a // BlobEvent. +// +// DataSize caveat: a v2 share carries only (fibre_blob_version + commitment), +// not the original blob payload, so b.DataLen() is the on-chain share size +// (a fixed constant), not the user-facing "how big is this blob" number +// that ev-node's fibermock and its consumers typically expect. Reporting +// the true payload size requires an on-chain query against x/fibre's +// PaymentPromise keyed by commitment. Tracked as a follow-up; for now we +// report the share size so the field is non-zero. func fibreBlobToEvent(b *libshare.Blob, height uint64) (block.FiberBlobEvent, error) { version, err := b.FibreBlobVersion() if err != nil { diff --git a/tools/celestia-node-fiber/testing/bridge.go b/tools/celestia-node-fiber/testing/bridge.go new file mode 100644 index 000000000..c0c784276 --- /dev/null +++ b/tools/celestia-node-fiber/testing/bridge.go @@ -0,0 +1,135 @@ +//go:build fibre + +package cnfibertest + +import ( + "context" + "crypto/rand" + "net" + "testing" + "time" + + "github.com/cosmos/cosmos-sdk/crypto/hd" + "github.com/cosmos/cosmos-sdk/crypto/keyring" + "github.com/cristalhq/jwt/v5" + "github.com/stretchr/testify/require" + "go.uber.org/fx" + + appfibre "github.com/celestiaorg/celestia-app/v8/fibre" + + "github.com/celestiaorg/celestia-node/api/client" + "github.com/celestiaorg/celestia-node/api/rpc/perms" + "github.com/celestiaorg/celestia-node/nodebuilder" + "github.com/celestiaorg/celestia-node/nodebuilder/node" + "github.com/celestiaorg/celestia-node/nodebuilder/p2p" + stateapi "github.com/celestiaorg/celestia-node/nodebuilder/state" +) + +// Bridge bundles an in-process celestia-node bridge node and the admin +// JWT that grants it authenticated RPC access. The adapter's ReadConfig +// needs both the address and the token for Blob.Subscribe to work. +type Bridge struct { + Node *nodebuilder.Node + AdminToken string +} + +// RPCAddr returns a WebSocket URL the adapter uses in +// Config.ReadConfig.BridgeDAAddr. WebSocket (not HTTP) is required +// because Blob.Subscribe returns a channel; go-jsonrpc only supports +// channel-returning methods over a streaming transport. +func (b *Bridge) RPCAddr() string { + return "ws://" + b.Node.RPCServer.ListenAddr() +} + +// StartBridge brings up an in-process celestia-node bridge connected to +// the Network's consensus gRPC endpoint. Mirrors celestia-node's +// api/client test helpers so TestShowcase has a real JSON-RPC server for +// Blob.Subscribe. +func StartBridge(t *testing.T, ctx context.Context, network *Network) *Bridge { + t.Helper() + + cfg := nodebuilder.DefaultConfig(node.Bridge) + + ip, port, err := net.SplitHostPort(network.ConsensusGRPCAddr()) + require.NoError(t, err, "splitting consensus gRPC addr") + cfg.Core.IP = ip + cfg.Core.Port = port + // Pin the bridge RPC to an ephemeral port; the test discovers it via + // Node.RPCServer.ListenAddr() after Start. + cfg.RPC.Port = "0" + + tempDir := t.TempDir() + store := nodebuilder.MockStore(t, cfg) + + auth, adminToken := bridgeAuth(t) + kr := bridgeKeyring(t, tempDir) + + bn, err := nodebuilder.New(node.Bridge, p2p.Private, store, + auth, + stateapi.WithKeyring(kr), + stateapi.WithKeyName(stateapi.AccountName(bridgeSigningKey)), + fx.Replace(node.StorePath(tempDir)), + ) + require.NoError(t, err, "constructing bridge node") + + require.NoError(t, bn.Start(ctx), "starting bridge node") + t.Cleanup(func() { + stopCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + _ = bn.Stop(stopCtx) + }) + + return &Bridge{ + Node: bn, + AdminToken: adminToken, + } +} + +// bridgeSigningKey is the keyring account the bridge uses for its own +// tx submissions. Distinct from the client's account so the two keyrings +// don't collide. +const bridgeSigningKey = "bridge-signer" + +func bridgeKeyring(t *testing.T, tempDir string) keyring.Keyring { + t.Helper() + + kr, err := client.KeyringWithNewKey(client.KeyringConfig{ + KeyName: bridgeSigningKey, + BackendName: keyring.BackendTest, + }, tempDir) + require.NoError(t, err, "creating bridge keyring") + + // The Fibre module on the bridge expects a key under + // appfibre.DefaultKeyName to exist, even though our client never uses + // the bridge's Fibre module for Upload/Download. Without it, + // fx.Start fails during fibre module wiring. + _, _, err = kr.NewMnemonic( + appfibre.DefaultKeyName, + keyring.English, "", "", hd.Secp256k1, + ) + require.NoError(t, err, "provisioning bridge fibre key") + return kr +} + +// bridgeAuth creates an HS256 JWT signer pair and an admin token. The +// returned fx option injects the signer/verifier into the node; the +// token is what the adapter puts in ReadConfig.DAAuthToken. +func bridgeAuth(t *testing.T) (fx.Option, string) { + t.Helper() + + key := make([]byte, 32) + _, err := rand.Read(key) + require.NoError(t, err, "rand.Read jwt key") + + signer, err := jwt.NewSignerHS(jwt.HS256, key) + require.NoError(t, err) + verifier, err := jwt.NewVerifierHS(jwt.HS256, key) + require.NoError(t, err) + + token, err := perms.NewTokenWithPerms(signer, perms.AllPerms) + require.NoError(t, err) + + return fx.Decorate(func() (jwt.Signer, jwt.Verifier, error) { + return signer, verifier, nil + }), string(token) +} diff --git a/tools/celestia-node-fiber/testing/doc.go b/tools/celestia-node-fiber/testing/doc.go new file mode 100644 index 000000000..639adf723 --- /dev/null +++ b/tools/celestia-node-fiber/testing/doc.go @@ -0,0 +1,16 @@ +//go:build fibre + +// Package cnfibertest wires a single-validator Celestia chain, an in-process +// Fibre server, a celestia-node bridge and the celestia-node-fiber adapter +// together so Upload → Listen → Download can be exercised end-to-end in a +// Go test. +// +// The chain is a celestia-app testnode built with -tags fibre. The Fibre +// server runs in the same process and its FSP endpoint is registered with +// the valaddr module so the client's host registry can find it. The +// underlying adapter talks directly to consensus gRPC and the Fibre server; +// only Listen goes through the bridge node's blob subscription. +// +// This is the "fast sanity" variant. A multi-validator showcase is planned +// as a Docker Compose follow-up that exercises real quorum collection. +package cnfibertest diff --git a/tools/celestia-node-fiber/testing/network.go b/tools/celestia-node-fiber/testing/network.go new file mode 100644 index 000000000..fa0943fba --- /dev/null +++ b/tools/celestia-node-fiber/testing/network.go @@ -0,0 +1,221 @@ +//go:build fibre + +package cnfibertest + +import ( + "context" + "path/filepath" + "testing" + "time" + + sdkmath "cosmossdk.io/math" + "github.com/cometbft/cometbft/privval" + core "github.com/cometbft/cometbft/types" + "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice" + sdk "github.com/cosmos/cosmos-sdk/types" + stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types" + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/celestia-app/v8/app" + "github.com/celestiaorg/celestia-app/v8/app/encoding" + appfibre "github.com/celestiaorg/celestia-app/v8/fibre" + "github.com/celestiaorg/celestia-app/v8/pkg/appconsts" + "github.com/celestiaorg/celestia-app/v8/pkg/user" + "github.com/celestiaorg/celestia-app/v8/test/util/testnode" + fibretypes "github.com/celestiaorg/celestia-app/v8/x/fibre/types" + valtypes "github.com/celestiaorg/celestia-app/v8/x/valaddr/types" +) + +const ( + // defaultChainID matches the celestia-node bridge test helper so the + // bridge and consensus node agree on network identity. + defaultChainID = "private" + + // escrowDeposit is a generous initial escrow. Uploads consume from + // escrow for gas + blob fees, so leave headroom for multiple runs. + escrowDeposit = 50_000_000 // 50 TIA in utia + + // clientAccount is the keyring account the adapter uses to sign + // payment promises and MsgPayForFibre. Pre-funded in genesis; also + // has a funded escrow after StartNetwork returns. + clientAccount = appfibre.DefaultKeyName +) + +// Network bundles a single-validator celestia-app chain, an in-process +// Fibre server registered via valaddr, and a funded escrow account. The +// caller's *testing.T cleanup stops everything. +type Network struct { + // Consensus is the celestia-app testnode context (keyring, gRPC + // client, home dir). Use Consensus.GRPCClient for consensus gRPC + // access and Consensus.Keyring for signing. + Consensus testnode.Context + + // FibreServer is the in-process Fibre gRPC server registered with + // the chain's valaddr module. The adapter (via appfibre.Client's + // default host registry) discovers it through valaddr. + FibreServer *appfibre.Server + + // ChainID matches what the bridge node needs to be configured with. + ChainID string + + // ClientAccount is the keyring name for the pre-funded + pre-escrowed + // account the showcase uses. Expose it so the test wires SubmitConfig. + ClientAccount string +} + +// StartNetwork boots a single-validator Fibre chain + server and returns +// a ready-to-use Network. Registration + escrow funding is complete when +// this returns. +func StartNetwork(t *testing.T, ctx context.Context) *Network { + t.Helper() + + cfg := testnode.DefaultConfig(). + WithChainID(defaultChainID). + WithFundedAccounts(clientAccount). + WithDelayedPrecommitTimeout(50 * time.Millisecond) + + cctx, _, grpcAddr := testnode.NewNetwork(t, cfg) + _, err := cctx.WaitForHeight(1) + require.NoError(t, err, "waiting for first block") + + server := startFibreServer(t, ctx, cctx, grpcAddr) + // The Fibre client's gRPC dialer expects URI-style targets, so prefix + // the host with the dns:/// scheme before registering — matches how + // talis provisions real Fibre networks (tools/talis/fibre_setup.go). + registerValidator(t, ctx, cctx, "dns:///"+server.ListenAddress()) + fundEscrow(t, ctx, cctx) + + return &Network{ + Consensus: cctx, + FibreServer: server, + ChainID: defaultChainID, + ClientAccount: clientAccount, + } +} + +// ConsensusGRPCAddr returns the host:port of the chain's gRPC endpoint +// that the adapter's SubmitConfig.CoreGRPCConfig should point at. +func (n *Network) ConsensusGRPCAddr() string { + return n.Consensus.GRPCClient.Target() +} + +// startFibreServer spins up an in-process Fibre gRPC server bound to an +// ephemeral localhost port. The server uses the testnode's private +// validator key for BLS signing, and keeps blob data in memory. +func startFibreServer( + t *testing.T, + ctx context.Context, + cctx testnode.Context, + appGRPCAddr string, +) *appfibre.Server { + t.Helper() + + pvKey := filepath.Join(cctx.HomeDir, "config", "priv_validator_key.json") + pvState := filepath.Join(cctx.HomeDir, "data", "priv_validator_state.json") + filePV := privval.LoadFilePV(pvKey, pvState) + + serverCfg := appfibre.DefaultServerConfig() + serverCfg.AppGRPCAddress = appGRPCAddr + serverCfg.ServerListenAddress = "127.0.0.1:0" + serverCfg.SignerFn = func(string) (core.PrivValidator, error) { + return filePV, nil + } + serverCfg.StoreFn = func(sc appfibre.StoreConfig) (*appfibre.Store, error) { + return appfibre.NewMemoryStore(sc), nil + } + + server, err := appfibre.NewServer(serverCfg) + require.NoError(t, err, "creating fibre server") + require.NoError(t, server.Start(ctx), "starting fibre server") + t.Cleanup(func() { + stopCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + _ = server.Stop(stopCtx) + }) + return server +} + +// registerValidator submits MsgSetFibreProviderInfo so the chain's +// valaddr module maps the validator's consensus address to the Fibre +// server's listen address. Without this the client's host registry +// cannot locate any FSPs. +func registerValidator( + t *testing.T, + ctx context.Context, + cctx testnode.Context, + fibreAddr string, +) { + t.Helper() + + stakingClient := stakingtypes.NewQueryClient(cctx.GRPCClient) + validators, err := stakingClient.Validators(ctx, &stakingtypes.QueryValidatorsRequest{}) + require.NoError(t, err) + require.Len(t, validators.Validators, 1, "single-validator testnode expected") + valOperator := validators.Validators[0].OperatorAddress + + txClient, err := testnode.NewTxClientFromContext(cctx) + require.NoError(t, err) + + msg := &valtypes.MsgSetFibreProviderInfo{ + Signer: valOperator, + Host: fibreAddr, + } + resp, err := txClient.SubmitTx(ctx, []sdk.Msg{msg}, user.SetGasLimit(200_000), user.SetFee(5_000)) + require.NoError(t, err, "registering validator fibre host") + require.Equal(t, uint32(0), resp.Code, "register validator tx failed") + require.NoError(t, cctx.WaitForNextBlock()) + + // Sanity-check the registration landed. + tmClient := cmtservice.NewServiceClient(cctx.GRPCClient) + valSet, err := tmClient.GetLatestValidatorSet(ctx, &cmtservice.GetLatestValidatorSetRequest{}) + require.NoError(t, err) + require.Len(t, valSet.Validators, 1) + consAddr, err := sdk.ConsAddressFromBech32(valSet.Validators[0].Address) + require.NoError(t, err) + + valAddrClient := valtypes.NewQueryClient(cctx.GRPCClient) + info, err := valAddrClient.FibreProviderInfo(ctx, &valtypes.QueryFibreProviderInfoRequest{ + ValidatorConsensusAddress: consAddr.String(), + }) + require.NoError(t, err) + require.True(t, info.Found, "fibre provider info not registered") + require.Equal(t, fibreAddr, info.Info.Host) +} + +// fundEscrow deposits enough utia into the client's escrow account to +// cover payment promises for several blob uploads. The async PFF +// settlement kicked off by adapter.Upload debits this account. +func fundEscrow(t *testing.T, ctx context.Context, cctx testnode.Context) { + t.Helper() + + info, err := cctx.Keyring.Key(clientAccount) + require.NoError(t, err, "loading client keyring entry") + addr, err := info.GetAddress() + require.NoError(t, err) + + ecfg := encoding.MakeConfig(app.ModuleEncodingRegisters...) + txClient, err := user.SetupTxClient( + ctx, cctx.Keyring, cctx.GRPCClient, ecfg, + user.WithDefaultAccount(clientAccount), + ) + require.NoError(t, err, "setting up funded-account tx client") + + amount := sdk.NewCoin(appconsts.BondDenom, sdkmath.NewInt(escrowDeposit)) + msg := &fibretypes.MsgDepositToEscrow{ + Signer: addr.String(), + Amount: amount, + } + resp, err := txClient.SubmitTx(ctx, []sdk.Msg{msg}, user.SetGasLimit(200_000), user.SetFee(5_000)) + require.NoError(t, err, "depositing to escrow") + require.Equal(t, uint32(0), resp.Code, "deposit tx failed") + require.NoError(t, cctx.WaitForNextBlock()) + + // Sanity: escrow is now visible. + queryClient := fibretypes.NewQueryClient(cctx.GRPCClient) + escrow, err := queryClient.EscrowAccount(ctx, &fibretypes.QueryEscrowAccountRequest{ + Signer: addr.String(), + }) + require.NoError(t, err) + require.True(t, escrow.Found, "escrow account not found after deposit") + require.Equal(t, amount, escrow.EscrowAccount.Balance, "escrow balance mismatch") +} diff --git a/tools/celestia-node-fiber/testing/showcase_test.go b/tools/celestia-node-fiber/testing/showcase_test.go new file mode 100644 index 000000000..5c7ec1962 --- /dev/null +++ b/tools/celestia-node-fiber/testing/showcase_test.go @@ -0,0 +1,159 @@ +//go:build fibre + +package cnfibertest_test + +import ( + "bytes" + "context" + "encoding/hex" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/celestia-node/api/client" + + "github.com/evstack/ev-node/block" + cnfiber "github.com/evstack/ev-node/tools/celestia-node-fiber" + cnfibertest "github.com/evstack/ev-node/tools/celestia-node-fiber/testing" +) + +const ( + // showcaseBlobs is how many distinct-payload blobs the test pushes + // through the adapter. Large enough to surface ordering and + // duplicate-handling bugs, small enough to keep wall time reasonable. + showcaseBlobs = 10 + + // listenEventsTimeout bounds the collection window for N BlobEvents. + // The async MsgPayForFibre broadcasts serialize on the TxClient + // mutex, so the dominant cost is block_time_per_tx × N. 60s gives + // ~6s per blob which is generous for a 50ms-precommit testnode. + listenEventsTimeout = 60 * time.Second +) + +// TestShowcase spins up a single-validator Celestia chain with an +// in-process Fibre server, a celestia-node bridge, and drives the full +// adapter surface: Listen subscribes first, Upload pushes N distinct +// blobs, the async MsgPayForFibre settlements commit on-chain, the +// subscription delivers an event per blob, and Download round-trips each +// payload byte-for-byte. +func TestShowcase(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + t.Cleanup(cancel) + + network := cnfibertest.StartNetwork(t, ctx) + bridge := cnfibertest.StartBridge(t, ctx, network) + + adapter, err := cnfiber.New(ctx, cnfiber.Config{ + Client: client.Config{ + ReadConfig: client.ReadConfig{ + BridgeDAAddr: bridge.RPCAddr(), + DAAuthToken: bridge.AdminToken, + EnableDATLS: false, + }, + SubmitConfig: client.SubmitConfig{ + DefaultKeyName: network.ClientAccount, + Network: "private", + CoreGRPCConfig: client.CoreGRPCConfig{ + Addr: network.ConsensusGRPCAddr(), + }, + }, + }, + }, network.Consensus.Keyring) + require.NoError(t, err, "constructing adapter") + t.Cleanup(func() { _ = adapter.Close() }) + + // Namespace: 10 bytes of v0 ID. Uploads share a namespace so Listen + // sees every settlement event in one stream. + namespace := bytes.Repeat([]byte{0xfe}, 10) + + // Subscribe BEFORE uploading so we don't race against settlements. + events, err := adapter.Listen(ctx, namespace) + require.NoError(t, err, "starting Listen subscription") + + // Build N distinctive payloads so byte-swapping or off-by-one BlobID + // reconstruction would be caught by the download diff below. + payloads := make([][]byte, showcaseBlobs) + for i := range payloads { + payloads[i] = []byte(fmt.Sprintf( + "showcase blob %02d — payload=%s", + i, bytes.Repeat([]byte{'a' + byte(i)}, 8+i), + )) + } + + // expected maps hex(BlobID) → original payload; populated by Upload + // and consulted by Listen + Download to catch misrouted bytes. + expected := make(map[string][]byte, showcaseBlobs) + ids := make([]block.FiberBlobID, showcaseBlobs) + for i, payload := range payloads { + res, err := adapter.Upload(ctx, namespace, payload) + require.NoError(t, err, "adapter.Upload #%d", i) + require.NotEmpty(t, res.BlobID, "upload #%d returned empty BlobID", i) + key := hex.EncodeToString(res.BlobID) + _, dup := expected[key] + require.False(t, dup, "adapter.Upload #%d returned a duplicate BlobID %s", i, key) + expected[key] = payload + ids[i] = res.BlobID + t.Logf("upload[%02d] ok: blob_id=%s size=%d", i, key, len(payload)) + } + + // Drain events until every Upload has a matching BlobEvent. Order is + // not guaranteed — multiple settlements can land in the same block. + seen := make(map[string]block.FiberBlobEvent, showcaseBlobs) + deadline := time.After(listenEventsTimeout) + for len(seen) < showcaseBlobs { + select { + case ev, ok := <-events: + require.True(t, ok, + "Listen channel closed with only %d/%d events", len(seen), showcaseBlobs) + key := hex.EncodeToString(ev.BlobID) + if _, want := expected[key]; !want { + t.Logf("listen: ignoring unexpected BlobID %s", key) + continue + } + if prev, dup := seen[key]; dup { + t.Fatalf("listen: duplicate event for BlobID %s (prev height=%d new height=%d)", + key, prev.Height, ev.Height) + } + seen[key] = ev + t.Logf("listen[%02d/%02d] ok: blob_id=%s height=%d data_size=%d", + len(seen), showcaseBlobs, key, ev.Height, ev.DataSize) + case <-deadline: + missing := make([]string, 0, showcaseBlobs-len(seen)) + for k := range expected { + if _, got := seen[k]; !got { + missing = append(missing, k) + } + } + t.Fatalf("timed out after %s: got %d/%d events; missing=%v", + listenEventsTimeout, len(seen), showcaseBlobs, missing) + } + } + + // Every event must carry a real block height. DataSize is + // intentionally not asserted against the payload length: the v2 share + // contains only (fibre_blob_version + commitment), not the original + // payload bytes, so b.DataLen() — what the adapter reports today — is + // always the fixed share-data size, not len(payload). Fixing that + // needs a PaymentPromise chain lookup; tracked as a TODO on the + // adapter's Listen path. + for key, ev := range seen { + require.Greater(t, ev.Height, uint64(0), + "BlobEvent %s must carry a real block height", key) + require.Greater(t, ev.DataSize, uint64(0), + "BlobEvent %s must report a non-zero DataSize", key) + } + + // Round-trip every blob through Download and diff bytes. Walking + // ids (upload order) rather than seen (map iteration order) keeps + // log output deterministic. + for i, id := range ids { + key := hex.EncodeToString(id) + got, err := adapter.Download(ctx, id) + require.NoError(t, err, "adapter.Download #%d (%s)", i, key) + require.Equal(t, expected[key], got, + "Download #%d (%s) bytes mismatch", i, key) + t.Logf("download[%02d] ok: blob_id=%s bytes=%d", i, key, len(got)) + } +}