Skip to content

Commit

Permalink
fix: kv-batch-bytes issue, update-feed-chunk-exists. closes #472, #473
Browse files Browse the repository at this point in the history
  • Loading branch information
asabya committed Apr 14, 2023
1 parent 6fe9138 commit cc7455c
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 6 deletions.
11 changes: 8 additions & 3 deletions pkg/collection/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (b *Batch) PutNumber(key float64, refValue []byte, apnd, memory bool) error
}

// Put creates an index entry given a key string and value.
func (b *Batch) Put(key string, refValue []byte, apnd, memory bool) error {
func (b *Batch) Put(key string, value []byte, apnd, memory bool) error {
if b.idx.isReadOnlyFeed() { // skipcq: TCV-001
return ErrReadOnlyIndex
}
Expand All @@ -75,8 +75,14 @@ func (b *Batch) Put(key string, refValue []byte, apnd, memory bool) error {
return ErrKVKeyNotANumber
}
stringKey = fmt.Sprintf("%020d", i)
} else if b.idx.indexType == BytesIndex {
ref, err := b.idx.client.UploadBlob(value, 0, true)
if err != nil { // skipcq: TCV-001
return err
}
value = ref
}
return b.idx.addOrUpdateStringEntry(ctx, b.memDb, stringKey, b.idx.indexType, refValue, memory, apnd)
return b.idx.addOrUpdateStringEntry(ctx, b.memDb, stringKey, b.idx.indexType, value, memory, apnd)
}

// Get extracts an index value from an index given a key.
Expand Down Expand Up @@ -192,7 +198,6 @@ func (b *Batch) mergeAndWriteManifest(diskManifest, memManifest *Manifest) (*Man
return nil, err
}
dirtyEntry.Manifest = nil
fmt.Println(atomic.LoadUint64(&b.storageCount))
}
}
diskManifest.Mutable = memManifest.Mutable
Expand Down
11 changes: 9 additions & 2 deletions pkg/collection/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"net/http"
"runtime"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -310,11 +311,17 @@ func (idx *Index) storeManifest(manifest *Manifest, encryptionPassword string) e
idx.logger.Errorf("uploadBlob failed in storeManifest : %s", err.Error())
return ErrManifestCreate
}

topic := utils.HashString(manifest.Name)
_, err = idx.feed.CreateFeed(idx.user, topic, ref, []byte(encryptionPassword))
if err != nil { // skipcq: TCV-001
return ErrManifestCreate
if strings.Contains(err.Error(), "chunk already exists") {
_, err = idx.feed.UpdateFeed(idx.user, topic, ref, []byte(encryptionPassword))
if err != nil { // skipcq: TCV-001
return ErrManifestCreate
}
} else {
return ErrManifestCreate
}
}
return nil
}
Expand Down
21 changes: 20 additions & 1 deletion pkg/feed/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package feed
import (
"context"
"fmt"
"strings"
"time"

"github.com/ethersphere/bee/pkg/crypto"
Expand All @@ -36,6 +37,8 @@ import (
const (
maxuint64 = ^uint64(0)
idLength = 32

maxUpdateRetry = 3
)

var (
Expand Down Expand Up @@ -290,6 +293,8 @@ func (a *API) UpdateFeed(user utils.Address, topic, data, encryptionPassword []b
return nil, err
}
}
retries := 0
retry:
ctx := context.Background()
f := new(Feed)
f.User = user
Expand Down Expand Up @@ -341,7 +346,21 @@ func (a *API) UpdateFeed(user utils.Address, topic, data, encryptionPassword []b
if err != nil { // skipcq: TCV-001
return nil, err
}
return a.handler.update(id, user.ToBytes(), signature, ch.Data())

address, err := a.handler.update(id, user.ToBytes(), signature, ch.Data())
if err != nil {
// updating same feed in the same second will lead to "chunk already exists" error.
// This will wait for 1 second and retry the update maxUpdateRetry times.
// It is a very dirty fix for this issue. We should find a better way to handle this.
if strings.Contains(err.Error(), "chunk already exists") && retries < maxUpdateRetry {
retries++
<-time.After(1 * time.Second)
goto retry
}
return nil, err
}

return address, nil
}

// DeleteFeed deleted the feed by updating with no data inside the SOC chunk.
Expand Down

0 comments on commit cc7455c

Please sign in to comment.