diff --git a/client/react-native/gomobile/core/bot.go b/client/react-native/gomobile/core/bot.go index 1fb97883ee..c368e8b600 100644 --- a/client/react-native/gomobile/core/bot.go +++ b/client/react-native/gomobile/core/bot.go @@ -8,7 +8,7 @@ import ( func IsBotRunning() bool { defer panicHandler() waitDaemon(accountName) - currentAccount, _ := account.Get(accountName) + currentAccount, _ := account.Get(rootContext, accountName) return currentAccount.BotRunning } @@ -16,7 +16,7 @@ func IsBotRunning() bool { func StartBot() error { defer panicHandler() waitDaemon(accountName) - currentAccount, _ := account.Get(accountName) + currentAccount, _ := account.Get(rootContext, accountName) if currentAccount.BotRunning { return errors.New("bot is already started") @@ -29,13 +29,13 @@ func StartBot() error { // return errors.Wrap(err, "state DB save failed") // } - return currentAccount.StartBot() + return currentAccount.StartBot(rootContext) } func StopBot() error { defer panicHandler() waitDaemon(accountName) - currentAccount, _ := account.Get(accountName) + currentAccount, _ := account.Get(rootContext, accountName) if !currentAccount.BotRunning { return errors.New("bot is already stopped") @@ -48,5 +48,5 @@ func StopBot() error { // return errors.Wrap(err, "state DB save failed") // } - return currentAccount.StopBot() + return currentAccount.StopBot(rootContext) } diff --git a/client/react-native/gomobile/core/core.go b/client/react-native/gomobile/core/core.go index de2bcb4764..36a90c6f0a 100644 --- a/client/react-native/gomobile/core/core.go +++ b/client/react-native/gomobile/core/core.go @@ -1,6 +1,7 @@ package core import ( + "context" "encoding/json" "fmt" "net" @@ -16,6 +17,7 @@ import ( var ( accountName = "" appConfig *account.StateDB + rootContext = context.Background() ) func logger() *zap.Logger { @@ -29,7 +31,7 @@ func panicHandler() { if accountName == "" { return } - a, err := account.Get(accountName) + a, err := account.Get(rootContext, accountName) if err != nil { return } @@ -56,10 +58,9 @@ func Panic() { } func GetPort() (int, error) { - defer panicHandler() - a, err := account.Get(accountName) + a, err := account.Get(rootContext, accountName) if err != nil { return 0, err } @@ -67,7 +68,6 @@ func GetPort() (int, error) { } func Initialize(loggerNative Logger) error { - defer panicHandler() if err := setupLogger("debug", loggerNative); err != nil { @@ -80,10 +80,9 @@ func Initialize(loggerNative Logger) error { } func ListAccounts(datastorePath string) (string, error) { - defer panicHandler() - accounts, err := account.List(datastorePath) + accounts, err := account.List(rootContext, datastorePath) if err != nil { return "", err } @@ -118,12 +117,11 @@ func initOrRestoreAppState(datastorePath string) error { } func Start(nickname, datastorePath string, loggerNative Logger) error { - defer panicHandler() accountName = nickname - a, _ := account.Get(nickname) + a, _ := account.Get(rootContext, nickname) if a != nil { // daemon already started, no errors to return return nil @@ -139,10 +137,9 @@ func Start(nickname, datastorePath string, loggerNative Logger) error { } func Restart() error { - defer panicHandler() - currentAccount, _ := account.Get(accountName) + currentAccount, _ := account.Get(rootContext, accountName) if currentAccount != nil { currentAccount.ErrChan() <- nil } @@ -152,14 +149,13 @@ func Restart() error { } func DropDatabase(datastorePath string) error { - defer panicHandler() - currentAccount, err := account.Get(accountName) + currentAccount, err := account.Get(rootContext, accountName) if err != nil { return err } - err = currentAccount.DropDatabase() + err = currentAccount.DropDatabase(rootContext) if err != nil { return err } @@ -181,7 +177,7 @@ func run(nickname, datastorePath string, loggerNative Logger) { } func waitDaemon(nickname string) { - currentAccount, _ := account.Get(nickname) + currentAccount, _ := account.Get(rootContext, nickname) if currentAccount == nil || currentAccount.GQLBind == "" { logger().Debug("waiting for daemon to start") time.Sleep(time.Second) @@ -231,17 +227,17 @@ func daemon(nickname, datastorePath string, loggerNative Logger) error { accountOptions = append(accountOptions, account.WithBot()) } - a, err = account.New(accountOptions...) + a, err = account.New(rootContext, accountOptions...) if err != nil { return err } - defer account.Delete(a) + defer account.Delete(rootContext, a) - err = a.Open() + err = a.Open(rootContext) if err != nil { return err } - defer a.Close() + defer a.Close(rootContext) if appConfig.LocalGRPC { err := StartLocalGRPC() diff --git a/client/react-native/gomobile/core/local_grpc.go b/client/react-native/gomobile/core/local_grpc.go index 0e46e8f294..8fa0742036 100644 --- a/client/react-native/gomobile/core/local_grpc.go +++ b/client/react-native/gomobile/core/local_grpc.go @@ -67,7 +67,7 @@ func GetLocalGRPCInfos() string { func StartLocalGRPC() error { defer panicHandler() waitDaemon(accountName) - currentAccount, _ := account.Get(accountName) + currentAccount, _ := account.Get(rootContext, accountName) if localListener != nil { return errors.New("local gRPC is already running") diff --git a/client/react-native/gomobile/core/network.go b/client/react-native/gomobile/core/network.go index fc0f331716..ebfeac4cd8 100644 --- a/client/react-native/gomobile/core/network.go +++ b/client/react-native/gomobile/core/network.go @@ -76,7 +76,7 @@ func GetNetworkConfig() string { func UpdateNetworkConfig(jsonConf string) error { defer panicHandler() waitDaemon(accountName) - currentAccount, _ := account.Get(accountName) + currentAccount, _ := account.Get(rootContext, accountName) var newNetworkConfig networkConfig if err := json.Unmarshal([]byte(jsonConf), &newNetworkConfig); err != nil { @@ -88,7 +88,7 @@ func UpdateNetworkConfig(jsonConf string) error { if err != nil { return err } - if err := currentAccount.UpdateP2PNetwork(netConf); err != nil { + if err := currentAccount.UpdateP2PNetwork(rootContext, netConf); err != nil { return err } diff --git a/core/Makefile b/core/Makefile index 79f5a4b957..21d42b3f78 100644 --- a/core/Makefile +++ b/core/Makefile @@ -31,7 +31,8 @@ EXT_LDFLAGS ?= -ldflags "-X berty.tech/core.GitSha=$(GIT_SHA) -X berty.tech/core RUN_DAEMON_OPTS ?= --log-level=debug TEST_PATHS ?= ./... TEST_OPTS ?= -v -TEST_CMD ?= go test -test.timeout 3m $(TEST_OPTS) $(TEST_PATHS) +TEST_TIMEOUT ?= 3m +TEST_CMD ?= go test -test.timeout $(TEST_TIMEOUT) $(TEST_OPTS) $(TEST_PATHS) PROTOC ?= protoc GQLGEN ?= gqlgen GIN_OPTS ?= --immediate --port=2999 --appPort=1337 --build=./cmd/berty --excludeDir vendor @@ -88,7 +89,7 @@ integration: install .PHONY: lint lint: generate - golangci-lint run $(TEST_PATHS) + $(BUILD_ENV) golangci-lint run $(TEST_PATHS) .PHONY: generate generate: .generated diff --git a/core/api/protobuf/graphql/logger.gen.go b/core/api/protobuf/graphql/logger.gen.go new file mode 100644 index 0000000000..f11bd928d3 --- /dev/null +++ b/core/api/protobuf/graphql/logger.gen.go @@ -0,0 +1,9 @@ +// Code generated by berty.tech/core/.scripts/generate-logger.sh + +package graphql + +import "go.uber.org/zap" + +func logger() *zap.Logger { + return zap.L().Named("core.api.protobuf.graphql") +} diff --git a/core/bot/bot.go b/core/bot/bot.go index 42c4362fd3..ad273b00ce 100644 --- a/core/bot/bot.go +++ b/core/bot/bot.go @@ -8,16 +8,24 @@ import ( "berty.tech/core/api/client" "berty.tech/core/api/node" + "berty.tech/core/pkg/tracing" + opentracing "github.com/opentracing/opentracing-go" ) type Bot struct { - client *client.Client - handlers []Handler + client *client.Client + handlers []Handler + rootContext context.Context } -func New(opts ...Option) (*Bot, error) { +func New(ctx context.Context, opts ...Option) (*Bot, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + defer span.Finish() + bot := Bot{ - handlers: make([]Handler, 0), + handlers: make([]Handler, 0), + rootContext: ctx, } for _, opt := range opts { @@ -34,8 +42,10 @@ func New(opts ...Option) (*Bot, error) { } func (b *Bot) Start() error { - ctx := context.Background() - stream, err := b.client.Node().EventStream(ctx, &node.EventStreamInput{}) + span, _ := tracing.EnterFunc(b.rootContext) + defer span.Finish() + + stream, err := b.client.Node().EventStream(b.rootContext, &node.EventStreamInput{}) if err != nil { return err } diff --git a/core/cmd/berty/client.go b/core/cmd/berty/client.go index f1fec39e5f..92acb6bdd9 100644 --- a/core/cmd/berty/client.go +++ b/core/cmd/berty/client.go @@ -74,7 +74,7 @@ func clientServerStream(opts *clientOptions) error { } if jaegerAddr != "" { - tracer, closer, err := jaeger.InitTracer(jaegerAddr, "berty-client") + tracer, closer, err := jaeger.InitTracer(jaegerAddr, jaegerName+":client") if err != nil { return err } @@ -145,7 +145,7 @@ func clientUnary(opts *clientOptions) error { } if jaegerAddr != "" { - tracer, closer, err := jaeger.InitTracer(jaegerAddr, "berty-client") + tracer, closer, err := jaeger.InitTracer(jaegerAddr, jaegerName+":client") if err != nil { return err } diff --git a/core/cmd/berty/daemon.go b/core/cmd/berty/daemon.go index f2b65fbfb3..6a927a4c4d 100644 --- a/core/cmd/berty/daemon.go +++ b/core/cmd/berty/daemon.go @@ -2,6 +2,7 @@ package main import ( "bufio" + "context" "fmt" "io" "os" @@ -38,12 +39,10 @@ type daemonOptions struct { SwarmKeyPath string `mapstructure:"swarm-key"` nickname string `mapstructure:"nickname"` - password string `mapstructure:"password"` } func daemonSetupFlags(flags *pflag.FlagSet, opts *daemonOptions) { flags.StringVar(&opts.nickname, "nickname", "berty-daemon", "set account nickname") - flags.StringVar(&opts.password, "password", "secure", "set account password") flags.BoolVar(&opts.dropDatabase, "drop-database", false, "drop database to force a reinitialization") flags.BoolVar(&opts.hideBanner, "hide-banner", false, "hide banner") flags.BoolVar(&opts.initOnly, "init-only", false, "stop after node initialization (useful for integration tests") @@ -57,7 +56,7 @@ func daemonSetupFlags(flags *pflag.FlagSet, opts *daemonOptions) { flags.StringVar(&opts.gqlBind, "gql-bind", ":8700", "Bind graphql api") flags.StringVarP(&opts.identity, "p2p-identity", "i", "", "set p2p identity") flags.StringSliceVar(&opts.bootstrap, "bootstrap", p2p.DefaultBootstrap, "boostrap peers") - // flags.StringSliceVar(&opts.bindP2P, "bind-p2p", []string{"/ip4/0.0.0.0/tcp/0", "/ble/00000000-0000-0000-0000-000000000000"}, "p2p listening address") + // flags.StringSliceVar(&opts.bindP2P, "bind-p2p", []string{"/ip4/0.0.0.0/tcp/0", "/ble/00000000-0000-0000-0000-000000000000"}, "p2p listening address") flags.StringSliceVar(&opts.bindP2P, "bind-p2p", []string{"/ip4/0.0.0.0/tcp/0"}, "p2p listening address") flags.StringSliceVar(&opts.transportP2P, "transport-p2p", []string{"default"}, "p2p transport to enable") _ = viper.BindPFlags(flags) @@ -84,30 +83,29 @@ func newDaemonCommand() *cobra.Command { } func daemon(opts *daemonOptions) error { + ctx := context.Background() var err error a := &account.Account{} defer a.PanicHandler() accountOptions := account.Options{ + account.WithJaegerAddrName(jaegerAddr, jaegerName+":node"), account.WithRing(ring), account.WithName(opts.nickname), - account.WithPassphrase(opts.password), + account.WithPassphrase(opts.sql.key), account.WithDatabase(&account.DatabaseOptions{ - Path: "/tmp", - Drop: opts.dropDatabase, - JaegerAddr: jaegerAddr, + Path: "/tmp", + Drop: opts.dropDatabase, }), account.WithBanner(banner.Quote()), account.WithGrpcServer(&account.GrpcServerOptions{ Bind: opts.grpcBind, Interceptors: true, - JaegerAddr: jaegerAddr, }), account.WithGQL(&account.GQLOptions{ Bind: opts.gqlBind, Interceptors: true, - JaegerAddr: jaegerAddr, }), } if !opts.noP2P { @@ -149,14 +147,14 @@ func daemon(opts *daemonOptions) error { if opts.initOnly { accountOptions = append(accountOptions, account.WithInitOnly()) } - a, err = account.New(accountOptions...) + a, err = account.New(ctx, accountOptions...) if err != nil { return err } - defer a.Close() + defer a.Close(ctx) - err = a.Open() + err = a.Open(ctx) if err != nil { return err } diff --git a/core/cmd/berty/root.go b/core/cmd/berty/root.go index cb0c4f6570..2c5184a217 100644 --- a/core/cmd/berty/root.go +++ b/core/cmd/berty/root.go @@ -80,11 +80,12 @@ func newRootCommand() *cobra.Command { return nil }, } - + defaultJaegerName := os.Getenv("USER") + "@" + os.Getenv("HOST") cmd.PersistentFlags().BoolP("help", "h", false, "Print usage") cmd.PersistentFlags().StringP("log-level", "", "info", "log level (debug, info, warn, error)") cmd.PersistentFlags().StringP("log-namespaces", "", "core.*,vendor.gorm*", "logger namespaces to enable (supports wildcard)") cmd.PersistentFlags().StringP("jaeger-address", "", "127.0.0.1:6831", "ip address / hostname and port of jaeger-agent: :") + cmd.PersistentFlags().StringP("jaeger-name", "", defaultJaegerName, "tracer name") cmd.PersistentFlags().Int64P("rand-seed", "", 0, "seed used to initialize the default rand source") cmd.AddCommand( @@ -188,10 +189,17 @@ func setupLogger(cmd *cobra.Command, args []string) error { return nil } -var jaegerAddr string +var ( + jaegerAddr string + jaegerName string +) func setupJaeger(cmd *cobra.Command, args []string) (err error) { jaegerAddr, err = cmd.Flags().GetString("jaeger-address") + if err != nil { + return err + } + jaegerName, err = cmd.Flags().GetString("jaeger-name") return } diff --git a/core/crypto/keypair/keypair.pb.go b/core/crypto/keypair/keypair.pb.go index 1e223643ca..3c0f5c452c 100644 --- a/core/crypto/keypair/keypair.pb.go +++ b/core/crypto/keypair/keypair.pb.go @@ -67,9 +67,9 @@ var PublicKeyAlgorithm_name = map[int32]string{ } var PublicKeyAlgorithm_value = map[string]int32{ "UNKNOWN_PUBLIC_KEY_ALGORITHM": 0, - "RSA": 1, - "DSA": 2, - "ECDSA": 3, + "RSA": 1, + "DSA": 2, + "ECDSA": 3, } func (x PublicKeyAlgorithm) String() string { diff --git a/core/manager/account/account.go b/core/manager/account/account.go index bf9f3a8ab6..d3f2d174fd 100644 --- a/core/manager/account/account.go +++ b/core/manager/account/account.go @@ -1,6 +1,7 @@ package account import ( + "context" "fmt" "io" "net" @@ -19,18 +20,23 @@ import ( "berty.tech/core/network" "berty.tech/core/network/netutil" "berty.tech/core/node" + "berty.tech/core/pkg/tracing" "berty.tech/core/pkg/zapring" "berty.tech/core/sql" "berty.tech/core/sql/sqlcipher" "github.com/jinzhu/gorm" reuse "github.com/libp2p/go-reuseport" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "go.uber.org/zap" "google.golang.org/grpc" ) // Info is used in berty.node.DeviceInfos -func Info() map[string]string { +func Info(ctx context.Context) map[string]string { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + if len(list) < 1 { return map[string]string{"accounts": "none!"} } @@ -73,8 +79,10 @@ type Account struct { withBot bool BotRunning bool - serverTracerCloser io.Closer - dialTracerCloser io.Closer + tracer opentracing.Tracer + tracingCloser io.Closer + rootContext context.Context + rootSpan opentracing.Span errChan chan error @@ -86,9 +94,14 @@ var list []*Account type NewOption func(*Account) error type Options []NewOption -func New(opts ...NewOption) (*Account, error) { +func New(ctx context.Context, opts ...NewOption) (*Account, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + a := &Account{ - errChan: make(chan error, 1), + errChan: make(chan error, 1), + rootSpan: span, + rootContext: ctx, } for _, opt := range opts { @@ -105,7 +118,10 @@ func New(opts ...NewOption) (*Account, error) { return a, nil } -func Get(name string) (*Account, error) { +func Get(ctx context.Context, name string) (*Account, error) { + span, _ := tracing.EnterFunc(ctx, name) + defer span.Finish() + for _, account := range list { if account != nil && account.Name == name { return account, nil @@ -114,7 +130,10 @@ func Get(name string) (*Account, error) { return nil, errors.New("account with name " + name + " isn't opened") } -func List(datastorePath string) ([]string, error) { +func List(ctx context.Context, datastorePath string) ([]string, error) { + span, _ := tracing.EnterFunc(ctx, datastorePath) + defer span.Finish() + var names []string err := filepath.Walk(datastorePath, func(path string, info os.FileInfo, err error) error { @@ -136,17 +155,25 @@ func List(datastorePath string) ([]string, error) { return names, nil } -func Delete(a *Account) { - ForEach(func(i int, current *Account) { +func Delete(ctx context.Context, a *Account) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, a) + defer span.Finish() + + ForEach(ctx, func(ctx context.Context, i int, current *Account) { if a == current { list = append(list[:i], list[i+1:]...) } }) } -func ForEach(callback func(int, *Account)) { +func ForEach(ctx context.Context, callback func(context.Context, int, *Account)) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + defer span.Finish() + for index, account := range list { - callback(index, account) + callback(ctx, index, account) } } @@ -167,9 +194,13 @@ func (a *Account) Validate() error { return nil } -func (a *Account) Open() error { - if err := a.initNode(); err != nil { - a.Close() +func (a *Account) Open(ctx context.Context) error { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + defer span.Finish() + + if err := a.initNode(ctx); err != nil { + a.Close(ctx) return err } if a.initOnly { @@ -177,24 +208,24 @@ func (a *Account) Open() error { } // start - if err := a.startNetwork(); err != nil { - a.Close() + if err := a.startNetwork(ctx); err != nil { + a.Close(ctx) return err } - if err := a.startGrpcServer(); err != nil { - a.Close() + if err := a.startGrpcServer(ctx); err != nil { + a.Close(ctx) return err } - if err := a.startGQL(); err != nil { - a.Close() + if err := a.startGQL(ctx); err != nil { + a.Close(ctx) return err } - if err := a.startNode(); err != nil { - a.Close() + if err := a.startNode(ctx); err != nil { + a.Close(ctx) return err } if a.withBot { - if err := a.StartBot(); err != nil { + if err := a.StartBot(ctx); err != nil { return err } } @@ -218,21 +249,22 @@ func (a *Account) Open() error { return nil } -func (a *Account) Close() { +func (a *Account) Close(ctx context.Context) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + defer span.Finish() + if a.node != nil { _ = a.node.Close() } if a.network != nil { - _ = a.network.Close() + _ = a.network.Close(ctx) } if a.db != nil { _ = a.db.Close() } - if a.serverTracerCloser != nil { - _ = a.serverTracerCloser.Close() - } - if a.dialTracerCloser != nil { - _ = a.dialTracerCloser.Close() + if a.tracingCloser != nil { + _ = a.tracingCloser.Close() } if a.ioGrpc != nil { _ = a.ioGrpc.Listener().Close() @@ -241,7 +273,7 @@ func (a *Account) Close() { a.grpcListener.Close() } if a.BotRunning { - _ = a.StopBot() + _ = a.StopBot(ctx) } if a.ring != nil { a.ring.Close() @@ -253,7 +285,11 @@ func (a *Account) dbPath() string { return a.dbDir + "/berty." + a.Name + ".db" } -func (a *Account) openDatabase() error { +func (a *Account) openDatabase(ctx context.Context) error { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + defer span.Finish() + var err error a.db, err = sqlcipher.Open(a.dbPath(), []byte(a.Passphrase)) if err != nil { @@ -264,7 +300,7 @@ func (a *Account) openDatabase() error { return errors.Wrap(err, "failed to initialize sql") } if a.dbDrop { - if err = a.DropDatabase(); err != nil { + if err = a.DropDatabase(ctx); err != nil { return errors.Wrap(err, "failed to drop database") } } @@ -274,28 +310,39 @@ func (a *Account) openDatabase() error { return nil } -func (a *Account) DropDatabase() error { +func (a *Account) DropDatabase(ctx context.Context) error { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + defer span.Finish() + var err error if err = sql.DropDatabase(a.db); err != nil { return errors.Wrap(err, "failed to drop database") } - return a.openDatabase() + return a.openDatabase(ctx) } -func (a *Account) startNetwork() error { +func (a *Account) startNetwork(ctx context.Context) error { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + defer span.Finish() + if a.network == nil { return nil } go func() { defer a.PanicHandler() - a.errChan <- a.network.Start() + a.errChan <- a.network.Start(ctx) }() return nil } -func (a *Account) startGrpcServer() error { +func (a *Account) startGrpcServer(ctx context.Context) error { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + var err error addr, err := net.ResolveTCPAddr("tcp", a.GrpcBind) @@ -321,7 +368,11 @@ func (a *Account) startGrpcServer() error { return nil } -func (a *Account) StartBot() error { +func (a *Account) StartBot(ctx context.Context) error { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + defer span.Finish() + options := append( []bot.Option{ bot.WithTCPDaemon(a.GrpcBind), @@ -329,7 +380,7 @@ func (a *Account) StartBot() error { }, bot.GenericOptions()..., ) - b, err := bot.New(options...) + b, err := bot.New(ctx, options...) if err != nil { return errors.Wrap(err, "failed to initialize bot") } @@ -348,13 +399,19 @@ func (a *Account) StartBot() error { return nil } -func (a *Account) StopBot() error { +func (a *Account) StopBot(ctx context.Context) error { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + // TODO: implement bot closing function // Then set a.BotRunning = false return errors.New("stop bot not implemented yet") } -func (a *Account) startGQL() error { +func (a *Account) startGQL(ctx context.Context) error { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + var err error if a.gqlHandler == nil { @@ -392,11 +449,15 @@ func (a *Account) startGQL() error { return nil } -func (a *Account) initNode() error { +func (a *Account) initNode(ctx context.Context) error { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + var err error // initialize node a.node, err = node.New( + a.rootContext, node.WithP2PGrpcServer(a.GrpcServer), node.WithNodeGrpcServer(a.GrpcServer), node.WithSQL(a.db), @@ -414,13 +475,14 @@ func (a *Account) initNode() error { return nil } -func (a *Account) startNode() error { +func (a *Account) startNode(ctx context.Context) error { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() // start node go func() { defer a.PanicHandler() - a.errChan <- a.node.Start(true, true) - + a.errChan <- a.node.Start(a.rootContext, true, true) }() // show banner @@ -466,6 +528,9 @@ func (a *Account) ErrChan() chan error { } func (a *Account) PanicHandler() { + span, _ := tracing.EnterFunc(a.rootContext) + defer span.Finish() + r := recover() if r != nil { err := errors.New(fmt.Sprintf("%+v", r)) diff --git a/core/manager/account/db.go b/core/manager/account/db.go index 4ea8364695..a8b1c4f5ee 100644 --- a/core/manager/account/db.go +++ b/core/manager/account/db.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" + "berty.tech/core/pkg/tracing" "github.com/jinzhu/gorm" opentracing "github.com/opentracing/opentracing-go" ) @@ -46,7 +47,9 @@ func OpenStateDB(path string, initialState StateDB) (*StateDB, error) { if state.StartCounter == 0 { state = initialState state.gorm = db - state.Save() + if err := state.Save(); err != nil { + return nil, err + } } else { state.gorm = db } @@ -74,11 +77,12 @@ func (state *StateDB) Close() { func gormCreateSubSpan(scope *gorm.Scope, operationName string) { if span, ok := scope.Get("rootSpan"); ok { rootSpan := span.(opentracing.Span) - operationName = fmt.Sprintf("db-%s", operationName) + operationName = fmt.Sprintf("gorm::%s", operationName) subSpan := rootSpan.Tracer().StartSpan( operationName, opentracing.ChildOf(rootSpan.Context()), ) + subSpan.SetTag("component", "gorm") scope.Set("subSpan", subSpan) } } @@ -93,6 +97,9 @@ func gormFinishSubSpan(scope *gorm.Scope) { func WithDatabase(opts *DatabaseOptions) NewOption { return func(a *Account) error { + span, ctx := tracing.EnterFunc(a.rootContext, opts) + defer span.Finish() + if opts == nil { opts = &DatabaseOptions{} } @@ -103,11 +110,11 @@ func WithDatabase(opts *DatabaseOptions) NewOption { } a.dbDrop = opts.Drop - if err := a.openDatabase(); err != nil { + if err := a.openDatabase(ctx); err != nil { return err } - if opts.JaegerAddr != "" { + if a.tracer != nil { // create a.db.Callback().Create().Before("gorm:before_create").Register("jaeger:before_create", func(scope *gorm.Scope) { gormCreateSubSpan(scope, fmt.Sprintln("insert", scope.TableName())) }) a.db.Callback().Create().Before("gorm:after_create").Register("jaeger:after_create", func(scope *gorm.Scope) { gormFinishSubSpan(scope) }) diff --git a/core/manager/account/options.go b/core/manager/account/options.go index 843dd0f07c..494a895a09 100644 --- a/core/manager/account/options.go +++ b/core/manager/account/options.go @@ -13,6 +13,7 @@ import ( "berty.tech/core/network/mock" "berty.tech/core/network/netutil" "berty.tech/core/pkg/jaeger" + "berty.tech/core/pkg/tracing" "berty.tech/core/pkg/zapring" "github.com/99designs/gqlgen/graphql" gqlhandler "github.com/99designs/gqlgen/handler" @@ -22,7 +23,6 @@ import ( grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" grpc_ot "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" - opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/rs/cors" "go.uber.org/zap" @@ -58,9 +58,8 @@ func WithPassphrase(passphrase string) NewOption { } type DatabaseOptions struct { - Path string - Drop bool - JaegerAddr string + Path string + Drop bool } func WithBanner(banner string) NewOption { @@ -72,7 +71,10 @@ func WithBanner(banner string) NewOption { func WithEnqueurNetwork() NewOption { return func(a *Account) error { - a.network = mock.NewEnqueuer() + span, ctx := tracing.EnterFunc(a.rootContext) + defer span.Finish() + + a.network = mock.NewEnqueuer(ctx) return nil } } @@ -84,16 +86,21 @@ func WithInitOnly() NewOption { } } +func WithJaegerAddrName(addr string, name string) NewOption { + return func(a *Account) error { + var err error + a.tracer, a.tracingCloser, err = jaeger.InitTracer(addr, name) + return err + } +} + type GrpcServerOptions struct { Bind string Interceptors bool - JaegerAddr string } func WithGrpcServer(opts *GrpcServerOptions) NewOption { return func(a *Account) error { - var err error - if opts == nil { opts = &GrpcServerOptions{} } @@ -121,14 +128,8 @@ func WithGrpcServer(opts *GrpcServerOptions) NewOption { grpc_recovery.UnaryServerInterceptor(), ) - if opts.JaegerAddr != "" { - var tracer opentracing.Tracer - tracer, a.serverTracerCloser, err = jaeger.InitTracer(opts.JaegerAddr, "berty-account-"+a.Name+"-server") - if err != nil { - return err - } - - tracerOpts := grpc_ot.WithTracer(tracer) + if a.tracer != nil { + tracerOpts := grpc_ot.WithTracer(a.tracer) serverStreamOpts = append(serverStreamOpts, grpc_ot.StreamServerInterceptor(tracerOpts)) serverUnaryOpts = append(serverUnaryOpts, grpc_ot.UnaryServerInterceptor(tracerOpts)) } @@ -153,7 +154,6 @@ func WithGrpcServer(opts *GrpcServerOptions) NewOption { type GQLOptions struct { Bind string Interceptors bool - JaegerAddr string } func WithGQL(opts *GQLOptions) NewOption { @@ -173,14 +173,8 @@ func WithGQL(opts *GQLOptions) NewOption { grpc_zap.UnaryClientInterceptor(gqlLogger), } - if opts.JaegerAddr != "" { - var tracer opentracing.Tracer - tracer, a.dialTracerCloser, err = jaeger.InitTracer(opts.JaegerAddr, "berty-account-"+a.Name+"-dial") - if err != nil { - return err - } - - tracerOpts := grpc_ot.WithTracer(tracer) + if a.tracer != nil { + tracerOpts := grpc_ot.WithTracer(a.tracer) clientStreamOpts = append(clientStreamOpts, grpc_ot.StreamClientInterceptor(tracerOpts)) clientUnaryOpts = append(clientUnaryOpts, grpc_ot.UnaryClientInterceptor(tracerOpts)) } diff --git a/core/manager/account/p2p.go b/core/manager/account/p2p.go index 95207ea605..2a15c84b9f 100644 --- a/core/manager/account/p2p.go +++ b/core/manager/account/p2p.go @@ -7,8 +7,10 @@ import ( "berty.tech/core/network" "berty.tech/core/network/p2p" + "berty.tech/core/pkg/tracing" "github.com/jinzhu/gorm" p2pcrypto "github.com/libp2p/go-libp2p-crypto" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" ) @@ -23,7 +25,11 @@ type P2PNetworkOptions struct { Identity string } -func createP2PNetwork(opts *P2PNetworkOptions, db *gorm.DB) (network.Driver, network.Metrics, error) { +func createP2PNetwork(ctx context.Context, opts *P2PNetworkOptions, db *gorm.DB) (network.Driver, network.Metrics, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + defer span.Finish() + if opts == nil { opts = &P2PNetworkOptions{} } @@ -88,14 +94,14 @@ func createP2PNetwork(opts *P2PNetworkOptions, db *gorm.DB) (network.Driver, net p2pOptions = append(p2pOptions, p2p.WithSwarmKey(opts.SwarmKey)) } - driver, err := p2p.NewDriver(context.Background(), p2pOptions...) + driver, err := p2p.NewDriver(ctx, p2pOptions...) if err != nil { return nil, nil, err } var metrics network.Metrics if opts.Metrics { - metrics = p2p.NewMetrics(driver) + metrics = p2p.NewMetrics(ctx, driver) } return driver, metrics, nil @@ -103,9 +109,12 @@ func createP2PNetwork(opts *P2PNetworkOptions, db *gorm.DB) (network.Driver, net func WithP2PNetwork(opts *P2PNetworkOptions) NewOption { return func(a *Account) error { + span, ctx := tracing.EnterFunc(a.rootContext) + defer span.Finish() + var err error - a.network, a.metrics, err = createP2PNetwork(opts, a.db) + a.network, a.metrics, err = createP2PNetwork(ctx, opts, a.db) if err != nil { return err } @@ -114,20 +123,22 @@ func WithP2PNetwork(opts *P2PNetworkOptions) NewOption { } } -func (a *Account) UpdateP2PNetwork(opts *P2PNetworkOptions) error { +func (a *Account) UpdateP2PNetwork(ctx context.Context, opts *P2PNetworkOptions) error { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + defer span.Finish() + var err error - err = a.network.Close() + err = a.network.Close(ctx) if err != nil { return err } - a.network, a.metrics, err = createP2PNetwork(opts, a.db) + a.network, a.metrics, err = createP2PNetwork(ctx, opts, a.db) if err != nil { return err } - a.node.UseNetworkDriver(a.network) - - return nil + return a.node.UseNetworkDriver(ctx, a.network) } diff --git a/core/network/ble/darwin.go b/core/network/ble/darwin.go index 88a53551a2..e17d6786a7 100644 --- a/core/network/ble/darwin.go +++ b/core/network/ble/darwin.go @@ -116,7 +116,7 @@ func NewListener(lAddr ma.Multiaddr, hostID peer.ID, t *Transport) (*Listener, e incomingBLEUUID: make(chan string), incomingPeerID: make(chan string), transport: t, - closer: make(chan struct{}), + closer: make(chan struct{}), } listeners[t.ID] = listerner diff --git a/core/network/driver.go b/core/network/driver.go index 6cdeec2d2e..1c66a9e085 100644 --- a/core/network/driver.go +++ b/core/network/driver.go @@ -11,7 +11,7 @@ import ( type Metrics interface { // Return a list of peers - Peers() *p2p.Peers + Peers(context.Context) *p2p.Peers // Monitor connected/disconnected peers MonitorPeers(func(*p2p.Peer, error) error) @@ -29,7 +29,7 @@ type Metrics interface { type Driver interface { // Return driver current id - ID() *p2p.Peer + ID(context.Context) *p2p.Peer // Emit sends an envelope to a channel Emit(context.Context, *p2p.Envelope) error @@ -47,11 +47,11 @@ type Driver interface { PingOtherNode(ctx context.Context, destination string) error // Start start service listener - Start() error + Start(context.Context) error // Return the supported protocols of the given peer - Protocols(*p2p.Peer) ([]string, error) + Protocols(context.Context, *p2p.Peer) ([]string, error) // Close cleanups things - Close() error + Close(context.Context) error } diff --git a/core/network/mock/euqueuer.go b/core/network/mock/euqueuer.go index 5b5f36e643..d5a4986374 100644 --- a/core/network/mock/euqueuer.go +++ b/core/network/mock/euqueuer.go @@ -5,6 +5,7 @@ import ( "berty.tech/core/api/p2p" "berty.tech/core/network" + "berty.tech/core/pkg/tracing" ) type Enqueuer struct { @@ -14,7 +15,10 @@ type Enqueuer struct { pingQueue chan string } -func NewEnqueuer() *Enqueuer { +func NewEnqueuer(ctx context.Context) *Enqueuer { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + return &Enqueuer{ queue: make(chan *p2p.Envelope, 100), pingQueue: make(chan string, 100), @@ -25,15 +29,19 @@ func (e *Enqueuer) Queue() chan *p2p.Envelope { return e.queue } -func (e *Enqueuer) Emit(_ context.Context, envelope *p2p.Envelope) error { +func (e *Enqueuer) Emit(ctx context.Context, envelope *p2p.Envelope) error { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + e.queue <- envelope return nil } -func (e *Enqueuer) Start() error { - for true { - } - return nil +func (e *Enqueuer) Start(ctx context.Context) error { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + + select {} // wait forever } func (e *Enqueuer) OnEnvelopeHandler(_ func(context.Context, *p2p.Envelope) (*p2p.Void, error)) { @@ -41,15 +49,23 @@ func (e *Enqueuer) OnEnvelopeHandler(_ func(context.Context, *p2p.Envelope) (*p2 } func (e *Enqueuer) PingOtherNode(ctx context.Context, destination string) error { - e.pingQueue <- destination + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + e.pingQueue <- destination return nil } -func (e *Enqueuer) Join(_ context.Context, _ string) error { +func (e *Enqueuer) Join(ctx context.Context, _ string) error { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + return nil } -func (e *Enqueuer) Close() error { +func (e *Enqueuer) Close(ctx context.Context) error { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + return nil } diff --git a/core/network/p2p/discovery.go b/core/network/p2p/discovery.go index af385a1cb9..89c599ca44 100644 --- a/core/network/p2p/discovery.go +++ b/core/network/p2p/discovery.go @@ -3,6 +3,7 @@ package p2p import ( "context" + "berty.tech/core/pkg/tracing" cid "github.com/ipfs/go-cid" inet "github.com/libp2p/go-libp2p-net" peer "github.com/libp2p/go-libp2p-peer" @@ -20,17 +21,24 @@ var _ discovery.Notifee = (*driverDiscoveryNotify)(nil) type driverDiscoveryNotify Driver -func DiscoveryNotify(d *Driver) discovery.Notifee { +func DiscoveryNotify(ctx context.Context, d *Driver) discovery.Notifee { + span, _ := tracing.EnterFunc(ctx, d) + defer span.Finish() return (*driverDiscoveryNotify)(d) } -func Notify(d *Driver) inet.Notifiee { +func Notify(ctx context.Context, d *Driver) inet.Notifiee { + span, _ := tracing.EnterFunc(ctx, d) + defer span.Finish() return (*driverDiscoveryNotify)(d) } // Driver Notify func (ddn *driverDiscoveryNotify) HandlePeerFound(pi pstore.PeerInfo) { - if err := ddn.host.Connect(context.Background(), pi); err != nil { + span, ctx := tracing.EnterFunc(ddn.rootContext, pi) + defer span.Finish() + + if err := ddn.host.Connect(ctx, pi); err != nil { logger().Warn("mdns discovery failed", zap.String("remoteID", pi.ID.Pretty()), zap.Error(err)) } else { // absorb addresses into peerstore @@ -38,18 +46,33 @@ func (ddn *driverDiscoveryNotify) HandlePeerFound(pi pstore.PeerInfo) { } } -func (ddn *driverDiscoveryNotify) Listen(net inet.Network, a ma.Multiaddr) {} -func (ddn *driverDiscoveryNotify) ListenClose(net inet.Network, a ma.Multiaddr) {} -func (ddn *driverDiscoveryNotify) OpenedStream(net inet.Network, s inet.Stream) {} -func (ddn *driverDiscoveryNotify) ClosedStream(net inet.Network, s inet.Stream) {} +func (ddn *driverDiscoveryNotify) Listen(net inet.Network, a ma.Multiaddr) { + span, _ := tracing.EnterFunc(ddn.rootContext, net, a) + defer span.Finish() +} +func (ddn *driverDiscoveryNotify) ListenClose(net inet.Network, a ma.Multiaddr) { + span, _ := tracing.EnterFunc(ddn.rootContext, net, a) + defer span.Finish() +} +func (ddn *driverDiscoveryNotify) OpenedStream(net inet.Network, s inet.Stream) { + span, _ := tracing.EnterFunc(ddn.rootContext, net, s) + defer span.Finish() +} +func (ddn *driverDiscoveryNotify) ClosedStream(net inet.Network, s inet.Stream) { + span, _ := tracing.EnterFunc(ddn.rootContext, net, s) + defer span.Finish() +} func (ddn *driverDiscoveryNotify) Connected(s inet.Network, c inet.Conn) { + span, ctx := tracing.EnterFunc(ddn.rootContext, s, c) + defer span.Finish() + go func(id peer.ID) { ddn.muSubs.Lock() if len(ddn.subsStack) > 0 { var newSubsStack []cid.Cid for _, c := range ddn.subsStack { - if err := ddn.dht.Provide(context.Background(), c, true); err != nil { + if err := ddn.dht.Provide(ctx, c, true); err != nil { // stack peer if no peer found logger().Warn("discover: provide err:", zap.Error(err)) newSubsStack = append(newSubsStack, c) @@ -64,4 +87,7 @@ func (ddn *driverDiscoveryNotify) Connected(s inet.Network, c inet.Conn) { }(c.RemotePeer()) } -func (ddn *driverDiscoveryNotify) Disconnected(s inet.Network, c inet.Conn) {} +func (ddn *driverDiscoveryNotify) Disconnected(s inet.Network, c inet.Conn) { + span, _ := tracing.EnterFunc(ddn.rootContext, s, c) + defer span.Finish() +} diff --git a/core/network/p2p/driver.go b/core/network/p2p/driver.go index 3148e7743e..e167415e94 100644 --- a/core/network/p2p/driver.go +++ b/core/network/p2p/driver.go @@ -3,7 +3,6 @@ package p2p import ( "context" "fmt" - "io" "net" "sync" "time" @@ -12,6 +11,7 @@ import ( "berty.tech/core/network" "berty.tech/core/network/p2p/p2putil" "berty.tech/core/network/p2p/protocol/service/p2pgrpc" + "berty.tech/core/pkg/tracing" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" @@ -31,6 +31,7 @@ import ( mdns "github.com/libp2p/go-libp2p/p2p/discovery" ma "github.com/multiformats/go-multiaddr" mh "github.com/multiformats/go-multihash" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "go.uber.org/zap" "google.golang.org/grpc" @@ -74,14 +75,17 @@ type Driver struct { listener net.Listener gs *grpc.Server - serverTracerClose io.Closer - dialTracerCloser io.Closer + rootContext context.Context } // Driver is a network.Driver var _ network.Driver = (*Driver)(nil) func NewDriver(ctx context.Context, opts ...Option) (*Driver, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + defer span.Finish() + var cfg driverConfig if err := cfg.Apply(opts...); err != nil { return nil, err @@ -92,13 +96,18 @@ func NewDriver(ctx context.Context, opts ...Option) (*Driver, error) { // New create a new driver func newDriver(ctx context.Context, cfg driverConfig) (*Driver, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, cfg) + defer span.Finish() + host, err := libp2p.New(ctx, cfg.libp2pOpt...) if err != nil { return nil, err } driver := &Driver{ - host: host, + host: host, + rootContext: ctx, } ds := syncdatastore.MutexWrap(datastore.NewMapDatastore()) @@ -107,7 +116,7 @@ func newDriver(ctx context.Context, cfg driverConfig) (*Driver, error) { cfg.dhtOpts = []dhtopt.Option{dhtopt.Datastore(ds)} } - driver.dht = dht.NewDHT(context.Background(), host, ds) + driver.dht = dht.NewDHT(ctx, host, ds) if err != nil { return nil, err } @@ -129,11 +138,11 @@ func newDriver(ctx context.Context, cfg driverConfig) (*Driver, error) { if err != nil { logger().Warn("Failed to enable MDNS", zap.Error(err)) } else { - sa.RegisterNotifee(DiscoveryNotify(driver)) + sa.RegisterNotifee(DiscoveryNotify(ctx, driver)) } } - host.Network().Notify(Notify(driver)) + host.Network().Notify(Notify(ctx, driver)) if len(cfg.bootstrap) > 0 { if err := driver.Bootstrap(ctx, cfg.bootstrapSync, cfg.bootstrap...); err != nil { @@ -180,19 +189,22 @@ func newDriver(ctx context.Context, cfg driverConfig) (*Driver, error) { dialOpts := append([]grpc.DialOption{ grpc.WithInsecure(), - grpc.WithDialer(sgrpc.NewDialer(ID)), + grpc.WithDialer(sgrpc.NewDialer(ctx, ID)), }, p2pInterceptorsClient...) driver.ccmanager = p2putil.NewNetManager(dialOpts...) p2p.RegisterServiceServer(driver.gs, ServiceServer(driver)) - driver.listener = sgrpc.NewListener(ID) + driver.listener = sgrpc.NewListener(ctx, ID) driver.logHostInfos() return driver, nil } -func (d *Driver) Start() error { +func (d *Driver) Start(ctx context.Context) error { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + if err := d.gs.Serve(d.listener); err != nil { logger().Error("Listen error", zap.Error(err)) return err @@ -210,7 +222,10 @@ func (d *Driver) logHostInfos() { logger().Debug("Host", zap.String("ID", d.host.ID().Pretty()), zap.Strings("Addrs", addrs)) } -func (d *Driver) getPeerInfo(addr string) (*pstore.PeerInfo, error) { +func (d *Driver) getPeerInfo(ctx context.Context, addr string) (*pstore.PeerInfo, error) { + span, _ := tracing.EnterFunc(ctx, addr) + defer span.Finish() + iaddr, err := ipfsaddr.ParseString(addr) if err != nil { return nil, err @@ -219,7 +234,10 @@ func (d *Driver) getPeerInfo(addr string) (*pstore.PeerInfo, error) { return pstore.InfoFromP2pAddr(iaddr.Multiaddr()) } -func (d *Driver) Protocols(p *p2p.Peer) ([]string, error) { +func (d *Driver) Protocols(ctx context.Context, p *p2p.Peer) ([]string, error) { + span, _ := tracing.EnterFunc(ctx, p) + defer span.Finish() + peerid, err := peer.IDB58Decode(p.ID) if err != nil { return nil, fmt.Errorf("get protocols error: `%s`", err) @@ -232,7 +250,10 @@ func (d *Driver) Addrs() []ma.Multiaddr { return d.host.Addrs() } -func (d *Driver) ID() *p2p.Peer { +func (d *Driver) ID(ctx context.Context) *p2p.Peer { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + addrs := make([]string, len(d.host.Addrs())) for i, addr := range d.host.Addrs() { addrs[i] = addr.String() @@ -245,7 +266,10 @@ func (d *Driver) ID() *p2p.Peer { } } -func (d *Driver) Close() error { +func (d *Driver) Close(ctx context.Context) error { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + // FIXME: save cache to speedup next connections var err error // close dht @@ -271,18 +295,25 @@ func (d *Driver) Close() error { return nil } -func (d *Driver) Peerstore() pstore.Peerstore { +func (d *Driver) Peerstore(ctx context.Context) pstore.Peerstore { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + return d.host.Peerstore() } func (d *Driver) Bootstrap(ctx context.Context, sync bool, addrs ...string) error { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, sync, addrs) + defer span.Finish() + bf := d.BootstrapPeerAsync if sync { bf = d.BootstrapPeer } for _, addr := range addrs { - if err := bf(context.Background(), addr); err != nil { + if err := bf(ctx, addr); err != nil { return err } } @@ -291,6 +322,10 @@ func (d *Driver) Bootstrap(ctx context.Context, sync bool, addrs ...string) erro } func (d *Driver) BootstrapPeerAsync(ctx context.Context, addr string) error { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, addr) + defer span.Finish() + go func() { if err := d.BootstrapPeer(ctx, addr); err != nil { logger().Warn("Bootstrap error", zap.String("addr", addr), zap.Error(err)) @@ -301,11 +336,15 @@ func (d *Driver) BootstrapPeerAsync(ctx context.Context, addr string) error { } func (d *Driver) BootstrapPeer(ctx context.Context, bootstrapAddr string) error { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, bootstrapAddr) + defer span.Finish() + if bootstrapAddr == "" { return nil } - pinfo, err := d.getPeerInfo(bootstrapAddr) + pinfo, err := d.getPeerInfo(ctx, bootstrapAddr) if err != nil { return err } else if err = d.host.Connect(ctx, *pinfo); err != nil { @@ -348,6 +387,10 @@ func (d *Driver) Connect(ctx context.Context, pi pstore.PeerInfo) error { } func (d *Driver) Dial(ctx context.Context, peerID string, pid protocol.ID) (net.Conn, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, peerID, pid) + defer span.Finish() + return p2putil.NewDialer(d.host, pid)(ctx, peerID) } diff --git a/core/network/p2p/metrics.go b/core/network/p2p/metrics.go index cb83bd3f96..dbec50f6c1 100644 --- a/core/network/p2p/metrics.go +++ b/core/network/p2p/metrics.go @@ -1,6 +1,7 @@ package p2p import ( + "context" "fmt" "sync" "time" @@ -12,10 +13,12 @@ import ( pstore "github.com/libp2p/go-libp2p-peerstore" protocol "github.com/libp2p/go-libp2p-protocol" ma "github.com/multiformats/go-multiaddr" + opentracing "github.com/opentracing/opentracing-go" "go.uber.org/zap" "berty.tech/core/api/p2p" "berty.tech/core/network" + "berty.tech/core/pkg/tracing" ) // Metrics is a network.Metrics @@ -29,21 +32,31 @@ type Metrics struct { bw *bw.BandwidthCounter driver *Driver + + rootContext context.Context } -func NewMetrics(d *Driver) network.Metrics { +func NewMetrics(ctx context.Context, d *Driver) network.Metrics { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + defer span.Finish() + m := &Metrics{ host: d.host, driver: d, peersHandlers: make([]func(*p2p.Peer, error) error, 0), bw: bw.NewBandwidthCounter(), + rootContext: ctx, } m.host.Network().Notify(m) return m } -func (m *Metrics) Peers() *p2p.Peers { +func (m *Metrics) Peers(ctx context.Context) *p2p.Peers { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + peers := m.peers() pis := &p2p.Peers{ List: make([]*p2p.Peer, len(peers)), @@ -67,8 +80,8 @@ func (m *Metrics) bandwidthToStats(b bw.Stats) *p2p.BandwidthStats { func (m *Metrics) MonitorPeers(handler func(*p2p.Peer, error) error) { m.muHPeers.Lock() + defer m.muHPeers.Unlock() m.peersHandlers = append(m.peersHandlers, handler) - m.muHPeers.Unlock() } func (m *Metrics) MonitorBandwidth(interval time.Duration, handler func(*p2p.BandwidthStats, error) error) { @@ -112,7 +125,9 @@ func (m *Metrics) MonitorBandwidthProtocol(id string, interval time.Duration, ha func (m *Metrics) MonitorBandwidthPeer(id string, interval time.Duration, handler func(*p2p.BandwidthStats, error) error) { peerid, err := peer.IDFromString(id) if err != nil { - handler(nil, fmt.Errorf("monitor bandwidth peer: %s", err)) + if err := handler(nil, fmt.Errorf("monitor bandwidth peer: %s", err)); err != nil { + logger().Error("failed to call handler", zap.Error(err)) + } return } @@ -134,7 +149,10 @@ func (m *Metrics) MonitorBandwidthPeer(id string, interval time.Duration, handle }() } -func (m *Metrics) handlePeer(id peer.ID) { +func (m *Metrics) handlePeer(ctx context.Context, id peer.ID) { + span, _ := tracing.EnterFunc(ctx, id) + defer span.Finish() + pi := m.host.Peerstore().PeerInfo(id) peer := m.peerInfoToPeer(pi) @@ -189,9 +207,9 @@ func (m *Metrics) OpenedStream(net inet.Network, s inet.Stream) {} func (m *Metrics) ClosedStream(net inet.Network, s inet.Stream) {} func (m *Metrics) Connected(s inet.Network, c inet.Conn) { - go m.handlePeer(c.RemotePeer()) + go m.handlePeer(m.rootContext, c.RemotePeer()) } func (m *Metrics) Disconnected(s inet.Network, c inet.Conn) { - go m.handlePeer(c.RemotePeer()) + go m.handlePeer(m.rootContext, c.RemotePeer()) } diff --git a/core/network/p2p/p2putil/conn.go b/core/network/p2p/p2putil/conn.go index fde9f21245..753d4c3ac0 100644 --- a/core/network/p2p/p2putil/conn.go +++ b/core/network/p2p/p2putil/conn.go @@ -24,7 +24,7 @@ func (pa *ProtocolAddr) Network() string { } func (pa *ProtocolAddr) String() string { - return string(pa.Address) + return pa.Address } // conn must implement net.conn diff --git a/core/network/p2p/protocol/service/p2pgrpc/grpc.go b/core/network/p2p/protocol/service/p2pgrpc/grpc.go index 32b0f5c0f7..477227a878 100644 --- a/core/network/p2p/protocol/service/p2pgrpc/grpc.go +++ b/core/network/p2p/protocol/service/p2pgrpc/grpc.go @@ -7,9 +7,11 @@ import ( host "github.com/libp2p/go-libp2p-host" protocol "github.com/libp2p/go-libp2p-protocol" + opentracing "github.com/opentracing/opentracing-go" "go.uber.org/zap" "berty.tech/core/network/p2p/p2putil" + "berty.tech/core/pkg/tracing" ) const ID = "/berty/grpc" @@ -36,7 +38,10 @@ func (pg *P2Pgrpc) hasProtocol(proto string) bool { return false } -func (pg *P2Pgrpc) NewListener(proto string) net.Listener { +func (pg *P2Pgrpc) NewListener(ctx context.Context, proto string) net.Listener { + span, _ := tracing.EnterFunc(ctx, proto) + defer span.Finish() + id := GetGrpcID(proto) if pg.hasProtocol(id) { @@ -56,11 +61,15 @@ func (pg *P2Pgrpc) NewListener(proto string) net.Listener { return l } -func (pg *P2Pgrpc) NewDialer(proto string) func(string, time.Duration) (net.Conn, error) { +func (pg *P2Pgrpc) NewDialer(ctx context.Context, proto string) func(string, time.Duration) (net.Conn, error) { pid := protocol.ID(GetGrpcID(proto)) return func(target string, timeout time.Duration) (net.Conn, error) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, proto, target, timeout) + defer span.Finish() + + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() return p2putil.NewDialer(pg.host, pid)(ctx, target) diff --git a/core/network/p2p/test/p2p_test.go b/core/network/p2p/test/p2p_test.go index 9a735bb6b2..b83aef72ac 100644 --- a/core/network/p2p/test/p2p_test.go +++ b/core/network/p2p/test/p2p_test.go @@ -26,7 +26,7 @@ func getBoostrap(d *p2p.Driver) []string { bootstrap := make([]string, len(addrs)) for i, a := range addrs { - bootstrap[i] = fmt.Sprintf("%s/ipfs/%s", a.String(), d.ID().ID) + bootstrap[i] = fmt.Sprintf("%s/ipfs/%s", a.String(), d.ID(context.Background()).ID) } return bootstrap @@ -49,7 +49,7 @@ func setupDriver(bootstrap ...string) (*p2p.Driver, error) { return nil, err } go func() { - if err = driver.Start(); err != nil { + if err = driver.Start(context.Background()); err != nil { logger().Error("driver start error", zap.Error(err)) } }() @@ -88,7 +88,7 @@ func TestP2PNetwork(t *testing.T) { if d != nil { _d := d go func() { - if err := _d.Close(); err != nil { + if err := _d.Close(context.Background()); err != nil { logger().Warn("error while closing", zap.Error(err)) } }() diff --git a/core/node/config.go b/core/node/config.go index c68e1cdb4f..6bbf16f8b7 100644 --- a/core/node/config.go +++ b/core/node/config.go @@ -1,6 +1,7 @@ package node import ( + "context" "encoding/base64" "github.com/gogo/protobuf/proto" @@ -11,6 +12,7 @@ import ( "berty.tech/core/api/p2p" "berty.tech/core/crypto/sigchain" "berty.tech/core/entity" + "berty.tech/core/pkg/tracing" "berty.tech/core/pkg/zapring" ) @@ -22,8 +24,11 @@ func WithRing(ring *zapring.Ring) NewNodeOption { func WithInitConfig() NewNodeOption { return func(n *Node) { + span, ctx := tracing.EnterFunc(n.rootContext) + defer span.Finish() + // get config from sql - config, err := n.Config() + config, err := n.Config(ctx) if err != nil { ID, err := uuid.NewV4() @@ -36,7 +41,7 @@ func WithInitConfig() NewNodeOption { ID: ID.String(), } - if err = n.sql(nil).Create(config).Error; err != nil { + if err = n.sql(ctx).Create(config).Error; err != nil { logger().Error("node.WithInitConfig", zap.Error(errors.Wrap(err, "failed to save empty config"))) return } @@ -48,8 +53,11 @@ func WithInitConfig() NewNodeOption { func WithConfig() NewNodeOption { return func(n *Node) { + span, ctx := tracing.EnterFunc(n.rootContext) + defer span.Finish() + // get config from sql - config, err := n.Config() + config, err := n.Config(ctx) if err != nil { zap.Error(errors.Wrap(err, "failed to load existing config")) @@ -59,14 +67,14 @@ func WithConfig() NewNodeOption { if config.Validate() != nil { logger().Debug("config is missing from sql, creating a new one") - if _, err = n.initConfig(); err != nil { + if _, err = n.initConfig(ctx); err != nil { zap.Error(errors.Wrap(err, "failed to initialize config")) return } } - config, err = n.Config() + config, err = n.Config(ctx) if err != nil { logger().Error("WithConfig", zap.Error(errors.Wrap(err, "unable to load node config"))) @@ -82,7 +90,10 @@ func WithConfig() NewNodeOption { } } -func (n *Node) initConfig() (*entity.Config, error) { +func (n *Node) initConfig(ctx context.Context) (*entity.Config, error) { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + if n.crypto == nil { return nil, errors.New("unable to get crypto instance") } @@ -111,7 +122,7 @@ func (n *Node) initConfig() (*entity.Config, error) { Sigchain: scBytes, } - if err := n.sql(nil).Create(myself).Error; err != nil { + if err := n.sql(ctx).Create(myself).Error; err != nil { return nil, errors.Wrap(err, "unable to save myself") } @@ -124,7 +135,7 @@ func (n *Node) initConfig() (*entity.Config, error) { ContactID: myself.ID, } - if err := n.sql(nil).Create(currentDevice).Error; err != nil { + if err := n.sql(ctx).Create(currentDevice).Error; err != nil { return nil, errors.Wrap(err, "unable to save config") } @@ -133,7 +144,7 @@ func (n *Node) initConfig() (*entity.Config, error) { n.config.Myself = myself n.config.MyselfID = n.config.Myself.ID - if err := n.sql(nil). + if err := n.sql(ctx). Save(&n.config). Error; err != nil { return nil, errors.Wrap(err, "failed to save config") @@ -143,10 +154,13 @@ func (n *Node) initConfig() (*entity.Config, error) { } // Config gets config from database -func (n *Node) Config() (*entity.Config, error) { +func (n *Node) Config(ctx context.Context) (*entity.Config, error) { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + var config []*entity.Config - if err := n.sql(nil).Preload("CurrentDevice").Preload("Myself").Preload("Myself.Devices").Find(&config, &entity.Config{}).Error; err != nil { + if err := n.sql(ctx).Preload("CurrentDevice").Preload("Myself").Preload("Myself.Devices").Find(&config, &entity.Config{}).Error; err != nil { // if err := n.sql.First(config).Error; err != nil { return nil, errors.Wrap(err, "unable to get config") } diff --git a/core/node/crypto.go b/core/node/crypto.go index dfd6d5bf62..7e0caf2964 100644 --- a/core/node/crypto.go +++ b/core/node/crypto.go @@ -10,6 +10,7 @@ import ( "go.uber.org/zap" "berty.tech/core/crypto/keypair" + "berty.tech/core/pkg/tracing" ) // WithCrypto set the underlying crypto (keypair.Interface) object inside Node @@ -21,6 +22,9 @@ func WithCrypto(cryptoImpl keypair.Interface) NewNodeOption { func WithSoftwareCrypto() NewNodeOption { return func(n *Node) { + span, ctx := tracing.EnterFunc(n.rootContext) + defer span.Finish() + var privBytes []byte if bytes.Compare(n.config.CryptoParams, []byte{}) == 0 { @@ -38,9 +42,9 @@ func WithSoftwareCrypto() NewNodeOption { } n.config.CryptoParams = privBytes - if err = n.sql(nil).Save(n.config).Error; err != nil { + if err = n.sql(ctx).Save(n.config).Error; err != nil { err := errors.Wrap(err, "failed to save RSA key") - n.LogBackgroundError(errors.Wrap(err, "node.WithSoftwareCrypto")) + n.LogBackgroundError(ctx, errors.Wrap(err, "node.WithSoftwareCrypto")) return } diff --git a/core/node/event.go b/core/node/event.go index aff344447d..f5502b14b1 100644 --- a/core/node/event.go +++ b/core/node/event.go @@ -6,32 +6,39 @@ import ( "fmt" "time" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "go.uber.org/zap" "berty.tech/core/api/node" "berty.tech/core/api/p2p" "berty.tech/core/entity" + "berty.tech/core/pkg/tracing" ) -func (n *Node) AsyncWait() { - n.asyncWaitGroup.Wait() +func (n *Node) AsyncWait(ctx context.Context) { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + + n.asyncWaitGroupInst.Wait() } // HandleEvent implements berty.p2p.HandleEvent (synchronous unary) func (n *Node) HandleEvent(ctx context.Context, input *p2p.Event) (*node.Void, error) { - n.asyncWaitGroup.Add(1) - defer n.asyncWaitGroup.Done() - err := n.handleEvent(ctx, input) - //time.Sleep(100 * time.Millisecond) - return &node.Void{}, err + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, input) + defer span.Finish() + defer n.asyncWaitGroup(ctx)() + + return &node.Void{}, n.handleEvent(ctx, input) } func (n *Node) handleEvent(ctx context.Context, input *p2p.Event) error { - n.asyncWaitGroup.Add(1) - defer n.asyncWaitGroup.Done() - n.handleMutex.Lock() - defer n.handleMutex.Unlock() + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, input) + defer span.Finish() + defer n.asyncWaitGroup(ctx)() + n.handleMutex(ctx)() if input.SenderID == n.UserID() { logger().Debug("skipping event created by myself", @@ -92,11 +99,11 @@ func (n *Node) handleEvent(ctx context.Context, input *p2p.Event) error { // emits the event to the client (UI) if handlingError == nil { - if err := n.EnqueueClientEvent(input); err != nil { + if err := n.EnqueueClientEvent(ctx, input); err != nil { return err } } else { - n.LogBackgroundError(errors.Wrap(handlingError, "p2p.Handle event")) + n.LogBackgroundError(ctx, errors.Wrap(handlingError, "p2p.Handle event")) } if err := sql.Save(input).Error; err != nil { @@ -104,12 +111,12 @@ func (n *Node) handleEvent(ctx context.Context, input *p2p.Event) error { } // asynchronously ack, maybe we can ignore this one? - ack := n.NewContactEvent(&entity.Contact{ID: input.SenderID}, p2p.Kind_Ack) + ack := n.NewContactEvent(ctx, &entity.Contact{ID: input.SenderID}, p2p.Kind_Ack) ack.AckedAt = &now if err := ack.SetAttrs(&p2p.AckAttrs{IDs: []string{input.ID}}); err != nil { return err } - if err := n.EnqueueOutgoingEvent(ack); err != nil { + if err := n.EnqueueOutgoingEvent(ctx, ack); err != nil { return err } diff --git a/core/node/event_handlers.go b/core/node/event_handlers.go index 87ffb54187..b42740bed4 100644 --- a/core/node/event_handlers.go +++ b/core/node/event_handlers.go @@ -63,7 +63,7 @@ func (n *Node) handleContactRequestAccepted(ctx context.Context, input *p2p.Even } // send my contact - if err := n.contactShareMe(contact); err != nil { + if err := n.contactShareMe(ctx, contact); err != nil { return err } diff --git a/core/node/mainloop.go b/core/node/mainloop.go index ff1ea5ddcc..f1d706177b 100644 --- a/core/node/mainloop.go +++ b/core/node/mainloop.go @@ -6,17 +6,23 @@ import ( "time" "github.com/gogo/protobuf/proto" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "go.uber.org/zap" "berty.tech/core/api/node" "berty.tech/core/api/p2p" "berty.tech/core/crypto/keypair" + "berty.tech/core/pkg/tracing" ) // EventsRetry updates SentAt and requeue an event -func (n *Node) EventRequeue(event *p2p.Event) error { - sql := n.sql(nil) +func (n *Node) EventRequeue(ctx context.Context, event *p2p.Event) error { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + defer span.Finish() + + sql := n.sql(ctx) now := time.Now() event.SentAt = &now @@ -29,9 +35,12 @@ func (n *Node) EventRequeue(event *p2p.Event) error { } // EventsRetry sends events which lack an AckedAt value emitted before the supplied time value -func (n *Node) EventsRetry(before time.Time) ([]*p2p.Event, error) { - sql := n.sql(nil) +func (n *Node) EventsRetry(ctx context.Context, before time.Time) ([]*p2p.Event, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + defer span.Finish() + sql := n.sql(ctx) var retriedEvents []*p2p.Event destinations, err := p2p.FindNonAcknowledgedEventDestinations(sql, before) @@ -43,15 +52,13 @@ func (n *Node) EventsRetry(before time.Time) ([]*p2p.Event, error) { events, err := p2p.FindNonAcknowledgedEventsForDestination(sql, destination) if err != nil { - n.LogBackgroundError(errors.Wrap(err, "error while retrieving events for dst")) + n.LogBackgroundError(ctx, errors.Wrap(err, "error while retrieving events for dst")) continue } for _, event := range events { - err := n.EventRequeue(event) - - if err != nil { - n.LogBackgroundError(errors.Wrap(err, "error while enqueuing event")) + if err := n.EventRequeue(ctx, event); err != nil { + n.LogBackgroundError(ctx, errors.Wrap(err, "error while enqueuing event")) continue } retriedEvents = append(retriedEvents, event) @@ -61,35 +68,41 @@ func (n *Node) EventsRetry(before time.Time) ([]*p2p.Event, error) { return retriedEvents, nil } +func (n *Node) cron(ctx context.Context) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + defer span.Finish() + for true { + before := time.Now().Add(-time.Second * 60 * 10) + if _, err := n.EventsRetry(ctx, before); err != nil { + n.LogBackgroundError(ctx, err) + } + + time.Sleep(time.Second * 60) + } +} + // Start is the node's mainloop -func (n *Node) Start(withCron, withNodeEvents bool) error { - ctx := context.Background() +func (n *Node) Start(ctx context.Context, withCron, withNodeEvents bool) error { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + defer span.Finish() if withCron { - go func() { - for true { - before := time.Now().Add(-time.Second * 60 * 10) - _, err := n.EventsRetry(before) - if err != nil { - n.LogBackgroundError(err) - } - - time.Sleep(time.Second * 60) - } - }() + go n.cron(ctx) } if withNodeEvents { // "node started" event go func() { time.Sleep(time.Second) - n.EnqueueNodeEvent(node.Kind_NodeStarted, nil) + n.EnqueueNodeEvent(ctx, node.Kind_NodeStarted, nil) }() // "node is alive" event go func() { for { - n.EnqueueNodeEvent(node.Kind_NodeIsAlive, nil) + n.EnqueueNodeEvent(ctx, node.Kind_NodeIsAlive, nil) time.Sleep(30 * time.Second) } }() @@ -102,7 +115,7 @@ func (n *Node) Start(withCron, withNodeEvents bool) error { envelope := p2p.Envelope{} eventBytes, err := proto.Marshal(event) if err != nil { - n.LogBackgroundError(errors.Wrap(err, "failed to marshal outgoing event")) + n.LogBackgroundError(ctx, errors.Wrap(err, "failed to marshal outgoing event")) continue } @@ -110,21 +123,21 @@ func (n *Node) Start(withCron, withNodeEvents bool) error { switch { case event.ReceiverID != "": // ContactEvent - envelope.Source = n.aliasEnvelopeForContact(&envelope, event) + envelope.Source = n.aliasEnvelopeForContact(ctx, &envelope, event) envelope.ChannelID = event.ReceiverID envelope.EncryptedEvent = eventBytes // FIXME: encrypt for receiver case event.ConversationID != "": //ConversationEvent - envelope.Source = n.aliasEnvelopeForConversation(&envelope, event) + envelope.Source = n.aliasEnvelopeForConversation(ctx, &envelope, event) envelope.ChannelID = event.ConversationID envelope.EncryptedEvent = eventBytes // FIXME: encrypt for conversation default: - n.LogBackgroundError(fmt.Errorf("unhandled event type")) + n.LogBackgroundError(ctx, fmt.Errorf("unhandled event type")) } if envelope.Signature, err = keypair.Sign(n.crypto, &envelope); err != nil { - n.LogBackgroundError(errors.Wrap(err, "failed to sign envelope")) + n.LogBackgroundError(ctx, errors.Wrap(err, "failed to sign envelope")) continue } @@ -135,7 +148,7 @@ func (n *Node) Start(withCron, withNodeEvents bool) error { go func() { // FIXME: make something smarter, i.e., grouping events by contact or network driver if err := n.networkDriver.Emit(ctx, &envelope); err != nil { - n.LogBackgroundError(errors.Wrap(err, "failed to emit envelope on network")) + n.LogBackgroundError(ctx, errors.Wrap(err, "failed to emit envelope on network")) } done <- true }() diff --git a/core/node/metrics.go b/core/node/metrics.go index f303f39126..53af014bf0 100644 --- a/core/node/metrics.go +++ b/core/node/metrics.go @@ -6,18 +6,24 @@ import ( "berty.tech/core/api/node" "berty.tech/core/api/p2p" + "berty.tech/core/pkg/tracing" ) const BandwidthInterval = time.Second // Return a list of peers -func (n *Node) Peers(_ context.Context, _ *node.Void) (*p2p.Peers, error) { - return n.networkMetrics.Peers(), nil +func (n *Node) Peers(ctx context.Context, _ *node.Void) (*p2p.Peers, error) { + return n.networkMetrics.Peers(ctx), nil } func (n *Node) MonitorPeers(_ *node.Void, stream node.Service_MonitorPeersServer) error { + span, ctx := tracing.EnterFunc(stream.Context()) + defer span.Finish() + cerr := make(chan error, 1) n.networkMetrics.MonitorPeers(func(p *p2p.Peer, err error) error { + span, _ := tracing.EnterFunc(ctx, p, err) + defer span.Finish() if err != nil { cerr <- err return err @@ -36,9 +42,15 @@ func (n *Node) MonitorPeers(_ *node.Void, stream node.Service_MonitorPeersServer // Monitor bandwidth globally with the given interval func (n *Node) MonitorBandwidth(input *p2p.BandwidthStats, stream node.Service_MonitorBandwidthServer) error { + span, ctx := tracing.EnterFunc(stream.Context(), input) + defer span.Finish() + cerr := make(chan error, 1) handler := func(bs *p2p.BandwidthStats, err error) error { + span, _ := tracing.EnterFunc(ctx, bs, err) + defer span.Finish() + if err != nil { cerr <- err return err diff --git a/core/node/network.go b/core/node/network.go index 2c58548423..b3fef90ce8 100644 --- a/core/node/network.go +++ b/core/node/network.go @@ -5,6 +5,8 @@ import ( "berty.tech/core/entity" "berty.tech/core/network" + "berty.tech/core/pkg/tracing" + opentracing "github.com/opentracing/opentracing-go" "go.uber.org/zap" ) @@ -20,14 +22,18 @@ func WithNetworkMetrics(metrics network.Metrics) NewNodeOption { } } -func (n *Node) UseNetworkDriver(driver network.Driver) error { +func (n *Node) UseNetworkDriver(ctx context.Context, driver network.Driver) error { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, driver) + defer span.Finish() + // FIXME: use a locking system n.networkDriver = driver // configure network n.networkDriver.OnEnvelopeHandler(n.HandleEnvelope) - if err := n.networkDriver.Join(context.Background(), n.UserID()); err != nil { + if err := n.networkDriver.Join(ctx, n.UserID()); err != nil { logger().Warn("failed to join user channel", zap.String("id", n.UserID()), zap.Error(err), @@ -38,16 +44,16 @@ func (n *Node) UseNetworkDriver(driver network.Driver) error { // var devices []entity.Device // n.sql.Table("device").Select("id").Find(&devices) // for _, device := range devices { - // if err := n.networkDriver.Join(context.Background(), device.ID); err != nil { + // if err := n.networkDriver.Join(ctx, device.ID); err != nil { // logger().Warn(err.Error()) // } // } var conversations []entity.Conversation - sql := n.sql(nil) + sql := n.sql(ctx) sql.Table("conversation").Select("id").Find(&conversations) for _, conversation := range conversations { - if err := n.networkDriver.Join(context.Background(), conversation.ID); err != nil { + if err := n.networkDriver.Join(ctx, conversation.ID); err != nil { logger().Warn(err.Error()) } } diff --git a/core/node/node.go b/core/node/node.go index f29acbfe31..627fbd4586 100644 --- a/core/node/node.go +++ b/core/node/node.go @@ -1,6 +1,7 @@ package node import ( + "context" "encoding/base64" "fmt" "sync" @@ -8,6 +9,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/jinzhu/gorm" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" uuid "github.com/satori/go.uuid" @@ -16,6 +18,7 @@ import ( "berty.tech/core/crypto/sigchain" "berty.tech/core/entity" "berty.tech/core/network" + "berty.tech/core/pkg/tracing" "berty.tech/core/pkg/zapring" ) @@ -28,15 +31,17 @@ type Node struct { sqlDriver *gorm.DB config *entity.Config initDevice *entity.Device - handleMutex sync.Mutex + handleMutexInst sync.Mutex networkDriver network.Driver networkMetrics network.Metrics - asyncWaitGroup sync.WaitGroup + asyncWaitGroupInst sync.WaitGroup pubkey []byte // FIXME: use a crypto instance, i.e., enclave b64pubkey string // FIXME: same as above sigchain *sigchain.SigChain crypto keypair.Interface ring *zapring.Ring // log ring buffer + rootSpan opentracing.Span + rootContext context.Context // only used for tracing // devtools createdAt time.Time // used for uptime calculation @@ -46,12 +51,17 @@ type Node struct { } // New initializes a new Node object -func New(opts ...NewNodeOption) (*Node, error) { +func New(ctx context.Context, opts ...NewNodeOption) (*Node, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + n := &Node{ // FIXME: fetch myself from db outgoingEvents: make(chan *p2p.Event, 100), clientEvents: make(chan *p2p.Event, 100), createdAt: time.Now().UTC(), + rootSpan: span, + rootContext: ctx, } // apply optioners @@ -83,7 +93,7 @@ func New(opts ...NewNodeOption) (*Node, error) { // configure network if n.networkDriver != nil { - if err := n.UseNetworkDriver(n.networkDriver); err != nil { + if err := n.UseNetworkDriver(n.rootContext, n.networkDriver); err != nil { return nil, errors.Wrap(err, "failed to setup network driver") } } @@ -95,6 +105,7 @@ func New(opts ...NewNodeOption) (*Node, error) { // // it should be called in a defer from the caller of New() func (n *Node) Close() error { + n.rootSpan.Finish() return nil } @@ -138,3 +149,19 @@ func (n *Node) UserID() string { func (n *Node) PubKey() string { return n.b64pubkey } + +func (n *Node) handleMutex(ctx context.Context) func() { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + + n.handleMutexInst.Lock() + return n.handleMutexInst.Unlock +} + +func (n *Node) asyncWaitGroup(ctx context.Context) func() { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + + n.asyncWaitGroupInst.Add(1) + return n.asyncWaitGroupInst.Done +} diff --git a/core/node/nodeapi.go b/core/node/nodeapi.go index 756b683e32..32325cbbd5 100644 --- a/core/node/nodeapi.go +++ b/core/node/nodeapi.go @@ -7,7 +7,9 @@ import ( "berty.tech/core/api/node" "berty.tech/core/api/p2p" "berty.tech/core/entity" + "berty.tech/core/pkg/tracing" bsql "berty.tech/core/sql" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "google.golang.org/grpc" ) @@ -28,7 +30,10 @@ func WithNodeGrpcServer(gs *grpc.Server) NewNodeOption { // EventList implements berty.node.EventList func (n *Node) EventList(input *node.EventListInput, stream node.Service_EventListServer) error { - sql := n.sql(stream.Context()) + span, ctx := tracing.EnterFunc(stream.Context(), input) + defer span.Finish() + n.handleMutex(ctx)() + sql := n.sql(ctx) // prepare query query := sql.Model(p2p.Event{}).Where(input.Filter) @@ -46,9 +51,6 @@ func (n *Node) EventList(input *node.EventListInput, stream node.Service_EventLi return errors.Wrap(err, "pagination error") } - n.handleMutex.Lock() - defer n.handleMutex.Unlock() - // perform query var events []*p2p.Event if err := query.Find(&events).Error; err != nil { @@ -65,6 +67,11 @@ func (n *Node) EventList(input *node.EventListInput, stream node.Service_EventLi } func (n *Node) EventSeen(ctx context.Context, input *node.EventIDInput) (*p2p.Event, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, input) + defer span.Finish() + n.handleMutex(ctx)() + event := &p2p.Event{} count := 0 @@ -87,12 +94,15 @@ func (n *Node) EventSeen(ctx context.Context, input *node.EventIDInput) (*p2p.Ev } // GetEvent implements berty.node.GetEvent -func (n *Node) GetEvent(ctx context.Context, event *p2p.Event) (*p2p.Event, error) { - n.handleMutex.Lock() - defer n.handleMutex.Unlock() +func (n *Node) GetEvent(ctx context.Context, input *p2p.Event) (*p2p.Event, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, input) + defer span.Finish() + n.handleMutex(ctx)() sql := n.sql(ctx) - if err := sql.First(event, "ID = ?", event.ID).Error; err != nil { + var event *p2p.Event + if err := sql.First(event, "ID = ?", input.ID).Error; err != nil { return nil, errors.Wrap(err, "failed to get event from database") } @@ -105,8 +115,10 @@ func (n *Node) GetEvent(ctx context.Context, event *p2p.Event) (*p2p.Event, erro // ContactAcceptRequest implements berty.node.ContactAcceptRequest func (n *Node) ContactAcceptRequest(ctx context.Context, input *entity.Contact) (*entity.Contact, error) { - n.handleMutex.Lock() - defer n.handleMutex.Unlock() + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, input) + defer span.Finish() + n.handleMutex(ctx)() // input validation if err := input.Validate(); err != nil { @@ -125,16 +137,16 @@ func (n *Node) ContactAcceptRequest(ctx context.Context, input *entity.Contact) } // send ContactRequestAccepted event - event := n.NewContactEvent(contact, p2p.Kind_ContactRequestAccepted) + event := n.NewContactEvent(ctx, contact, p2p.Kind_ContactRequestAccepted) if err != nil { return nil, err } - if err := n.EnqueueOutgoingEvent(event); err != nil { + if err := n.EnqueueOutgoingEvent(ctx, event); err != nil { return nil, err } // send ContactShareMe event - if err := n.contactShareMe(contact); err != nil { + if err := n.contactShareMe(ctx, contact); err != nil { return nil, err } @@ -143,8 +155,10 @@ func (n *Node) ContactAcceptRequest(ctx context.Context, input *entity.Contact) // ContactRequest implements berty.node.ContactRequest func (n *Node) ContactRequest(ctx context.Context, req *node.ContactRequestInput) (*entity.Contact, error) { - n.handleMutex.Lock() - defer n.handleMutex.Unlock() + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, req) + defer span.Finish() + n.handleMutex(ctx)() // input validation if err := req.Contact.Validate(); err != nil { @@ -164,7 +178,7 @@ func (n *Node) ContactRequest(ctx context.Context, req *node.ContactRequestInput } // send request to peer - event := n.NewContactEvent(contact, p2p.Kind_ContactRequest) + event := n.NewContactEvent(ctx, contact, p2p.Kind_ContactRequest) if err := event.SetAttrs(&p2p.ContactRequestAttrs{ Me: &entity.Contact{ ID: n.UserID(), @@ -174,7 +188,7 @@ func (n *Node) ContactRequest(ctx context.Context, req *node.ContactRequestInput }); err != nil { return nil, err } - if err := n.EnqueueOutgoingEvent(event); err != nil { + if err := n.EnqueueOutgoingEvent(ctx, event); err != nil { return nil, err } @@ -183,8 +197,10 @@ func (n *Node) ContactRequest(ctx context.Context, req *node.ContactRequestInput // ContactUpdate implements berty.node.ContactUpdate func (n *Node) ContactUpdate(ctx context.Context, contact *entity.Contact) (*entity.Contact, error) { - n.handleMutex.Lock() - defer n.handleMutex.Unlock() + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, contact) + defer span.Finish() + n.handleMutex(ctx)() // input validation if contact == nil || contact.ID == "" { @@ -202,8 +218,10 @@ func (n *Node) ContactUpdate(ctx context.Context, contact *entity.Contact) (*ent // ContactRemove implements berty.node.ContactRemove func (n *Node) ContactRemove(ctx context.Context, contact *entity.Contact) (*entity.Contact, error) { - n.handleMutex.Lock() - defer n.handleMutex.Unlock() + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, contact) + defer span.Finish() + n.handleMutex(ctx)() // input validation if contact == nil || contact.ID == "" { @@ -223,7 +241,11 @@ func (n *Node) ContactRemove(ctx context.Context, contact *entity.Contact) (*ent // ContactList implements berty.node.ContactList func (n *Node) ContactList(input *node.ContactListInput, stream node.Service_ContactListServer) error { - sql := n.sql(stream.Context()) + span, ctx := tracing.EnterFunc(stream.Context(), input) + defer span.Finish() + n.handleMutex(ctx)() + + sql := n.sql(ctx) // prepare query query := sql.Model(entity.Contact{}).Where(input.Filter) @@ -235,9 +257,6 @@ func (n *Node) ContactList(input *node.ContactListInput, stream node.Service_Con return errors.Wrap(err, "pagination error") } - n.handleMutex.Lock() - defer n.handleMutex.Unlock() - // perform query var contacts []*entity.Contact if err := query.Find(&contacts).Error; err != nil { @@ -254,12 +273,15 @@ func (n *Node) ContactList(input *node.ContactListInput, stream node.Service_Con } // GetContact implements berty.node.GetContact -func (n *Node) GetContact(ctx context.Context, contact *entity.Contact) (*entity.Contact, error) { - n.handleMutex.Lock() - defer n.handleMutex.Unlock() +func (n *Node) GetContact(ctx context.Context, input *entity.Contact) (*entity.Contact, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, input) + defer span.Finish() + n.handleMutex(ctx)() sql := n.sql(ctx) - if err := sql.First(contact, "ID = ?", contact.ID).Error; err != nil { + var contact *entity.Contact + if err := sql.First(contact, "ID = ?", input.ID).Error; err != nil { return nil, errors.Wrap(err, "failed to get contact from database") } @@ -271,13 +293,19 @@ func (n *Node) GetContact(ctx context.Context, contact *entity.Contact) (*entity // func (n *Node) ConversationCreate(ctx context.Context, input *node.ConversationCreateInput) (*entity.Conversation, error) { - n.handleMutex.Lock() - defer n.handleMutex.Unlock() + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, input) + defer span.Finish() + n.handleMutex(ctx)() return n.conversationCreate(ctx, input) } func (n *Node) conversationCreate(ctx context.Context, input *node.ConversationCreateInput) (*entity.Conversation, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, input) + defer span.Finish() + members := []*entity.ConversationMember{ { ID: n.NewID(), @@ -318,7 +346,7 @@ func (n *Node) conversationCreate(ctx context.Context, input *node.ConversationC done := make(chan bool, 1) go func() { if err := n.networkDriver.Join(ctx, conversation.ID); err != nil { - n.LogBackgroundWarn(errors.Wrap(err, "failed to join conversation")) + n.LogBackgroundWarn(ctx, errors.Wrap(err, "failed to join conversation")) } done <- true }() @@ -334,13 +362,13 @@ func (n *Node) conversationCreate(ctx context.Context, input *node.ConversationC // skipping myself continue } - event := n.NewContactEvent(member.Contact, p2p.Kind_ConversationInvite) + event := n.NewContactEvent(ctx, member.Contact, p2p.Kind_ConversationInvite) if err := event.SetAttrs(&p2p.ConversationInviteAttrs{ Conversation: filtered, }); err != nil { return nil, err } - if err := n.EnqueueOutgoingEvent(event); err != nil { + if err := n.EnqueueOutgoingEvent(ctx, event); err != nil { return nil, err } } @@ -348,30 +376,40 @@ func (n *Node) conversationCreate(ctx context.Context, input *node.ConversationC return conversation, err } -func (n *Node) ConversationAcceptInvite(_ context.Context, conversation *entity.Conversation) (*entity.Conversation, error) { - n.handleMutex.Lock() - defer n.handleMutex.Unlock() +func (n *Node) ConversationAcceptInvite(ctx context.Context, input *entity.Conversation) (*entity.Conversation, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, input) + defer span.Finish() + n.handleMutex(ctx)() return nil, ErrNotImplemented } -func (n *Node) ConversationInvite(context.Context, *node.ConversationManageMembersInput) (*entity.Conversation, error) { - n.handleMutex.Lock() - defer n.handleMutex.Unlock() +func (n *Node) ConversationInvite(ctx context.Context, input *node.ConversationManageMembersInput) (*entity.Conversation, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, input) + defer span.Finish() + n.handleMutex(ctx)() return nil, ErrNotImplemented } -func (n *Node) ConversationExclude(context.Context, *node.ConversationManageMembersInput) (*entity.Conversation, error) { - n.handleMutex.Lock() - defer n.handleMutex.Unlock() +func (n *Node) ConversationExclude(ctx context.Context, input *node.ConversationManageMembersInput) (*entity.Conversation, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, input) + defer span.Finish() + n.handleMutex(ctx)() return nil, ErrNotImplemented } func (n *Node) ConversationList(input *node.ConversationListInput, stream node.Service_ConversationListServer) error { + span, ctx := tracing.EnterFunc(stream.Context(), input) + defer span.Finish() + n.handleMutex(ctx)() + // prepare query - sql := n.sql(stream.Context()) + sql := n.sql(ctx) query := sql.Model(entity.Conversation{}).Where(input.Filter) // pagination @@ -381,9 +419,6 @@ func (n *Node) ConversationList(input *node.ConversationListInput, stream node.S return errors.Wrap(err, "pagination error") } - n.handleMutex.Lock() - defer n.handleMutex.Unlock() - // perform query var conversations []*entity.Conversation if err := query.Find(&conversations).Error; err != nil { @@ -399,29 +434,34 @@ func (n *Node) ConversationList(input *node.ConversationListInput, stream node.S return nil } -func (n *Node) ConversationAddMessage(_ context.Context, input *node.ConversationAddMessageInput) (*p2p.Event, error) { - n.handleMutex.Lock() - defer n.handleMutex.Unlock() +func (n *Node) ConversationAddMessage(ctx context.Context, input *node.ConversationAddMessageInput) (*p2p.Event, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, input) + defer span.Finish() + n.handleMutex(ctx)() - event := n.NewConversationEvent(input.Conversation, p2p.Kind_ConversationNewMessage) + event := n.NewConversationEvent(ctx, input.Conversation, p2p.Kind_ConversationNewMessage) if err := event.SetAttrs(&p2p.ConversationNewMessageAttrs{ Message: input.Message, }); err != nil { return nil, err } - if err := n.EnqueueOutgoingEvent(event); err != nil { + if err := n.EnqueueOutgoingEvent(ctx, event); err != nil { return nil, err } return event, nil } // GetConversation implements berty.node.GetConversation -func (n *Node) GetConversation(ctx context.Context, conversation *entity.Conversation) (*entity.Conversation, error) { - n.handleMutex.Lock() - defer n.handleMutex.Unlock() +func (n *Node) GetConversation(ctx context.Context, input *entity.Conversation) (*entity.Conversation, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, input) + defer span.Finish() + n.handleMutex(ctx)() sql := n.sql(ctx) - if err := sql.First(conversation, "ID = ?", conversation.ID).Error; err != nil { + var conversation *entity.Conversation + if err := sql.First(conversation, "ID = ?", input.ID).Error; err != nil { return nil, errors.Wrap(err, "failed to get conversation from database") } @@ -429,12 +469,15 @@ func (n *Node) GetConversation(ctx context.Context, conversation *entity.Convers } // GetConversationMember implements berty.node.GetConversationMember -func (n *Node) GetConversationMember(ctx context.Context, conversationMember *entity.ConversationMember) (*entity.ConversationMember, error) { - n.handleMutex.Lock() - defer n.handleMutex.Unlock() +func (n *Node) GetConversationMember(ctx context.Context, input *entity.ConversationMember) (*entity.ConversationMember, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, input) + defer span.Finish() + n.handleMutex(ctx)() sql := n.sql(ctx) - if err := sql.First(conversationMember, "ID = ?", conversationMember.ID).Error; err != nil { + var conversationMember *entity.ConversationMember + if err := sql.First(conversationMember, "ID = ?", input.ID).Error; err != nil { return nil, errors.Wrap(err, "failed to get conversationMember from database") } @@ -442,10 +485,10 @@ func (n *Node) GetConversationMember(ctx context.Context, conversationMember *en } func (n *Node) DebugPing(ctx context.Context, input *node.PingDestination) (*node.Void, error) { - n.handleMutex.Lock() - defer n.handleMutex.Unlock() - - err := n.networkDriver.PingOtherNode(ctx, input.Destination) + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, input) + defer span.Finish() + n.handleMutex(ctx)() - return &node.Void{}, err + return &node.Void{}, n.networkDriver.PingOtherNode(ctx, input.Destination) } diff --git a/core/node/nodeapi_devtools.go b/core/node/nodeapi_devtools.go index 0b0dbfaf67..47903fb209 100644 --- a/core/node/nodeapi_devtools.go +++ b/core/node/nodeapi_devtools.go @@ -13,6 +13,11 @@ import ( "strings" "time" + "github.com/brianvoe/gofakeit" + "github.com/gogo/protobuf/proto" + opentracing "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" + "berty.tech/core" "berty.tech/core/api/node" "berty.tech/core/api/p2p" @@ -20,106 +25,132 @@ import ( "berty.tech/core/crypto/sigchain" "berty.tech/core/entity" "berty.tech/core/pkg/deviceinfo" + "berty.tech/core/pkg/tracing" "berty.tech/core/sql" "berty.tech/core/testrunner" - "github.com/brianvoe/gofakeit" - "github.com/gogo/protobuf/proto" - "github.com/pkg/errors" ) +func (n *Node) generateFakeContact(ctx context.Context) (*entity.Contact, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + defer span.Finish() + + var ( + username = gofakeit.Username() + devicename = fmt.Sprintf("%s's phone", username) + ) + if rand.Intn(3) > 0 { + username = fmt.Sprintf("%s %s", gofakeit.FirstName(), gofakeit.LastName()) + } + + priv, err := rsa.GenerateKey(crand.Reader, 512) + if err != nil { + return nil, errors.Wrap(err, "failed to generate rsa key") + } + privBytes, err := x509.MarshalPKCS8PrivateKey(priv) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal private key") + } + pubBytes, err := x509.MarshalPKIXPublicKey(priv.Public()) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal public key") + } + kp := keypair.InsecureCrypto{} + if err := kp.SetPrivateKeyData(privBytes); err != nil { + return nil, errors.Wrap(err, "failed to set private key in kp") + } + sc := sigchain.SigChain{} + if err := sc.Init(&kp, pubBytes); err != nil { + return nil, errors.Wrap(err, "failed to init sigchain") + } + scBytes, err := proto.Marshal(&sc) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal sigchain") + } + contact := &entity.Contact{ + ID: base64.StdEncoding.EncodeToString(pubBytes), + DisplayName: username, + Status: entity.Contact_Status(rand.Intn(5) + 1), + Sigchain: scBytes, + Devices: []*entity.Device{ + { + ID: base64.StdEncoding.EncodeToString(pubBytes), + Name: devicename, + Status: entity.Device_Status(rand.Intn(3) + 1), + ApiVersion: p2p.Version, + }, + }, + } + sql := n.sql(ctx) + if err := sql.Set("gorm:association_autoupdate", true).Save(&contact).Error; err != nil { + return nil, errors.Wrap(err, "failed to save contacts") + } + return contact, nil +} + func (n *Node) GenerateFakeData(ctx context.Context, input *node.Void) (*node.Void, error) { - // FIXME: enable mutext, but allow calling submethod, i.e., node.CreateConversation + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, input) + defer span.Finish() + + // FIXME: enable mutex, but allow calling submethod, i.e., node.CreateConversation //n.handleMutex.Lock() //defer n.handleMutex.Unlock() contacts := []*entity.Contact{} for i := 0; i < 10; i++ { - var ( - username = gofakeit.Username() - devicename = fmt.Sprintf("%s's phone", username) - ) - if rand.Intn(3) > 0 { - username = fmt.Sprintf("%s %s", gofakeit.FirstName(), gofakeit.LastName()) - } - - priv, err := rsa.GenerateKey(crand.Reader, 512) - if err != nil { - return nil, errors.Wrap(err, "failed to generate rsa key") - } - privBytes, err := x509.MarshalPKCS8PrivateKey(priv) - if err != nil { - return nil, errors.Wrap(err, "failed to marshal private key") - } - pubBytes, err := x509.MarshalPKIXPublicKey(priv.Public()) - if err != nil { - return nil, errors.Wrap(err, "failed to marshal public key") - } - kp := keypair.InsecureCrypto{} - if err := kp.SetPrivateKeyData(privBytes); err != nil { - return nil, errors.Wrap(err, "failed to set private key in kp") - } - sc := sigchain.SigChain{} - if err := sc.Init(&kp, pubBytes); err != nil { - return nil, errors.Wrap(err, "failed to init sigchain") - } - scBytes, err := proto.Marshal(&sc) + contact, err := n.generateFakeContact(ctx) if err != nil { - return nil, errors.Wrap(err, "failed to marshal sigchain") - } - contact := &entity.Contact{ - ID: base64.StdEncoding.EncodeToString(pubBytes), - DisplayName: username, - Status: entity.Contact_Status(rand.Intn(5) + 1), - Sigchain: scBytes, - Devices: []*entity.Device{ - { - ID: base64.StdEncoding.EncodeToString(pubBytes), - Name: devicename, - Status: entity.Device_Status(rand.Intn(3) + 1), - ApiVersion: p2p.Version, - }, - }, - } - sql := n.sql(ctx) - if err := sql.Set("gorm:association_autoupdate", true).Save(&contact).Error; err != nil { - return nil, errors.Wrap(err, "failed to save contacts") + return nil, err } contacts = append(contacts, contact) } for i := 0; i < 10; i++ { - contactsMembers := []*entity.Contact{} - for j := 0; j < rand.Intn(2)+1; j++ { - contactsMembers = append(contactsMembers, &entity.Contact{ - ID: contacts[rand.Intn(len(contacts))].ID, - }) - } - if _, err := n.conversationCreate(context.Background(), &node.ConversationCreateInput{ - Contacts: contactsMembers, - Title: strings.Title(fmt.Sprintf("%s %s", gofakeit.HipsterWord(), gofakeit.HackerNoun())), - Topic: gofakeit.HackerPhrase(), - }); err != nil { - return nil, errors.Wrap(err, "failed to create conversation") + if err := func() error { + var span opentracing.Span + span, ctx = opentracing.StartSpanFromContext(ctx, "new conversation") + defer span.Finish() + + contactsMembers := []*entity.Contact{} + for j := 0; j < rand.Intn(2)+1; j++ { + contactsMembers = append(contactsMembers, &entity.Contact{ + ID: contacts[rand.Intn(len(contacts))].ID, + }) + } + if _, err := n.conversationCreate(ctx, &node.ConversationCreateInput{ + Contacts: contactsMembers, + Title: strings.Title(fmt.Sprintf("%s %s", gofakeit.HipsterWord(), gofakeit.HackerNoun())), + Topic: gofakeit.HackerPhrase(), + }); err != nil { + return errors.Wrap(err, "failed to create conversation") + } + return nil + }(); err != nil { + return nil, err } } - // enqueue fake incoming event - in := n.NewContactEvent(&entity.Contact{ID: "abcde"}, p2p.Kind_DevtoolsMapset) - if err := n.EnqueueClientEvent(in); err != nil { - return nil, err - } + /* + // enqueue fake incoming event + in := n.NewContactEvent(&entity.Contact{ID: "abcde"}, p2p.Kind_DevtoolsMapset) + if err := n.EnqueueClientEvent(in); err != nil { + return nil, err + } - // enqueue fake outgoing event - out := n.NewContactEvent(&entity.Contact{ID: "abcde"}, p2p.Kind_DevtoolsMapset) - if err := n.EnqueueOutgoingEvent(out); err != nil { - return nil, err - } + // enqueue fake outgoing event + out := n.NewContactEvent(&entity.Contact{ID: "abcde"}, p2p.Kind_DevtoolsMapset) + if err := n.EnqueueOutgoingEvent(out); err != nil { + return nil, err + } + */ return &node.Void{}, nil } func (n *Node) NodeInfos(ctx context.Context) (map[string]string, error) { - db := n.sql(ctx) + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() infos := map[string]string{} @@ -129,6 +160,7 @@ func (n *Node) NodeInfos(ctx context.Context) (map[string]string, error) { infos["time: node uptime"] = fmt.Sprintf("%s", time.Since(n.createdAt)) infos["time: node db creation"] = fmt.Sprintf("%s", time.Since(n.config.CreatedAt)) + db := n.sql(ctx) sqlStats := []string{} for _, table := range sql.AllTables() { var count uint32 @@ -151,6 +183,10 @@ func (n *Node) NodeInfos(ctx context.Context) (map[string]string, error) { } func (n *Node) DeviceInfos(ctx context.Context, input *node.Void) (*deviceinfo.DeviceInfos, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, input) + defer span.Finish() + entries, err := n.NodeInfos(ctx) if err != nil { return nil, err @@ -165,6 +201,9 @@ func (n *Node) DeviceInfos(ctx context.Context, input *node.Void) (*deviceinfo.D } func (n *Node) RunIntegrationTests(ctx context.Context, input *node.IntegrationTestInput) (*node.IntegrationTestOutput, error) { + span, _ := tracing.EnterFunc(ctx, input) + defer span.Finish() + tests := listIntegrationTests() output := &node.IntegrationTestOutput{ @@ -186,31 +225,44 @@ func (n *Node) RunIntegrationTests(ctx context.Context, input *node.IntegrationT return output, nil } -func (n *Node) AppVersion(_ context.Context, input *node.Void) (*node.AppVersionOutput, error) { +func (n *Node) AppVersion(ctx context.Context, input *node.Void) (*node.AppVersionOutput, error) { + span, _ := tracing.EnterFunc(ctx, input) + defer span.Finish() + return &node.AppVersionOutput{Version: core.Version}, nil } -func (n *Node) Panic(_ context.Context, input *node.Void) (*node.Void, error) { +func (n *Node) Panic(ctx context.Context, input *node.Void) (*node.Void, error) { + span, _ := tracing.EnterFunc(ctx, input) + defer span.Finish() + panic("panic from client") } func (n *Node) DebugRequeueEvent(ctx context.Context, input *node.EventIDInput) (*p2p.Event, error) { - event := p2p.Event{} + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, input) + defer span.Finish() sql := n.sql(ctx) + var event p2p.Event if err := sql.First(&event, "ID = ?", input.EventID).Error; err != nil { return nil, errors.Wrap(err, "unable to fetch event") } - if err := n.EventRequeue(&event); err != nil { + if err := n.EventRequeue(ctx, &event); err != nil { return nil, errors.Wrap(err, "unable to requeue event") } return &event, nil } -func (n *Node) DebugRequeueAll(_ context.Context, _ *node.Void) (*node.Void, error) { - if _, err := n.EventsRetry(time.Now()); err != nil { +func (n *Node) DebugRequeueAll(ctx context.Context, _ *node.Void) (*node.Void, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + defer span.Finish() + + if _, err := n.EventsRetry(ctx, time.Now()); err != nil { return nil, errors.Wrap(err, "unable to requeue events") } @@ -218,6 +270,9 @@ func (n *Node) DebugRequeueAll(_ context.Context, _ *node.Void) (*node.Void, err } func (n *Node) LogStream(input *node.LogStreamInput, stream node.Service_LogStreamServer) error { + span, _ := tracing.EnterFunc(stream.Context(), input) + defer span.Finish() + if n.ring == nil { return fmt.Errorf("ring not configured") } @@ -230,7 +285,7 @@ func (n *Node) LogStream(input *node.LogStreamInput, stream node.Service_LogStre r, w := io.Pipe() go func() { - n.ring.WriteTo(w) + _, _ = n.ring.WriteTo(w) w.Close() }() diff --git a/core/node/nodeapi_test.go b/core/node/nodeapi_test.go index 7e3d07af68..ab34f1699a 100644 --- a/core/node/nodeapi_test.go +++ b/core/node/nodeapi_test.go @@ -55,7 +55,7 @@ func TestPagination(t *testing.T) { gs.Serve(ic.Listener()) }() go func() { - alice.Start(false, false) + alice.Start(context.Background(), false, false) }() }) diff --git a/core/node/nodeclient.go b/core/node/nodeclient.go index 4172c83635..12c23972cc 100644 --- a/core/node/nodeclient.go +++ b/core/node/nodeclient.go @@ -1,17 +1,23 @@ package node import ( + "context" + "berty.tech/core/api/node" "berty.tech/core/api/p2p" + "berty.tech/core/pkg/tracing" "github.com/pkg/errors" "go.uber.org/zap" ) -func (n *Node) EnqueueClientEvent(event *p2p.Event) error { +func (n *Node) EnqueueClientEvent(ctx context.Context, event *p2p.Event) error { + span, _ := tracing.EnterFunc(ctx, event) + defer span.Finish() + if err := event.Validate(); err != nil { return errors.Wrap(err, "invalid event") } - sql := n.sql(nil) + sql := n.sql(ctx) if err := sql.Create(event).Error; err != nil { return errors.Wrap(err, "failed to write event to db") } @@ -26,6 +32,9 @@ type clientEventSubscriber struct { // EventStream implements berty.node.EventStream func (n *Node) EventStream(input *node.EventStreamInput, stream node.Service_EventStreamServer) error { + span, _ := tracing.EnterFunc(stream.Context(), input) + defer span.Finish() + logger().Debug("EventStream connected", zap.Stringer("input", input)) sub := clientEventSubscriber{ diff --git a/core/node/nodeevent.go b/core/node/nodeevent.go index 3a5a6397e9..c6ef5b911a 100644 --- a/core/node/nodeevent.go +++ b/core/node/nodeevent.go @@ -1,13 +1,19 @@ package node import ( + "context" + "github.com/gogo/protobuf/proto" "go.uber.org/zap" "berty.tech/core/api/node" + "berty.tech/core/pkg/tracing" ) -func (n *Node) EnqueueNodeEvent(kind node.Kind, attributes proto.Message) { +func (n *Node) EnqueueNodeEvent(ctx context.Context, kind node.Kind, attributes proto.Message) { + span, _ := tracing.EnterFunc(ctx, kind, attributes) + defer span.Finish() + event, err := node.NewEvent(kind, attributes) if err != nil { logger().Error("failed to create node.NodeStarted event") @@ -16,23 +22,23 @@ func (n *Node) EnqueueNodeEvent(kind node.Kind, attributes proto.Message) { } } -func (n *Node) LogBackgroundError(err error) { +func (n *Node) LogBackgroundError(ctx context.Context, err error) { logger().Error("background error", zap.Error(err)) - n.EnqueueNodeEvent(node.Kind_BackgroundError, &node.BackgroundErrorAttrs{ + n.EnqueueNodeEvent(ctx, node.Kind_BackgroundError, &node.BackgroundErrorAttrs{ ErrMsg: err.Error(), }) } -func (n *Node) LogBackgroundWarn(err error) { +func (n *Node) LogBackgroundWarn(ctx context.Context, err error) { logger().Warn("background warn", zap.Error(err)) - n.EnqueueNodeEvent(node.Kind_BackgroundWarn, &node.BackgroundWarnAttrs{ + n.EnqueueNodeEvent(ctx, node.Kind_BackgroundWarn, &node.BackgroundWarnAttrs{ ErrMsg: err.Error(), }) } -func (n *Node) LogBackgroundDebug(msg string) { +func (n *Node) LogBackgroundDebug(ctx context.Context, msg string) { logger().Debug("background debug", zap.String("msg", msg)) - n.EnqueueNodeEvent(node.Kind_Debug, &node.DebugAttrs{ + n.EnqueueNodeEvent(ctx, node.Kind_Debug, &node.DebugAttrs{ Msg: msg, }) } diff --git a/core/node/p2papi.go b/core/node/p2papi.go index f7df19b512..2401fa2767 100644 --- a/core/node/p2papi.go +++ b/core/node/p2papi.go @@ -2,21 +2,20 @@ package node import ( "context" - - "berty.tech/core/entity" - "berty.tech/core/errorcodes" + "crypto/x509" + "encoding/base64" "github.com/gogo/protobuf/proto" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "google.golang.org/grpc" - "encoding/base64" - - "crypto/x509" - "berty.tech/core/api/node" "berty.tech/core/api/p2p" "berty.tech/core/crypto/keypair" + "berty.tech/core/entity" + "berty.tech/core/errorcodes" + "berty.tech/core/pkg/tracing" ) // WithP2PGrpcServer registers the Node as a 'berty.p2p' protobuf server implementation @@ -27,11 +26,20 @@ func WithP2PGrpcServer(gs *grpc.Server) NewNodeOption { } func (n *Node) ID(ctx context.Context, _ *node.Void) (*p2p.Peer, error) { - return n.networkDriver.ID(), nil + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + defer span.Finish() + + return n.networkDriver.ID(ctx), nil } func (n *Node) Protocols(ctx context.Context, p *p2p.Peer) (*node.ProtocolsOutput, error) { - pids, err := n.networkDriver.Protocols(p) + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, p) + defer span.Finish() + defer n.asyncWaitGroup(ctx)() + + pids, err := n.networkDriver.Protocols(ctx, p) if err != nil { return nil, err } @@ -42,20 +50,28 @@ func (n *Node) Protocols(ctx context.Context, p *p2p.Peer) (*node.ProtocolsOutpu } func (n *Node) HandleEnvelope(ctx context.Context, input *p2p.Envelope) (*p2p.Void, error) { - n.asyncWaitGroup.Add(1) - defer n.asyncWaitGroup.Done() + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, input) + defer span.Finish() + defer n.asyncWaitGroup(ctx)() + return &p2p.Void{}, n.handleEnvelope(ctx, input) } func (n *Node) Ping(ctx context.Context, _ *p2p.Void) (*p2p.Void, error) { + span, _ := tracing.EnterFunc(ctx) + defer span.Finish() + return &p2p.Void{}, nil } func (n *Node) handleEnvelope(ctx context.Context, input *p2p.Envelope) error { - n.asyncWaitGroup.Add(1) - defer n.asyncWaitGroup.Done() - event, err := n.OpenEnvelope(input) + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, input) + defer span.Finish() + defer n.asyncWaitGroup(ctx)() + event, err := n.OpenEnvelope(ctx, input) if err == errorcodes.ErrorUntrustedEnvelope { // ignored error } else if err != nil { @@ -65,12 +81,14 @@ func (n *Node) handleEnvelope(ctx context.Context, input *p2p.Envelope) error { return n.handleEvent(ctx, event) } -func (n *Node) OpenEnvelope(envelope *p2p.Envelope) (*p2p.Event, error) { - n.asyncWaitGroup.Add(1) - defer n.asyncWaitGroup.Done() +func (n *Node) OpenEnvelope(ctx context.Context, envelope *p2p.Envelope) (*p2p.Event, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, envelope) + defer span.Finish() + defer n.asyncWaitGroup(ctx)() trusted := false - sql := n.sql(nil) + sql := n.sql(ctx) device, err := envelope.GetDeviceForEnvelope(sql) if err == errorcodes.ErrorNoDeviceFoundForEnvelope { diff --git a/core/node/p2papi_test.go b/core/node/p2papi_test.go index 4fb01a4e11..6e743d5561 100644 --- a/core/node/p2papi_test.go +++ b/core/node/p2papi_test.go @@ -1,6 +1,7 @@ package node import ( + "context" "testing" "crypto/rand" @@ -57,7 +58,7 @@ func testNode(t *testing.T, opts ...NewNodeOption) (*Node, error) { opts = append(opts, WithSoftwareCrypto()) opts = append(opts, WithConfig()) - node, err := New(opts...) + node, err := New(context.Background(), opts...) if err != nil { t.Fatalf("%s", err) @@ -82,9 +83,9 @@ func TestOpenEnvelope(t *testing.T) { charlieAlias := "ch4rli3_4l145" - alice.RegisterDevice(aliceDevice2) - bob.RegisterDevice(bobDevice2) - charlie.RegisterDevice(charlieDevice2) + alice.RegisterDevice(context.Background(), aliceDevice2) + bob.RegisterDevice(context.Background(), bobDevice2) + charlie.RegisterDevice(context.Background(), charlieDevice2) aliceSigChainBytes, err := alice.sigchain.Marshal() bobSigChainBytes, err := bob.sigchain.Marshal() @@ -140,7 +141,7 @@ func TestOpenEnvelope(t *testing.T) { t.Fatalf("failed to sign envelope") } - if decodedEvent, err = bob.OpenEnvelope(envelope); err != nil { + if decodedEvent, err = bob.OpenEnvelope(context.Background(), envelope); err != nil { t.Error(err) } @@ -172,7 +173,7 @@ func TestOpenEnvelope(t *testing.T) { t.Fatalf("failed to sign envelope") } - if decodedEvent, err = alice.OpenEnvelope(envelope); err != nil { + if decodedEvent, err = alice.OpenEnvelope(context.Background(), envelope); err != nil { t.Fatal(err) } @@ -204,7 +205,7 @@ func TestOpenEnvelope(t *testing.T) { t.Fatalf("failed to sign envelope") } - if decodedEvent, err = bob.OpenEnvelope(envelope); err != nil { + if decodedEvent, err = bob.OpenEnvelope(context.Background(), envelope); err != nil { t.Fatal(err) } @@ -214,7 +215,7 @@ func TestOpenEnvelope(t *testing.T) { t.Fatalf("wrong event data") } - if decodedEvent, err = alice.OpenEnvelope(envelope); err == nil { + if decodedEvent, err = alice.OpenEnvelope(context.Background(), envelope); err == nil { t.Fatalf("alice should not be able to check this signature") } diff --git a/core/node/p2pclient.go b/core/node/p2pclient.go index b15ec2c57f..daf17d8ab4 100644 --- a/core/node/p2pclient.go +++ b/core/node/p2pclient.go @@ -1,29 +1,43 @@ package node import ( + "context" + "berty.tech/core/api/p2p" "berty.tech/core/entity" + "berty.tech/core/pkg/tracing" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" ) -func (n *Node) NewContactEvent(destination *entity.Contact, kind p2p.Kind) *p2p.Event { +func (n *Node) NewContactEvent(ctx context.Context, destination *entity.Contact, kind p2p.Kind) *p2p.Event { + span, _ := tracing.EnterFunc(ctx, destination, kind) + defer span.Finish() + event := p2p.NewOutgoingEvent(n.b64pubkey, destination.ID, kind) event.ID = n.NewID() return event } -func (n *Node) NewConversationEvent(destination *entity.Conversation, kind p2p.Kind) *p2p.Event { +func (n *Node) NewConversationEvent(ctx context.Context, destination *entity.Conversation, kind p2p.Kind) *p2p.Event { + span, _ := tracing.EnterFunc(ctx, destination, kind) + defer span.Finish() + event := p2p.NewOutgoingEvent(n.b64pubkey, "", kind) event.ConversationID = destination.ID event.ID = n.NewID() return event } -func (n *Node) EnqueueOutgoingEvent(event *p2p.Event) error { +func (n *Node) EnqueueOutgoingEvent(ctx context.Context, event *p2p.Event) error { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, event) + defer span.Finish() + if err := event.Validate(); err != nil { return errors.Wrap(err, "invalid event") } - sql := n.sql(nil) + sql := n.sql(ctx) if err := sql.Create(event).Error; err != nil { return errors.Wrap(err, "failed to write event to db") } @@ -31,18 +45,25 @@ func (n *Node) EnqueueOutgoingEvent(event *p2p.Event) error { return nil } -func (n *Node) contactShareMe(to *entity.Contact) error { - event := n.NewContactEvent(to, p2p.Kind_ContactShareMe) +func (n *Node) contactShareMe(ctx context.Context, to *entity.Contact) error { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, to) + defer span.Finish() + + event := n.NewContactEvent(ctx, to, p2p.Kind_ContactShareMe) if err := event.SetAttrs(&p2p.ContactShareMeAttrs{Me: n.config.Myself.Filtered()}); err != nil { return err } - if err := n.EnqueueOutgoingEvent(event); err != nil { + if err := n.EnqueueOutgoingEvent(ctx, event); err != nil { return err } return nil } -func (n *Node) NewSenderAliasEvent(destination string, aliases []*entity.SenderAlias) (*p2p.Event, error) { +func (n *Node) NewSenderAliasEvent(ctx context.Context, destination string, aliases []*entity.SenderAlias) (*p2p.Event, error) { + span, _ := tracing.EnterFunc(ctx, destination, aliases) + defer span.Finish() + event := p2p.NewOutgoingEvent(n.b64pubkey, destination, p2p.Kind_SenderAliasUpdate) event.ID = n.NewID() if err := event.SetAttrs(&p2p.SenderAliasUpdateAttrs{Aliases: aliases}); err != nil { diff --git a/core/node/sender_aliases.go b/core/node/sender_aliases.go index 7825899e69..02c48ccfb8 100644 --- a/core/node/sender_aliases.go +++ b/core/node/sender_aliases.go @@ -1,16 +1,18 @@ package node import ( + "context" + "berty.tech/core/api/p2p" "berty.tech/core/entity" "github.com/pkg/errors" "go.uber.org/zap" ) -func (node *Node) SenderAliasesRenew() error { +func (node *Node) SenderAliasesRenew(ctx context.Context) error { var aliases []*entity.SenderAlias - sql := node.sql(nil) + sql := node.sql(ctx) if err := sql.Where("used = 1 AND status = ?", entity.SenderAlias_SENT_AND_ACKED).Find(&aliases).Error; err != nil { return errors.Wrap(err, "unable to fetch used aliases") } @@ -23,7 +25,7 @@ func (node *Node) SenderAliasesRenew() error { continue } - if err := node.senderAliasSave(replacement, alias); err != nil { + if err := node.senderAliasSave(ctx, replacement, alias); err != nil { logger().Error("node.SenderAliasesRenew", zap.Error(err)) continue } @@ -33,13 +35,13 @@ func (node *Node) SenderAliasesRenew() error { destination = alias.ConversationID } - evt, err := node.NewSenderAliasEvent(destination, []*entity.SenderAlias{replacement}) + evt, err := node.NewSenderAliasEvent(ctx, destination, []*entity.SenderAlias{replacement}) if err != nil { return errors.Wrap(err, "unable to create event") } - if err := node.EnqueueOutgoingEvent(evt); err != nil { + if err := node.EnqueueOutgoingEvent(ctx, evt); err != nil { logger().Error("node.SenderAliasesRenew", zap.Error(err)) return errors.Wrap(err, "unable to emit event") } @@ -48,34 +50,34 @@ func (node *Node) SenderAliasesRenew() error { return nil } -func (node *Node) GenerateAliasForContact(contactID string) (*entity.SenderAlias, error) { +func (node *Node) GenerateAliasForContact(ctx context.Context, contactID string) (*entity.SenderAlias, error) { alias, err := entity.SenderAliasGenerateRandom(node.b64pubkey, contactID, "") if err != nil { return nil, errors.Wrap(err, "unable to generate sender alias for contact") } - err = node.senderAliasSave(alias, nil) + err = node.senderAliasSave(ctx, alias, nil) if err != nil { return nil, errors.Wrap(err, "unable to generate sender alias for contact") } - evt, err := node.NewSenderAliasEvent(contactID, []*entity.SenderAlias{alias}) + evt, err := node.NewSenderAliasEvent(ctx, contactID, []*entity.SenderAlias{alias}) if err != nil { return nil, errors.Wrap(err, "unable to generate sender alias for contact") } - if err := node.EnqueueOutgoingEvent(evt); err != nil { + if err := node.EnqueueOutgoingEvent(ctx, evt); err != nil { return nil, errors.Wrap(err, "unable to generate sender alias for contact") } return alias, nil } -func (node *Node) senderAliasSave(senderAlias *entity.SenderAlias, previous *entity.SenderAlias) error { - sql := node.sql(nil) +func (node *Node) senderAliasSave(ctx context.Context, senderAlias *entity.SenderAlias, previous *entity.SenderAlias) error { + sql := node.sql(ctx) tx := sql.Begin() if err := tx.Save(senderAlias).Error; err != nil { @@ -101,8 +103,8 @@ func (node *Node) senderAliasSave(senderAlias *entity.SenderAlias, previous *ent return nil } -func (node *Node) aliasEnvelopeForContact(envelope *p2p.Envelope, event *p2p.Event) string { - sql := node.sql(nil) +func (node *Node) aliasEnvelopeForContact(ctx context.Context, envelope *p2p.Envelope, event *p2p.Event) string { + sql := node.sql(ctx) alias, err := entity.GetAliasForContact(sql, event.ReceiverID) if err == nil && alias != "" { @@ -114,8 +116,8 @@ func (node *Node) aliasEnvelopeForContact(envelope *p2p.Envelope, event *p2p.Eve return node.b64pubkey } -func (node *Node) aliasEnvelopeForConversation(envelope *p2p.Envelope, event *p2p.Event) string { - sql := node.sql(nil) +func (node *Node) aliasEnvelopeForConversation(ctx context.Context, envelope *p2p.Envelope, event *p2p.Event) string { + sql := node.sql(ctx) alias, err := entity.GetAliasForConversation(sql, event.ConversationID) if err == nil && alias != "" { diff --git a/core/node/sigchain.go b/core/node/sigchain.go index ca9abaa4ce..68d2a560da 100644 --- a/core/node/sigchain.go +++ b/core/node/sigchain.go @@ -1,36 +1,49 @@ package node import ( + "context" + "berty.tech/core/entity" + "berty.tech/core/pkg/tracing" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" ) -func (n *Node) RegisterDevice(device *entity.Device) error { +func (n *Node) RegisterDevice(ctx context.Context, device *entity.Device) error { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, device) + defer span.Finish() + err := n.sigchain.AddDevice(n.crypto, n.pubkey, []byte(device.ID), []byte(device.ID)) if err != nil { return errors.Wrap(err, "unable to add device to sigchain") } - return n.persistSigChain() + return n.persistSigChain(ctx) } -func (n *Node) RevokeDevice(device *entity.Device) error { +func (n *Node) RevokeDevice(ctx context.Context, device *entity.Device) error { + span, _ := tracing.EnterFunc(ctx, device) + defer span.Finish() + // TODO: implement return errors.New("unimplemented") - //return n.persistSigChain() } -func (n *Node) persistSigChain() error { - data, err := n.sigchain.Marshal() +func (n *Node) persistSigChain(ctx context.Context) error { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + defer span.Finish() + data, err := n.sigchain.Marshal() if err != nil { return errors.Wrap(err, "unable to serialize sigchain") } n.config.Myself.Sigchain = data - sql := n.sql(nil) + sql := n.sql(ctx) return sql.Save(n.config.Myself).Error } diff --git a/core/node/sql.go b/core/node/sql.go index d6f24dcad8..de0b29d9b9 100644 --- a/core/node/sql.go +++ b/core/node/sql.go @@ -14,11 +14,10 @@ func WithSQL(sql *gorm.DB) NewNodeOption { } } +// sql returns a gorm.DB object with opentracing context func (n *Node) sql(ctx context.Context) *gorm.DB { - // FIXME: check if jaeger is enable before losing time calling complex functions if ctx == nil { - // FIXME: create a special span for non-contexted streams - return n.sqlDriver + return n.sqlDriver.Set("rootSpan", n.rootSpan) } if span := opentracing.SpanFromContext(ctx); span != nil { return n.sqlDriver.Set("rootSpan", span) diff --git a/core/pkg/banner/logger.gen.go b/core/pkg/banner/logger.gen.go new file mode 100644 index 0000000000..73776ae3fc --- /dev/null +++ b/core/pkg/banner/logger.gen.go @@ -0,0 +1,9 @@ +// Code generated by berty.tech/core/.scripts/generate-logger.sh + +package banner + +import "go.uber.org/zap" + +func logger() *zap.Logger { + return zap.L().Named("core.pkg.banner") +} diff --git a/core/pkg/deviceinfo/logger.gen.go b/core/pkg/deviceinfo/logger.gen.go new file mode 100644 index 0000000000..03c810d88a --- /dev/null +++ b/core/pkg/deviceinfo/logger.gen.go @@ -0,0 +1,9 @@ +// Code generated by berty.tech/core/.scripts/generate-logger.sh + +package deviceinfo + +import "go.uber.org/zap" + +func logger() *zap.Logger { + return zap.L().Named("core.pkg.deviceinfo") +} diff --git a/core/pkg/jaeger/jaeger.go b/core/pkg/jaeger/jaeger.go index 07dc3f3dc6..9c0c498d3f 100644 --- a/core/pkg/jaeger/jaeger.go +++ b/core/pkg/jaeger/jaeger.go @@ -8,9 +8,9 @@ import ( "go.uber.org/zap" ) -func InitTracer(address, service string) (opentracing.Tracer, io.Closer, error) { +func InitTracer(address, name string) (opentracing.Tracer, io.Closer, error) { cfg := &config.Configuration{ - ServiceName: service, + ServiceName: name, Sampler: &config.SamplerConfig{ Type: "const", Param: 1, @@ -21,8 +21,9 @@ func InitTracer(address, service string) (opentracing.Tracer, io.Closer, error) }, } + logger := zap.L().Named("vendor.jaeger") tracer, closer, err := cfg.NewTracer( - config.Logger(&jaegerLogger{logger: zap.L().Named("vendor.jaeger")}), + config.Logger(&jaegerLogger{logger: logger}), ) if err != nil { return nil, nil, err @@ -30,5 +31,10 @@ func InitTracer(address, service string) (opentracing.Tracer, io.Closer, error) opentracing.SetGlobalTracer(tracer) + logger.Debug("jaeger tracer started", + zap.String("addr", address), + zap.String("name", name), + ) + return tracer, closer, nil } diff --git a/core/pkg/quote/logger.gen.go b/core/pkg/quote/logger.gen.go new file mode 100644 index 0000000000..649026229a --- /dev/null +++ b/core/pkg/quote/logger.gen.go @@ -0,0 +1,9 @@ +// Code generated by berty.tech/core/.scripts/generate-logger.sh + +package quote + +import "go.uber.org/zap" + +func logger() *zap.Logger { + return zap.L().Named("core.pkg.quote") +} diff --git a/core/pkg/tracing/logger.gen.go b/core/pkg/tracing/logger.gen.go new file mode 100644 index 0000000000..20d9721320 --- /dev/null +++ b/core/pkg/tracing/logger.gen.go @@ -0,0 +1,9 @@ +// Code generated by berty.tech/core/.scripts/generate-logger.sh + +package tracing + +import "go.uber.org/zap" + +func logger() *zap.Logger { + return zap.L().Named("core.pkg.tracing") +} diff --git a/core/pkg/tracing/tracing.go b/core/pkg/tracing/tracing.go new file mode 100644 index 0000000000..4c4f7386d6 --- /dev/null +++ b/core/pkg/tracing/tracing.go @@ -0,0 +1,40 @@ +package tracing + +import ( + "context" + "encoding/json" + "fmt" + "runtime" + "strings" + + opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/log" +) + +func EnterFunc(ctx context.Context, args ...interface{}) (opentracing.Span, context.Context) { + // FIXME: add a way to completely disable the following behavior + function, _, _, _ := runtime.Caller(1) + funcName := runtime.FuncForPC(function).Name() + topic := fmt.Sprintf("call::%s()", funcName) + topic = strings.Replace(topic, "call::berty.tech/core/", "call::./", 1) + if ctx == nil { + ctx = context.Background() + logger().Warn("context is not set") + } + span, ctx := opentracing.StartSpanFromContext(ctx, topic) + if len(args) > 0 { + for idx, arg := range args { + outBytes, err := json.Marshal(arg) + out := string(outBytes) + argName := fmt.Sprintf("arg%d", idx) + + if err != nil { + out = fmt.Sprintf("%v", arg) + } + span.LogFields(log.String(argName, out)) + + } + } + span.SetTag("component", "core.call") + return span, ctx +} diff --git a/core/pkg/zapring/logger.gen.go b/core/pkg/zapring/logger.gen.go new file mode 100644 index 0000000000..d5a6cd3685 --- /dev/null +++ b/core/pkg/zapring/logger.gen.go @@ -0,0 +1,9 @@ +// Code generated by berty.tech/core/.scripts/generate-logger.sh + +package zapring + +import "go.uber.org/zap" + +func logger() *zap.Logger { + return zap.L().Named("core.pkg.zapring") +} diff --git a/core/pkg/zapring/zapring.go b/core/pkg/zapring/zapring.go index 33ad11d8f2..5bee61bfa9 100644 --- a/core/pkg/zapring/zapring.go +++ b/core/pkg/zapring/zapring.go @@ -9,7 +9,7 @@ import ( "go.uber.org/zap/zapcore" ) -const PtrSize = 32 << uintptr(^uintptr(0)>>63) +const PtrSize = 32 << (^uintptr(0) >> 63) type Ring struct { zapcore.Core @@ -41,7 +41,9 @@ func (r *Ring) Write(entry zapcore.Entry, fields []zapcore.Field) error { if err != nil { return err } - r.buffer.Write(buff.Bytes()) + if _, err = r.buffer.Write(buff.Bytes()); err != nil { + return err + } } return r.Core.Write(entry, fields) diff --git a/core/test/app_mock.go b/core/test/app_mock.go index 94ef45b6ef..8670853899 100644 --- a/core/test/app_mock.go +++ b/core/test/app_mock.go @@ -118,7 +118,10 @@ func (a *AppMock) Open() error { } } + a.ctx, a.cancel = context.WithCancel(context.Background()) + if a.node, err = node.New( + a.ctx, node.WithSQL(a.db), node.WithP2PGrpcServer(gs), node.WithNodeGrpcServer(gs), @@ -142,7 +145,7 @@ func (a *AppMock) Open() error { }() go func() { - if err := a.node.Start(false, false); err != nil { + if err := a.node.Start(a.ctx, false, false); err != nil { logger().Error("node routine error", zap.Error(err)) } }() @@ -153,8 +156,6 @@ func (a *AppMock) Open() error { } a.client = client.New(a.clientConn) - a.ctx, a.cancel = context.WithCancel(context.Background()) - return nil } diff --git a/core/test/e2e_test.go b/core/test/e2e_test.go index c82a297f85..b07039e7a3 100644 --- a/core/test/e2e_test.go +++ b/core/test/e2e_test.go @@ -50,15 +50,15 @@ func TestWithEnqueuer(t *testing.T) { Convey("Initialize nodes", FailureHalts, func() { shouldIContinue(t) - alice, err = NewAppMock(&entity.Device{Name: "Alice's iPhone"}, mock.NewEnqueuer()) + alice, err = NewAppMock(&entity.Device{Name: "Alice's iPhone"}, mock.NewEnqueuer(context.Background())) So(err, ShouldBeNil) So(alice.InitEventStream(), ShouldBeNil) - bob, err = NewAppMock(&entity.Device{Name: "iPhone de Bob"}, mock.NewEnqueuer()) + bob, err = NewAppMock(&entity.Device{Name: "iPhone de Bob"}, mock.NewEnqueuer(context.Background())) So(err, ShouldBeNil) So(bob.InitEventStream(), ShouldBeNil) - eve, err = NewAppMock(&entity.Device{Name: "Eve"}, mock.NewEnqueuer()) + eve, err = NewAppMock(&entity.Device{Name: "Eve"}, mock.NewEnqueuer(context.Background())) So(err, ShouldBeNil) So(eve.InitEventStream(), ShouldBeNil) @@ -70,7 +70,7 @@ func TestWithEnqueuer(t *testing.T) { for i := 0; i < 100; i++ { { - event := alice.node.NewContactEvent(&entity.Contact{ID: bob.node.DeviceID()}, p2p.Kind_DevtoolsMapset) + event := alice.node.NewContactEvent(alice.ctx, &entity.Contact{ID: bob.node.DeviceID()}, p2p.Kind_DevtoolsMapset) So(event.SetAttrs(&p2p.DevtoolsMapsetAttrs{Key: "test", Val: fmt.Sprintf("%d", i)}), ShouldBeNil) res, err := bob.node.HandleEvent(alice.ctx, event.Copy()) So(err, ShouldBeNil) @@ -79,7 +79,7 @@ func TestWithEnqueuer(t *testing.T) { { So(bob.node.DevtoolsMapget("test"), ShouldEqual, fmt.Sprintf("%d", i)) envelope := <-bob.networkDriver.(*mock.Enqueuer).Queue() - event, err := alice.node.OpenEnvelope(envelope) + event, err := alice.node.OpenEnvelope(alice.ctx, envelope) So(err, ShouldBeIn, nil, errorcodes.ErrorUntrustedEnvelope) So(event.Kind, ShouldEqual, p2p.Kind_Ack) //jsonPrintIndent(event) @@ -211,7 +211,7 @@ func TestWithEnqueuer(t *testing.T) { shouldIContinue(t) envelope := <-alice.networkDriver.(*mock.Enqueuer).Queue() - event, err := alice.node.OpenEnvelope(envelope) + event, err := alice.node.OpenEnvelope(alice.ctx, envelope) So(err, ShouldBeNil) So(event.Author(), ShouldEqual, alice.node.UserID()) @@ -269,7 +269,7 @@ func TestWithEnqueuer(t *testing.T) { shouldIContinue(t) envelope := <-bob.networkDriver.(*mock.Enqueuer).Queue() - event, err := alice.node.OpenEnvelope(envelope) + event, err := alice.node.OpenEnvelope(alice.ctx, envelope) So(err, ShouldEqual, errorcodes.ErrorUntrustedEnvelope) So(event.Author(), ShouldEqual, bob.node.UserID()) @@ -332,7 +332,7 @@ func TestWithEnqueuer(t *testing.T) { shouldIContinue(t) envelope := <-bob.networkDriver.(*mock.Enqueuer).Queue() - event, err := alice.node.OpenEnvelope(envelope) + event, err := alice.node.OpenEnvelope(alice.ctx, envelope) So(err, ShouldEqual, errorcodes.ErrorUntrustedEnvelope) So(event.Kind, ShouldEqual, p2p.Kind_ContactRequestAccepted) @@ -375,7 +375,7 @@ func TestWithEnqueuer(t *testing.T) { shouldIContinue(t) envelope := <-bob.networkDriver.(*mock.Enqueuer).Queue() - event, err := alice.node.OpenEnvelope(envelope) + event, err := alice.node.OpenEnvelope(alice.ctx, envelope) So(err, ShouldEqual, errorcodes.ErrorUntrustedEnvelope) So(event.Kind, ShouldEqual, p2p.Kind_ContactShareMe) @@ -419,7 +419,7 @@ func TestWithEnqueuer(t *testing.T) { shouldIContinue(t) envelope := <-alice.networkDriver.(*mock.Enqueuer).Queue() - event, err := alice.node.OpenEnvelope(envelope) + event, err := alice.node.OpenEnvelope(alice.ctx, envelope) So(err, ShouldBeNil) So(event.SenderID, ShouldEqual, alice.node.UserID()) @@ -444,7 +444,7 @@ func TestWithEnqueuer(t *testing.T) { shouldIContinue(t) envelope := <-alice.networkDriver.(*mock.Enqueuer).Queue() - event, err := alice.node.OpenEnvelope(envelope) + event, err := alice.node.OpenEnvelope(alice.ctx, envelope) So(err, ShouldBeNil) So(event.SenderID, ShouldEqual, alice.node.UserID()) @@ -467,7 +467,7 @@ func TestWithEnqueuer(t *testing.T) { shouldIContinue(t) envelope := <-alice.networkDriver.(*mock.Enqueuer).Queue() - event, err := alice.node.OpenEnvelope(envelope) + event, err := alice.node.OpenEnvelope(alice.ctx, envelope) So(err, ShouldBeNil) So(event.SenderID, ShouldEqual, alice.node.UserID()) @@ -490,7 +490,7 @@ func TestWithEnqueuer(t *testing.T) { shouldIContinue(t) envelope := <-bob.networkDriver.(*mock.Enqueuer).Queue() - event, err := alice.node.OpenEnvelope(envelope) + event, err := alice.node.OpenEnvelope(alice.ctx, envelope) So(err, ShouldEqual, errorcodes.ErrorUntrustedEnvelope) So(event.Kind, ShouldEqual, p2p.Kind_Ack) @@ -728,7 +728,7 @@ func TestAliasesFlow(t *testing.T) { Convey("Alice send initial aliases to Bob and sends an aliased message to Bob", FailureHalts, func() { shouldIContinue(t) - alias, err := alice.node.GenerateAliasForContact(bob.node.UserID()) + alias, err := alice.node.GenerateAliasForContact(alice.ctx, bob.node.UserID()) So(err, ShouldBeNil) time.Sleep(timeBetweenSteps) @@ -758,7 +758,7 @@ func TestAliasesFlow(t *testing.T) { envelope = alice.networkDriver.(*mock.SimpleDriver).GetLastSentEnvelope() So(envelope.Source, ShouldEqual, alias.AliasIdentifier) - err = alice.node.SenderAliasesRenew() + err = alice.node.SenderAliasesRenew(alice.ctx) So(err, ShouldBeNil) @@ -823,7 +823,7 @@ func setupP2PNetwork(bootstrap ...string) (*p2pnet.Driver, error) { return nil, err } go func() { - if err := driver.Start(); err != nil { + if err := driver.Start(context.Background()); err != nil { logger().Error("driver start error", zap.Error(err)) } }() @@ -835,7 +835,7 @@ func getBoostrap(d *p2pnet.Driver) []string { bootstrap := make([]string, len(addrs)) for i, a := range addrs { - bootstrap[i] = fmt.Sprintf("%s/ipfs/%s", a.String(), d.ID().ID) + bootstrap[i] = fmt.Sprintf("%s/ipfs/%s", a.String(), d.ID(context.Background()).ID) } return bootstrap diff --git a/core/test/mainloop_test.go b/core/test/mainloop_test.go index f4b42f2519..2df5a4aa2b 100644 --- a/core/test/mainloop_test.go +++ b/core/test/mainloop_test.go @@ -1,6 +1,7 @@ package test import ( + "context" "fmt" "testing" "time" @@ -14,7 +15,7 @@ import ( ) func setupNonAcknowledgedEventDestinations() (*AppMock, time.Time, time.Time, time.Time) { - n, err := NewAppMock(&entity.Device{Name: "Alice's iPhone"}, networkmock.NewEnqueuer(), WithUnencryptedDb()) + n, err := NewAppMock(&entity.Device{Name: "Alice's iPhone"}, networkmock.NewEnqueuer(context.Background()), WithUnencryptedDb()) if err != nil { panic(err) @@ -54,7 +55,7 @@ func TestEventRetry(t *testing.T) { "Event8": false, } - events, err := appMock.node.EventsRetry(now) + events, err := appMock.node.EventsRetry(context.Background(), now) if err != nil { t.Error(err) diff --git a/core/test/node_test.go b/core/test/node_test.go index b182353ad5..6cab69427b 100644 --- a/core/test/node_test.go +++ b/core/test/node_test.go @@ -1,6 +1,7 @@ package test import ( + "context" "io" "testing" "time" @@ -42,12 +43,12 @@ func TestNodeHelpers(t *testing.T) { Convey("Testing Node", t, func() { Convey("Testing Node.EventStream", FailureContinues, func(c C) { t.Skip("see https://github.com/berty/berty/issues/252") - app, err := NewAppMock(&entity.Device{Name: "test phone"}, mock.NewEnqueuer()) + app, err := NewAppMock(&entity.Device{Name: "test phone"}, mock.NewEnqueuer(context.Background())) So(err, ShouldBeNil) defer app.Close() - So(app.node.EnqueueClientEvent(&p2p.Event{}), ShouldBeNil) - So(app.node.EnqueueClientEvent(&p2p.Event{}), ShouldBeNil) + So(app.node.EnqueueClientEvent(app.ctx, &p2p.Event{}), ShouldBeNil) + So(app.node.EnqueueClientEvent(app.ctx, &p2p.Event{}), ShouldBeNil) // streamA accepts everything queueA := make(chan eventStreamEntry, 100) @@ -58,8 +59,8 @@ func TestNodeHelpers(t *testing.T) { time.Sleep(50 * time.Millisecond) So(len(queueA), ShouldEqual, 0) - So(app.node.EnqueueClientEvent(&p2p.Event{}), ShouldBeNil) - So(app.node.EnqueueClientEvent(&p2p.Event{}), ShouldBeNil) + So(app.node.EnqueueClientEvent(app.ctx, &p2p.Event{}), ShouldBeNil) + So(app.node.EnqueueClientEvent(app.ctx, &p2p.Event{}), ShouldBeNil) time.Sleep(50 * time.Millisecond) So(len(queueA), ShouldEqual, 2) @@ -70,8 +71,8 @@ func TestNodeHelpers(t *testing.T) { So(err, ShouldBeNil) go streamToQueue(queueB, streamB, c) - So(app.node.EnqueueClientEvent(&p2p.Event{}), ShouldBeNil) - So(app.node.EnqueueClientEvent(&p2p.Event{}), ShouldBeNil) + So(app.node.EnqueueClientEvent(app.ctx, &p2p.Event{}), ShouldBeNil) + So(app.node.EnqueueClientEvent(app.ctx, &p2p.Event{}), ShouldBeNil) time.Sleep(50 * time.Millisecond) So(len(queueA), ShouldEqual, 4) @@ -97,7 +98,7 @@ func TestNodeHelpers(t *testing.T) { So(err, ShouldBeNil) go streamToQueue(queueD, streamD, c) - So(app.node.EnqueueClientEvent(&p2p.Event{ + So(app.node.EnqueueClientEvent(app.ctx, &p2p.Event{ Kind: p2p.Kind_Ack, ConversationID: "bbbb", }), ShouldBeNil) @@ -107,7 +108,7 @@ func TestNodeHelpers(t *testing.T) { So(len(queueC), ShouldEqual, 0) So(len(queueD), ShouldEqual, 0) - So(app.node.EnqueueClientEvent(&p2p.Event{ + So(app.node.EnqueueClientEvent(app.ctx, &p2p.Event{ Kind: p2p.Kind_Ping, ConversationID: "abcde", }), ShouldBeNil) @@ -127,7 +128,7 @@ func TestNodeHelpers(t *testing.T) { } for i := 0; i < 50; i++ { - So(app.node.EnqueueClientEvent(&p2p.Event{ + So(app.node.EnqueueClientEvent(app.ctx, &p2p.Event{ Kind: p2p.Kind_Ping, ConversationID: "bbbb", }), ShouldBeNil) diff --git a/core/test/test_test.go b/core/test/test_test.go index 32600022ce..d0ec0483b4 100644 --- a/core/test/test_test.go +++ b/core/test/test_test.go @@ -1,6 +1,7 @@ package test import ( + "context" "encoding/json" "fmt" "log" @@ -60,7 +61,7 @@ func nodeChansLens(apps ...*AppMock) []int { time.Sleep(10 * time.Millisecond) // we let a few time to the queue to be filled out := []int{} for _, app := range apps { - app.node.AsyncWait() + app.node.AsyncWait(context.Background()) out = append(out, len(app.networkDriver.(*mock.Enqueuer).Queue())) out = append(out, len(app.eventStream)) if len(app.eventStream) > 99 {