From 843fe1f2c203150286727ed2f12061031cadd1fd Mon Sep 17 00:00:00 2001 From: Naman Jain Date: Thu, 10 Dec 2020 12:52:49 +0530 Subject: [PATCH] feat(tablet): add uncompressed size to pb.Tablet (#7095) - Add UncompressedBytes to pb.Tablet denoting the uncompressed size of tablet. - Rename pb.Tablet.Space -> pb.Tablet.OnDiskBytes - Simplify tablet size calculation as badger does not have internal keys now. --- dgraph/cmd/zero/tablet.go | 22 +++++++------- dgraph/cmd/zero/zero.go | 4 +-- protos/pb.proto | 5 ++-- protos/pb/pb.pb.go | 62 +++++++++++++++++++++++++++++++-------- telemetry/telemetry.go | 2 +- worker/draft.go | 40 ++++++++++++------------- 6 files changed, 87 insertions(+), 48 deletions(-) diff --git a/dgraph/cmd/zero/tablet.go b/dgraph/cmd/zero/tablet.go index fd06907b9bc..a6afd909d7d 100644 --- a/dgraph/cmd/zero/tablet.go +++ b/dgraph/cmd/zero/tablet.go @@ -103,8 +103,9 @@ func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) erro if tab == nil { return errors.Errorf("Tablet to be moved: [%v] is not being served", predicate) } - msg := fmt.Sprintf("Going to move predicate: [%v], size: [%v] from group %d to %d\n", predicate, - humanize.Bytes(uint64(tab.Space)), srcGroup, dstGroup) + msg := fmt.Sprintf("Going to move predicate: [%v], size: [ondisk: %v, uncompressed: %v]"+ + " from group %d to %d\n", predicate, humanize.IBytes(uint64(tab.OnDiskBytes)), + humanize.IBytes(uint64(tab.UncompressedBytes)), srcGroup, dstGroup) glog.Info(msg) span.Annotate([]otrace.Attribute{otrace.StringAttribute("tablet", predicate)}, msg) @@ -139,11 +140,12 @@ func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) erro p := &pb.ZeroProposal{} p.Tablet = &pb.Tablet{ - GroupId: dstGroup, - Predicate: predicate, - Space: tab.Space, - Force: true, - MoveTs: in.TxnTs, + GroupId: dstGroup, + Predicate: predicate, + OnDiskBytes: tab.OnDiskBytes, + UncompressedBytes: tab.UncompressedBytes, + Force: true, + MoveTs: in.TxnTs, } msg = fmt.Sprintf("Move at Alpha done. Now proposing: %+v", p) span.Annotate(nil, msg) @@ -197,7 +199,7 @@ func (s *Server) chooseTablet() (predicate string, srcGroup uint32, dstGroup uin for k, v := range s.state.Groups { space := int64(0) for _, tab := range v.Tablets { - space += tab.Space + space += tab.OnDiskBytes } groups = append(groups, kv{k, space}) } @@ -233,9 +235,9 @@ func (s *Server) chooseTablet() (predicate string, srcGroup uint32, dstGroup uin // Finds a tablet as big a possible such that on moving it dstGroup's size is // less than or equal to srcGroup. - if tab.Space <= sizeDiff/2 && tab.Space > size { + if tab.OnDiskBytes <= sizeDiff/2 && tab.OnDiskBytes > size { predicate = tab.Predicate - size = tab.Space + size = tab.OnDiskBytes } } if len(predicate) > 0 { diff --git a/dgraph/cmd/zero/zero.go b/dgraph/cmd/zero/zero.go index 36941f86709..14444d4a157 100644 --- a/dgraph/cmd/zero/zero.go +++ b/dgraph/cmd/zero/zero.go @@ -388,8 +388,8 @@ func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) { continue } - s := float64(srcTablet.Space) - d := float64(dstTablet.Space) + s := float64(srcTablet.OnDiskBytes) + d := float64(dstTablet.OnDiskBytes) if dstTablet.Remove || (s == 0 && d > 0) || (s > 0 && math.Abs(d/s-1) > 0.1) { dstTablet.Force = false proposal := &pb.ZeroProposal{ diff --git a/protos/pb.proto b/protos/pb.proto index 0be33dd81bb..66bc56c3be7 100644 --- a/protos/pb.proto +++ b/protos/pb.proto @@ -202,10 +202,11 @@ message Tablet { uint32 group_id = 1 [(gogoproto.jsontag) = "groupId,omitempty"]; // Served by which group. string predicate = 2; bool force = 3; // Used while moving predicate. - int64 space = 7; + int64 on_disk_bytes = 7; bool remove = 8; bool read_only = 9 [(gogoproto.jsontag) = "readOnly,omitempty"]; // If true, do not ask zero to serve any tablets. - uint64 move_ts = 10 [(gogoproto.jsontag) = "moveTs,omitempty"]; + uint64 move_ts = 10 [(gogoproto.jsontag) = "moveTs,omitempty"]; + int64 uncompressed_bytes = 11; // Estimated uncompressed size of tablet in bytes } message DirectedEdge { diff --git a/protos/pb/pb.pb.go b/protos/pb/pb.pb.go index 8ffc1f7524d..dbdb65cc032 100644 --- a/protos/pb/pb.pb.go +++ b/protos/pb/pb.pb.go @@ -7,6 +7,10 @@ import ( context "context" encoding_binary "encoding/binary" fmt "fmt" + io "io" + math "math" + math_bits "math/bits" + pb "github.com/dgraph-io/badger/v2/pb" api "github.com/dgraph-io/dgo/v200/protos/api" _ "github.com/gogo/protobuf/gogoproto" @@ -14,9 +18,6 @@ import ( grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" - io "io" - math "math" - math_bits "math/bits" ) // Reference imports to suppress errors if they are not otherwise used. @@ -1751,10 +1752,11 @@ type Tablet struct { GroupId uint32 `protobuf:"varint,1,opt,name=group_id,json=groupId,proto3" json:"groupId,omitempty"` Predicate string `protobuf:"bytes,2,opt,name=predicate,proto3" json:"predicate,omitempty"` Force bool `protobuf:"varint,3,opt,name=force,proto3" json:"force,omitempty"` - Space int64 `protobuf:"varint,7,opt,name=space,proto3" json:"space,omitempty"` + OnDiskBytes int64 `protobuf:"varint,7,opt,name=on_disk_bytes,json=onDiskBytes,proto3" json:"on_disk_bytes,omitempty"` Remove bool `protobuf:"varint,8,opt,name=remove,proto3" json:"remove,omitempty"` ReadOnly bool `protobuf:"varint,9,opt,name=read_only,json=readOnly,proto3" json:"readOnly,omitempty"` MoveTs uint64 `protobuf:"varint,10,opt,name=move_ts,json=moveTs,proto3" json:"moveTs,omitempty"` + UncompressedBytes int64 `protobuf:"varint,11,opt,name=uncompressed_bytes,json=uncompressedBytes,proto3" json:"uncompressed_bytes,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -1814,9 +1816,9 @@ func (m *Tablet) GetForce() bool { return false } -func (m *Tablet) GetSpace() int64 { +func (m *Tablet) GetOnDiskBytes() int64 { if m != nil { - return m.Space + return m.OnDiskBytes } return 0 } @@ -1842,6 +1844,13 @@ func (m *Tablet) GetMoveTs() uint64 { return 0 } +func (m *Tablet) GetUncompressedBytes() int64 { + if m != nil { + return m.UncompressedBytes + } + return 0 +} + type DirectedEdge struct { Entity uint64 `protobuf:"fixed64,1,opt,name=entity,proto3" json:"entity,omitempty"` Attr string `protobuf:"bytes,2,opt,name=attr,proto3" json:"attr,omitempty"` @@ -8106,6 +8115,11 @@ func (m *Tablet) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } + if m.UncompressedBytes != 0 { + i = encodeVarintPb(dAtA, i, uint64(m.UncompressedBytes)) + i-- + dAtA[i] = 0x58 + } if m.MoveTs != 0 { i = encodeVarintPb(dAtA, i, uint64(m.MoveTs)) i-- @@ -8131,8 +8145,8 @@ func (m *Tablet) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x40 } - if m.Space != 0 { - i = encodeVarintPb(dAtA, i, uint64(m.Space)) + if m.OnDiskBytes != 0 { + i = encodeVarintPb(dAtA, i, uint64(m.OnDiskBytes)) i-- dAtA[i] = 0x38 } @@ -11563,8 +11577,8 @@ func (m *Tablet) Size() (n int) { if m.Force { n += 2 } - if m.Space != 0 { - n += 1 + sovPb(uint64(m.Space)) + if m.OnDiskBytes != 0 { + n += 1 + sovPb(uint64(m.OnDiskBytes)) } if m.Remove { n += 2 @@ -11575,6 +11589,9 @@ func (m *Tablet) Size() (n int) { if m.MoveTs != 0 { n += 1 + sovPb(uint64(m.MoveTs)) } + if m.UncompressedBytes != 0 { + n += 1 + sovPb(uint64(m.UncompressedBytes)) + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -16874,9 +16891,9 @@ func (m *Tablet) Unmarshal(dAtA []byte) error { m.Force = bool(v != 0) case 7: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Space", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field OnDiskBytes", wireType) } - m.Space = 0 + m.OnDiskBytes = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowPb @@ -16886,7 +16903,7 @@ func (m *Tablet) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.Space |= int64(b&0x7F) << shift + m.OnDiskBytes |= int64(b&0x7F) << shift if b < 0x80 { break } @@ -16950,6 +16967,25 @@ func (m *Tablet) Unmarshal(dAtA []byte) error { break } } + case 11: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field UncompressedBytes", wireType) + } + m.UncompressedBytes = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPb + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.UncompressedBytes |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipPb(dAtA[iNdEx:]) diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index ebdf587a133..b8f9f1de826 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -67,7 +67,7 @@ func NewZero(ms *pb.MembershipState) *Telemetry { t.NumAlphas += len(g.GetMembers()) for _, tablet := range g.GetTablets() { t.NumTablets++ - t.DiskUsageMB += tablet.GetSpace() + t.DiskUsageMB += tablet.GetOnDiskBytes() } } t.DiskUsageMB /= (1 << 20) diff --git a/worker/draft.go b/worker/draft.go index fb458825032..e3f65858719 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -37,6 +37,7 @@ import ( "go.opencensus.io/tag" otrace "go.opencensus.io/trace" + "github.com/dgraph-io/badger/v2" bpb "github.com/dgraph-io/badger/v2/pb" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/dgraph/cmd/zero" @@ -1313,26 +1314,28 @@ func (n *node) calculateTabletSizes() { } var total int64 tablets := make(map[string]*pb.Tablet) - updateSize := func(pred string, size int64) { + updateSize := func(tinfo badger.TableInfo) { + // The error has already been checked by caller. + left, _ := x.Parse(tinfo.Left) + pred := left.Attr if pred == "" { return } - if tablet, ok := tablets[pred]; ok { - tablet.Space += size + tablet.OnDiskBytes += int64(tinfo.OnDiskSize) + tablet.UncompressedBytes += int64(tinfo.UncompressedSize) } else { tablets[pred] = &pb.Tablet{ - GroupId: n.gid, - Predicate: pred, - Space: size, + GroupId: n.gid, + Predicate: pred, + OnDiskBytes: int64(tinfo.OnDiskSize), + UncompressedBytes: int64(tinfo.UncompressedSize), } } - total += size + total += int64(tinfo.OnDiskSize) } tableInfos := pstore.Tables() - previousLeft := "" - var previousSize int64 glog.V(2).Infof("Calculating tablet sizes. Found %d tables\n", len(tableInfos)) for _, tinfo := range tableInfos { left, err := x.Parse(tinfo.Left) @@ -1340,22 +1343,19 @@ func (n *node) calculateTabletSizes() { glog.V(3).Infof("Unable to parse key: %v", err) continue } + right, err := x.Parse(tinfo.Right) + if err != nil { + glog.V(3).Infof("Unable to parse key: %v", err) + continue + } - if left.Attr == previousLeft { - // Dgraph cannot depend on the right end of the table to know if the table belongs - // to a single predicate because there might be Badger-specific keys. - // Instead, Dgraph only counts the previous table if the current one belongs to the - // same predicate. - // We could later specifically iterate over these tables to get their estimated sizes. - updateSize(previousLeft, previousSize) + // Count the table only if it is occupied by a single predicate. + if left.Attr == right.Attr { + updateSize(tinfo) } else { glog.V(3).Info("Skipping table not owned by one predicate") } - previousLeft = left.Attr - previousSize = int64(tinfo.OnDiskSize) } - // The last table has not been counted. Assign it to the predicate at the left of the table. - updateSize(previousLeft, previousSize) if len(tablets) == 0 { glog.V(2).Infof("No tablets found.")