Skip to content

Commit

Permalink
[bee #48, #49, #40]: introduce netstore (#104)
Browse files Browse the repository at this point in the history
* netstore: introduce netstore
  • Loading branch information
acud committed Apr 21, 2020
1 parent 7459b90 commit 052f751
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 17 deletions.
80 changes: 80 additions & 0 deletions pkg/netstore/netstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package netstore

import (
"context"
"errors"

"github.com/ethersphere/bee/pkg/retrieval"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)

type store struct {
storage.Storer

retrieval retrieval.Interface
validators []swarm.ChunkValidator
}

// New returns a new NetStore that wraps a given Storer.
func New(s storage.Storer, r retrieval.Interface, validators ...swarm.ChunkValidator) storage.Storer {
return &store{Storer: s, retrieval: r, validators: validators}
}

// Get retrieves a given chunk address.
// It will request a chunk from the network whenever it cannot be found locally.
func (s *store) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address) (ch swarm.Chunk, err error) {
ch, err = s.Storer.Get(ctx, mode, addr)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
// request from network
data, err := s.retrieval.RetrieveChunk(ctx, addr)
if err != nil {
return nil, err
}

ch = swarm.NewChunk(addr, data)
if err != nil {
return nil, err
}

if !s.valid(ch) {
return nil, storage.ErrInvalidChunk
}

_, err = s.Storer.Put(ctx, storage.ModePutRequest, ch)
if err != nil {
return nil, err
}
return ch, nil
}
return nil, err
}
return ch, nil
}

// Put stores a given chunk in the local storage.
// returns a storage.ErrInvalidChunk error when
// encountering an invalid chunk.
func (s *store) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err error) {
for _, ch := range chs {
if !s.valid(ch) {
return nil, storage.ErrInvalidChunk
}
}
return s.Storer.Put(ctx, mode, chs...)
}

// checks if a particular chunk is valid using the built in validators
func (s *store) valid(ch swarm.Chunk) (ok bool) {
for _, v := range s.validators {
if ok = v.Validate(ch); ok {
return true
}
}
return false
}
116 changes: 116 additions & 0 deletions pkg/netstore/netstore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package netstore_test

import (
"bytes"
"context"
"sync/atomic"
"testing"

"github.com/ethersphere/bee/pkg/netstore"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
)

var chunkData = []byte("mockdata")

type mockValidator struct{}

func (_ mockValidator) Validate(_ swarm.Chunk) bool { return true }

// TestNetstoreRetrieval verifies that a chunk is asked from the network whenever
// it is not found locally
func TestNetstoreRetrieval(t *testing.T) {
retrieve, store, nstore := newRetrievingNetstore()
addr := swarm.MustParseHexAddress("000001")
_, err := nstore.Get(context.Background(), storage.ModeGetRequest, addr)
if err != nil {
t.Fatal(err)
}
if !retrieve.called {
t.Fatal("retrieve request not issued")
}
if retrieve.callCount != 1 {
t.Fatalf("call count %d", retrieve.callCount)
}
if !retrieve.addr.Equal(addr) {
t.Fatalf("addresses not equal. got %s want %s", retrieve.addr, addr)
}

// store should have the chunk now
d, err := store.Get(context.Background(), storage.ModeGetRequest, addr)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(d.Data(), chunkData) {
t.Fatal("chunk data not equal to expected data")
}

// check that the second call does not result in another retrieve request
d, err = nstore.Get(context.Background(), storage.ModeGetRequest, addr)
if err != nil {
t.Fatal(err)
}

if retrieve.callCount != 1 {
t.Fatalf("call count %d", retrieve.callCount)
}
if !bytes.Equal(d.Data(), chunkData) {
t.Fatal("chunk data not equal to expected data")
}

}

// TestNetstoreNoRetrieval verifies that a chunk is not requested from the network
// whenever it is found locally.
func TestNetstoreNoRetrieval(t *testing.T) {
retrieve, store, nstore := newRetrievingNetstore()
addr := swarm.MustParseHexAddress("000001")

// store should have the chunk in advance
_, err := store.Put(context.Background(), storage.ModePutUpload, swarm.NewChunk(addr, chunkData))
if err != nil {
t.Fatal(err)
}

c, err := nstore.Get(context.Background(), storage.ModeGetRequest, addr)
if err != nil {
t.Fatal(err)
}
if retrieve.called {
t.Fatal("retrieve request issued but shouldn't")
}
if retrieve.callCount != 0 {
t.Fatalf("call count %d", retrieve.callCount)
}
if !bytes.Equal(c.Data(), chunkData) {
t.Fatal("chunk data mismatch")
}
}

// returns a mock retrieval protocol, a mock local storage and a netstore
func newRetrievingNetstore() (ret *retrievalMock, mockStore storage.Storer, ns storage.Storer) {
retrieve := &retrievalMock{}
store := mock.NewStorer()
nstore := netstore.New(store, retrieve, mockValidator{})

return retrieve, store, nstore
}

type retrievalMock struct {
called bool
callCount int32
addr swarm.Address
}

func (r *retrievalMock) RetrieveChunk(ctx context.Context, addr swarm.Address) (data []byte, err error) {
r.called = true
atomic.AddInt32(&r.callCount, 1)
r.addr = addr
return chunkData, nil
}
14 changes: 13 additions & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@ import (
"github.com/ethersphere/bee/pkg/localstore"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/metrics"
"github.com/ethersphere/bee/pkg/netstore"
"github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/retrieval"
"github.com/ethersphere/bee/pkg/statestore/leveldb"
mockinmem "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology/full"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/ethersphere/bee/pkg/validator"
ma "github.com/multiformats/go-multiaddr"
)

Expand Down Expand Up @@ -190,12 +193,21 @@ func NewBee(o Options) (*Bee, error) {
}
b.localstoreCloser = storer

retrieve := retrieval.New(retrieval.Options{
Streamer: p2ps,
ChunkPeerer: topologyDriver,
Storer: storer,
Logger: logger,
})

ns := netstore.New(storer, retrieve, validator.NewContentAddressValidator())

var apiService api.Service
if o.APIAddr != "" {
// API server
apiService = api.New(api.Options{
Pingpong: pingPong,
Storer: storer,
Storer: ns,
Logger: logger,
Tracer: tracer,
})
Expand Down
21 changes: 10 additions & 11 deletions pkg/retrieval/pb/retrieval.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions pkg/retrieval/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ const (
streamName = "retrieval"
)

var _ Interface = (*Service)(nil)

type Interface interface {
RetrieveChunk(ctx context.Context, addr swarm.Address) (data []byte, err error)
}

type Service struct {
streamer p2p.Streamer
peerSuggester topology.ClosestPeerer
Expand All @@ -37,9 +43,6 @@ type Options struct {
Logger logging.Logger
}

type Storer interface {
}

func New(o Options) *Service {
return &Service{
streamer: o.Streamer,
Expand Down
4 changes: 2 additions & 2 deletions pkg/validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"encoding/binary"
"hash"

"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bmt"
"github.com/ethersphere/bee/pkg/logging"
bmtlegacy "github.com/ethersphere/bmt/legacy"
"golang.org/x/crypto/sha3"
)
Expand All @@ -30,7 +30,7 @@ type ContentAddressValidator struct {
}

// New constructs a new ContentAddressValidator
func NewContentAddressValidator() *ContentAddressValidator {
func NewContentAddressValidator() swarm.ChunkValidator {
p := bmtlegacy.NewTreePool(hashFunc, swarm.SectionSize, bmtlegacy.PoolSize)

return &ContentAddressValidator{
Expand Down

0 comments on commit 052f751

Please sign in to comment.