Permalink
Browse files

Modifications to ipfs-chat and ipfs chunkstore (#3691)

* Modifications to ipfs-chat and ipfs chunkstore
 * Change ipfs paths to include directory where ipfs repo is stored.
 * Rework ipfs-chat to create ipfs chunkstores manually rather than
   relying on Spec.ForDataset. This enables creating two chunkstores
   (one local and one network) using the same IpfsNode (ipfs repo).
 * Create separate replicate function for daemon and mergeMessage
   function for client to experiment with slightly different behaviors
   for each.

* Re-organization of code to remove duplication.

The main points are:
* added event loop to process events synchronously
* more agressive about not re-processing msgs from other nodes
  that we've already processed
* fixed bug in ipfs chunkstore HasMany()

* Add go-base58 library
  • Loading branch information...
willhite authored and aboodman committed Sep 20, 2017
1 parent 1e7db1d commit 742f0681c36b5fcc1ab4569570a2b1d10cf68c3b
View
@@ -4,11 +4,13 @@ import (
"context"
"fmt"
"io/ioutil"
"math"
"os"
"path"
"reflect"
"strings"
"sync"
"time"
cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
mh "gx/ipfs/QmU9a9NV9RdPNwZQDYd5uKsm6N6LJLSvLbywDDYFbaaC6P/go-multihash"
@@ -17,6 +19,7 @@ import (
"github.com/attic-labs/noms/go/chunks"
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/hash"
"github.com/attic-labs/noms/samples/go/ipfs-chat/dbg"
"github.com/ipfs/go-ipfs/blocks/blockstore"
"github.com/ipfs/go-ipfs/blockservice"
"github.com/ipfs/go-ipfs/core"
@@ -48,18 +51,18 @@ import (
// blocks stored will be exposed to the entire IPFS network.
func NewChunkStore(p string, local bool) *chunkStore {
node := OpenIPFSRepo(p, -1)
return ChunkStoreFromIPFSNode(p, local, node)
return ChunkStoreFromIPFSNode(p, local, node, math.MaxInt32)
}
// Creates a new chunchStore using a pre-existing IpfsNode. This is currently
// used to create a second 'local' chunkStore using the same IpfsNode as another
// non-local chunkStore.
func ChunkStoreFromIPFSNode(p string, local bool, node *core.IpfsNode) *chunkStore {
func ChunkStoreFromIPFSNode(p string, local bool, node *core.IpfsNode, maxConcurrentRequests int) *chunkStore {
return &chunkStore{
node: node,
name: p,
local: local,
rateLimit: make(chan struct{}, 1024),
rateLimit: make(chan struct{}, maxConcurrentRequests),
}
}
@@ -100,77 +103,80 @@ type chunkStore struct {
name string
rateLimit chan struct{}
local bool
test bool
}
func (cs *chunkStore) RateLimitAdd() {
func (cs *chunkStore) limitRateF() func() {
cs.rateLimit <- struct{}{}
cs.test = true
}
func (cs *chunkStore) RateLimitSub() {
cs.test = false
<-cs.rateLimit
return func() {
<-cs.rateLimit
}
}
func (cs *chunkStore) Get(h hash.Hash) chunks.Chunk {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cs.RateLimitAdd()
defer cs.RateLimitSub()
defer dbg.BoxF("ipfsCS Get, h: %s, cid: %s, cs.local: %t", h, NomsHashToCID(h), cs.local)()
defer cs.limitRateF()()
var b blocks.Block
var err error
c := nomsHashToCID(h)
if cs.local {
b, err = cs.node.Blockstore.Get(c)
if err == blockstore.ErrNotFound {
return chunks.EmptyChunk
}
} else {
b, err = cs.node.Blocks.GetBlock(ctx, c)
if err == blockservice.ErrNotFound {
return chunks.EmptyChunk
getBlock := func(chunkId *cid.Cid) (b blocks.Block, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*200)
defer cancel()
if cs.local {
b, err = cs.node.Blockstore.Get(chunkId)
if err == blockstore.ErrNotFound {
return
}
} else {
dbg.Debug("ipfs GetBlock, h: %s, cid: %s", h, chunkId)
b, err = cs.node.Blocks.GetBlock(ctx, chunkId)
dbg.Debug("ipfs GetBlock returned, h: %s, cid: %s, err: %s", h, chunkId, err)
}
return
}
chunkId := NomsHashToCID(h)
b, err := getBlock(chunkId)
if err == nil {
return chunks.NewChunkWithHash(h, b.RawData())
}
if err == blockservice.ErrNotFound {
return chunks.EmptyChunk
}
d.PanicIfError(err)
return chunks.NewChunkWithHash(h, b.RawData())
dbg.Debug("ipfsCS Get, EmptyChunk for h: %s, cid: %s", h, NomsHashToCID(h))
return chunks.EmptyChunk
}
func (cs *chunkStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks.Chunk) {
defer dbg.BoxF("ipfs chunkstore GetMany, cs.local: %t", cs.local)()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cs.RateLimitAdd()
defer cs.RateLimitSub()
defer cs.limitRateF()()
cids := make([]*cid.Cid, 0, len(hashes))
for h := range hashes {
c := nomsHashToCID(h)
c := NomsHashToCID(h)
cids = append(cids, c)
}
if cs.local {
for _, cid := range cids {
b, err := cs.node.Blockstore.Get(cid)
d.PanicIfError(err)
c := chunks.NewChunkWithHash(cidToNomsHash(b.Cid()), b.RawData())
c := chunks.NewChunkWithHash(CidToNomsHash(b.Cid()), b.RawData())
foundChunks <- &c
}
} else {
for b := range cs.node.Blocks.GetBlocks(ctx, cids) {
c := chunks.NewChunkWithHash(cidToNomsHash(b.Cid()), b.RawData())
c := chunks.NewChunkWithHash(CidToNomsHash(b.Cid()), b.RawData())
foundChunks <- &c
}
}
}
func (cs *chunkStore) Has(h hash.Hash) bool {
cs.RateLimitAdd()
defer cs.RateLimitSub()
id := nomsHashToCID(h)
id := NomsHashToCID(h)
if cs.local {
defer cs.limitRateF()()
ok, err := cs.node.Blockstore.Has(id)
d.PanicIfError(err)
return ok
@@ -182,6 +188,7 @@ func (cs *chunkStore) Has(h hash.Hash) bool {
}
func (cs *chunkStore) HasMany(hashes hash.HashSet) hash.HashSet {
defer dbg.BoxF("HashMany, len(hashes): %d", len(hashes))()
misses := hash.HashSet{}
if cs.local {
for h := range hashes {
@@ -195,8 +202,6 @@ func (cs *chunkStore) HasMany(hashes hash.HashSet) hash.HashSet {
wg.Add(len(hashes))
for h := range hashes {
go func() {
cs.RateLimitAdd()
defer cs.RateLimitSub()
defer wg.Done()
ok := cs.Has(h)
if !ok {
@@ -206,21 +211,15 @@ func (cs *chunkStore) HasMany(hashes hash.HashSet) hash.HashSet {
}
}()
}
wg.Wait()
}
return misses
}
func nomsHashToCID(nh hash.Hash) *cid.Cid {
mhb, err := mh.Encode(nh[:], mh.SHA2_512)
d.PanicIfError(err)
return cid.NewCidV1(cid.Raw, mhb)
}
func (cs *chunkStore) Put(c chunks.Chunk) {
cs.RateLimitAdd()
defer cs.RateLimitSub()
defer cs.limitRateF()()
cid := nomsHashToCID(c.Hash())
cid := NomsHashToCID(c.Hash())
b, err := blocks.NewBlockWithCid(c.Data(), cid)
d.PanicIfError(err)
if cs.local {
@@ -251,7 +250,7 @@ func (cs *chunkStore) Rebase() {
if sp != "" {
cid, err := cid.Decode(sp)
d.PanicIfError(err)
h = cidToNomsHash(cid)
h = CidToNomsHash(cid)
}
cs.root = &h
}
@@ -263,14 +262,21 @@ func (cs *chunkStore) Root() (h hash.Hash) {
return *cs.root
}
func cidToNomsHash(id *cid.Cid) (h hash.Hash) {
func CidToNomsHash(id *cid.Cid) (h hash.Hash) {
dmh, err := mh.Decode([]byte(id.Hash()))
d.PanicIfError(err)
copy(h[:], dmh.Digest)
return
}
func NomsHashToCID(nh hash.Hash) *cid.Cid {
mhb, err := mh.Encode(nh[:], mh.SHA2_512)
d.PanicIfError(err)
return cid.NewCidV1(cid.Raw, mhb)
}
func (cs *chunkStore) Commit(current, last hash.Hash) bool {
defer dbg.BoxF("chunkstore Commit")()
// TODO: In a more realistic implementation this would flush queued chunks to storage.
if cs.root != nil && *cs.root == current {
fmt.Println("eep, asked to commit current value?")
@@ -279,7 +285,7 @@ func (cs *chunkStore) Commit(current, last hash.Hash) bool {
// TODO: Optimistic concurrency?
cid := nomsHashToCID(current)
cid := NomsHashToCID(current)
if cs.local {
err := ioutil.WriteFile(cs.getLocalNameFile(true), []byte(cid.String()), 0644)
d.PanicIfError(err)
Oops, something went wrong.

0 comments on commit 742f068

Please sign in to comment.