Skip to content

Commit

Permalink
feat: blobsub
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd committed Jun 28, 2024
1 parent 1fa5aa6 commit d237fcb
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 16 deletions.
44 changes: 44 additions & 0 deletions blob/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,20 @@ type Service struct {
shareGetter share.Getter
// headerGetter fetches header by the provided height
headerGetter func(context.Context, uint64) (*header.ExtendedHeader, error)
headerSub func(ctx context.Context) (<-chan *header.ExtendedHeader, error)
}

func NewService(
submitter Submitter,
getter share.Getter,
headerGetter func(context.Context, uint64) (*header.ExtendedHeader, error),
headerSub func(ctx context.Context) (<-chan *header.ExtendedHeader, error),
) *Service {
return &Service{
blobSubmitter: submitter,
shareGetter: getter,
headerGetter: headerGetter,
headerSub: headerSub,
}
}

Expand All @@ -86,6 +89,47 @@ func DefaultSubmitOptions() *SubmitOptions {
}
}

type BlobsubResponse struct {
blobs []*Blob
height uint64
}

func (s *Service) Subscribe(ctx context.Context, ns share.Namespace) (<-chan *BlobsubResponse, error) {
headerCh, err := s.headerSub(ctx)
if err != nil {
return nil, err
}

blobCh := make(chan *BlobsubResponse)
go func() {
defer close(blobCh)

for {
select {
case header, ok := <-headerCh:
if !ok {
log.Errorw("header channel closed for blobsub", "namespace", ns.ID())
return
}
blobs, err := s.GetAll(ctx, header.Height(), []share.Namespace{ns})
if err != nil {
log.Errorw("failed to get blobs", "height", header.Height(), "namespace", ns.ID(), "err", err)
continue
}

select {
case <-ctx.Done():
return
case blobCh <- &BlobsubResponse{blobs: blobs, height: header.Height()}:
}
case <-ctx.Done():
return
}
}
}()
return blobCh, nil
}

// Submit sends PFB transaction and reports the height at which it was included.
// Allows sending multiple Blobs atomically synchronously.
// Uses default wallet registered on the Node.
Expand Down
28 changes: 20 additions & 8 deletions blob/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,10 @@ func TestService_GetSingleBlobWithoutPadding(t *testing.T) {
fn := func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) {
return headerStore.GetByHeight(ctx, height)
}
service := NewService(nil, getters.NewIPLDGetter(bs), fn)
fn2 := func(ctx context.Context) (<-chan *header.ExtendedHeader, error) {
return nil, fmt.Errorf("not implemented")
}
service := NewService(nil, getters.NewIPLDGetter(bs), fn, fn2)

newBlob, err := service.Get(ctx, 1, blobs[1].Namespace(), blobs[1].Commitment)
require.NoError(t, err)
Expand Down Expand Up @@ -473,8 +476,10 @@ func TestService_GetAllWithoutPadding(t *testing.T) {
fn := func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) {
return h, nil
}

service := NewService(nil, getters.NewIPLDGetter(bs), fn)
fn2 := func(ctx context.Context) (<-chan *header.ExtendedHeader, error) {
return nil, fmt.Errorf("not implemented")
}
service := NewService(nil, getters.NewIPLDGetter(bs), fn, fn2)

newBlobs, err := service.GetAll(ctx, 1, []share.Namespace{blobs[0].Namespace()})
require.NoError(t, err)
Expand Down Expand Up @@ -520,8 +525,10 @@ func TestAllPaddingSharesInEDS(t *testing.T) {
fn := func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) {
return h, nil
}

service := NewService(nil, getters.NewIPLDGetter(bs), fn)
fn2 := func(ctx context.Context) (<-chan *header.ExtendedHeader, error) {
return nil, fmt.Errorf("not implemented")
}
service := NewService(nil, getters.NewIPLDGetter(bs), fn, fn2)
_, err = service.GetAll(ctx, 1, []share.Namespace{nid})
require.Error(t, err)
}
Expand Down Expand Up @@ -562,8 +569,10 @@ func TestSkipPaddingsAndRetrieveBlob(t *testing.T) {
fn := func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) {
return h, nil
}

service := NewService(nil, getters.NewIPLDGetter(bs), fn)
fn2 := func(ctx context.Context) (<-chan *header.ExtendedHeader, error) {
return nil, fmt.Errorf("not implemented")
}
service := NewService(nil, getters.NewIPLDGetter(bs), fn, fn2)
newBlob, err := service.GetAll(ctx, 1, []share.Namespace{nid})
require.NoError(t, err)
require.Len(t, newBlob, 1)
Expand Down Expand Up @@ -614,5 +623,8 @@ func createService(ctx context.Context, t testing.TB, blobs []*Blob) *Service {
fn := func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) {
return headerStore.GetByHeight(ctx, height)
}
return NewService(nil, getters.NewIPLDGetter(bs), fn)
fn2 := func(ctx context.Context) (<-chan *header.ExtendedHeader, error) {
return nil, fmt.Errorf("not implemented")
}
return NewService(nil, getters.NewIPLDGetter(bs), fn, fn2)
}
20 changes: 15 additions & 5 deletions nodebuilder/blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,18 @@ type Module interface {
// Included checks whether a blob's given commitment(Merkle subtree root) is included at
// given height and under the namespace.
Included(_ context.Context, height uint64, _ share.Namespace, _ *blob.Proof, _ blob.Commitment) (bool, error)
// Subscribe to published blobs from the given namespace as they are included.
Subscribe(_ context.Context, _ share.Namespace) (<-chan *blob.BlobsubResponse, error)
}

type API struct {
Internal struct {
Submit func(context.Context, []*blob.Blob, blob.GasPrice) (uint64, error) `perm:"write"`
Get func(context.Context, uint64, share.Namespace, blob.Commitment) (*blob.Blob, error) `perm:"read"`
GetAll func(context.Context, uint64, []share.Namespace) ([]*blob.Blob, error) `perm:"read"`
GetProof func(context.Context, uint64, share.Namespace, blob.Commitment) (*blob.Proof, error) `perm:"read"`
Included func(context.Context, uint64, share.Namespace, *blob.Proof, blob.Commitment) (bool, error) `perm:"read"`
Submit func(context.Context, []*blob.Blob, blob.GasPrice) (uint64, error) `perm:"write"`
Get func(context.Context, uint64, share.Namespace, blob.Commitment) (*blob.Blob, error) `perm:"read"`
GetAll func(context.Context, uint64, []share.Namespace) ([]*blob.Blob, error) `perm:"read"`
GetProof func(context.Context, uint64, share.Namespace, blob.Commitment) (*blob.Proof, error) `perm:"read"`
Included func(context.Context, uint64, share.Namespace, *blob.Proof, blob.Commitment) (bool, error) `perm:"read"`
Subscribe func(context.Context, share.Namespace) (<-chan *blob.BlobsubResponse, error) `perm:"read"`
}
}

Expand Down Expand Up @@ -73,3 +76,10 @@ func (api *API) Included(
) (bool, error) {
return api.Internal.Included(ctx, height, namespace, proof, commitment)
}

func (api *API) Subscribe(
ctx context.Context,
namespace share.Namespace,
) (<-chan *blob.BlobsubResponse, error) {
return api.Internal.Subscribe(ctx, namespace)
}
14 changes: 11 additions & 3 deletions nodebuilder/blob/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@ func ConstructModule() fx.Option {
fx.Provide(
func(service headerService.Module) func(context.Context, uint64) (*header.ExtendedHeader, error) {
return service.GetByHeight
}),
},
),
fx.Provide(
func(service headerService.Module) func(context.Context) (<-chan *header.ExtendedHeader, error) {
return service.Subscribe
},
),
fx.Provide(func(
state state.Module,
sGetter share.Getter,
getByHeightFn func(context.Context, uint64) (*header.ExtendedHeader, error),
subscribeFn func(context.Context) (<-chan *header.ExtendedHeader, error),
) Module {
return blob.NewService(state, sGetter, getByHeightFn)
}))
return blob.NewService(state, sGetter, getByHeightFn, subscribeFn)
}),
)
}

0 comments on commit d237fcb

Please sign in to comment.