Skip to content

Commit

Permalink
feat(go): add persistence for ipfs & protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
gfanton authored and n0izn0iz committed May 11, 2020
1 parent da360f0 commit 85b34fd
Show file tree
Hide file tree
Showing 9 changed files with 502 additions and 88 deletions.
1 change: 1 addition & 0 deletions go.mod

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 2 additions & 7 deletions go/cmd/berty/mini/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,16 @@ func newService(logger *zap.Logger, ctx context.Context, opts *Opts) (bertyproto
rootDS := sync_ds.MutexWrap(opts.RootDS)
ipfsDS := ipfsutil.NewNamespacedDatastore(rootDS, datastore.NewKey("ipfs"))
routingOpt, crouting := ipfsutil.NewTinderRouting(logger, opts.RendezVousPeer, false)
cfg, err := ipfsutil.CreateBuildConfig(&ipfsutil.CoreAPIConfig{
api, node, err := ipfsutil.NewCoreAPIFromDatastore(ctx, ipfsDS, &ipfsutil.CoreAPIConfig{
BootstrapAddrs: opts.Bootstrap,
SwarmAddrs: swarmAddresses,
Datastore: ipfsDS,
Routing: routingOpt,
Options: []ipfsutil.CoreAPIOption{ipfsutil.OptionMDNSDiscovery},
})
if err != nil {
panicUnlockFS(err, lock)
}

api, node, err := ipfsutil.NewConfigurableCoreAPI(ctx, cfg, ipfsutil.OptionMDNSDiscovery)
if err != nil {
panicUnlockFS(err, lock)
}

// wait to get routing
routing := <-crouting

Expand Down
158 changes: 145 additions & 13 deletions go/framework/bertybridge/bridge_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package bertybridge

import (
"context"
"os"
"path/filepath"

"berty.tech/berty/v2/go/internal/config"
"berty.tech/berty/v2/go/internal/ipfsutil"
Expand All @@ -12,6 +14,8 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
"github.com/juju/fslock"

"github.com/libp2p/go-libp2p-core/peer"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/pkg/errors"
Expand All @@ -22,8 +26,13 @@ import (

"go.uber.org/zap"

keystore "github.com/ipfs/go-ipfs-keystore"
"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"

badger_opts "github.com/dgraph-io/badger/options"
ipfs_badger "github.com/ipfs/go-ds-badger"
"github.com/ipfs/go-ipfs/core"
ipfs_repo "github.com/ipfs/go-ipfs/repo"
ipfs_interface "github.com/ipfs/interface-go-ipfs-core"
)

Expand All @@ -36,6 +45,13 @@ type Protocol struct {
node *core.IpfsNode
dht *dht.IpfsDHT
service bertyprotocol.Service

// protocol datastore
ds datastore.Batching
dslock *fslock.Lock

// ipfs repo
repo ipfs_repo.Repo
}

type ProtocolConfig struct {
Expand All @@ -44,8 +60,8 @@ type ProtocolConfig struct {
dLogger NativeLoggerDriver
loglevel string

swarmListeners []string
orbitDBDirectory string
swarmListeners []string
rootDirectory string

// internal
coreAPI ipfs_interface.CoreAPI
Expand All @@ -57,8 +73,8 @@ func NewProtocolConfig() *ProtocolConfig {
}
}

func (pc *ProtocolConfig) OrbitDBDirectory(dir string) {
pc.orbitDBDirectory = dir
func (pc *ProtocolConfig) RootDirectory(dir string) {
pc.rootDirectory = dir
}

func (pc *ProtocolConfig) LogLevel(level string) {
Expand Down Expand Up @@ -100,10 +116,17 @@ func newProtocolBridge(logger *zap.Logger, config *ProtocolConfig) (*Protocol, e
var api ipfs_interface.CoreAPI
var node *core.IpfsNode
var dht *dht.IpfsDHT
var repo ipfs_repo.Repo
{
var err error

if api = config.coreAPI; api == nil {
// load repo

if repo, err = getIPFSRepo(config.rootDirectory); err != nil {
return nil, errors.Wrap(err, "failed to get ipfs repo")
}

var bopts = ipfsutil.CoreAPIConfig{}
bopts.BootstrapAddrs = defaultProtocolBootstrap

Expand All @@ -121,7 +144,7 @@ func newProtocolBridge(logger *zap.Logger, config *ProtocolConfig) (*Protocol, e
bopts.SwarmAddrs = config.swarmListeners
}

api, node, err = ipfsutil.NewCoreAPI(ctx, &bopts)
api, node, err = ipfsutil.NewCoreAPIFromRepo(ctx, repo, &bopts)
if err != nil {
return nil, errcode.TODO.Wrap(err)
}
Expand All @@ -131,17 +154,32 @@ func newProtocolBridge(logger *zap.Logger, config *ProtocolConfig) (*Protocol, e
}
}

// load datastore
var rootds datastore.Batching
var rootdslock *fslock.Lock
{
var err error

if rootds, rootdslock, err = getRootDatastore(config.rootDirectory); err != nil {
return nil, errcode.TODO.Wrap(err)
}

}

// setup protocol
var service bertyprotocol.Service
{
var err error
odb_dir, err := getOrbitDBDirectory(config.rootDirectory)
if err != nil {
return nil, errcode.TODO.Wrap(err)
}

// initialize new protocol client
protocolOpts := bertyprotocol.Opts{
Logger: logger.Named("bertyprotocol"),
IpfsCoreAPI: api,
DeviceKeystore: bertyprotocol.NewDeviceKeystore(keystore.NewMemKeystore()),
MessageKeystore: bertyprotocol.NewInMemMessageKeystore(),
Logger: logger.Named("bertyprotocol"),
OrbitDirectory: odb_dir,
RootDatastore: rootds,
IpfsCoreAPI: api,
}

service, err = bertyprotocol.New(protocolOpts)
Expand Down Expand Up @@ -204,16 +242,29 @@ func newProtocolBridge(logger *zap.Logger, config *ProtocolConfig) (*Protocol, e
service: service,
node: node,
dht: dht,

ds: rootds,
dslock: rootdslock,
}, nil
}

func (p *Protocol) newServiceClient() (bertyprotocol.ProtocolServiceClient, error) {
cl, err := p.Bridge.NewGRPCClient()
if err != nil {
return nil, err
}

return bertyprotocol.NewProtocolServiceClient(cl.grpcClient), nil
}

func (p *Protocol) Close() (err error) {
// Close bridge
err = p.Bridge.Close()
p.Bridge.Close()

// close service
p.service.Close()
err = p.service.Close() // keep service error

/// close other services
if p.dht != nil {
p.dht.Close()
}
Expand All @@ -222,5 +273,86 @@ func (p *Protocol) Close() (err error) {
p.node.Close()
}

if p.ds != nil {
p.ds.Close()
}

if p.dslock != nil {
p.dslock.Unlock()
}

return
}

func getRootDatastore(path string) (datastore.Batching, *fslock.Lock, error) {
if path == "" || path == ":memory:" {
baseds := ds_sync.MutexWrap(datastore.NewMapDatastore())
return baseds, nil, nil
}

basepath := filepath.Join(path, "store")
_, err := os.Stat(basepath)
if err != nil {
if !os.IsNotExist(err) {
return nil, nil, errors.Wrap(err, "unable get directory")
}
if err := os.MkdirAll(basepath, 0700); err != nil {
return nil, nil, errors.Wrap(err, "unable to create datastore directory")
}
}

lock := fslock.New(filepath.Join(basepath, "lock"))
err = lock.TryLock()
if err != nil {
return nil, nil, errors.Wrap(err, "unable to get lock file")
}

baseds, err := ipfs_badger.NewDatastore(basepath, &ipfs_badger.Options{
Options: ipfs_badger.DefaultOptions.WithValueLogLoadingMode(badger_opts.FileIO),
})

if err != nil {
return nil, nil, errors.Wrapf(err, "failed to load datastore on: `%s`", basepath)
}

return baseds, lock, nil
}

func getOrbitDBDirectory(path string) (string, error) {
if path == "" || path == ":memory:" {
return path, nil
}

basePath := filepath.Join(path, "orbitdb")
_, err := os.Stat(basePath)
if err != nil {
if !os.IsNotExist(err) {
return "", errors.Wrap(err, "unable get orbitdb directory")
}
if err := os.MkdirAll(basePath, 0700); err != nil {
return "", errors.Wrap(err, "unable to create orbitdb directory")
}
}

return basePath, nil
}

func getIPFSRepo(path string) (ipfs_repo.Repo, error) {
if path == "" || path == ":memory:" {
repods := ds_sync.MutexWrap(datastore.NewMapDatastore())
return ipfsutil.CreateMockedRepo(repods)
}

basepath := filepath.Join(path, "ipfs")
_, err := os.Stat(basepath)
if err != nil {
if !os.IsNotExist(err) {
return nil, errors.Wrap(err, "unable get orbitdb directory")
}
if err := os.MkdirAll(basepath, 0700); err != nil {
return nil, errors.Wrap(err, "unable to create orbitdb directory")
}
}

return ipfsutil.LoadRepoFromPath(basepath)
}
83 changes: 83 additions & 0 deletions go/framework/bertybridge/bridge_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package bertybridge

import (
"io/ioutil"
"os"
"testing"

"berty.tech/berty/v2/go/internal/ipfsutil"
Expand All @@ -9,6 +11,7 @@ import (
"berty.tech/berty/v2/go/pkg/bertyprotocol"
"berty.tech/berty/v2/go/pkg/bertytypes"
"github.com/gogo/protobuf/proto"
p2p_peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand Down Expand Up @@ -93,6 +96,86 @@ func TestProtocolBridge(t *testing.T) {
//}
}

func TestPersistenceProtocol(t *testing.T) {
var err error //results [][]byte

const n_try = 4

testutil.SkipSlow(t)

ctx := context.Background()

logger := testutil.Logger(t)
rootdir, err := ioutil.TempDir("", "ipfs")
require.NoError(t, err)

defer os.RemoveAll(rootdir)

// coreAPI, cleanup := ipfsutil.TestingCoreAPI(ctx, t)
// defer cleanup()

config := NewProtocolConfig()
config.RootDirectory(rootdir)

var node_id_1 p2p_peer.ID
var device_pk_1 []byte
{
protocol, err := newProtocolBridge(logger, config)
require.NoError(t, err)

// get grpc client
client, err := protocol.newServiceClient()
if !assert.NoError(t, err) {
protocol.Close()
assert.FailNow(t, "unable to create client")
}

// get node id
node_id_1 = protocol.node.Identity
assert.NotEmpty(t, node_id_1)

res, err := client.InstanceGetConfiguration(ctx, &bertytypes.InstanceGetConfiguration_Request{})
assert.NoError(t, err)

device_pk_1 = res.DevicePK
assert.NotEmpty(t, device_pk_1)

err = protocol.Close()
require.NoError(t, err)
}

var node_id_2 p2p_peer.ID
var device_pk_2 []byte
{

protocol, err := newProtocolBridge(logger, config)
require.NoError(t, err)

// get grpc client
client, err := protocol.newServiceClient()
if !assert.NoError(t, err) {
protocol.Close()
assert.FailNow(t, "unable to create client")
}

// get node id
node_id_2 = protocol.node.Identity
assert.NotEmpty(t, node_id_2)

res, err := client.InstanceGetConfiguration(ctx, &bertytypes.InstanceGetConfiguration_Request{})
assert.NoError(t, err)

device_pk_2 = res.DevicePK
assert.NotEmpty(t, device_pk_2)

err = protocol.Close()
require.NoError(t, err)
}

assert.Equal(t, node_id_1, node_id_2, "IPFS node should have the same ID after reboot")
assert.Equal(t, device_pk_1, device_pk_2, "Device should have the same PK after reboot")
}

func TestDemoBridge(t *testing.T) {
var (
err error
Expand Down

0 comments on commit 85b34fd

Please sign in to comment.