From 417e43844886d964bb44d49f9bd85f8d76373c47 Mon Sep 17 00:00:00 2001 From: Guilhem Fanton Date: Fri, 30 Nov 2018 18:21:37 +0100 Subject: [PATCH] feat(event): Add event span --- client/react-native/common/schema.graphql | 12 + core/api/node/graphql/gqlgen.gen.yml | 19 + .../graphql/graph/generated/generated.gen.go | 337 ++++++++++++++- core/api/node/graphql/resolver.go | 2 +- core/api/node/graphql/service.gen.graphql | 12 + core/api/p2p/event.go | 87 +++- core/api/p2p/event.pb.go | 388 +++++++++++++++--- core/api/p2p/event.proto | 8 + core/cmd/berty/client.go | 2 +- core/go.mod | 2 +- core/manager/account/p2p.go | 2 + core/network/p2p/driver.go | 57 ++- core/network/p2p/options.go | 2 +- core/network/p2p/p2putil/manager.go | 10 + core/node/event.go | 1 + core/node/mainloop.go | 133 +++--- core/node/p2pclient.go | 15 +- core/pkg/tracing/tracing.go | 13 +- experiment/network/real/docker-compose.yml | 6 +- 19 files changed, 972 insertions(+), 136 deletions(-) diff --git a/client/react-native/common/schema.graphql b/client/react-native/common/schema.graphql index b585e5564d..b133b2d01a 100644 --- a/client/react-native/common/schema.graphql +++ b/client/react-native/common/schema.graphql @@ -388,6 +388,11 @@ type BertyP2pEvent implements Node { attributes: [Byte!], conversationId: ID! seenAt: GoogleProtobufTimestamp + metadata: [BertyP2pMetadataKeyValue] +} +type BertyP2pMetadataKeyValue { + key: String! + values: [String!] } @@ -494,6 +499,10 @@ type BertyP2pPeerPayload { addrs: [String!] connection: Enum } +input BertyP2pMetadataKeyValueInput { + key: String! + values: [String!] +} input BertyP2pEventInput { id: ID! senderId: String! @@ -510,6 +519,7 @@ input BertyP2pEventInput { attributes: [Byte!], conversationId: ID! seenAt: GoogleProtobufTimestampInput + metadata: [BertyP2pMetadataKeyValueInput] } type BertyP2pEventPayload { id: ID! @@ -527,6 +537,7 @@ type BertyP2pEventPayload { attributes: [Byte!], conversationId: ID! seenAt: GoogleProtobufTimestamp + metadata: [BertyP2pMetadataKeyValue] } input BertyNodePaginationInput { orderBy: String! @@ -677,6 +688,7 @@ type Query { attributes: [Byte!], conversationId: ID! seenAt: GoogleProtobufTimestampInput + metadata: [BertyP2pMetadataKeyValueInput] ): BertyP2pEventPayload ContactList( filter: BertyEntityContactInput diff --git a/core/api/node/graphql/gqlgen.gen.yml b/core/api/node/graphql/gqlgen.gen.yml index 1c09b10825..a06ad10090 100644 --- a/core/api/node/graphql/gqlgen.gen.yml +++ b/core/api/node/graphql/gqlgen.gen.yml @@ -1173,6 +1173,7 @@ models: conversationId: resolver: true seenAt: + metadata: BertyP2pEventInput: model: berty.tech/core/api/p2p.Event fields: @@ -1194,6 +1195,7 @@ models: conversationId: resolver: true seenAt: + metadata: BertyP2pEventPayload: model: berty.tech/core/api/p2p.Event fields: @@ -1215,6 +1217,23 @@ models: conversationId: resolver: true seenAt: + metadata: + + BertyP2pMetadataKeyValue: + model: berty.tech/core/api/p2p.MetadataKeyValue + fields: + key: + values: + BertyP2pMetadataKeyValueInput: + model: berty.tech/core/api/p2p.MetadataKeyValue + fields: + key: + values: + BertyP2pMetadataKeyValuePayload: + model: berty.tech/core/api/p2p.MetadataKeyValue + fields: + key: + values: BertyP2pBandwidthStats: diff --git a/core/api/node/graphql/graph/generated/generated.gen.go b/core/api/node/graphql/graph/generated/generated.gen.go index f297684f0b..d92e18db24 100644 --- a/core/api/node/graphql/graph/generated/generated.gen.go +++ b/core/api/node/graphql/graph/generated/generated.gen.go @@ -345,6 +345,7 @@ type ComplexityRoot struct { Attributes func(childComplexity int) int ConversationId func(childComplexity int) int SeenAt func(childComplexity int) int + Metadata func(childComplexity int) int } BertyP2pEventPayload struct { @@ -363,6 +364,12 @@ type ComplexityRoot struct { Attributes func(childComplexity int) int ConversationId func(childComplexity int) int SeenAt func(childComplexity int) int + Metadata func(childComplexity int) int + } + + BertyP2pMetadataKeyValue struct { + Key func(childComplexity int) int + Values func(childComplexity int) int } BertyP2pNodeAttrs struct { @@ -640,7 +647,7 @@ type ComplexityRoot struct { Node func(childComplexity int, id string) int Id func(childComplexity int, T bool) int EventList func(childComplexity int, filter *p2p.Event, onlyWithoutAckedAt *int32, orderBy string, orderDesc bool, first *int32, after *string, last *int32, before *string) int - GetEvent func(childComplexity int, id string, senderId string, createdAt *time.Time, updatedAt *time.Time, sentAt *time.Time, receivedAt *time.Time, ackedAt *time.Time, direction *int32, senderApiVersion uint32, receiverApiVersion uint32, receiverId string, kind *int32, attributes []byte, conversationId string, seenAt *time.Time) int + GetEvent func(childComplexity int, id string, senderId string, createdAt *time.Time, updatedAt *time.Time, sentAt *time.Time, receivedAt *time.Time, ackedAt *time.Time, direction *int32, senderApiVersion uint32, receiverApiVersion uint32, receiverId string, kind *int32, attributes []byte, conversationId string, seenAt *time.Time, metadata []*p2p.MetadataKeyValue) int ContactList func(childComplexity int, filter *entity.Contact, orderBy string, orderDesc bool, first *int32, after *string, last *int32, before *string) int GetContact func(childComplexity int, id string, createdAt *time.Time, updatedAt *time.Time, sigchain []byte, status *int32, devices []*entity.Device, displayName string, displayStatus string, overrideDisplayName string, overrideDisplayStatus string) int ConversationList func(childComplexity int, filter *entity.Conversation, orderBy string, orderDesc bool, first *int32, after *string, last *int32, before *string) int @@ -733,7 +740,7 @@ type QueryResolver interface { Node(ctx context.Context, id string) (models.Node, error) ID(ctx context.Context, T bool) (*p2p.Peer, error) EventList(ctx context.Context, filter *p2p.Event, onlyWithoutAckedAt *int32, orderBy string, orderDesc bool, first *int32, after *string, last *int32, before *string) (*node.EventListConnection, error) - GetEvent(ctx context.Context, id string, senderId string, createdAt *time.Time, updatedAt *time.Time, sentAt *time.Time, receivedAt *time.Time, ackedAt *time.Time, direction *int32, senderApiVersion uint32, receiverApiVersion uint32, receiverId string, kind *int32, attributes []byte, conversationId string, seenAt *time.Time) (*p2p.Event, error) + GetEvent(ctx context.Context, id string, senderId string, createdAt *time.Time, updatedAt *time.Time, sentAt *time.Time, receivedAt *time.Time, ackedAt *time.Time, direction *int32, senderApiVersion uint32, receiverApiVersion uint32, receiverId string, kind *int32, attributes []byte, conversationId string, seenAt *time.Time, metadata []*p2p.MetadataKeyValue) (*p2p.Event, error) ContactList(ctx context.Context, filter *entity.Contact, orderBy string, orderDesc bool, first *int32, after *string, last *int32, before *string) (*node.ContactListConnection, error) GetContact(ctx context.Context, id string, createdAt *time.Time, updatedAt *time.Time, sigchain []byte, status *int32, devices []*entity.Device, displayName string, displayStatus string, overrideDisplayName string, overrideDisplayStatus string) (*entity.Contact, error) ConversationList(ctx context.Context, filter *entity.Conversation, orderBy string, orderDesc bool, first *int32, after *string, last *int32, before *string) (*node.ConversationListConnection, error) @@ -1765,6 +1772,30 @@ func field_Query_GetEvent_args(rawArgs map[string]interface{}) (map[string]inter } } args["seenAt"] = arg14 + var arg15 []*p2p.MetadataKeyValue + if tmp, ok := rawArgs["metadata"]; ok { + var err error + var rawIf1 []interface{} + if tmp != nil { + if tmp1, ok := tmp.([]interface{}); ok { + rawIf1 = tmp1 + } else { + rawIf1 = []interface{}{tmp} + } + } + arg15 = make([]*p2p.MetadataKeyValue, len(rawIf1)) + for idx1 := range rawIf1 { + var ptr2 p2p.MetadataKeyValue + if rawIf1[idx1] != nil { + ptr2, err = UnmarshalBertyP2pMetadataKeyValueInput(rawIf1[idx1]) + arg15[idx1] = &ptr2 + } + } + if err != nil { + return nil, err + } + } + args["metadata"] = arg15 return args, nil } @@ -3696,6 +3727,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.BertyP2pEvent.SeenAt(childComplexity), true + case "BertyP2pEvent.metadata": + if e.complexity.BertyP2pEvent.Metadata == nil { + break + } + + return e.complexity.BertyP2pEvent.Metadata(childComplexity), true + case "BertyP2pEventPayload.id": if e.complexity.BertyP2pEventPayload.Id == nil { break @@ -3801,6 +3839,27 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.BertyP2pEventPayload.SeenAt(childComplexity), true + case "BertyP2pEventPayload.metadata": + if e.complexity.BertyP2pEventPayload.Metadata == nil { + break + } + + return e.complexity.BertyP2pEventPayload.Metadata(childComplexity), true + + case "BertyP2pMetadataKeyValue.key": + if e.complexity.BertyP2pMetadataKeyValue.Key == nil { + break + } + + return e.complexity.BertyP2pMetadataKeyValue.Key(childComplexity), true + + case "BertyP2pMetadataKeyValue.values": + if e.complexity.BertyP2pMetadataKeyValue.Values == nil { + break + } + + return e.complexity.BertyP2pMetadataKeyValue.Values(childComplexity), true + case "BertyP2pNodeAttrs.kind": if e.complexity.BertyP2pNodeAttrs.Kind == nil { break @@ -4990,7 +5049,7 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return 0, false } - return e.complexity.Query.GetEvent(childComplexity, args["id"].(string), args["senderId"].(string), args["createdAt"].(*time.Time), args["updatedAt"].(*time.Time), args["sentAt"].(*time.Time), args["receivedAt"].(*time.Time), args["ackedAt"].(*time.Time), args["direction"].(*int32), args["senderApiVersion"].(uint32), args["receiverApiVersion"].(uint32), args["receiverId"].(string), args["kind"].(*int32), args["attributes"].([]byte), args["conversationId"].(string), args["seenAt"].(*time.Time)), true + return e.complexity.Query.GetEvent(childComplexity, args["id"].(string), args["senderId"].(string), args["createdAt"].(*time.Time), args["updatedAt"].(*time.Time), args["sentAt"].(*time.Time), args["receivedAt"].(*time.Time), args["ackedAt"].(*time.Time), args["direction"].(*int32), args["senderApiVersion"].(uint32), args["receiverApiVersion"].(uint32), args["receiverId"].(string), args["kind"].(*int32), args["attributes"].([]byte), args["conversationId"].(string), args["seenAt"].(*time.Time), args["metadata"].([]*p2p.MetadataKeyValue)), true case "Query.ContactList": if e.complexity.Query.ContactList == nil { @@ -10297,6 +10356,8 @@ func (ec *executionContext) _BertyP2pEvent(ctx context.Context, sel ast.Selectio }(i, field) case "seenAt": out.Values[i] = ec._BertyP2pEvent_seenAt(ctx, field, obj) + case "metadata": + out.Values[i] = ec._BertyP2pEvent_metadata(ctx, field, obj) default: panic("unknown field " + strconv.Quote(field.Name)) } @@ -10651,6 +10712,63 @@ func (ec *executionContext) _BertyP2pEvent_seenAt(ctx context.Context, field gra return models.MarshalTime(*res) } +// nolint: vetshadow +func (ec *executionContext) _BertyP2pEvent_metadata(ctx context.Context, field graphql.CollectedField, obj *p2p.Event) graphql.Marshaler { + rctx := &graphql.ResolverContext{ + Object: "BertyP2pEvent", + Args: nil, + Field: field, + } + ctx = graphql.WithResolverContext(ctx, rctx) + resTmp := ec.FieldMiddleware(ctx, obj, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Metadata, nil + }) + if resTmp == nil { + return graphql.Null + } + res := resTmp.([]*p2p.MetadataKeyValue) + rctx.Result = res + + arr1 := make(graphql.Array, len(res)) + var wg sync.WaitGroup + + isLen1 := len(res) == 1 + if !isLen1 { + wg.Add(len(res)) + } + + for idx1 := range res { + idx1 := idx1 + rctx := &graphql.ResolverContext{ + Index: &idx1, + Result: res[idx1], + } + ctx := graphql.WithResolverContext(ctx, rctx) + f := func(idx1 int) { + if !isLen1 { + defer wg.Done() + } + arr1[idx1] = func() graphql.Marshaler { + + if res[idx1] == nil { + return graphql.Null + } + + return ec._BertyP2pMetadataKeyValue(ctx, field.Selections, res[idx1]) + }() + } + if isLen1 { + f(idx1) + } else { + go f(idx1) + } + + } + wg.Wait() + return arr1 +} + var bertyP2pEventPayloadImplementors = []string{"BertyP2pEventPayload"} // nolint: gocyclo, errcheck, gas, goconst @@ -10726,6 +10844,8 @@ func (ec *executionContext) _BertyP2pEventPayload(ctx context.Context, sel ast.S }(i, field) case "seenAt": out.Values[i] = ec._BertyP2pEventPayload_seenAt(ctx, field, obj) + case "metadata": + out.Values[i] = ec._BertyP2pEventPayload_metadata(ctx, field, obj) default: panic("unknown field " + strconv.Quote(field.Name)) } @@ -11080,6 +11200,147 @@ func (ec *executionContext) _BertyP2pEventPayload_seenAt(ctx context.Context, fi return models.MarshalTime(*res) } +// nolint: vetshadow +func (ec *executionContext) _BertyP2pEventPayload_metadata(ctx context.Context, field graphql.CollectedField, obj *p2p.Event) graphql.Marshaler { + rctx := &graphql.ResolverContext{ + Object: "BertyP2pEventPayload", + Args: nil, + Field: field, + } + ctx = graphql.WithResolverContext(ctx, rctx) + resTmp := ec.FieldMiddleware(ctx, obj, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Metadata, nil + }) + if resTmp == nil { + return graphql.Null + } + res := resTmp.([]*p2p.MetadataKeyValue) + rctx.Result = res + + arr1 := make(graphql.Array, len(res)) + var wg sync.WaitGroup + + isLen1 := len(res) == 1 + if !isLen1 { + wg.Add(len(res)) + } + + for idx1 := range res { + idx1 := idx1 + rctx := &graphql.ResolverContext{ + Index: &idx1, + Result: res[idx1], + } + ctx := graphql.WithResolverContext(ctx, rctx) + f := func(idx1 int) { + if !isLen1 { + defer wg.Done() + } + arr1[idx1] = func() graphql.Marshaler { + + if res[idx1] == nil { + return graphql.Null + } + + return ec._BertyP2pMetadataKeyValue(ctx, field.Selections, res[idx1]) + }() + } + if isLen1 { + f(idx1) + } else { + go f(idx1) + } + + } + wg.Wait() + return arr1 +} + +var bertyP2pMetadataKeyValueImplementors = []string{"BertyP2pMetadataKeyValue"} + +// nolint: gocyclo, errcheck, gas, goconst +func (ec *executionContext) _BertyP2pMetadataKeyValue(ctx context.Context, sel ast.SelectionSet, obj *p2p.MetadataKeyValue) graphql.Marshaler { + fields := graphql.CollectFields(ctx, sel, bertyP2pMetadataKeyValueImplementors) + + out := graphql.NewOrderedMap(len(fields)) + invalid := false + for i, field := range fields { + out.Keys[i] = field.Alias + + switch field.Name { + case "__typename": + out.Values[i] = graphql.MarshalString("BertyP2pMetadataKeyValue") + case "key": + out.Values[i] = ec._BertyP2pMetadataKeyValue_key(ctx, field, obj) + if out.Values[i] == graphql.Null { + invalid = true + } + case "values": + out.Values[i] = ec._BertyP2pMetadataKeyValue_values(ctx, field, obj) + default: + panic("unknown field " + strconv.Quote(field.Name)) + } + } + + if invalid { + return graphql.Null + } + return out +} + +// nolint: vetshadow +func (ec *executionContext) _BertyP2pMetadataKeyValue_key(ctx context.Context, field graphql.CollectedField, obj *p2p.MetadataKeyValue) graphql.Marshaler { + rctx := &graphql.ResolverContext{ + Object: "BertyP2pMetadataKeyValue", + Args: nil, + Field: field, + } + ctx = graphql.WithResolverContext(ctx, rctx) + resTmp := ec.FieldMiddleware(ctx, obj, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Key, nil + }) + if resTmp == nil { + if !ec.HasError(rctx) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + rctx.Result = res + return models.MarshalString(res) +} + +// nolint: vetshadow +func (ec *executionContext) _BertyP2pMetadataKeyValue_values(ctx context.Context, field graphql.CollectedField, obj *p2p.MetadataKeyValue) graphql.Marshaler { + rctx := &graphql.ResolverContext{ + Object: "BertyP2pMetadataKeyValue", + Args: nil, + Field: field, + } + ctx = graphql.WithResolverContext(ctx, rctx) + resTmp := ec.FieldMiddleware(ctx, obj, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Values, nil + }) + if resTmp == nil { + return graphql.Null + } + res := resTmp.([]string) + rctx.Result = res + + arr1 := make(graphql.Array, len(res)) + + for idx1 := range res { + arr1[idx1] = func() graphql.Marshaler { + return models.MarshalString(res[idx1]) + }() + } + + return arr1 +} + var bertyP2pNodeAttrsImplementors = []string{"BertyP2pNodeAttrs"} // nolint: gocyclo, errcheck, gas, goconst @@ -18184,7 +18445,7 @@ func (ec *executionContext) _Query_GetEvent(ctx context.Context, field graphql.C ctx = graphql.WithResolverContext(ctx, rctx) resTmp := ec.FieldMiddleware(ctx, nil, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return ec.resolvers.Query().GetEvent(rctx, args["id"].(string), args["senderId"].(string), args["createdAt"].(*time.Time), args["updatedAt"].(*time.Time), args["sentAt"].(*time.Time), args["receivedAt"].(*time.Time), args["ackedAt"].(*time.Time), args["direction"].(*int32), args["senderApiVersion"].(uint32), args["receiverApiVersion"].(uint32), args["receiverId"].(string), args["kind"].(*int32), args["attributes"].([]byte), args["conversationId"].(string), args["seenAt"].(*time.Time)) + return ec.resolvers.Query().GetEvent(rctx, args["id"].(string), args["senderId"].(string), args["createdAt"].(*time.Time), args["updatedAt"].(*time.Time), args["sentAt"].(*time.Time), args["receivedAt"].(*time.Time), args["ackedAt"].(*time.Time), args["direction"].(*int32), args["senderApiVersion"].(uint32), args["receiverApiVersion"].(uint32), args["receiverId"].(string), args["kind"].(*int32), args["attributes"].([]byte), args["conversationId"].(string), args["seenAt"].(*time.Time), args["metadata"].([]*p2p.MetadataKeyValue)) }) if resTmp == nil { return graphql.Null @@ -20639,6 +20900,62 @@ func UnmarshalBertyP2pEventInput(v interface{}) (p2p.Event, error) { it.SeenAt = &ptr1 } + if err != nil { + return it, err + } + case "metadata": + var err error + var rawIf1 []interface{} + if v != nil { + if tmp1, ok := v.([]interface{}); ok { + rawIf1 = tmp1 + } else { + rawIf1 = []interface{}{v} + } + } + it.Metadata = make([]*p2p.MetadataKeyValue, len(rawIf1)) + for idx1 := range rawIf1 { + var ptr2 p2p.MetadataKeyValue + if rawIf1[idx1] != nil { + ptr2, err = UnmarshalBertyP2pMetadataKeyValueInput(rawIf1[idx1]) + it.Metadata[idx1] = &ptr2 + } + } + if err != nil { + return it, err + } + } + } + + return it, nil +} + +func UnmarshalBertyP2pMetadataKeyValueInput(v interface{}) (p2p.MetadataKeyValue, error) { + var it p2p.MetadataKeyValue + var asMap = v.(map[string]interface{}) + + for k, v := range asMap { + switch k { + case "key": + var err error + it.Key, err = models.UnmarshalString(v) + if err != nil { + return it, err + } + case "values": + var err error + var rawIf1 []interface{} + if v != nil { + if tmp1, ok := v.([]interface{}); ok { + rawIf1 = tmp1 + } else { + rawIf1 = []interface{}{v} + } + } + it.Values = make([]string, len(rawIf1)) + for idx1 := range rawIf1 { + it.Values[idx1], err = models.UnmarshalString(rawIf1[idx1]) + } if err != nil { return it, err } @@ -21062,6 +21379,11 @@ type BertyP2pEvent implements Node { attributes: [Byte!], conversationId: ID! seenAt: GoogleProtobufTimestamp + metadata: [BertyP2pMetadataKeyValue] +} +type BertyP2pMetadataKeyValue { + key: String! + values: [String!] } @@ -21168,6 +21490,10 @@ type BertyP2pPeerPayload { addrs: [String!] connection: Enum } +input BertyP2pMetadataKeyValueInput { + key: String! + values: [String!] +} input BertyP2pEventInput { id: ID! senderId: String! @@ -21184,6 +21510,7 @@ input BertyP2pEventInput { attributes: [Byte!], conversationId: ID! seenAt: GoogleProtobufTimestampInput + metadata: [BertyP2pMetadataKeyValueInput] } type BertyP2pEventPayload { id: ID! @@ -21201,6 +21528,7 @@ type BertyP2pEventPayload { attributes: [Byte!], conversationId: ID! seenAt: GoogleProtobufTimestamp + metadata: [BertyP2pMetadataKeyValue] } input BertyNodePaginationInput { orderBy: String! @@ -21351,6 +21679,7 @@ type Query { attributes: [Byte!], conversationId: ID! seenAt: GoogleProtobufTimestampInput + metadata: [BertyP2pMetadataKeyValueInput] ): BertyP2pEventPayload ContactList( filter: BertyEntityContactInput diff --git a/core/api/node/graphql/resolver.go b/core/api/node/graphql/resolver.go index b8f6f8ac36..dca5d8c4e1 100644 --- a/core/api/node/graphql/resolver.go +++ b/core/api/node/graphql/resolver.go @@ -379,7 +379,7 @@ func (r *queryResolver) EventList(ctx context.Context, filter *p2p.Event, rawOnl return output, nil } -func (r *queryResolver) GetEvent(ctx context.Context, id string, senderID string, createdAt *time.Time, updatedAt *time.Time, sentAt *time.Time, receivedAt *time.Time, ackedAt *time.Time, direction *int32, senderAPIVersion uint32, receiverAPIVersion uint32, receiverID string, kind *int32, attributes []byte, conversationID string, seenAt *time.Time) (*p2p.Event, error) { +func (r *queryResolver) GetEvent(ctx context.Context, id string, senderID string, createdAt *time.Time, updatedAt *time.Time, sentAt *time.Time, receivedAt *time.Time, ackedAt *time.Time, direction *int32, senderAPIVersion uint32, receiverAPIVersion uint32, receiverID string, kind *int32, attributes []byte, conversationID string, seenAt *time.Time, metadata []*p2p.MetadataKeyValue) (*p2p.Event, error) { return r.client.GetEvent(ctx, &p2p.Event{ ID: strings.SplitN(id, ":", 2)[1], }) diff --git a/core/api/node/graphql/service.gen.graphql b/core/api/node/graphql/service.gen.graphql index b585e5564d..b133b2d01a 100644 --- a/core/api/node/graphql/service.gen.graphql +++ b/core/api/node/graphql/service.gen.graphql @@ -388,6 +388,11 @@ type BertyP2pEvent implements Node { attributes: [Byte!], conversationId: ID! seenAt: GoogleProtobufTimestamp + metadata: [BertyP2pMetadataKeyValue] +} +type BertyP2pMetadataKeyValue { + key: String! + values: [String!] } @@ -494,6 +499,10 @@ type BertyP2pPeerPayload { addrs: [String!] connection: Enum } +input BertyP2pMetadataKeyValueInput { + key: String! + values: [String!] +} input BertyP2pEventInput { id: ID! senderId: String! @@ -510,6 +519,7 @@ input BertyP2pEventInput { attributes: [Byte!], conversationId: ID! seenAt: GoogleProtobufTimestampInput + metadata: [BertyP2pMetadataKeyValueInput] } type BertyP2pEventPayload { id: ID! @@ -527,6 +537,7 @@ type BertyP2pEventPayload { attributes: [Byte!], conversationId: ID! seenAt: GoogleProtobufTimestamp + metadata: [BertyP2pMetadataKeyValue] } input BertyNodePaginationInput { orderBy: String! @@ -677,6 +688,7 @@ type Query { attributes: [Byte!], conversationId: ID! seenAt: GoogleProtobufTimestampInput + metadata: [BertyP2pMetadataKeyValueInput] ): BertyP2pEventPayload ContactList( filter: BertyEntityContactInput diff --git a/core/api/p2p/event.go b/core/api/p2p/event.go index e5a17b95eb..81dd2518c0 100644 --- a/core/api/p2p/event.go +++ b/core/api/p2p/event.go @@ -2,14 +2,23 @@ package p2p import ( "encoding/json" + "fmt" "strings" "time" + "berty.tech/core/pkg/tracing" "github.com/jinzhu/gorm" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" + "go.uber.org/zap" + context "golang.org/x/net/context" ) -func NewOutgoingEvent(sender, receiver string, kind Kind) *Event { +func NewOutgoingEvent(ctx context.Context, sender, receiver string, kind Kind) *Event { + var span opentracing.Span + span, _ = tracing.EnterFunc(ctx, sender, receiver, kind) + defer span.Finish() + return &Event{ SenderAPIVersion: Version, CreatedAt: time.Now().UTC(), @@ -18,6 +27,7 @@ func NewOutgoingEvent(sender, receiver string, kind Kind) *Event { ReceiverID: receiver, Direction: Event_Outgoing, } + } func (e Event) Validate() error { @@ -54,8 +64,83 @@ func (e Event) ToJSON() string { return string(out) } +func (e Event) CreateSpan(ctx context.Context) (opentracing.Span, context.Context) { + tracer := opentracing.GlobalTracer() + caller := tracing.GetCallerName("call", 1) + topic := fmt.Sprintf("event::/%s/%s", e.Direction.String(), e.Kind.String()) + + // Retrieve span context inside the event if needed + spanctx, err := tracer.Extract(opentracing.TextMap, e.TextMapReader()) + + var span opentracing.Span + if err != nil { + span = opentracing.StartSpan(topic, opentracing.ChildOf(spanctx)) + } else { + span = opentracing.StartSpan(topic) + } + + if e.ID != "" { + span.SetTag("event.id", e.ID) + } + + span.SetTag("caller", caller) + span.SetTag("event.kind", e.Kind.String()) + span.SetTag("event.SenderID", e.SenderID) + span.SetTag("event.DestinationID", e.ReceiverID) + span.SetTag("event.Direction", e.Direction.String()) + + if err := tracer.Inject(span.Context(), opentracing.TextMap, e.TextMapWriter()); err != nil { + logger().Error("failed to inject span context", zap.Error(err)) + } + + return span, opentracing.ContextWithSpan(ctx, span) +} + func (e Event) IsNode() {} // required by gqlgen +func (e Event) TextMapWriter() opentracing.TextMapWriter { + return (EventTextMap)(e) +} + +func (e Event) TextMapReader() opentracing.TextMapReader { + return (EventTextMap)(e) +} + +var _ opentracing.TextMapWriter = (*EventTextMap)(nil) +var _ opentracing.TextMapReader = (*EventTextMap)(nil) + +type EventTextMap Event + +func (tm EventTextMap) Set(key, val string) { + for _, data := range tm.Metadata { + if data.Key == key { + data.Values = []string{val} + return + } + } + + tm.Metadata = append(tm.Metadata, &MetadataKeyValue{ + Key: key, + Values: []string{val}, + }) +} + +func (tm EventTextMap) ForeachKey(handler func(key, val string) error) (err error) { + for _, data := range tm.Metadata { + if len(data.Values) > 0 { + err = handler(data.Key, data.Values[0]) + } else { + err = handler(data.Key, "") + } + + if err != nil { + return + } + } + + return +} + // FindNonAcknowledgedEventDestinations finds non acknowledged event destinations emitted before the supplied time value func FindNonAcknowledgedEventDestinations(db *gorm.DB, before time.Time) ([]*Event, error) { var events []*Event diff --git a/core/api/p2p/event.pb.go b/core/api/p2p/event.pb.go index 635d7d4b69..675395958f 100644 --- a/core/api/p2p/event.pb.go +++ b/core/api/p2p/event.pb.go @@ -60,7 +60,7 @@ func (x Event_Direction) String() string { return proto.EnumName(Event_Direction_name, int32(x)) } func (Event_Direction) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_event_349323b62019054f, []int{0, 0} + return fileDescriptor_event_ce557309c9b43243, []int{0, 0} } type Event struct { @@ -98,17 +98,19 @@ type Event struct { // ConversationID needs to be set if the event belongs to a conversation. ConversationID string `protobuf:"bytes,15,opt,name=conversation_id,json=conversationId,proto3" json:"conversation_id,omitempty"` // SeenAt is set to the date when the event has been displayed on the user's screen - SeenAt *time.Time `protobuf:"bytes,16,opt,name=seen_at,json=seenAt,stdtime" json:"seen_at,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + SeenAt *time.Time `protobuf:"bytes,16,opt,name=seen_at,json=seenAt,stdtime" json:"seen_at,omitempty"` + // Additional metadata + Metadata []*MetadataKeyValue `protobuf:"bytes,99,rep,name=metadata" json:"metadata,omitempty" gorm:"-"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *Event) Reset() { *m = Event{} } func (m *Event) String() string { return proto.CompactTextString(m) } func (*Event) ProtoMessage() {} func (*Event) Descriptor() ([]byte, []int) { - return fileDescriptor_event_349323b62019054f, []int{0} + return fileDescriptor_event_ce557309c9b43243, []int{0} } func (m *Event) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -242,8 +244,71 @@ func (m *Event) GetSeenAt() *time.Time { return nil } +func (m *Event) GetMetadata() []*MetadataKeyValue { + if m != nil { + return m.Metadata + } + return nil +} + +type MetadataKeyValue struct { + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Values []string `protobuf:"bytes,2,rep,name=values" json:"values,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *MetadataKeyValue) Reset() { *m = MetadataKeyValue{} } +func (m *MetadataKeyValue) String() string { return proto.CompactTextString(m) } +func (*MetadataKeyValue) ProtoMessage() {} +func (*MetadataKeyValue) Descriptor() ([]byte, []int) { + return fileDescriptor_event_ce557309c9b43243, []int{1} +} +func (m *MetadataKeyValue) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *MetadataKeyValue) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_MetadataKeyValue.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (dst *MetadataKeyValue) XXX_Merge(src proto.Message) { + xxx_messageInfo_MetadataKeyValue.Merge(dst, src) +} +func (m *MetadataKeyValue) XXX_Size() int { + return m.Size() +} +func (m *MetadataKeyValue) XXX_DiscardUnknown() { + xxx_messageInfo_MetadataKeyValue.DiscardUnknown(m) +} + +var xxx_messageInfo_MetadataKeyValue proto.InternalMessageInfo + +func (m *MetadataKeyValue) GetKey() string { + if m != nil { + return m.Key + } + return "" +} + +func (m *MetadataKeyValue) GetValues() []string { + if m != nil { + return m.Values + } + return nil +} + func init() { proto.RegisterType((*Event)(nil), "berty.p2p.Event") + proto.RegisterType((*MetadataKeyValue)(nil), "berty.p2p.MetadataKeyValue") proto.RegisterEnum("berty.p2p.Event_Direction", Event_Direction_name, Event_Direction_value) } func (m *Event) Marshal() (dAtA []byte, err error) { @@ -369,6 +434,62 @@ func (m *Event) MarshalTo(dAtA []byte) (int, error) { } i += n6 } + if len(m.Metadata) > 0 { + for _, msg := range m.Metadata { + dAtA[i] = 0x9a + i++ + dAtA[i] = 0x6 + i++ + i = encodeVarintEvent(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } + return i, nil +} + +func (m *MetadataKeyValue) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *MetadataKeyValue) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Key) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintEvent(dAtA, i, uint64(len(m.Key))) + i += copy(dAtA[i:], m.Key) + } + if len(m.Values) > 0 { + for _, s := range m.Values { + dAtA[i] = 0x12 + i++ + l = len(s) + for l >= 1<<7 { + dAtA[i] = uint8(uint64(l)&0x7f | 0x80) + l >>= 7 + i++ + } + dAtA[i] = uint8(l) + i++ + i += copy(dAtA[i:], s) + } + } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) } @@ -442,6 +563,34 @@ func (m *Event) Size() (n int) { l = github_com_gogo_protobuf_types.SizeOfStdTime(*m.SeenAt) n += 2 + l + sovEvent(uint64(l)) } + if len(m.Metadata) > 0 { + for _, e := range m.Metadata { + l = e.Size() + n += 2 + l + sovEvent(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *MetadataKeyValue) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Key) + if l > 0 { + n += 1 + l + sovEvent(uint64(l)) + } + if len(m.Values) > 0 { + for _, s := range m.Values { + l = len(s) + n += 1 + l + sovEvent(uint64(l)) + } + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -905,6 +1054,146 @@ func (m *Event) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 99: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Metadata", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvent + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthEvent + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Metadata = append(m.Metadata, &MetadataKeyValue{}) + if err := m.Metadata[len(m.Metadata)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipEvent(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthEvent + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *MetadataKeyValue) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvent + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: MetadataKeyValue: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: MetadataKeyValue: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvent + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvent + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Key = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Values", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvent + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvent + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Values = append(m.Values, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipEvent(dAtA[iNdEx:]) @@ -1032,46 +1321,51 @@ var ( ErrIntOverflowEvent = fmt.Errorf("proto: integer overflow") ) -func init() { proto.RegisterFile("api/p2p/event.proto", fileDescriptor_event_349323b62019054f) } +func init() { proto.RegisterFile("api/p2p/event.proto", fileDescriptor_event_ce557309c9b43243) } -var fileDescriptor_event_349323b62019054f = []byte{ - // 607 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0x4d, 0x6f, 0xd3, 0x30, - 0x18, 0x9e, 0x4b, 0x59, 0x13, 0x6f, 0xeb, 0x22, 0x53, 0x8d, 0xae, 0x87, 0xa6, 0x2a, 0x1c, 0xca, - 0x25, 0x41, 0xe5, 0xc2, 0xc7, 0x61, 0x4a, 0x57, 0x04, 0x11, 0x12, 0xa0, 0xf0, 0x71, 0xe0, 0x52, - 0xa5, 0xb1, 0xc9, 0xac, 0xae, 0xb6, 0x71, 0xdd, 0xa1, 0xfd, 0x0b, 0x8e, 0xfc, 0xa4, 0x1d, 0xf9, - 0x05, 0x61, 0x0a, 0x17, 0x8e, 0x13, 0xbf, 0x00, 0xd9, 0x49, 0xba, 0x72, 0x40, 0x94, 0x53, 0x93, - 0xe7, 0x7d, 0x9f, 0xc7, 0x8f, 0xdf, 0xf7, 0x69, 0xe0, 0xad, 0x58, 0x50, 0x5f, 0x0c, 0x85, 0x4f, - 0xce, 0x08, 0x53, 0x9e, 0x90, 0x5c, 0x71, 0x64, 0x4f, 0x89, 0x54, 0xe7, 0x9e, 0x18, 0x8a, 0x4e, - 0xdf, 0xd4, 0x35, 0x3a, 0x5d, 0x7e, 0xf4, 0x53, 0x19, 0x8b, 0x93, 0x4f, 0xa7, 0xd5, 0x6f, 0xd1, - 0xde, 0x41, 0x95, 0xc6, 0x8c, 0x32, 0x5c, 0x62, 0xad, 0x94, 0xa7, 0xdc, 0x3c, 0xfa, 0xfa, 0xa9, - 0x44, 0xdd, 0x94, 0xf3, 0xf4, 0x94, 0x5c, 0x0b, 0x2a, 0x3a, 0x27, 0x0b, 0x15, 0xcf, 0x45, 0xd1, - 0xd0, 0xbf, 0x68, 0xc0, 0x9b, 0x4f, 0xb5, 0x13, 0x74, 0x1f, 0xd6, 0x28, 0x6e, 0x83, 0x1e, 0x18, - 0xd8, 0xa3, 0xde, 0xe5, 0xd5, 0x21, 0xc8, 0x33, 0xb7, 0x16, 0x8e, 0x7f, 0x65, 0x2e, 0x4a, 0xb9, - 0x9c, 0x3f, 0xee, 0x0b, 0x49, 0xe7, 0xb1, 0x3c, 0x9f, 0xcc, 0xc8, 0x79, 0x3f, 0xaa, 0x51, 0x8c, - 0x8e, 0xa0, 0xbd, 0x20, 0x0c, 0x13, 0x39, 0xa1, 0xb8, 0x5d, 0x33, 0xc4, 0x7e, 0x9e, 0xb9, 0xd6, - 0x1b, 0x03, 0xfe, 0x95, 0x6a, 0x15, 0xa4, 0x10, 0xa3, 0x63, 0x08, 0x13, 0x49, 0x62, 0x45, 0xf0, - 0x24, 0x56, 0xed, 0x1b, 0x3d, 0x30, 0xd8, 0x19, 0x76, 0xbc, 0xc2, 0xb2, 0x57, 0x59, 0xf6, 0xde, - 0x56, 0x96, 0x47, 0xd6, 0x45, 0xe6, 0x6e, 0x7d, 0xf9, 0xee, 0x82, 0xc8, 0x2e, 0x79, 0x81, 0xd2, - 0x22, 0x4b, 0x81, 0x2b, 0x91, 0xfa, 0xff, 0x88, 0x94, 0xbc, 0x40, 0xa1, 0x47, 0xb0, 0xb1, 0x20, - 0x4c, 0x69, 0x85, 0xed, 0x7f, 0x2a, 0xd4, 0x0d, 0x7b, 0x5b, 0x13, 0x02, 0x85, 0x02, 0xb8, 0x23, - 0x49, 0x42, 0xe8, 0x59, 0x61, 0xa0, 0xb1, 0x21, 0x1d, 0x56, 0xa4, 0x40, 0xa1, 0x27, 0xd0, 0x8a, - 0x93, 0x59, 0xc1, 0xb7, 0x36, 0xe4, 0x37, 0x0c, 0x23, 0x50, 0xe8, 0x21, 0xb4, 0x31, 0x95, 0x24, - 0x51, 0x94, 0xb3, 0xb6, 0xdd, 0x03, 0x83, 0xe6, 0xb0, 0xe3, 0xad, 0xf2, 0xe4, 0x99, 0xe5, 0x7a, - 0xe3, 0xaa, 0x23, 0xba, 0x6e, 0x46, 0x23, 0x88, 0xca, 0xfd, 0xc5, 0x82, 0x4e, 0xce, 0x88, 0x5c, - 0x68, 0x09, 0xd8, 0x03, 0x83, 0xbd, 0x51, 0x2b, 0xcf, 0x5c, 0xa7, 0x58, 0x64, 0xf0, 0x3a, 0x7c, - 0x5f, 0xd4, 0x22, 0xa7, 0xe8, 0x0f, 0x04, 0x2d, 0x11, 0xf4, 0x1c, 0xb6, 0xca, 0x8b, 0xfc, 0xa9, - 0xb2, 0x63, 0x54, 0x0e, 0xf2, 0xcc, 0x45, 0x51, 0x59, 0x5f, 0xd3, 0x41, 0x15, 0x67, 0x4d, 0xc9, - 0x5f, 0xcd, 0xd1, 0xe4, 0x69, 0xd7, 0xe4, 0xa9, 0x99, 0x67, 0x2e, 0xac, 0x04, 0xc2, 0xf1, 0x6a, - 0x6a, 0x3a, 0x3d, 0x77, 0x60, 0x5d, 0xe7, 0xbf, 0xbd, 0x67, 0xee, 0xbc, 0xbf, 0x76, 0xe7, 0x17, - 0x94, 0xe1, 0xc8, 0x14, 0xd1, 0x5d, 0x08, 0x63, 0xa5, 0x24, 0x9d, 0x2e, 0x15, 0x59, 0xb4, 0x9b, - 0x3d, 0x30, 0xd8, 0x1d, 0xd5, 0x7f, 0x5e, 0x1d, 0x82, 0x68, 0x0d, 0x47, 0x47, 0x70, 0x3f, 0xe1, - 0x4c, 0x7b, 0x8f, 0xf5, 0x64, 0xf4, 0xf9, 0xfb, 0xe6, 0xfc, 0x83, 0xf2, 0x8f, 0xd0, 0x3c, 0x5e, - 0x2b, 0x87, 0xe3, 0xa8, 0xb9, 0xde, 0x1e, 0xe2, 0x22, 0x3f, 0x84, 0xe9, 0x05, 0x3a, 0x9b, 0xe7, - 0x87, 0xb0, 0x40, 0xf5, 0x9f, 0x41, 0x7b, 0xb5, 0x1d, 0xd4, 0x82, 0xce, 0x3b, 0x36, 0x63, 0xfc, - 0x33, 0x5b, 0x61, 0xce, 0x16, 0xda, 0x85, 0x56, 0xc8, 0x12, 0x3e, 0xa7, 0x2c, 0x75, 0x80, 0x7e, - 0x7b, 0xb5, 0x54, 0x29, 0xd7, 0x6f, 0x35, 0x64, 0xc1, 0xfa, 0x4b, 0x8e, 0x89, 0x93, 0x8c, 0xee, - 0x5d, 0xe4, 0x5d, 0xf0, 0x2d, 0xef, 0x82, 0xcb, 0xbc, 0x0b, 0xbe, 0xfe, 0xe8, 0x6e, 0x7d, 0xb8, - 0x5d, 0x8c, 0x44, 0x91, 0xe4, 0xc4, 0x4f, 0xb8, 0x24, 0x7e, 0xf9, 0xdd, 0x98, 0x6e, 0x1b, 0x57, - 0x0f, 0x7e, 0x07, 0x00, 0x00, 0xff, 0xff, 0xec, 0xac, 0x1d, 0x5a, 0x8d, 0x04, 0x00, 0x00, +var fileDescriptor_event_ce557309c9b43243 = []byte{ + // 683 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x94, 0xcd, 0x6e, 0xd3, 0x4a, + 0x14, 0xc7, 0xeb, 0x24, 0x37, 0xb5, 0x27, 0x69, 0x6a, 0xcd, 0x8d, 0x7a, 0xdd, 0x5c, 0x29, 0xb6, + 0x7c, 0xef, 0x22, 0x2c, 0x70, 0x50, 0xd8, 0xf0, 0x25, 0x55, 0x4e, 0x53, 0x41, 0x54, 0xf1, 0x21, + 0x03, 0x5d, 0xb0, 0x89, 0x26, 0xf6, 0xe0, 0x8e, 0xd2, 0xcc, 0x18, 0x67, 0x12, 0x94, 0xa7, 0x80, + 0x25, 0x8f, 0xd4, 0x25, 0x4f, 0x60, 0x2a, 0xb3, 0x61, 0x59, 0xf5, 0x09, 0xd0, 0x8c, 0xed, 0xd4, + 0x20, 0x21, 0xca, 0xca, 0x33, 0xe7, 0x9c, 0xdf, 0x7f, 0xce, 0x99, 0x73, 0x3c, 0xe0, 0x6f, 0x14, + 0x91, 0x7e, 0x34, 0x88, 0xfa, 0x78, 0x85, 0x29, 0x77, 0xa2, 0x98, 0x71, 0x06, 0xb5, 0x29, 0x8e, + 0xf9, 0xda, 0x89, 0x06, 0x51, 0xc7, 0x96, 0x7e, 0x61, 0x9d, 0x2e, 0xdf, 0xf6, 0xc3, 0x18, 0x45, + 0xa7, 0xef, 0xce, 0x8a, 0x6f, 0x16, 0xde, 0x81, 0x85, 0xc6, 0x8c, 0xd0, 0x20, 0xb7, 0xb5, 0x43, + 0x16, 0x32, 0xb9, 0xec, 0x8b, 0x55, 0x6e, 0x35, 0x43, 0xc6, 0xc2, 0x33, 0x7c, 0x2d, 0xc8, 0xc9, + 0x1c, 0x2f, 0x38, 0x9a, 0x47, 0x59, 0x80, 0xfd, 0x41, 0x05, 0x7f, 0x1d, 0x89, 0x4c, 0xe0, 0x1d, + 0x50, 0x21, 0x81, 0xa1, 0x58, 0x4a, 0x4f, 0x1b, 0x5a, 0x17, 0x97, 0xfb, 0x4a, 0x9a, 0x98, 0x95, + 0xf1, 0xe8, 0x2a, 0x31, 0x61, 0xc8, 0xe2, 0xf9, 0x03, 0x3b, 0x8a, 0xc9, 0x1c, 0xc5, 0xeb, 0xc9, + 0x0c, 0xaf, 0x6d, 0xaf, 0x42, 0x02, 0x78, 0x00, 0xb4, 0x05, 0xa6, 0x01, 0x8e, 0x27, 0x24, 0x30, + 0x2a, 0x12, 0xb4, 0xd3, 0xc4, 0x54, 0x5f, 0x4a, 0xe3, 0x2f, 0x51, 0x35, 0x83, 0xc6, 0x01, 0x3c, + 0x04, 0xc0, 0x8f, 0x31, 0xe2, 0x38, 0x98, 0x20, 0x6e, 0x54, 0x2d, 0xa5, 0xd7, 0x18, 0x74, 0x9c, + 0x2c, 0x65, 0xa7, 0x48, 0xd9, 0x79, 0x55, 0xa4, 0x3c, 0x54, 0xcf, 0x13, 0x73, 0xeb, 0xe3, 0x17, + 0x53, 0xf1, 0xb4, 0x9c, 0x73, 0xb9, 0x10, 0x59, 0x46, 0x41, 0x21, 0x52, 0xfb, 0x13, 0x91, 0x9c, + 0x73, 0x39, 0xbc, 0x0f, 0xb6, 0x17, 0x98, 0x72, 0xa1, 0x50, 0xff, 0xad, 0x42, 0x4d, 0xd2, 0x75, + 0x01, 0xb8, 0x1c, 0xba, 0xa0, 0x11, 0x63, 0x1f, 0x93, 0x55, 0x96, 0xc0, 0xf6, 0x0d, 0x71, 0x50, + 0x40, 0x2e, 0x87, 0x0f, 0x81, 0x8a, 0xfc, 0x59, 0xc6, 0xab, 0x37, 0xe4, 0xb7, 0x25, 0xe1, 0x72, + 0x78, 0x0f, 0x68, 0x01, 0x89, 0xb1, 0xcf, 0x09, 0xa3, 0x86, 0x66, 0x29, 0xbd, 0xd6, 0xa0, 0xe3, + 0x6c, 0xe6, 0xc9, 0x91, 0xcd, 0x75, 0x46, 0x45, 0x84, 0x77, 0x1d, 0x0c, 0x87, 0x00, 0xe6, 0xfd, + 0x43, 0x11, 0x99, 0xac, 0x70, 0xbc, 0x10, 0x12, 0xc0, 0x52, 0x7a, 0x3b, 0xc3, 0x76, 0x9a, 0x98, + 0x7a, 0xd6, 0x48, 0xf7, 0xc5, 0xf8, 0x24, 0xf3, 0x79, 0x7a, 0x16, 0xef, 0x46, 0x24, 0xb7, 0xc0, + 0x27, 0xa0, 0x9d, 0x17, 0xf2, 0xa3, 0x4a, 0x43, 0xaa, 0xec, 0xa5, 0x89, 0x09, 0xbd, 0xdc, 0x5f, + 0xd2, 0x81, 0x05, 0x53, 0x52, 0xea, 0x6f, 0xee, 0x51, 0xce, 0x53, 0x53, 0xce, 0x53, 0x2b, 0x4d, + 0x4c, 0x50, 0x08, 0x8c, 0x47, 0x9b, 0x5b, 0x13, 0xd3, 0xf3, 0x1f, 0xa8, 0x89, 0xf9, 0x37, 0x76, + 0x64, 0xcd, 0xbb, 0xa5, 0x9a, 0x8f, 0x09, 0x0d, 0x3c, 0xe9, 0x84, 0xff, 0x03, 0x80, 0x38, 0x8f, + 0xc9, 0x74, 0xc9, 0xf1, 0xc2, 0x68, 0x59, 0x4a, 0xaf, 0x39, 0xac, 0x7d, 0xbb, 0xdc, 0x57, 0xbc, + 0x92, 0x1d, 0x1e, 0x80, 0x5d, 0x9f, 0x51, 0x91, 0x3b, 0x12, 0x37, 0x23, 0xce, 0xdf, 0x95, 0xe7, + 0xef, 0xe5, 0x3f, 0x42, 0xeb, 0xb0, 0xe4, 0x1e, 0x8f, 0xbc, 0x56, 0x39, 0x7c, 0x1c, 0x64, 0xf3, + 0x83, 0xa9, 0x68, 0xa0, 0x7e, 0xf3, 0xf9, 0xc1, 0xd4, 0xe5, 0xf0, 0x08, 0xa8, 0x73, 0xcc, 0x51, + 0x80, 0x38, 0x32, 0x7c, 0xab, 0xda, 0x6b, 0x0c, 0xfe, 0x2d, 0x95, 0xf2, 0x34, 0x77, 0x1d, 0xe3, + 0xf5, 0x09, 0x3a, 0x5b, 0xe2, 0x61, 0xf3, 0x2a, 0x31, 0xd5, 0xec, 0xaf, 0xba, 0x6d, 0x7b, 0x1b, + 0xd4, 0x7e, 0x0c, 0xb4, 0x4d, 0x93, 0x61, 0x1b, 0xe8, 0xaf, 0xe9, 0x8c, 0xb2, 0xf7, 0x74, 0x63, + 0xd3, 0xb7, 0x60, 0x13, 0xa8, 0x63, 0xea, 0xb3, 0x39, 0xa1, 0xa1, 0xae, 0x88, 0xdd, 0xf3, 0x25, + 0x0f, 0x99, 0xd8, 0x55, 0xa0, 0x0a, 0x6a, 0xcf, 0x58, 0x80, 0x75, 0xdf, 0x7e, 0x04, 0xf4, 0x9f, + 0x0f, 0x85, 0x3a, 0xa8, 0xce, 0xf0, 0x3a, 0x7b, 0x1c, 0x3c, 0xb1, 0x84, 0x7b, 0xa0, 0xbe, 0x12, + 0xae, 0x85, 0x51, 0xb1, 0xaa, 0x3d, 0xcd, 0xcb, 0x77, 0xc3, 0x5b, 0xe7, 0x69, 0x57, 0xf9, 0x9c, + 0x76, 0x95, 0x8b, 0xb4, 0xab, 0x7c, 0xfa, 0xda, 0xdd, 0x7a, 0xf3, 0x4f, 0x56, 0x0c, 0xc7, 0xfe, + 0x69, 0xdf, 0x67, 0x31, 0xee, 0xe7, 0x8f, 0xd7, 0xb4, 0x2e, 0xaf, 0xe6, 0xee, 0xf7, 0x00, 0x00, + 0x00, 0xff, 0xff, 0xbc, 0x54, 0xf7, 0xde, 0x12, 0x05, 0x00, 0x00, } diff --git a/core/api/p2p/event.proto b/core/api/p2p/event.proto index 0b4ece2244..482d7117e5 100644 --- a/core/api/p2p/event.proto +++ b/core/api/p2p/event.proto @@ -60,6 +60,9 @@ message Event { // SeenAt is set to the date when the event has been displayed on the user's screen google.protobuf.Timestamp seen_at = 16 [(gogoproto.stdtime) = true]; + // Additional metadata + repeated MetadataKeyValue metadata = 99 [(gogoproto.moretags) = "gorm:\"-\""]; + // // enums // @@ -76,3 +79,8 @@ message Event { Node = 99; } } + +message MetadataKeyValue { + string key = 1; + repeated string values = 2; +} diff --git a/core/cmd/berty/client.go b/core/cmd/berty/client.go index 4752b5aef8..d5d05c1920 100644 --- a/core/cmd/berty/client.go +++ b/core/cmd/berty/client.go @@ -251,7 +251,7 @@ func getClient(opts *clientOptions) (*client.Client, context.Context, error) { } if jaegerAddr != "" { - tracer, closer, err := jaeger.InitTracer(jaegerAddr, jaegerName+":client") + tracer, closer, err := jaeger.InitTracer(jaegerAddr, jaegerName+"client") if err != nil { return nil, nil, err } diff --git a/core/go.mod b/core/go.mod index 8807765c29..494a7672ec 100644 --- a/core/go.mod +++ b/core/go.mod @@ -68,7 +68,7 @@ require ( github.com/libp2p/go-buffer-pool v0.1.1 // indirect github.com/libp2p/go-conn-security v0.1.13 // indirect github.com/libp2p/go-conn-security-multistream v0.1.13 // indirect - github.com/libp2p/go-floodsub v0.10.2 // indirect + github.com/libp2p/go-floodsub v0.10.2 github.com/libp2p/go-flow-metrics v0.2.0 // indirect github.com/libp2p/go-libp2p v6.0.19+incompatible github.com/libp2p/go-libp2p-blankhost v0.3.13 // indirect diff --git a/core/manager/account/p2p.go b/core/manager/account/p2p.go index 2a15c84b9f..999f11ea91 100644 --- a/core/manager/account/p2p.go +++ b/core/manager/account/p2p.go @@ -8,6 +8,7 @@ import ( "berty.tech/core/network" "berty.tech/core/network/p2p" "berty.tech/core/pkg/tracing" + grpc_ot "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/jinzhu/gorm" p2pcrypto "github.com/libp2p/go-libp2p-crypto" opentracing "github.com/opentracing/opentracing-go" @@ -74,6 +75,7 @@ func createP2PNetwork(ctx context.Context, opts *P2PNetworkOptions, db *gorm.DB) // @TODO: Allow static identity loaded from a file (useful for relay // server for creating static endpoint for bootstrap) // p2p.WithIdentity(), + p2p.WithJaeger(grpc_ot.WithTracer(span.Tracer())), p2p.WithNATPortMap(), // @TODO: Is this a pb on mobile? p2p.WithListenAddrStrings(opts.Bind...), p2p.WithBootstrap(opts.Bootstrap...), diff --git a/core/network/p2p/driver.go b/core/network/p2p/driver.go index e167415e94..67637b893d 100644 --- a/core/network/p2p/driver.go +++ b/core/network/p2p/driver.go @@ -45,7 +45,7 @@ var ProtocolID = protocol.ID(p2pgrpc.GetGrpcID(ID)) type driverConfig struct { libp2pOpt []libp2p.Option - jaeger *grpc_ot.Option + jaeger []grpc_ot.Option bootstrap []string bootstrapSync bool @@ -168,10 +168,10 @@ func newDriver(ctx context.Context, cfg driverConfig) (*Driver, error) { ) if cfg.jaeger != nil { - serverStreamOpts = append(serverStreamOpts, grpc_ot.StreamServerInterceptor(*cfg.jaeger)) - serverUnaryOpts = append(serverUnaryOpts, grpc_ot.UnaryServerInterceptor(*cfg.jaeger)) - clientStreamOpts = append(clientStreamOpts, grpc_ot.StreamClientInterceptor(*cfg.jaeger)) - clientUnaryOpts = append(clientUnaryOpts, grpc_ot.UnaryClientInterceptor(*cfg.jaeger)) + serverStreamOpts = append(serverStreamOpts, grpc_ot.StreamServerInterceptor(cfg.jaeger...)) + serverUnaryOpts = append(serverUnaryOpts, grpc_ot.UnaryServerInterceptor(cfg.jaeger...)) + clientStreamOpts = append(clientStreamOpts, grpc_ot.StreamClientInterceptor(cfg.jaeger...)) + clientUnaryOpts = append(clientUnaryOpts, grpc_ot.UnaryClientInterceptor(cfg.jaeger...)) } p2pInterceptorsServer := []grpc.ServerOption{ @@ -408,10 +408,19 @@ func (d *Driver) Find(ctx context.Context, pid peer.ID) (pstore.PeerInfo, error) } func (d *Driver) Emit(ctx context.Context, e *p2p.Envelope) error { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, e) + defer span.Finish() + return d.EmitTo(ctx, e.GetChannelID(), e) } func (d *Driver) EmitTo(ctx context.Context, channel string, e *p2p.Envelope) error { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, channel, e) + + defer span.Finish() + ss, err := d.FindSubscribers(ctx, channel) if err != nil { return err @@ -421,30 +430,56 @@ func (d *Driver) EmitTo(ctx context.Context, channel string, e *p2p.Envelope) er return fmt.Errorf("no subscribers found") } - for _, s := range ss { - go func(pi pstore.PeerInfo) { + success := make([]chan bool, len(ss)) + for i, s := range ss { + success[i] = make(chan bool, 1) + go func(pi pstore.PeerInfo, index int, done chan bool) { + var gospan opentracing.Span + var goctx context.Context + gospan, goctx = tracing.EnterFunc(ctx, index) + defer gospan.Finish() + peerID := pi.ID.Pretty() if pi.ID == d.host.ID() { + done <- false return } - if err := d.Connect(ctx, pi); err != nil { + if err := d.Connect(goctx, pi); err != nil { logger().Warn("failed to connect", zap.String("id", peerID), zap.Error(err)) + done <- false + return } - c, err := d.ccmanager.GetConn(ctx, peerID) + c, err := d.ccmanager.GetConn(goctx, peerID) if err != nil { logger().Warn("failed to dial", zap.String("id", peerID), zap.Error(err)) + done <- false + return } sc := p2p.NewServiceClient(c) - _, err = sc.HandleEnvelope(ctx, e) + _, err = sc.HandleEnvelope(goctx, e) if err != nil { logger().Error("failed to send envelope", zap.String("envelope", fmt.Sprintf("%+v", e)), zap.String("error", err.Error())) + done <- false + return } - }(s) + + done <- true + }(s, i, success[i]) + } + + var ok bool + for _, cc := range success { + ok = ok || <-cc + } + + if !ok { + return fmt.Errorf("failed to emit envelope") } + return nil } diff --git a/core/network/p2p/options.go b/core/network/p2p/options.go index d8ab99d67e..8a450c4bcb 100644 --- a/core/network/p2p/options.go +++ b/core/network/p2p/options.go @@ -89,7 +89,7 @@ func WithBootstrap(addrs ...string) Option { } // WithJaeger configure boostrap connection -func WithJaeger(jaeger *grpc_ot.Option) Option { +func WithJaeger(jaeger ...grpc_ot.Option) Option { return func(dc *driverConfig) error { dc.jaeger = jaeger return nil diff --git a/core/network/p2p/p2putil/manager.go b/core/network/p2p/p2putil/manager.go index 92fbfa00c7..463e98ecbd 100644 --- a/core/network/p2p/p2putil/manager.go +++ b/core/network/p2p/p2putil/manager.go @@ -6,9 +6,11 @@ import ( "net" "sync" + "berty.tech/core/pkg/tracing" host "github.com/libp2p/go-libp2p-host" peer "github.com/libp2p/go-libp2p-peer" protocol "github.com/libp2p/go-libp2p-protocol" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "go.uber.org/zap" "google.golang.org/grpc" @@ -44,6 +46,10 @@ func newClient(m *Manager, target string) *client { } func (c *client) GetClient(ctx context.Context) (*grpc.ClientConn, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx) + defer span.Finish() + c.mu.Lock() defer c.mu.Unlock() @@ -71,6 +77,10 @@ func (c *client) Close() (err error) { } func (m *Manager) GetConn(ctx context.Context, target string) (*grpc.ClientConn, error) { + var span opentracing.Span + span, ctx = tracing.EnterFunc(ctx, target) + defer span.Finish() + m.mu.Lock() var c *client diff --git a/core/node/event.go b/core/node/event.go index f5502b14b1..cf0a09bb3f 100644 --- a/core/node/event.go +++ b/core/node/event.go @@ -36,6 +36,7 @@ func (n *Node) HandleEvent(ctx context.Context, input *p2p.Event) (*node.Void, e func (n *Node) handleEvent(ctx context.Context, input *p2p.Event) error { var span opentracing.Span span, ctx = tracing.EnterFunc(ctx, input) + defer span.Finish() defer n.asyncWaitGroup(ctx)() n.handleMutex(ctx)() diff --git a/core/node/mainloop.go b/core/node/mainloop.go index f1d706177b..1790e9bd9f 100644 --- a/core/node/mainloop.go +++ b/core/node/mainloop.go @@ -72,7 +72,7 @@ func (n *Node) cron(ctx context.Context) { var span opentracing.Span span, ctx = tracing.EnterFunc(ctx) defer span.Finish() - for true { + for { before := time.Now().Add(-time.Second * 60 * 10) if _, err := n.EventsRetry(ctx, before); err != nil { n.LogBackgroundError(ctx, err) @@ -82,6 +82,78 @@ func (n *Node) cron(ctx context.Context) { } } +func (n *Node) handleClientEvent(event *p2p.Event) { + logger().Debug("client event", zap.Stringer("event", event)) + + // @FIXME: Don't create a span here for now + // span, _ := event.CreateSpan(context.Background()) + // defer span.Finish() + + n.clientEventsMutex.Lock() + for _, sub := range n.clientEventsSubscribers { + if sub.filter(event) { + sub.queue <- event + } + } + n.clientEventsMutex.Unlock() +} + +func (n *Node) handleOutgoingEvent(event *p2p.Event) { + logger().Debug("outgoing event", zap.Stringer("event", event)) + + span, ctx := event.CreateSpan(context.Background()) + + envelope := p2p.Envelope{} + eventBytes, err := proto.Marshal(event) + if err != nil { + n.LogBackgroundError(ctx, errors.Wrap(err, "failed to marshal outgoing event")) + span.Finish() + return + } + + event.SenderID = n.b64pubkey + + switch { + case event.ReceiverID != "": // ContactEvent + envelope.Source = n.aliasEnvelopeForContact(ctx, &envelope, event) + envelope.ChannelID = event.ReceiverID + envelope.EncryptedEvent = eventBytes // FIXME: encrypt for receiver + + case event.ConversationID != "": //ConversationEvent + envelope.Source = n.aliasEnvelopeForConversation(ctx, &envelope, event) + envelope.ChannelID = event.ConversationID + envelope.EncryptedEvent = eventBytes // FIXME: encrypt for conversation + + default: + n.LogBackgroundError(ctx, fmt.Errorf("unhandled event type")) + } + + if envelope.Signature, err = keypair.Sign(n.crypto, &envelope); err != nil { + n.LogBackgroundError(ctx, errors.Wrap(err, "failed to sign envelope")) + span.Finish() + return + } + + // Async subscribe to conversation + // wait for 1s to simulate a sync subscription, + // if too long, the task will be done in background + done := make(chan bool, 1) + go func() { + // FIXME: make something smarter, i.e., grouping events by contact or network driver + if err := n.networkDriver.Emit(ctx, &envelope); err != nil { + n.LogBackgroundError(ctx, errors.Wrap(err, "failed to emit envelope on network")) + } + done <- true + span.Finish() + }() + select { + case <-done: + case <-time.After(1 * time.Second): + } + // push the outgoing event on the client stream + n.clientEvents <- event +} + // Start is the node's mainloop func (n *Node) Start(ctx context.Context, withCron, withNodeEvents bool) error { var span opentracing.Span @@ -111,65 +183,10 @@ func (n *Node) Start(ctx context.Context, withCron, withNodeEvents bool) error { for { select { case event := <-n.outgoingEvents: - logger().Debug("outgoing event", zap.Stringer("event", event)) - envelope := p2p.Envelope{} - eventBytes, err := proto.Marshal(event) - if err != nil { - n.LogBackgroundError(ctx, errors.Wrap(err, "failed to marshal outgoing event")) - continue - } - - event.SenderID = n.b64pubkey - - switch { - case event.ReceiverID != "": // ContactEvent - envelope.Source = n.aliasEnvelopeForContact(ctx, &envelope, event) - envelope.ChannelID = event.ReceiverID - envelope.EncryptedEvent = eventBytes // FIXME: encrypt for receiver - - case event.ConversationID != "": //ConversationEvent - envelope.Source = n.aliasEnvelopeForConversation(ctx, &envelope, event) - envelope.ChannelID = event.ConversationID - envelope.EncryptedEvent = eventBytes // FIXME: encrypt for conversation - - default: - n.LogBackgroundError(ctx, fmt.Errorf("unhandled event type")) - } - - if envelope.Signature, err = keypair.Sign(n.crypto, &envelope); err != nil { - n.LogBackgroundError(ctx, errors.Wrap(err, "failed to sign envelope")) - continue - } - - // Async subscribe to conversation - // wait for 1s to simulate a sync subscription, - // if too long, the task will be done in background - done := make(chan bool, 1) - go func() { - // FIXME: make something smarter, i.e., grouping events by contact or network driver - if err := n.networkDriver.Emit(ctx, &envelope); err != nil { - n.LogBackgroundError(ctx, errors.Wrap(err, "failed to emit envelope on network")) - } - done <- true - }() - select { - case <-done: - case <-time.After(1 * time.Second): - } - - // push the outgoing event on the client stream - n.clientEvents <- event - + n.handleOutgoingEvent(event) // emit the outgoing event on the node event stream case event := <-n.clientEvents: - logger().Debug("client event", zap.Stringer("event", event)) - n.clientEventsMutex.Lock() - for _, sub := range n.clientEventsSubscribers { - if sub.filter(event) { - sub.queue <- event - } - } - n.clientEventsMutex.Unlock() + n.handleClientEvent(event) } } } diff --git a/core/node/p2pclient.go b/core/node/p2pclient.go index daf17d8ab4..b92fdadaa9 100644 --- a/core/node/p2pclient.go +++ b/core/node/p2pclient.go @@ -11,21 +11,22 @@ import ( ) func (n *Node) NewContactEvent(ctx context.Context, destination *entity.Contact, kind p2p.Kind) *p2p.Event { - span, _ := tracing.EnterFunc(ctx, destination, kind) + span, ctx := tracing.EnterFunc(ctx, destination, kind) defer span.Finish() - event := p2p.NewOutgoingEvent(n.b64pubkey, destination.ID, kind) + event := p2p.NewOutgoingEvent(ctx, n.b64pubkey, destination.ID, kind) event.ID = n.NewID() return event } func (n *Node) NewConversationEvent(ctx context.Context, destination *entity.Conversation, kind p2p.Kind) *p2p.Event { - span, _ := tracing.EnterFunc(ctx, destination, kind) + span, ctx := tracing.EnterFunc(ctx, destination, kind) defer span.Finish() - event := p2p.NewOutgoingEvent(n.b64pubkey, "", kind) + event := p2p.NewOutgoingEvent(ctx, n.b64pubkey, "", kind) event.ConversationID = destination.ID event.ID = n.NewID() + return event } @@ -42,6 +43,7 @@ func (n *Node) EnqueueOutgoingEvent(ctx context.Context, event *p2p.Event) error return errors.Wrap(err, "failed to write event to db") } n.outgoingEvents <- event + return nil } @@ -57,14 +59,15 @@ func (n *Node) contactShareMe(ctx context.Context, to *entity.Contact) error { if err := n.EnqueueOutgoingEvent(ctx, event); err != nil { return err } + return nil } func (n *Node) NewSenderAliasEvent(ctx context.Context, destination string, aliases []*entity.SenderAlias) (*p2p.Event, error) { - span, _ := tracing.EnterFunc(ctx, destination, aliases) + span, ctx := tracing.EnterFunc(ctx, destination, aliases) defer span.Finish() - event := p2p.NewOutgoingEvent(n.b64pubkey, destination, p2p.Kind_SenderAliasUpdate) + event := p2p.NewOutgoingEvent(ctx, n.b64pubkey, destination, p2p.Kind_SenderAliasUpdate) event.ID = n.NewID() if err := event.SetAttrs(&p2p.SenderAliasUpdateAttrs{Aliases: aliases}); err != nil { return nil, err diff --git a/core/pkg/tracing/tracing.go b/core/pkg/tracing/tracing.go index 4c4f7386d6..de942381fa 100644 --- a/core/pkg/tracing/tracing.go +++ b/core/pkg/tracing/tracing.go @@ -11,12 +11,17 @@ import ( "github.com/opentracing/opentracing-go/log" ) +func GetCallerName(prefix string, skip int) string { + function, _, _, _ := runtime.Caller(skip + 1) + funcName := runtime.FuncForPC(function).Name() + caller := strings.Replace(funcName, "berty.tech/core/", "./", 1) + return fmt.Sprintf("%s::%s()", prefix, caller) +} + func EnterFunc(ctx context.Context, args ...interface{}) (opentracing.Span, context.Context) { // FIXME: add a way to completely disable the following behavior - function, _, _, _ := runtime.Caller(1) - funcName := runtime.FuncForPC(function).Name() - topic := fmt.Sprintf("call::%s()", funcName) - topic = strings.Replace(topic, "call::berty.tech/core/", "call::./", 1) + topic := GetCallerName("call", 1) + if ctx == nil { ctx = context.Background() logger().Warn("context is not set") diff --git a/experiment/network/real/docker-compose.yml b/experiment/network/real/docker-compose.yml index 1945071b7d..547350245d 100644 --- a/experiment/network/real/docker-compose.yml +++ b/experiment/network/real/docker-compose.yml @@ -38,7 +38,11 @@ services: - "1338-1438:1337" depends_on: - tracer-service - command: daemon --log-level=debug --mdns=false --jaeger-address=tracer-service:6831 + entrypoint: + - /bin/sh + - -c + - | + USER=node_$${HOSTNAME} berty daemon --log-level=debug --mdns=false --jaeger-address=tracer-service:6831 networks: tools: