Skip to content

Commit

Permalink
feat: postage stamp indexing (#1625)
Browse files Browse the repository at this point in the history
This PR adds postage stamp indexing to enforce overissuance 
on the storer node side. This allows for slot overriding and reusage.
It also adds immutable stamps functionality. 

Co-authored-by: Esad Akar <esadakar@gmail.com>
  • Loading branch information
zelig and istae committed Jun 11, 2021
1 parent 6d2b270 commit 3709726
Show file tree
Hide file tree
Showing 49 changed files with 866 additions and 266 deletions.
2 changes: 1 addition & 1 deletion .github/patches/listener.patch
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
166c166
168c168
< chainUpdateInterval := (time.Duration(l.blockTime) * time.Second) / 2
---
> chainUpdateInterval := (time.Duration(l.blockTime) * time.Second) / 5
2 changes: 1 addition & 1 deletion .github/patches/postagecontract.patch
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
25c25
26c26
< BucketDepth = uint8(16)
---
> BucketDepth = uint8(10)
2 changes: 1 addition & 1 deletion .github/workflows/beekeeper.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
env:
REPLICA: 3
RUN_TYPE: "PR RUN"
SETUP_CONTRACT_IMAGE_TAG: "0.1.0"
SETUP_CONTRACT_IMAGE_TAG: "0.2.0"
runs-on: ubuntu-latest
steps:
- name: Checkout
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/coreos/go-semver v0.3.0
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/ethereum/go-ethereum v1.10.3
github.com/ethersphere/go-storage-incentives-abi v0.2.0
github.com/ethersphere/go-storage-incentives-abi v0.3.0
github.com/ethersphere/go-sw3-abi v0.4.0
github.com/ethersphere/langos v1.0.0
github.com/gogo/protobuf v1.3.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/ethereum/go-ethereum v1.10.3 h1:SEYOYARvbWnoDl1hOSks3ZJQpRiiRJe8ubaQGJQwq0s=
github.com/ethereum/go-ethereum v1.10.3/go.mod h1:99onQmSd1GRGOziyGldI41YQb7EESX3Q4H41IfJgIQQ=
github.com/ethersphere/go-storage-incentives-abi v0.2.0 h1:TZ15auzGsdzuzUR2b5dLAMpFixorb4uKUDGF0QnVmmU=
github.com/ethersphere/go-storage-incentives-abi v0.2.0/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc=
github.com/ethersphere/go-storage-incentives-abi v0.3.0 h1:Y1OyNMI1JjqOmVJlgzR70PPe2Czuh4BglCV/nD3UHIA=
github.com/ethersphere/go-storage-incentives-abi v0.3.0/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc=
github.com/ethersphere/go-sw3-abi v0.4.0 h1:T3ANY+ktWrPAwe2U0tZi+DILpkHzto5ym/XwV/Bbz8g=
github.com/ethersphere/go-sw3-abi v0.4.0/go.mod h1:BmpsvJ8idQZdYEtWnvxA8POYQ8Rl/NhyCdF0zLMOOJU=
github.com/ethersphere/langos v1.0.0 h1:NBtNKzXTTRSue95uOlzPN4py7Aofs0xWPzyj4AI1Vcc=
Expand Down
12 changes: 9 additions & 3 deletions pkg/api/postage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ import (
)

const (
gasPriceHeader = "Gas-Price"
errBadGasPrice = "bad gas price"
gasPriceHeader = "Gas-Price"
immutableHeader = "Immutable"
errBadGasPrice = "bad gas price"
)

type batchID []byte
Expand Down Expand Up @@ -65,7 +66,12 @@ func (s *server) postageCreateHandler(w http.ResponseWriter, r *http.Request) {
ctx = sctx.SetGasPrice(ctx, p)
}

batchID, err := s.postageContract.CreateBatch(ctx, amount, uint8(depth), label)
var immutable bool
if val, ok := r.Header[immutableHeader]; ok {
immutable, _ = strconv.ParseBool(val[0])
}

batchID, err := s.postageContract.CreateBatch(ctx, amount, uint8(depth), immutable, label)
if err != nil {
if errors.Is(err, postagecontract.ErrInsufficientFunds) {
s.logger.Debugf("create batch: out of funds: %v", err)
Expand Down
36 changes: 31 additions & 5 deletions pkg/api/postage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestPostageCreateStamp(t *testing.T) {

t.Run("ok", func(t *testing.T) {
contract := contractMock.New(
contractMock.WithCreateBatchFunc(func(ctx context.Context, ib *big.Int, d uint8, l string) ([]byte, error) {
contractMock.WithCreateBatchFunc(func(ctx context.Context, ib *big.Int, d uint8, i bool, l string) ([]byte, error) {
if ib.Cmp(big.NewInt(initialBalance)) != 0 {
return nil, fmt.Errorf("called with wrong initial balance. wanted %d, got %d", initialBalance, ib)
}
Expand All @@ -60,7 +60,7 @@ func TestPostageCreateStamp(t *testing.T) {

t.Run("with-custom-gas", func(t *testing.T) {
contract := contractMock.New(
contractMock.WithCreateBatchFunc(func(ctx context.Context, ib *big.Int, d uint8, l string) ([]byte, error) {
contractMock.WithCreateBatchFunc(func(ctx context.Context, ib *big.Int, d uint8, i bool, l string) ([]byte, error) {
if ib.Cmp(big.NewInt(initialBalance)) != 0 {
return nil, fmt.Errorf("called with wrong initial balance. wanted %d, got %d", initialBalance, ib)
}
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestPostageCreateStamp(t *testing.T) {

t.Run("with-error", func(t *testing.T) {
contract := contractMock.New(
contractMock.WithCreateBatchFunc(func(ctx context.Context, ib *big.Int, d uint8, l string) ([]byte, error) {
contractMock.WithCreateBatchFunc(func(ctx context.Context, ib *big.Int, d uint8, i bool, l string) ([]byte, error) {
return nil, errors.New("err")
}),
)
Expand All @@ -108,7 +108,7 @@ func TestPostageCreateStamp(t *testing.T) {

t.Run("out-of-funds", func(t *testing.T) {
contract := contractMock.New(
contractMock.WithCreateBatchFunc(func(ctx context.Context, ib *big.Int, d uint8, l string) ([]byte, error) {
contractMock.WithCreateBatchFunc(func(ctx context.Context, ib *big.Int, d uint8, i bool, l string) ([]byte, error) {
return nil, postagecontract.ErrInsufficientFunds
}),
)
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestPostageCreateStamp(t *testing.T) {

t.Run("depth less than bucket depth", func(t *testing.T) {
contract := contractMock.New(
contractMock.WithCreateBatchFunc(func(ctx context.Context, ib *big.Int, d uint8, l string) ([]byte, error) {
contractMock.WithCreateBatchFunc(func(ctx context.Context, ib *big.Int, d uint8, i bool, l string) ([]byte, error) {
return nil, postagecontract.ErrInvalidDepth
}),
)
Expand All @@ -163,6 +163,32 @@ func TestPostageCreateStamp(t *testing.T) {
}),
)
})

t.Run("immutable header", func(t *testing.T) {

var immutable bool
contract := contractMock.New(
contractMock.WithCreateBatchFunc(func(ctx context.Context, _ *big.Int, _ uint8, i bool, _ string) ([]byte, error) {
immutable = i
return batchID, nil
}),
)
client, _, _ := newTestServer(t, testServerOptions{
PostageContract: contract,
})

jsonhttptest.Request(t, client, http.MethodPost, "/stamps/1000/24", http.StatusCreated,
jsonhttptest.WithRequestHeader("Immutable", "true"),
jsonhttptest.WithExpectedJSONResponse(&api.PostageCreateResponse{
BatchID: batchID,
}),
)

if !immutable {
t.Fatalf("want true, got %v", immutable)
}

})
}

func TestPostageGetStamps(t *testing.T) {
Expand Down
8 changes: 7 additions & 1 deletion pkg/localstore/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
// about exported data format version
exportVersionFilename = ".swarm-export-version"
// current export format version
currentExportVersion = "2"
currentExportVersion = "3"
)

// Export writes a tar structured data to the writer of
Expand Down Expand Up @@ -71,6 +71,12 @@ func (db *DB) Export(w io.Writer) (count int64, err error) {
if _, err := tw.Write(item.BatchID); err != nil {
return false, err
}
if _, err := tw.Write(item.Index); err != nil {
return false, err
}
if _, err := tw.Write(item.Timestamp); err != nil {
return false, err
}
if _, err := tw.Write(item.Sig); err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/localstore/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestExportImport(t *testing.T) {
}
got := append(stamp, ch.Data()...)
if !bytes.Equal(got, want) {
t.Fatalf("chunk %s: got stamp+data %x, want %x", addr, got, want)
t.Fatalf("chunk %s: got stamp+data %x, want %x", addr, got[:256], want[:256])
}
}
}
5 changes: 5 additions & 0 deletions pkg/localstore/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
if err != nil {
return 0, false, err
}
err = db.postageIndexIndex.DeleteInBatch(batch, item)
if err != nil {
return 0, false, err
}

}
if gcSize-collectedCount > target {
done = false
Expand Down
10 changes: 10 additions & 0 deletions pkg/localstore/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ func testDBCollectGarbageWorker(t *testing.T) {

t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)))

t.Run("postage index count", newItemsCountTest(db.postageIndexIndex, int(gcTarget)))

t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget)))

t.Run("gc size", newIndexGCSizeTest(db))
Expand Down Expand Up @@ -231,6 +233,8 @@ func TestPinGC(t *testing.T) {

t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)+pinChunksCount))

t.Run("postage index count", newItemsCountTest(db.postageIndexIndex, int(gcTarget)+pinChunksCount))

t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget)))

t.Run("gc size", newIndexGCSizeTest(db))
Expand Down Expand Up @@ -310,6 +314,8 @@ func TestGCAfterPin(t *testing.T) {

t.Run("gc index count", newItemsCountTest(db.gcIndex, int(0)))

t.Run("postage index count", newItemsCountTest(db.postageIndexIndex, chunkCount))

for _, hash := range pinAddrs {
_, err := db.Get(context.Background(), storage.ModeGetRequest, hash)
if err != nil {
Expand Down Expand Up @@ -436,6 +442,8 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {

t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)))

t.Run("postage index count", newItemsCountTest(db.postageIndexIndex, int(gcTarget)))

t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget)))

t.Run("gc size", newIndexGCSizeTest(db))
Expand Down Expand Up @@ -898,6 +906,8 @@ func TestGC_NoEvictDirty(t *testing.T) {

t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)))

t.Run("postage index count", newItemsCountTest(db.postageIndexIndex, int(gcTarget)))

t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget)))

t.Run("gc size", newIndexGCSizeTest(db))
Expand Down
67 changes: 53 additions & 14 deletions pkg/localstore/localstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,12 @@ type DB struct {
// postage chunks index
postageChunksIndex shed.Index

// postage chunks index
// postage radius index
postageRadiusIndex shed.Index

// postage index index
postageIndexIndex shed.Index

// field that stores number of intems in gc index
gcSize shed.Uint64Field

Expand Down Expand Up @@ -258,7 +261,7 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB

// Index storing actual chunk address, data and bin id.
headerSize := 16 + postage.StampSize
db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|BinID|BatchID|Sig|Data", shed.IndexFuncs{
db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|BinID|BatchID|BatchIndex|Sig|Data", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
Expand All @@ -270,7 +273,7 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
b := make([]byte, headerSize)
binary.BigEndian.PutUint64(b[:8], fields.BinID)
binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
stamp, err := postage.NewStamp(fields.BatchID, fields.Sig).MarshalBinary()
stamp, err := postage.NewStamp(fields.BatchID, fields.Index, fields.Timestamp, fields.Sig).MarshalBinary()
if err != nil {
return nil, err
}
Expand All @@ -286,6 +289,8 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
return e, err
}
e.BatchID = stamp.BatchID()
e.Index = stamp.Index()
e.Timestamp = stamp.Timestamp()
e.Sig = stamp.Sig()
e.Data = value[headerSize:]
return e, nil
Expand Down Expand Up @@ -382,7 +387,7 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
// create a push syncing triggers used by SubscribePush function
db.pushTriggers = make([]chan<- struct{}, 0)
// gc index for removable chunk ordered by ascending last access time
db.gcIndex, err = db.shed.NewIndex("AccessTimestamp|BinID|Hash->BatchID", shed.IndexFuncs{
db.gcIndex, err = db.shed.NewIndex("AccessTimestamp|BinID|Hash->BatchID|BatchIndex", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
b := make([]byte, 16, 16+len(fields.Address))
binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp))
Expand All @@ -397,14 +402,16 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
value = make([]byte, 32)
value = make([]byte, 40)
copy(value, fields.BatchID)
copy(value[32:], fields.Index)
return value, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e = keyItem
e.BatchID = make([]byte, 32)
copy(e.BatchID, value)
copy(e.BatchID, value[:32])
e.Index = make([]byte, postage.IndexSize)
copy(e.Index, value[32:])
return e, nil
},
})
Expand Down Expand Up @@ -481,6 +488,34 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
return nil, err
}

db.postageIndexIndex, err = db.shed.NewIndex("BatchID|BatchIndex->Hash|Timestamp", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
key = make([]byte, 40)
copy(key[:32], fields.BatchID)
copy(key[32:40], fields.Index)
return key, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.BatchID = key[:32]
e.Index = key[32:40]
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
value = make([]byte, 40)
copy(value, fields.Address)
copy(value[32:], fields.Timestamp)
return value, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.Address = value[:32]
e.Timestamp = value[32:]
return e, nil
},
})
if err != nil {
return nil, err
}

// start garbage collection worker
go db.collectGarbageWorker()
return db, nil
Expand Down Expand Up @@ -553,13 +588,17 @@ func (db *DB) DebugIndices() (indexInfo map[string]int, err error) {
// chunkToItem creates new Item with data provided by the Chunk.
func chunkToItem(ch swarm.Chunk) shed.Item {
return shed.Item{
Address: ch.Address().Bytes(),
Data: ch.Data(),
Tag: ch.TagID(),
BatchID: ch.Stamp().BatchID(),
Sig: ch.Stamp().Sig(),
Depth: ch.Depth(),
Radius: ch.Radius(),
Address: ch.Address().Bytes(),
Data: ch.Data(),
Tag: ch.TagID(),
BatchID: ch.Stamp().BatchID(),
Index: ch.Stamp().Index(),
Timestamp: ch.Stamp().Timestamp(),
Sig: ch.Stamp().Sig(),
Depth: ch.Depth(),
Radius: ch.Radius(),
BucketDepth: ch.BucketDepth(),
Immutable: ch.Immutable(),
}
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/localstore/localstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func newPullIndexTest(db *DB, ch swarm.Chunk, binID uint64, wantError error) fun
t.Errorf("got error %v, want %v", err, wantError)
}
if err == nil {
validateItem(t, item, ch.Address().Bytes(), nil, 0, 0, postage.NewStamp(ch.Stamp().BatchID(), nil))
validateItem(t, item, ch.Address().Bytes(), nil, 0, 0, postage.NewStamp(ch.Stamp().BatchID(), nil, nil, nil))
}
}
}
Expand All @@ -318,7 +318,7 @@ func newPushIndexTest(db *DB, ch swarm.Chunk, storeTimestamp int64, wantError er
t.Errorf("got error %v, want %v", err, wantError)
}
if err == nil {
validateItem(t, item, ch.Address().Bytes(), nil, storeTimestamp, 0, postage.NewStamp(nil, nil))
validateItem(t, item, ch.Address().Bytes(), nil, storeTimestamp, 0, postage.NewStamp(nil, nil, nil, nil))
}
}
}
Expand Down Expand Up @@ -356,7 +356,7 @@ func newPinIndexTest(db *DB, chunk swarm.Chunk, wantError error) func(t *testing
t.Errorf("got error %v, want %v", err, wantError)
}
if err == nil {
validateItem(t, item, chunk.Address().Bytes(), nil, 0, 0, postage.NewStamp(nil, nil))
validateItem(t, item, chunk.Address().Bytes(), nil, 0, 0, postage.NewStamp(nil, nil, nil, nil))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/localstore/mode_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (db *DB) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address)
return nil, err
}
return swarm.NewChunk(swarm.NewAddress(out.Address), out.Data).
WithStamp(postage.NewStamp(out.BatchID, out.Sig)), nil
WithStamp(postage.NewStamp(out.BatchID, out.Index, out.Timestamp, out.Sig)), nil
}

// get returns Item from the retrieval index
Expand Down
Loading

0 comments on commit 3709726

Please sign in to comment.