Skip to content

Commit

Permalink
[BREAKING] Fix(OOM): Don't unmarshal pb.Proposals until we need them (#…
Browse files Browse the repository at this point in the history
…7059)

In worker/draft.go, in Raft.Ready() when we get committed entries, we were unmarshalling them to pb.Proposals upfront. Instead, push them to applyCh as it is, and only when we need them, we unmarshal them. This should fix an OOM issue we see with pb.Proposals unmarshal taking up a lot of memory.

We achieve this by switching proposal.Key which is a string to a uint64 key, which can be put directly in the raft entry.Data in the first 8 bytes. The rest are used to marshal proposal.

Additional changes:
* Bump up max pending proposal size to 256 MB to increase proposal throughput.

Co-authored-by: Ahsan Barkati <ahsanbarkati@gmail.com>
Co-authored-by: NamanJain8 <jnaman806@gmail.com>
Co-authored-by: Daniel Mai <daniel@dgraph.io>
  • Loading branch information
4 people committed Dec 10, 2020
1 parent bc637af commit 2e5499f
Show file tree
Hide file tree
Showing 8 changed files with 384 additions and 453 deletions.
20 changes: 10 additions & 10 deletions conn/raft_server.go
Expand Up @@ -65,17 +65,17 @@ type ProposalCtx struct {

type proposals struct {
sync.RWMutex
all map[string]*ProposalCtx
all map[uint64]*ProposalCtx
}

func (p *proposals) Store(key string, pctx *ProposalCtx) bool {
if len(key) == 0 {
func (p *proposals) Store(key uint64, pctx *ProposalCtx) bool {
if key == 0 {
return false
}
p.Lock()
defer p.Unlock()
if p.all == nil {
p.all = make(map[string]*ProposalCtx)
p.all = make(map[uint64]*ProposalCtx)
}
if _, has := p.all[key]; has {
return false
Expand All @@ -84,30 +84,30 @@ func (p *proposals) Store(key string, pctx *ProposalCtx) bool {
return true
}

func (p *proposals) Ctx(key string) context.Context {
func (p *proposals) Ctx(key uint64) context.Context {
if pctx := p.Get(key); pctx != nil {
return pctx.Ctx
}
return context.Background()
}

func (p *proposals) Get(key string) *ProposalCtx {
func (p *proposals) Get(key uint64) *ProposalCtx {
p.RLock()
defer p.RUnlock()
return p.all[key]
}

func (p *proposals) Delete(key string) {
if len(key) == 0 {
func (p *proposals) Delete(key uint64) {
if key == 0 {
return
}
p.Lock()
defer p.Unlock()
delete(p.all, key)
}

func (p *proposals) Done(key string, err error) {
if len(key) == 0 {
func (p *proposals) Done(key uint64, err error) {
if key == 0 {
return
}
p.Lock()
Expand Down
15 changes: 9 additions & 6 deletions dgraph/cmd/debug/wal.go
Expand Up @@ -35,23 +35,26 @@ import (

func printEntry(es raftpb.Entry, pending map[uint64]bool) {
var buf bytes.Buffer
defer func() {
fmt.Printf("%s\n", buf.Bytes())
}()
fmt.Fprintf(&buf, "%d . %d . %v . %-6s .", es.Term, es.Index, es.Type,
humanize.Bytes(uint64(es.Size())))
if es.Type == raftpb.EntryConfChange {
fmt.Printf("%s\n", buf.Bytes())
return
}
if len(es.Data) == 0 {
return
}
var pr pb.Proposal
var zpr pb.ZeroProposal
if err := pr.Unmarshal(es.Data); err == nil {
if err := pr.Unmarshal(es.Data[8:]); err == nil {
printAlphaProposal(&buf, &pr, pending)
} else if err := zpr.Unmarshal(es.Data); err == nil {
} else if err := zpr.Unmarshal(es.Data[8:]); err == nil {
printZeroProposal(&buf, &zpr)
} else {
fmt.Printf("%s Unable to parse Proposal: %v\n", buf.Bytes(), err)
return
fmt.Fprintf(&buf, " Unable to parse Proposal: %v", err)
}
fmt.Printf("%s\n", buf.Bytes())
}

type RaftStore interface {
Expand Down
60 changes: 32 additions & 28 deletions dgraph/cmd/zero/raft.go
Expand Up @@ -18,6 +18,7 @@ package zero

import (
"context"
"encoding/binary"
"fmt"
"log"
"math"
Expand Down Expand Up @@ -73,8 +74,8 @@ func (n *node) AmLeader() bool {
return time.Since(n.lastQuorum) <= 5*time.Second
}

func (n *node) uniqueKey() string {
return fmt.Sprintf("z%x-%d", n.Id, n.Rand.Uint64())
func (n *node) uniqueKey() uint64 {
return uint64(n.Id)<<32 | uint64(n.Rand.Uint32())
}

var errInternalRetry = errors.New("Retry Raft proposal internally")
Expand Down Expand Up @@ -120,13 +121,15 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) er
key := n.uniqueKey()
x.AssertTruef(n.Proposals.Store(key, pctx), "Found existing proposal with key: [%v]", key)
defer n.Proposals.Delete(key)
proposal.Key = key
span.Annotatef(nil, "Proposing with key: %s. Timeout: %v", key, timeout)
span.Annotatef(nil, "Proposing with key: %d. Timeout: %v", key, timeout)

data, err := proposal.Marshal()
data := make([]byte, 8+proposal.Size())
binary.BigEndian.PutUint64(data[:8], key)
sz, err := proposal.MarshalToSizedBuffer(data[8:])
if err != nil {
return err
}
data = data[:8+sz]
// Propose the change.
if err := n.Raft().Propose(cctx, data); err != nil {
span.Annotatef(nil, "Error while proposing via Raft: %v", err)
Expand Down Expand Up @@ -322,19 +325,18 @@ func (n *node) applySnapshot(snap *pb.ZeroSnapshot) error {
return nil
}

func (n *node) applyProposal(e raftpb.Entry) (string, error) {
func (n *node) applyProposal(e raftpb.Entry) (uint64, error) {
x.AssertTrue(len(e.Data) > 0)

var p pb.ZeroProposal
// Raft commits empty entry on becoming a leader.
if len(e.Data) == 0 {
return p.Key, nil
}
if err := p.Unmarshal(e.Data); err != nil {
return p.Key, err
key := binary.BigEndian.Uint64(e.Data[:8])
if err := p.Unmarshal(e.Data[8:]); err != nil {
return key, err
}
if p.Key == "" {
return p.Key, errInvalidProposal
if key == 0 {
return key, errInvalidProposal
}
span := otrace.FromContext(n.Proposals.Ctx(p.Key))
span := otrace.FromContext(n.Proposals.Ctx(key))

n.server.Lock()
defer n.server.Unlock()
Expand All @@ -343,13 +345,13 @@ func (n *node) applyProposal(e raftpb.Entry) (string, error) {
state.Counter = e.Index
if len(p.Cid) > 0 {
if len(state.Cid) > 0 {
return p.Key, errInvalidProposal
return key, errInvalidProposal
}
state.Cid = p.Cid
}
if p.MaxRaftId > 0 {
if p.MaxRaftId <= state.MaxRaftId {
return p.Key, errInvalidProposal
return key, errInvalidProposal
}
state.MaxRaftId = p.MaxRaftId
n.server.nextRaftId = x.Max(n.server.nextRaftId, p.MaxRaftId+1)
Expand All @@ -365,14 +367,14 @@ func (n *node) applyProposal(e raftpb.Entry) (string, error) {
if err := n.handleMemberProposal(p.Member); err != nil {
span.Annotatef(nil, "While applying membership proposal: %+v", err)
glog.Errorf("While applying membership proposal: %+v", err)
return p.Key, err
return key, err
}
}
if p.Tablet != nil {
if err := n.handleTabletProposal(p.Tablet); err != nil {
span.Annotatef(nil, "While applying tablet proposal: %v", err)
glog.Errorf("While applying tablet proposal: %v", err)
return p.Key, err
return key, err
}
}
if p.License != nil {
Expand All @@ -383,7 +385,7 @@ func (n *node) applyProposal(e raftpb.Entry) (string, error) {
numNodes += len(group.GetMembers())
}
if uint64(numNodes) > p.GetLicense().GetMaxNodes() {
return p.Key, errInvalidProposal
return key, errInvalidProposal
}
state.License = p.License
// Check expiry and set enabled accordingly.
Expand Down Expand Up @@ -411,7 +413,7 @@ func (n *node) applyProposal(e raftpb.Entry) (string, error) {
n.server.orc.updateCommitStatus(e.Index, p.Txn)
}

return p.Key, nil
return key, nil
}

func (n *node) applyConfChange(e raftpb.Entry) {
Expand Down Expand Up @@ -516,12 +518,11 @@ func (n *node) checkForCIDInEntries() (bool, error) {
batch = entries[len(entries)-1].Index + 1

for _, entry := range entries {
if entry.Type != raftpb.EntryNormal {
if entry.Type != raftpb.EntryNormal || len(entry.Data) == 0 {
continue
}
var proposal pb.ZeroProposal
err = proposal.Unmarshal(entry.Data)
if err != nil {
if err = proposal.Unmarshal(entry.Data[8:]); err != nil {
return false, err
}
if len(proposal.Cid) > 0 {
Expand Down Expand Up @@ -749,11 +750,11 @@ func (n *node) calculateAndProposeSnapshot() error {
break
}
for _, entry := range entries {
if entry.Type != raftpb.EntryNormal {
if entry.Type != raftpb.EntryNormal || len(entry.Data) == 0 {
continue
}
var p pb.ZeroProposal
if err := p.Unmarshal(entry.Data); err != nil {
if err := p.Unmarshal(entry.Data[8:]); err != nil {
span.Annotatef(nil, "Error: %v", err)
return err
}
Expand Down Expand Up @@ -876,6 +877,10 @@ func (n *node) Run() {
n.applyConfChange(entry)
glog.Infof("Done applying conf change at %#x", n.Id)

case len(entry.Data) == 0:
// Raft commits empty entry on becoming a leader.
// Do nothing.

case entry.Type == raftpb.EntryNormal:
start := time.Now()
key, err := n.applyProposal(entry)
Expand All @@ -886,11 +891,10 @@ func (n *node) Run() {
if took := time.Since(start); took > time.Second {
var p pb.ZeroProposal
// Raft commits empty entry on becoming a leader.
if err := p.Unmarshal(entry.Data); err == nil {
if err := p.Unmarshal(entry.Data[8:]); err == nil {
glog.V(2).Infof("Proposal took %s to apply: %+v\n",
took.Round(time.Second), p)
}

}

default:
Expand Down
4 changes: 2 additions & 2 deletions protos/pb.proto
Expand Up @@ -150,14 +150,14 @@ message License {
}

message ZeroProposal {
reserved 8; // Was used for string key.
map<uint32, uint64> snapshot_ts = 1; // Group ID -> Snapshot Ts.
Member member = 2;
Tablet tablet = 3;
uint64 maxLeaseId = 4;
uint64 maxTxnTs = 5;
uint64 maxRaftId = 6;
api.TxnContext txn = 7;
string key = 8; // Used as unique identifier for proposal id.
string cid = 9; // Used as unique identifier for the cluster.
License license = 10;
ZeroSnapshot snapshot = 11; // Used to make Zeros take a snapshot.
Expand Down Expand Up @@ -302,11 +302,11 @@ message RestoreRequest {
}

message Proposal {
reserved 7; // Was used for string key.
Mutations mutations = 2;
repeated badgerpb2.KV kv = 4;
MembershipState state = 5;
string clean_predicate = 6; // Delete the predicate which was moved to other group.
string key = 7;
OracleDelta delta = 8;
Snapshot snapshot = 9; // Used to tell the group when to take snapshot.
uint64 index = 10; // Used to store Raft index, in raft.Ready.
Expand Down

0 comments on commit 2e5499f

Please sign in to comment.