From 612bf29cb2fb987fefc36df806ed1df68f73f3e5 Mon Sep 17 00:00:00 2001 From: aeddi Date: Thu, 13 Jun 2019 15:01:23 +0200 Subject: [PATCH] feat: implement local record manager --- .../service/entity/local_record.pb.json | 23 ++ core/entity/local_record.pb.go | 351 ++++++++++++++++++ core/entity/local_record.proto | 9 + core/entity/local_record.validate.gen.go | 3 + core/network/localrecord.go | 142 +++++++ 5 files changed, 528 insertions(+) create mode 100644 client/react-native/app/bridge/service/entity/local_record.pb.json create mode 100644 core/entity/local_record.pb.go create mode 100644 core/entity/local_record.proto create mode 100644 core/entity/local_record.validate.gen.go create mode 100644 core/network/localrecord.go diff --git a/client/react-native/app/bridge/service/entity/local_record.pb.json b/client/react-native/app/bridge/service/entity/local_record.pb.json new file mode 100644 index 0000000000..4fd7f03db0 --- /dev/null +++ b/client/react-native/app/bridge/service/entity/local_record.pb.json @@ -0,0 +1,23 @@ +{ + "nested": { + "berty": { + "nested": { + "entity": { + "options": { + "go_package": "berty.tech/core/entity" + }, + "nested": { + "LocalRecord": { + "fields": { + "contactId": { + "type": "string", + "id": 1 + } + } + } + } + } + } + } + } +} \ No newline at end of file diff --git a/core/entity/local_record.pb.go b/core/entity/local_record.pb.go new file mode 100644 index 0000000000..5961bd3065 --- /dev/null +++ b/core/entity/local_record.pb.go @@ -0,0 +1,351 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: entity/local_record.proto + +package entity + +import ( + fmt "fmt" + io "io" + math "math" + + proto "github.com/golang/protobuf/proto" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type LocalRecord struct { + ContactId string `protobuf:"bytes,1,opt,name=contact_id,json=contactId,proto3" json:"contact_id,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *LocalRecord) Reset() { *m = LocalRecord{} } +func (m *LocalRecord) String() string { return proto.CompactTextString(m) } +func (*LocalRecord) ProtoMessage() {} +func (*LocalRecord) Descriptor() ([]byte, []int) { + return fileDescriptor_b561157065c2b28c, []int{0} +} +func (m *LocalRecord) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *LocalRecord) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_LocalRecord.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *LocalRecord) XXX_Merge(src proto.Message) { + xxx_messageInfo_LocalRecord.Merge(m, src) +} +func (m *LocalRecord) XXX_Size() int { + return m.Size() +} +func (m *LocalRecord) XXX_DiscardUnknown() { + xxx_messageInfo_LocalRecord.DiscardUnknown(m) +} + +var xxx_messageInfo_LocalRecord proto.InternalMessageInfo + +func (m *LocalRecord) GetContactId() string { + if m != nil { + return m.ContactId + } + return "" +} + +func init() { + proto.RegisterType((*LocalRecord)(nil), "berty.entity.LocalRecord") +} + +func init() { proto.RegisterFile("entity/local_record.proto", fileDescriptor_b561157065c2b28c) } + +var fileDescriptor_b561157065c2b28c = []byte{ + // 137 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x4c, 0xcd, 0x2b, 0xc9, + 0x2c, 0xa9, 0xd4, 0xcf, 0xc9, 0x4f, 0x4e, 0xcc, 0x89, 0x2f, 0x4a, 0x4d, 0xce, 0x2f, 0x4a, 0xd1, + 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x49, 0x4a, 0x2d, 0x2a, 0xa9, 0xd4, 0x83, 0x28, 0x50, + 0xd2, 0xe1, 0xe2, 0xf6, 0x01, 0xa9, 0x09, 0x02, 0x2b, 0x11, 0x92, 0xe5, 0xe2, 0x4a, 0xce, 0xcf, + 0x2b, 0x49, 0x4c, 0x2e, 0x89, 0xcf, 0x4c, 0x91, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0xe2, 0x84, + 0x8a, 0x78, 0xa6, 0x38, 0x69, 0x9c, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, 0x47, + 0x72, 0x8c, 0x33, 0x1e, 0xcb, 0x31, 0x44, 0x89, 0x41, 0x4c, 0x2b, 0x49, 0x4d, 0xce, 0xd0, 0x4f, + 0xce, 0x2f, 0x4a, 0xd5, 0x87, 0x98, 0x9b, 0xc4, 0x06, 0xb6, 0xcc, 0x18, 0x10, 0x00, 0x00, 0xff, + 0xff, 0x10, 0xc8, 0x42, 0xb4, 0x89, 0x00, 0x00, 0x00, +} + +func (m *LocalRecord) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *LocalRecord) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.ContactId) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintLocalRecord(dAtA, i, uint64(len(m.ContactId))) + i += copy(dAtA[i:], m.ContactId) + } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } + return i, nil +} + +func encodeVarintLocalRecord(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *LocalRecord) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.ContactId) + if l > 0 { + n += 1 + l + sovLocalRecord(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovLocalRecord(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozLocalRecord(x uint64) (n int) { + return sovLocalRecord(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *LocalRecord) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLocalRecord + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LocalRecord: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LocalRecord: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ContactId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLocalRecord + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthLocalRecord + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthLocalRecord + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ContactId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipLocalRecord(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLocalRecord + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLocalRecord + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipLocalRecord(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowLocalRecord + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowLocalRecord + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowLocalRecord + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthLocalRecord + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthLocalRecord + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowLocalRecord + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipLocalRecord(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthLocalRecord + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthLocalRecord = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowLocalRecord = fmt.Errorf("proto: integer overflow") +) diff --git a/core/entity/local_record.proto b/core/entity/local_record.proto new file mode 100644 index 0000000000..ed4723a1c8 --- /dev/null +++ b/core/entity/local_record.proto @@ -0,0 +1,9 @@ +syntax = "proto3"; + +package berty.entity; + +option go_package = "berty.tech/core/entity"; + +message LocalRecord { + string contact_id = 1; +} diff --git a/core/entity/local_record.validate.gen.go b/core/entity/local_record.validate.gen.go new file mode 100644 index 0000000000..1eb628fc11 --- /dev/null +++ b/core/entity/local_record.validate.gen.go @@ -0,0 +1,3 @@ +// this file was generated by protoc-gen-gotemplate + +package entity diff --git a/core/network/localrecord.go b/core/network/localrecord.go new file mode 100644 index 0000000000..4ac3385977 --- /dev/null +++ b/core/network/localrecord.go @@ -0,0 +1,142 @@ +package network + +import ( + "context" + "fmt" + "io" + onet "net" + "regexp" + "time" + + "berty.tech/core/entity" + ggio "github.com/gogo/protobuf/io" + inet "github.com/libp2p/go-libp2p-net" + peer "github.com/libp2p/go-libp2p-peer" + protocol "github.com/libp2p/go-libp2p-protocol" + filter "github.com/libp2p/go-maddr-filter" + ma "github.com/multiformats/go-multiaddr" + "go.uber.org/zap" +) + +var ( + recProtocolID = protocol.ID("berty/p2p/localrecord") + privateIPCIDRs = []string{ + "127.0.0.0/8", // IPv4 loopback + "10.0.0.0/8", // RFC1918 + "172.16.0.0/12", // RFC1918 + "192.168.0.0/16", // RFC1918 + "::1/128", // IPv6 loopback + "fe80::/10", // IPv6 link-local + "fc00::/7", // IPv6 unique local addr + } + privateIPFilters = filter.NewFilters() +) + +func init() { + // Init private IP filters + privateIPFilters.DefaultAction = filter.ActionDeny + + for _, privateIPCIDR := range privateIPCIDRs { + _, ipnet, err := onet.ParseCIDR(privateIPCIDR) + if err != nil { + logger().Fatal("parsing CIDR failed", zap.Error(err), zap.String("CIDR", privateIPCIDR)) + } + + privateIPFilters.AddFilter(*ipnet, filter.ActionAccept) + } +} + +type LocalRecordManager struct { + net *Network +} + +func NewLocalRecordManager(net *Network) *LocalRecordManager { + lrm := &LocalRecordManager{ + net: net, + } + + net.host.Network().Notify(lrm) + net.host.SetStreamHandler(recProtocolID, lrm.handleLocalRecord) + + return lrm +} + +func (lrm *LocalRecordManager) Connected(net inet.Network, c inet.Conn) { + isPrivateIP := !privateIPFilters.AddrBlocked(c.RemoteMultiaddr()) + isBLEMultiaddr, err := regexp.Match(`^/ble/.+`, c.RemoteMultiaddr().Bytes()) + if err != nil { + logger().Fatal("parsing regex failed", zap.Error(err)) + } + + if isBLEMultiaddr || isPrivateIP { + if err := lrm.sendLocalRecord(context.Background(), c.RemotePeer()); err != nil { + logger().Error("sending local record failed", zap.Error(err)) + } else { + logger().Debug("sending local record succeeded", + zap.String("peerID", c.RemotePeer().Pretty()), + ) + } + } +} + +// Unused notifees +func (lrm *LocalRecordManager) Disconnected(net inet.Network, c inet.Conn) {} +func (lrm *LocalRecordManager) Listen(n inet.Network, addr ma.Multiaddr) {} +func (lrm *LocalRecordManager) ListenClose(n inet.Network, addr ma.Multiaddr) {} +func (lrm *LocalRecordManager) OpenedStream(n inet.Network, s inet.Stream) {} +func (lrm *LocalRecordManager) ClosedStream(n inet.Network, s inet.Stream) {} + +func (lrm *LocalRecordManager) handleLocalRecord(s inet.Stream) { + logger().Debug("receiving local record", + zap.String("peerID", s.Conn().RemotePeer().Pretty()), + ) + + pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax) + for { + lr := &entity.LocalRecord{} + switch err := pbr.ReadMsg(lr); err { + case io.EOF: + s.Close() + return + case nil: // do noting, everything fine + default: + s.Reset() + logger().Error("invalid local record", zap.Error(err)) + return + } + + logger().Debug("saving local record in cache", + zap.String("peerID", s.Conn().RemotePeer().Pretty()), + zap.String("contactID", lr.ContactId), + ) + + peerInfo := lrm.net.host.Peerstore().PeerInfo(s.Conn().RemotePeer()) + lrm.net.cache.UpdateCache(lr.ContactId, peerInfo) + } +} + +func (lrm *LocalRecordManager) sendLocalRecord(ctx context.Context, pID peer.ID) error { + tctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + logger().Debug("sending local record", zap.String("peerID", pID.Pretty())) + + if pID == lrm.net.host.ID() { + return fmt.Errorf("cannot dial to self") + } + + s, err := lrm.net.host.NewStream(tctx, pID, recProtocolID) + if err != nil { + return fmt.Errorf("new stream failed: `%s`", err.Error()) + } + + lr := &entity.LocalRecord{ContactId: lrm.net.contactID} + pbw := ggio.NewDelimitedWriter(s) + if err := pbw.WriteMsg(lr); err != nil { + return fmt.Errorf("write stream: `%s`", err.Error()) + } + + go inet.FullClose(s) + + return nil +}