Skip to content

Commit

Permalink
feat(tablet): add uncompressed size to pb.Tablet (#7095)
Browse files Browse the repository at this point in the history
- Add UncompressedBytes to pb.Tablet denoting the uncompressed size of tablet.
- Rename pb.Tablet.Space -> pb.Tablet.OnDiskBytes
- Simplify tablet size calculation as badger does not have internal keys now.
  • Loading branch information
NamanJain8 committed Dec 10, 2020
1 parent 55390aa commit 843fe1f
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 48 deletions.
22 changes: 12 additions & 10 deletions dgraph/cmd/zero/tablet.go
Expand Up @@ -103,8 +103,9 @@ func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) erro
if tab == nil {
return errors.Errorf("Tablet to be moved: [%v] is not being served", predicate)
}
msg := fmt.Sprintf("Going to move predicate: [%v], size: [%v] from group %d to %d\n", predicate,
humanize.Bytes(uint64(tab.Space)), srcGroup, dstGroup)
msg := fmt.Sprintf("Going to move predicate: [%v], size: [ondisk: %v, uncompressed: %v]"+
" from group %d to %d\n", predicate, humanize.IBytes(uint64(tab.OnDiskBytes)),
humanize.IBytes(uint64(tab.UncompressedBytes)), srcGroup, dstGroup)
glog.Info(msg)
span.Annotate([]otrace.Attribute{otrace.StringAttribute("tablet", predicate)}, msg)

Expand Down Expand Up @@ -139,11 +140,12 @@ func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) erro

p := &pb.ZeroProposal{}
p.Tablet = &pb.Tablet{
GroupId: dstGroup,
Predicate: predicate,
Space: tab.Space,
Force: true,
MoveTs: in.TxnTs,
GroupId: dstGroup,
Predicate: predicate,
OnDiskBytes: tab.OnDiskBytes,
UncompressedBytes: tab.UncompressedBytes,
Force: true,
MoveTs: in.TxnTs,
}
msg = fmt.Sprintf("Move at Alpha done. Now proposing: %+v", p)
span.Annotate(nil, msg)
Expand Down Expand Up @@ -197,7 +199,7 @@ func (s *Server) chooseTablet() (predicate string, srcGroup uint32, dstGroup uin
for k, v := range s.state.Groups {
space := int64(0)
for _, tab := range v.Tablets {
space += tab.Space
space += tab.OnDiskBytes
}
groups = append(groups, kv{k, space})
}
Expand Down Expand Up @@ -233,9 +235,9 @@ func (s *Server) chooseTablet() (predicate string, srcGroup uint32, dstGroup uin

// Finds a tablet as big a possible such that on moving it dstGroup's size is
// less than or equal to srcGroup.
if tab.Space <= sizeDiff/2 && tab.Space > size {
if tab.OnDiskBytes <= sizeDiff/2 && tab.OnDiskBytes > size {
predicate = tab.Predicate
size = tab.Space
size = tab.OnDiskBytes
}
}
if len(predicate) > 0 {
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/zero/zero.go
Expand Up @@ -388,8 +388,8 @@ func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) {
continue
}

s := float64(srcTablet.Space)
d := float64(dstTablet.Space)
s := float64(srcTablet.OnDiskBytes)
d := float64(dstTablet.OnDiskBytes)
if dstTablet.Remove || (s == 0 && d > 0) || (s > 0 && math.Abs(d/s-1) > 0.1) {
dstTablet.Force = false
proposal := &pb.ZeroProposal{
Expand Down
5 changes: 3 additions & 2 deletions protos/pb.proto
Expand Up @@ -202,10 +202,11 @@ message Tablet {
uint32 group_id = 1 [(gogoproto.jsontag) = "groupId,omitempty"]; // Served by which group.
string predicate = 2;
bool force = 3; // Used while moving predicate.
int64 space = 7;
int64 on_disk_bytes = 7;
bool remove = 8;
bool read_only = 9 [(gogoproto.jsontag) = "readOnly,omitempty"]; // If true, do not ask zero to serve any tablets.
uint64 move_ts = 10 [(gogoproto.jsontag) = "moveTs,omitempty"];
uint64 move_ts = 10 [(gogoproto.jsontag) = "moveTs,omitempty"];
int64 uncompressed_bytes = 11; // Estimated uncompressed size of tablet in bytes
}

message DirectedEdge {
Expand Down
62 changes: 49 additions & 13 deletions protos/pb/pb.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion telemetry/telemetry.go
Expand Up @@ -67,7 +67,7 @@ func NewZero(ms *pb.MembershipState) *Telemetry {
t.NumAlphas += len(g.GetMembers())
for _, tablet := range g.GetTablets() {
t.NumTablets++
t.DiskUsageMB += tablet.GetSpace()
t.DiskUsageMB += tablet.GetOnDiskBytes()
}
}
t.DiskUsageMB /= (1 << 20)
Expand Down
40 changes: 20 additions & 20 deletions worker/draft.go
Expand Up @@ -37,6 +37,7 @@ import (
"go.opencensus.io/tag"
otrace "go.opencensus.io/trace"

"github.com/dgraph-io/badger/v2"
bpb "github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/dgraph/cmd/zero"
Expand Down Expand Up @@ -1313,49 +1314,48 @@ func (n *node) calculateTabletSizes() {
}
var total int64
tablets := make(map[string]*pb.Tablet)
updateSize := func(pred string, size int64) {
updateSize := func(tinfo badger.TableInfo) {
// The error has already been checked by caller.
left, _ := x.Parse(tinfo.Left)
pred := left.Attr
if pred == "" {
return
}

if tablet, ok := tablets[pred]; ok {
tablet.Space += size
tablet.OnDiskBytes += int64(tinfo.OnDiskSize)
tablet.UncompressedBytes += int64(tinfo.UncompressedSize)
} else {
tablets[pred] = &pb.Tablet{
GroupId: n.gid,
Predicate: pred,
Space: size,
GroupId: n.gid,
Predicate: pred,
OnDiskBytes: int64(tinfo.OnDiskSize),
UncompressedBytes: int64(tinfo.UncompressedSize),
}
}
total += size
total += int64(tinfo.OnDiskSize)
}

tableInfos := pstore.Tables()
previousLeft := ""
var previousSize int64
glog.V(2).Infof("Calculating tablet sizes. Found %d tables\n", len(tableInfos))
for _, tinfo := range tableInfos {
left, err := x.Parse(tinfo.Left)
if err != nil {
glog.V(3).Infof("Unable to parse key: %v", err)
continue
}
right, err := x.Parse(tinfo.Right)
if err != nil {
glog.V(3).Infof("Unable to parse key: %v", err)
continue
}

if left.Attr == previousLeft {
// Dgraph cannot depend on the right end of the table to know if the table belongs
// to a single predicate because there might be Badger-specific keys.
// Instead, Dgraph only counts the previous table if the current one belongs to the
// same predicate.
// We could later specifically iterate over these tables to get their estimated sizes.
updateSize(previousLeft, previousSize)
// Count the table only if it is occupied by a single predicate.
if left.Attr == right.Attr {
updateSize(tinfo)
} else {
glog.V(3).Info("Skipping table not owned by one predicate")
}
previousLeft = left.Attr
previousSize = int64(tinfo.OnDiskSize)
}
// The last table has not been counted. Assign it to the predicate at the left of the table.
updateSize(previousLeft, previousSize)

if len(tablets) == 0 {
glog.V(2).Infof("No tablets found.")
Expand Down

0 comments on commit 843fe1f

Please sign in to comment.