Permalink
Browse files

Fixes and enhancements for IPFS chunkstore

Bump up noms version number
Change implementation of Has() so that chunks that can't be retrieved within 5 secs return false.
Fix bug in HasMany() to copy variable in for loop that's used in embedded goroutine.
  • Loading branch information...
willhite committed Oct 28, 2017
1 parent 00e7a20 commit 7ebaacf69029809d80c288f1dd47ed18bb985580
Showing with 16 additions and 16 deletions.
  1. +16 −16 go/ipfs/cs.go
View
@@ -12,16 +12,15 @@ import (
"sync"
"time"
cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"
mh "gx/ipfs/QmU9a9NV9RdPNwZQDYd5uKsm6N6LJLSvLbywDDYFbaaC6P/go-multihash"
blocks "gx/ipfs/QmVA4mafxbfH5aEvNz8fyoxC6J1xhAtw88B4GerPznSZBg/go-block-format"
"github.com/attic-labs/noms/go/chunks"
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/hash"
"github.com/attic-labs/noms/samples/go/decent/dbg"
"github.com/ipfs/go-ipfs/blocks/blockstore"
"github.com/ipfs/go-ipfs/blockservice"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/repo/config"
@@ -112,12 +111,13 @@ func (cs *chunkStore) limitRateF() func() {
}
}
func (cs *chunkStore) Get(h hash.Hash) chunks.Chunk {
defer dbg.BoxF("ipfsCS Get, h: %s, cid: %s, cs.local: %t", h, NomsHashToCID(h), cs.local)()
func (cs *chunkStore) get(h hash.Hash, timeout time.Duration) chunks.Chunk {
dbg.Debug("starting ipfsCS Get, h: %s, cid: %s, cs.local: %t", h, NomsHashToCID(h), cs.local)
var b blocks.Block
defer cs.limitRateF()()
getBlock := func(chunkId *cid.Cid) (b blocks.Block, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*200)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if cs.local {
@@ -126,26 +126,25 @@ func (cs *chunkStore) Get(h hash.Hash) chunks.Chunk {
return
}
} else {
dbg.Debug("ipfs GetBlock, h: %s, cid: %s", h, chunkId)
b, err = cs.node.Blocks.GetBlock(ctx, chunkId)
dbg.Debug("ipfs GetBlock returned, h: %s, cid: %s, err: %s", h, chunkId, err)
}
return
}
chunkId := NomsHashToCID(h)
b, err := getBlock(chunkId)
if err == nil {
dbg.Debug("finished ipfsCS Get, h: %s, cid: %s, cs.local: %t, len(b.RawData): %d", h, NomsHashToCID(h), cs.local, len(b.RawData()))
return chunks.NewChunkWithHash(h, b.RawData())
}
if err == blockservice.ErrNotFound {
return chunks.EmptyChunk
}
dbg.Debug("ipfsCS Get, EmptyChunk for h: %s, cid: %s", h, NomsHashToCID(h))
dbg.Debug("ipfsCS Get, EmptyChunk for h: %s, cid: %s, err: %s, b: %v", h, NomsHashToCID(h), err, b)
return chunks.EmptyChunk
}
func (cs *chunkStore) Get(h hash.Hash) chunks.Chunk {
return cs.get(h, time.Second*200)
}
func (cs *chunkStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks.Chunk) {
defer dbg.BoxF("ipfs chunkstore GetMany, cs.local: %t", cs.local)()
ctx, cancel := context.WithCancel(context.Background())
@@ -182,13 +181,13 @@ func (cs *chunkStore) Has(h hash.Hash) bool {
return ok
} else {
// BlockService doesn't have Has(), neither does underlying Exchange()
c := cs.Get(h)
c := cs.get(h, time.Second*5)
return !c.IsEmpty()
}
}
func (cs *chunkStore) HasMany(hashes hash.HashSet) hash.HashSet {
defer dbg.BoxF("HashMany, len(hashes): %d", len(hashes))()
defer dbg.BoxF("HasMany, len(hashes): %d", len(hashes))()
misses := hash.HashSet{}
if cs.local {
for h := range hashes {
@@ -201,6 +200,7 @@ func (cs *chunkStore) HasMany(hashes hash.HashSet) hash.HashSet {
wg := sync.WaitGroup{}
wg.Add(len(hashes))
for h := range hashes {
h := h
go func() {
defer wg.Done()
ok := cs.Has(h)
@@ -234,7 +234,7 @@ func (cs *chunkStore) Put(c chunks.Chunk) {
func (cs *chunkStore) Version() string {
// TODO: Store this someplace in the DB root
return "7.15"
return "7.18"
}
func (cs *chunkStore) Rebase() {

0 comments on commit 7ebaacf

Please sign in to comment.