Skip to content

Commit

Permalink
Merge pull request #1840 from gfanton/dev/override-ipfs-discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
moul committed Apr 16, 2020
2 parents 81ba55e + c0087cd commit 4d75d34
Show file tree
Hide file tree
Showing 37 changed files with 767 additions and 319 deletions.
5 changes: 4 additions & 1 deletion go.mod

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

166 changes: 147 additions & 19 deletions go/cmd/berty/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"path"
"strings"
"time"

"berty.tech/berty/v2/go/cmd/berty/mini"
"berty.tech/berty/v2/go/internal/grpcutil"
Expand All @@ -23,14 +24,25 @@ import (
datastore "github.com/ipfs/go-datastore"
sync_ds "github.com/ipfs/go-datastore/sync"
badger "github.com/ipfs/go-ds-badger"
"github.com/ipfs/go-ipfs/core"
iface "github.com/ipfs/interface-go-ipfs-core"
"github.com/juju/fslock"
ma "github.com/multiformats/go-multiaddr"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
peer "github.com/libp2p/go-libp2p-core/peer"

"github.com/oklog/run"
"github.com/peterbourgon/ff"
"github.com/peterbourgon/ff/ffcli"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

ipfs_cfg "github.com/ipfs/go-ipfs-config"
ipfs_log "github.com/ipfs/go-log"

"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
Expand All @@ -42,21 +54,31 @@ import (
"moul.io/srand"
)

// Default ipfs bootstrap & rendezvous point server

const DevRendezVousPoint = "/ip4/167.99.223.55/tcp/4040/p2p/QmTo3RS6Uc8aCS5Cxx8EBHkNCe4C7vKRanbMEboxkA92Cn"

var DefaultBootstrap = ipfs_cfg.DefaultBootstrapAddresses

func main() {
log.SetFlags(0)

var (
logger *zap.Logger
globalFlags = flag.NewFlagSet("berty", flag.ExitOnError)
globalDebug = globalFlags.Bool("debug", false, "debug mode")
globalLogToFile = globalFlags.String("logfile", "", "if specified, will log everything in JSON into a file and nothing on stderr")
logger *zap.Logger
globalFlags = flag.NewFlagSet("berty", flag.ExitOnError)
globalDebug = globalFlags.Bool("debug", false, "berty debug mode")
globalLibp2pDebug = globalFlags.Bool("debug-p2p", false, "libp2p debug mode")
globalOrbitDebug = globalFlags.Bool("debug-odb", false, "orbitdb debug mode")
globalLogToFile = globalFlags.String("logfile", "", "if specified, will log everything in JSON into a file and nothing on stderr")

bannerFlags = flag.NewFlagSet("banner", flag.ExitOnError)
bannerLight = bannerFlags.Bool("light", false, "light mode")

clientProtocolFlags = flag.NewFlagSet("protocol client", flag.ExitOnError)
clientProtocolListeners = clientProtocolFlags.String("l", "/ip4/127.0.0.1/tcp/9091/grpc", "client listeners")
clientProtocolPath = clientProtocolFlags.String("d", cacheleveldown.InMemoryDirectory, "datastore base directory")
clientProtocolRDVP = clientProtocolFlags.String("rdvp", DevRendezVousPoint, "rendezvous point maddr")
clientProtocolRDVPFroce = clientProtocolFlags.Bool("force-rdvp", false, "force connect to rendezvous point")

clientDemoFlags = flag.NewFlagSet("demo client", flag.ExitOnError)
clientDemoDirectory = clientDemoFlags.String("d", ":memory:", "orbit db directory")
Expand All @@ -67,10 +89,14 @@ func main() {
miniClientDemoPath = miniClientDemoFlags.String("d", cacheleveldown.InMemoryDirectory, "datastore base directory")
miniClientDemoPort = miniClientDemoFlags.Uint("p", 0, "default IPFS listen port")
miniClientDemoRemoteAddr = miniClientDemoFlags.String("r", "", "remote berty daemon")
miniClientDemoRDVP = miniClientDemoFlags.String("rdvp", DevRendezVousPoint, "rendezvous point maddr")
)

globalPreRun := func() error {
mrand.Seed(srand.Secure())
isDebugEnabled := *globalDebug || *globalOrbitDebug || *globalLibp2pDebug

// setup zap config
var config zap.Config
if *globalLogToFile != "" {
config = zap.NewProductionConfig()
Expand All @@ -80,17 +106,26 @@ func main() {
config.DisableStacktrace = true
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
}
if *globalDebug {

if isDebugEnabled {
config.Level.SetLevel(zap.DebugLevel)
} else {
config.Level.SetLevel(zap.InfoLevel)
}

var err error
logger, err = config.Build()
if err != nil {
if logger, err = config.Build(); err != nil {
return errcode.TODO.Wrap(err)
}
logger.Debug("logger initialized")

if *globalLibp2pDebug {
ipfs_log.SetDebugLogging()
}

if *globalOrbitDebug {
zap.ReplaceGlobals(logger)
}

return nil
}

Expand Down Expand Up @@ -144,12 +179,19 @@ func main() {
remoteAddr = *miniClientDemoRemoteAddr
}

l := zap.NewNop()
if *globalLogToFile != "" {
l = logger
}

mini.Main(&mini.Opts{
RemoteAddr: remoteAddr,
GroupInvitation: *miniClientDemoGroup,
Port: *miniClientDemoPort,
RootDS: rootDS,
Logger: logger,
RemoteAddr: remoteAddr,
GroupInvitation: *miniClientDemoGroup,
Port: *miniClientDemoPort,
RootDS: rootDS,
Logger: l,
Bootstrap: DefaultBootstrap,
RendezVousServerMAddr: *miniClientDemoRDVP,
})
return nil
},
Expand All @@ -166,15 +208,66 @@ func main() {

ctx := context.Background()

// protocol
var protocol bertyprotocol.Service
var api iface.CoreAPI
{
api, node, err := ipfsutil.NewInMemoryCoreAPI(ctx)
var err error
var opts = ipfsutil.IpfsOpts{}

opts.Bootstrap = DefaultBootstrap

var rdvpeer *peer.AddrInfo
var mardv ma.Multiaddr
var crouting <-chan *ipfsutil.RoutingOut

if *clientProtocolRDVP != "" {
if mardv, err = ma.NewMultiaddr(*clientProtocolRDVP); err != nil {
logger.Warn("failed to parse rdvp multiaddr", zap.String("maddr", *clientProtocolRDVP), zap.Error(err))
} else { // should be a valid rendezvous peer
rdvpeer, err = peer.AddrInfoFromP2pAddr(mardv)
if err != nil {
return errcode.TODO.Wrap(err)
}

opts.Routing, crouting = ipfsutil.NewTinderRouting(logger, rdvpeer.ID, false)
opts.Bootstrap = append(opts.Bootstrap, mardv.String())
}
}

var node *core.IpfsNode

api, node, err = ipfsutil.NewInMemoryCoreAPI(ctx, &opts)
if err != nil {
return errcode.TODO.Wrap(err)
}
defer node.Close()

if crouting != nil {
routingOut := <-crouting
defer routingOut.IpfsDHT.Close()

if *clientProtocolRDVPFroce {
go func() {
// monitor rdv peer
monitorPeers(logger, node.PeerHost, rdvpeer.ID)
}()

for {
if err := node.PeerHost.Connect(ctx, *rdvpeer); err != nil {
logger.Error("cannot dial rendez-vous point", zap.Error(err))
} else {
break
}
time.Sleep(time.Second)
}
}

}
}

// protocol
var protocol bertyprotocol.Service
{

rootDS, dsLock, err := getRootDatastore(clientProtocolPath)
if err != nil {
return errcode.TODO.Wrap(err)
Expand All @@ -186,7 +279,6 @@ func main() {
defer rootDS.Close()

deviceDS := ipfsutil.NewDatastoreKeystore(ipfsutil.NewNamespacedDatastore(rootDS, datastore.NewKey("account")))

mk := bertyprotocol.NewMessageKeystore(ipfsutil.NewNamespacedDatastore(rootDS, datastore.NewKey("messages")))

// initialize new protocol client
Expand Down Expand Up @@ -292,12 +384,27 @@ func main() {
{
var err error

api, node, err := ipfsutil.NewInMemoryCoreAPI(ctx)
mardv := multiaddr.StringCast(DevRendezVousPoint)
rdvpeer, err := peer.AddrInfoFromP2pAddr(mardv)
if err != nil {
return errcode.TODO.Wrap(err)
}

routingOpts, crouting := ipfsutil.NewTinderRouting(logger, rdvpeer.ID, false)
ipfsOpts := &ipfsutil.IpfsOpts{
Bootstrap: append(DefaultBootstrap, DevRendezVousPoint),
Routing: routingOpts,
}

api, node, err := ipfsutil.NewInMemoryCoreAPI(ctx, ipfsOpts)
if err != nil {
return errcode.TODO.Wrap(err)
}
defer node.Close()

routing := <-crouting
defer routing.IpfsDHT.Close()

demo, err = bertydemo.New(&bertydemo.Opts{
Logger: logger,
CoreAPI: api,
Expand Down Expand Up @@ -434,3 +541,24 @@ func parseAddr(addr string) (maddr ma.Multiaddr, err error) {

return
}

func monitorPeers(l *zap.Logger, h host.Host, peers ...peer.ID) error {
currentStates := make([]network.Connectedness, len(peers))
for {
time.Sleep(time.Second)

for i, p := range peers {
nextState := h.Network().Connectedness(p)
if nextState != currentStates[i] {
switch nextState {
case network.Connected:
l.Info("peer Connected", zap.String("ID", p.String()))
case network.NotConnected:
l.Info("peer NotConnected", zap.String("ID", p.String()))
}

currentStates[i] = nextState
}
}
}
}

0 comments on commit 4d75d34

Please sign in to comment.