Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package node
import (
"context"
"crypto/ecdsa"
"encoding/hex"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -277,7 +278,7 @@ func NewBee(
}
}

var changedOverlay bool
var changedOverlay, resetReserve bool
if targetNeighborhood != "" {
neighborhood, err := swarm.ParseBitStrAddress(targetNeighborhood)
if err != nil {
Expand All @@ -293,21 +294,19 @@ func NewBee(
}

if nonceExists {
logger.Info("Override nonce %x to %x and clean state for neighborhood %s", nonce, newNonce, targetNeighborhood)
logger.Info("Override nonce and clean state for neighborhood", "old_none", hex.EncodeToString(nonce), "new_nonce", hex.EncodeToString(newNonce))
logger.Warning("you have another 10 seconds to change your mind and kill this process with CTRL-C...")
time.Sleep(10 * time.Second)

dirsToNuke := []string{ioutil.DataPathLocalstore, ioutil.DataPathKademlia}
for _, dir := range dirsToNuke {
err := ioutil.RemoveContent(filepath.Join(o.DataDir, dir))
if err != nil {
return nil, fmt.Errorf("delete %s: %w", dir, err)
}
err := ioutil.RemoveContent(filepath.Join(o.DataDir, ioutil.DataPathKademlia))
if err != nil {
return nil, fmt.Errorf("delete %s: %w", ioutil.DataPathKademlia, err)
}

if err := stateStore.ClearForHopping(); err != nil {
return nil, fmt.Errorf("clearing stateStore %w", err)
}
resetReserve = true
}

swarmAddress = newSwarmAddress
Expand Down Expand Up @@ -736,6 +735,14 @@ func NewBee(
b.localstoreCloser = localStore
evictFn = func(id []byte) error { return localStore.EvictBatch(context.Background(), id) }

if resetReserve {
logger.Warning("resetting the reserve")
err := localStore.ResetReserve(ctx)
if err != nil {
return nil, fmt.Errorf("reset reserve: %w", err)
}
}

actLogic := accesscontrol.NewLogic(session)
accesscontrol := accesscontrol.NewController(actLogic)
b.accesscontrolCloser = accesscontrol
Expand Down
126 changes: 65 additions & 61 deletions pkg/storer/internal/chunkstamp/chunkstamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
)

var (
// errMarshalInvalidChunkStampItemNamespace is returned during marshaling if the namespace is not set.
errMarshalInvalidChunkStampItemNamespace = errors.New("marshal chunkstamp.Item: invalid namespace")
// errMarshalInvalidChunkStampItemScope is returned during marshaling if the scope is not set.
errMarshalInvalidChunkStampItemScope = errors.New("marshal chunkstamp.Item: invalid scope")
// errMarshalInvalidChunkStampAddress is returned during marshaling if the address is zero.
errMarshalInvalidChunkStampItemAddress = errors.New("marshal chunkstamp.item: invalid address")
// errUnmarshalInvalidChunkStampAddress is returned during unmarshaling if the address is not set.
Expand All @@ -29,55 +29,59 @@ var (
errUnmarshalInvalidChunkStampItemSize = errors.New("unmarshal chunkstamp.item: invalid size")
)

var _ storage.Item = (*item)(nil)
var _ storage.Item = (*Item)(nil)

// item is the index used to represent a stamp for a chunk.
// Item is the index used to represent a stamp for a chunk.
//
// Going ahead we will support multiple stamps on chunks. This item will allow
// Going ahead we will support multiple stamps on chunks. This Item will allow
// mapping multiple stamps to a single address. For this reason, the address is
// part of the Namespace and can be used to iterate on all the stamps for this
// address.
type item struct {
namespace []byte // The namespace of other related item.
address swarm.Address
stamp swarm.Stamp
type Item struct {
scope []byte // The scope of other related item.
address swarm.Address
stamp swarm.Stamp
}

// ID implements the storage.Item interface.
func (i *item) ID() string {
func (i *Item) ID() string {
return storageutil.JoinFields(string(i.stamp.BatchID()), string(i.stamp.Index()))
}

// Namespace implements the storage.Item interface.
func (i *item) Namespace() string {
return storageutil.JoinFields("chunkStamp", string(i.namespace), i.address.ByteString())
func (i *Item) Namespace() string {
return storageutil.JoinFields("chunkStamp", string(i.scope), i.address.ByteString())
}

func (i *Item) SetScope(ns []byte) {
i.scope = ns
}

// Marshal implements the storage.Item interface.
// address is not part of the payload which is stored, as address is part of the
// prefix, hence already known before querying this object. This will be reused
// during unmarshaling.
func (i *item) Marshal() ([]byte, error) {
func (i *Item) Marshal() ([]byte, error) {
// The address is not part of the payload, but it is used to create the
// namespace so it is better if we check that the address is correctly
// scope so it is better if we check that the address is correctly
// set here before it is stored in the underlying storage.

switch {
case len(i.namespace) == 0:
return nil, errMarshalInvalidChunkStampItemNamespace
case len(i.scope) == 0:
return nil, errMarshalInvalidChunkStampItemScope
case i.address.IsZero():
return nil, errMarshalInvalidChunkStampItemAddress
case i.stamp == nil:
return nil, errMarshalInvalidChunkStampItemStamp
}

buf := make([]byte, 8+len(i.namespace)+postage.StampSize)
buf := make([]byte, 8+len(i.scope)+postage.StampSize)

l := 0
binary.LittleEndian.PutUint64(buf[l:l+8], uint64(len(i.namespace)))
binary.LittleEndian.PutUint64(buf[l:l+8], uint64(len(i.scope)))
l += 8
copy(buf[l:l+len(i.namespace)], i.namespace)
l += len(i.namespace)
copy(buf[l:l+len(i.scope)], i.scope)
l += len(i.scope)
data, err := i.stamp.MarshalBinary()
if err != nil {
return nil, fmt.Errorf("unable to marshal chunkstamp.item: %w", err)
Expand All @@ -88,7 +92,7 @@ func (i *item) Marshal() ([]byte, error) {
}

// Unmarshal implements the storage.Item interface.
func (i *item) Unmarshal(bytes []byte) error {
func (i *Item) Unmarshal(bytes []byte) error {
if len(bytes) < 8 {
return errUnmarshalInvalidChunkStampItemSize
}
Expand All @@ -102,9 +106,9 @@ func (i *item) Unmarshal(bytes []byte) error {
return errUnmarshalInvalidChunkStampItemAddress
}

ni := &item{address: i.address.Clone()}
ni := &Item{address: i.address.Clone()}
l := 8
ni.namespace = append(make([]byte, 0, nsLen), bytes[l:l+nsLen]...)
ni.scope = append(make([]byte, 0, nsLen), bytes[l:l+nsLen]...)
l += nsLen
stamp := new(postage.Stamp)
if err := stamp.UnmarshalBinary(bytes[l : l+postage.StampSize]); err != nil {
Expand All @@ -119,13 +123,13 @@ func (i *item) Unmarshal(bytes []byte) error {
}

// Clone implements the storage.Item interface.
func (i *item) Clone() storage.Item {
func (i *Item) Clone() storage.Item {
if i == nil {
return nil
}
clone := &item{
namespace: append([]byte(nil), i.namespace...),
address: i.address.Clone(),
clone := &Item{
scope: append([]byte(nil), i.scope...),
address: i.address.Clone(),
}
if i.stamp != nil {
clone.stamp = i.stamp.Clone()
Expand All @@ -134,31 +138,31 @@ func (i *item) Clone() storage.Item {
}

// String implements the storage.Item interface.
func (i item) String() string {
func (i Item) String() string {
return storageutil.JoinFields(i.Namespace(), i.ID())
}

// Load returns first found swarm.Stamp related to the given address.
func Load(s storage.Reader, namespace string, addr swarm.Address) (swarm.Stamp, error) {
return LoadWithBatchID(s, namespace, addr, nil)
func Load(s storage.Reader, scope string, addr swarm.Address) (swarm.Stamp, error) {
return LoadWithBatchID(s, scope, addr, nil)
}

// LoadWithBatchID returns swarm.Stamp related to the given address and batchID.
func LoadWithBatchID(s storage.Reader, namespace string, addr swarm.Address, batchID []byte) (swarm.Stamp, error) {
func LoadWithBatchID(s storage.Reader, scope string, addr swarm.Address, batchID []byte) (swarm.Stamp, error) {
var stamp swarm.Stamp

found := false
err := s.Iterate(
storage.Query{
Factory: func() storage.Item {
return &item{
namespace: []byte(namespace),
address: addr,
return &Item{
scope: []byte(scope),
address: addr,
}
},
},
func(res storage.Result) (bool, error) {
item := res.Entry.(*item)
item := res.Entry.(*Item)
if batchID == nil || bytes.Equal(batchID, item.stamp.BatchID()) {
stamp = item.stamp
found = true
Expand All @@ -178,12 +182,12 @@ func LoadWithBatchID(s storage.Reader, namespace string, addr swarm.Address, bat
}

// Store creates new or updated an existing stamp index
// record related to the given namespace and chunk.
func Store(s storage.IndexStore, namespace string, chunk swarm.Chunk) error {
item := &item{
namespace: []byte(namespace),
address: chunk.Address(),
stamp: chunk.Stamp(),
// record related to the given scope and chunk.
func Store(s storage.IndexStore, scope string, chunk swarm.Chunk) error {
item := &Item{
scope: []byte(scope),
address: chunk.Address(),
stamp: chunk.Stamp(),
}
if err := s.Put(item); err != nil {
return fmt.Errorf("unable to put chunkstamp.item %s: %w", item, err)
Expand All @@ -192,19 +196,19 @@ func Store(s storage.IndexStore, namespace string, chunk swarm.Chunk) error {
}

// DeleteAll removes all swarm.Stamp related to the given address.
func DeleteAll(s storage.IndexStore, namespace string, addr swarm.Address) error {
func DeleteAll(s storage.IndexStore, scope string, addr swarm.Address) error {
var stamps []swarm.Stamp
err := s.Iterate(
storage.Query{
Factory: func() storage.Item {
return &item{
namespace: []byte(namespace),
address: addr,
return &Item{
scope: []byte(scope),
address: addr,
}
},
},
func(res storage.Result) (bool, error) {
stamps = append(stamps, res.Entry.(*item).stamp)
stamps = append(stamps, res.Entry.(*Item).stamp)
return false, nil
},
)
Expand All @@ -216,41 +220,41 @@ func DeleteAll(s storage.IndexStore, namespace string, addr swarm.Address) error
for _, stamp := range stamps {
errs = errors.Join(
errs,
s.Delete(&item{
namespace: []byte(namespace),
address: addr,
stamp: stamp,
s.Delete(&Item{
scope: []byte(scope),
address: addr,
stamp: stamp,
}),
)
}
return errs
}

// Delete removes a stamp associated with an chunk and batchID.
func Delete(s storage.IndexStore, namespace string, addr swarm.Address, batchId []byte) error {
stamp, err := LoadWithBatchID(s, namespace, addr, batchId)
func Delete(s storage.IndexStore, scope string, addr swarm.Address, batchId []byte) error {
stamp, err := LoadWithBatchID(s, scope, addr, batchId)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return nil
}
return err
}
return s.Delete(&item{
namespace: []byte(namespace),
address: addr,
stamp: stamp,
return s.Delete(&Item{
scope: []byte(scope),
address: addr,
stamp: stamp,
})
}

func DeleteWithStamp(
writer storage.Writer,
namespace string,
scope string,
addr swarm.Address,
stamp swarm.Stamp,
) error {
return writer.Delete(&item{
namespace: []byte(namespace),
address: addr,
stamp: stamp,
return writer.Delete(&Item{
scope: []byte(scope),
address: addr,
stamp: stamp,
})
}
6 changes: 2 additions & 4 deletions pkg/storer/internal/chunkstamp/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ package chunkstamp

import "github.com/ethersphere/bee/v2/pkg/swarm"

type Item = item

func (i *Item) WithNamespace(ns string) *Item {
i.namespace = []byte(ns)
i.scope = []byte(ns)
return i
}

Expand All @@ -24,7 +22,7 @@ func (i *Item) WithStamp(stamp swarm.Stamp) *Item {
}

var (
ErrMarshalInvalidChunkStampItemNamespace = errMarshalInvalidChunkStampItemNamespace
ErrMarshalInvalidChunkStampItemNamespace = errMarshalInvalidChunkStampItemScope
ErrMarshalInvalidChunkStampItemAddress = errMarshalInvalidChunkStampItemAddress
ErrUnmarshalInvalidChunkStampItemAddress = errUnmarshalInvalidChunkStampItemAddress
ErrMarshalInvalidChunkStampItemStamp = errMarshalInvalidChunkStampItemStamp
Expand Down
Loading