Skip to content

Commit 0561801

Browse files
authored
[BREAKING] feat(zero): Make zero lease out namespace IDs (#7341)
This change makes zero lease out the namespace IDs. It changes the `AssignUids()` API to `AssignIds()`. The new API takes in the lease type and accordingly leases out.
1 parent 28b75cf commit 0561801

File tree

20 files changed

+632
-446
lines changed

20 files changed

+632
-446
lines changed

dgraph/cmd/alpha/run_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1694,7 +1694,8 @@ func TestMain(m *testing.M) {
16941694
log.Fatal(err)
16951695
}
16961696
zc := pb.NewZeroClient(conn)
1697-
if _, err := zc.AssignUids(context.Background(), &pb.Num{Val: 1e6}); err != nil {
1697+
if _, err := zc.AssignIds(context.Background(),
1698+
&pb.Num{Val: 1e6, Type: pb.Num_UID}); err != nil {
16981699
log.Fatal(err)
16991700
}
17001701
httpToken := testutil.GrootHttpLogin(addr + "/admin")

dgraph/cmd/debug/run.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -853,8 +853,10 @@ func printZeroProposal(buf *bytes.Buffer, zpr *pb.ZeroProposal) {
853853
fmt.Fprintf(buf, " Member: %+v .", zpr.Member)
854854
case zpr.Tablet != nil:
855855
fmt.Fprintf(buf, " Tablet: %+v .", zpr.Tablet)
856-
case zpr.MaxLeaseId > 0:
857-
fmt.Fprintf(buf, " MaxLeaseId: %d .", zpr.MaxLeaseId)
856+
case zpr.MaxUID > 0:
857+
fmt.Fprintf(buf, " MaxUID: %d .", zpr.MaxUID)
858+
case zpr.MaxNsID > 0:
859+
fmt.Fprintf(buf, " MaxNsID: %d .", zpr.MaxNsID)
858860
case zpr.MaxRaftId > 0:
859861
fmt.Fprintf(buf, " MaxRaftId: %d .", zpr.MaxRaftId)
860862
case zpr.MaxTxnTs > 0:

dgraph/cmd/zero/assign.go

Lines changed: 56 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -36,24 +36,30 @@ const (
3636
func (s *Server) updateLeases() {
3737
var startTs uint64
3838
s.Lock()
39-
s.nextLeaseId = s.state.MaxLeaseId + 1
40-
s.nextTxnTs = s.state.MaxTxnTs + 1
41-
startTs = s.nextTxnTs
42-
glog.Infof("Updated Lease id: %d. Txn Ts: %d", s.nextLeaseId, s.nextTxnTs)
39+
s.nextLease[pb.Num_UID] = s.state.MaxUID + 1
40+
s.nextLease[pb.Num_TXN_TS] = s.state.MaxTxnTs + 1
41+
s.nextLease[pb.Num_NS_ID] = s.state.MaxNsID + 1
42+
43+
startTs = s.nextLease[pb.Num_TXN_TS]
44+
glog.Infof("Updated UID: %d. Txn Ts: %d. NsID: %d.",
45+
s.nextLease[pb.Num_UID], s.nextLease[pb.Num_TXN_TS], s.nextLease[pb.Num_NS_ID])
4346
s.Unlock()
4447
s.orc.updateStartTxnTs(startTs)
4548
}
4649

47-
func (s *Server) maxLeaseId() uint64 {
48-
s.RLock()
49-
defer s.RUnlock()
50-
return s.state.MaxLeaseId
51-
}
52-
53-
func (s *Server) maxTxnTs() uint64 {
50+
func (s *Server) maxLease(typ pb.NumLeaseType) uint64 {
5451
s.RLock()
5552
defer s.RUnlock()
56-
return s.state.MaxTxnTs
53+
var maxlease uint64
54+
switch typ {
55+
case pb.Num_UID:
56+
maxlease = s.state.MaxUID
57+
case pb.Num_TXN_TS:
58+
maxlease = s.state.MaxTxnTs
59+
case pb.Num_NS_ID:
60+
maxlease = s.state.MaxNsID
61+
}
62+
return maxlease
5763
}
5864

5965
var errServedFromMemory = errors.New("Lease was served from memory")
@@ -62,7 +68,8 @@ var errServedFromMemory = errors.New("Lease was served from memory")
6268
// This function is triggered by an RPC call. We ensure that only leader can assign new UIDs,
6369
// so we can tackle any collisions that might happen with the leasemanager
6470
// In essence, we just want one server to be handing out new uids.
65-
func (s *Server) lease(ctx context.Context, num *pb.Num, txn bool) (*pb.AssignedIds, error) {
71+
func (s *Server) lease(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) {
72+
typ := num.GetType()
6673
node := s.Node
6774
// TODO: Fix when we move to linearizable reads, need to check if we are the leader, might be
6875
// based on leader leases. If this node gets partitioned and unless checkquorum is enabled, this
@@ -75,21 +82,21 @@ func (s *Server) lease(ctx context.Context, num *pb.Num, txn bool) (*pb.Assigned
7582
return &emptyAssignedIds, errors.Errorf("Nothing to be leased")
7683
}
7784
if glog.V(3) {
78-
glog.Infof("Got lease request for txn: %v. Num: %+v\n", txn, num)
85+
glog.Infof("Got lease request for Type: %v. Num: %+v\n", typ, num)
7986
}
8087

8188
s.leaseLock.Lock()
8289
defer s.leaseLock.Unlock()
8390

84-
if txn {
91+
if typ == pb.Num_TXN_TS {
8592
if num.Val == 0 && num.ReadOnly {
8693
// If we're only asking for a readonly timestamp, we can potentially
8794
// service it directly.
8895
if glog.V(3) {
8996
glog.Infof("Attempting to serve read only txn ts [%d, %d]",
90-
s.readOnlyTs, s.nextTxnTs)
97+
s.readOnlyTs, s.nextLease[pb.Num_TXN_TS])
9198
}
92-
if s.readOnlyTs > 0 && s.readOnlyTs == s.nextTxnTs-1 {
99+
if s.readOnlyTs > 0 && s.readOnlyTs == s.nextLease[pb.Num_TXN_TS]-1 {
93100
return &pb.AssignedIds{ReadOnly: s.readOnlyTs}, errServedFromMemory
94101
}
95102
}
@@ -106,23 +113,24 @@ func (s *Server) lease(ctx context.Context, num *pb.Num, txn bool) (*pb.Assigned
106113
howMany = num.Val + leaseBandwidth
107114
}
108115

109-
if s.nextLeaseId == 0 || s.nextTxnTs == 0 {
116+
if s.nextLease[pb.Num_UID] == 0 || s.nextLease[pb.Num_TXN_TS] == 0 ||
117+
s.nextLease[pb.Num_NS_ID] == 0 {
110118
return nil, errors.New("Server not initialized")
111119
}
112120

113-
var maxLease, available uint64
114121
var proposal pb.ZeroProposal
115122

116123
// Calculate how many ids do we have available in memory, before we need to
117124
// renew our lease.
118-
if txn {
119-
maxLease = s.maxTxnTs()
120-
available = maxLease - s.nextTxnTs + 1
125+
maxLease := s.maxLease(typ)
126+
available := maxLease - s.nextLease[typ] + 1
127+
switch typ {
128+
case pb.Num_TXN_TS:
121129
proposal.MaxTxnTs = maxLease + howMany
122-
} else {
123-
maxLease = s.maxLeaseId()
124-
available = maxLease - s.nextLeaseId + 1
125-
proposal.MaxLeaseId = maxLease + howMany
130+
case pb.Num_UID:
131+
proposal.MaxUID = maxLease + howMany
132+
case pb.Num_NS_ID:
133+
proposal.MaxNsID = maxLease + howMany
126134
}
127135

128136
// If we have less available than what we need, we need to renew our lease.
@@ -134,48 +142,55 @@ func (s *Server) lease(ctx context.Context, num *pb.Num, txn bool) (*pb.Assigned
134142
}
135143

136144
out := &pb.AssignedIds{}
137-
if txn {
145+
if typ == pb.Num_TXN_TS {
138146
if num.Val > 0 {
139-
out.StartId = s.nextTxnTs
147+
out.StartId = s.nextLease[pb.Num_TXN_TS]
140148
out.EndId = out.StartId + num.Val - 1
141-
s.nextTxnTs = out.EndId + 1
149+
s.nextLease[pb.Num_TXN_TS] = out.EndId + 1
142150
}
143151
if num.ReadOnly {
144-
s.readOnlyTs = s.nextTxnTs
145-
s.nextTxnTs++
152+
s.readOnlyTs = s.nextLease[pb.Num_TXN_TS]
153+
s.nextLease[pb.Num_TXN_TS]++
146154
out.ReadOnly = s.readOnlyTs
147155
}
148156
s.orc.doneUntil.Begin(x.Max(out.EndId, out.ReadOnly))
149-
} else {
150-
out.StartId = s.nextLeaseId
157+
} else if typ == pb.Num_UID {
158+
out.StartId = s.nextLease[pb.Num_UID]
159+
out.EndId = out.StartId + num.Val - 1
160+
s.nextLease[pb.Num_UID] = out.EndId + 1
161+
} else if typ == pb.Num_NS_ID {
162+
out.StartId = s.nextLease[pb.Num_NS_ID]
151163
out.EndId = out.StartId + num.Val - 1
152-
s.nextLeaseId = out.EndId + 1
164+
s.nextLease[pb.Num_NS_ID] = out.EndId + 1
165+
166+
} else {
167+
return out, errors.Errorf("Unknown lease type: %v\n", typ)
153168
}
154169
return out, nil
155170
}
156171

157-
// AssignUids is used to assign new uids by communicating with the leader of the RAFT group
158-
// responsible for handing out uids.
159-
func (s *Server) AssignUids(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) {
172+
// AssignIds is used to assign new ids (UIDs, NsIDs) by communicating with the leader of the
173+
// RAFT group responsible for handing out ids.
174+
func (s *Server) AssignIds(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) {
160175
if ctx.Err() != nil {
161176
return &emptyAssignedIds, ctx.Err()
162177
}
163-
ctx, span := otrace.StartSpan(ctx, "Zero.AssignUids")
178+
ctx, span := otrace.StartSpan(ctx, "Zero.AssignIds")
164179
defer span.End()
165180

166181
reply := &emptyAssignedIds
167182
lease := func() error {
168183
var err error
169184
if s.Node.AmLeader() {
170185
span.Annotatef(nil, "Zero leader leasing %d ids", num.GetVal())
171-
reply, err = s.lease(ctx, num, false)
186+
reply, err = s.lease(ctx, num)
172187
return err
173188
}
174189
span.Annotate(nil, "Not Zero leader")
175190
// I'm not the leader and this request was forwarded to me by a peer, who thought I'm the
176191
// leader.
177192
if num.Forwarded {
178-
return errors.Errorf("Invalid Zero received AssignUids request forward. Please retry")
193+
return errors.Errorf("Invalid Zero received AssignIds request forward. Please retry")
179194
}
180195
// This is an original request. Forward it to the leader.
181196
pl := s.Leader(0)
@@ -185,7 +200,7 @@ func (s *Server) AssignUids(ctx context.Context, num *pb.Num) (*pb.AssignedIds,
185200
span.Annotatef(nil, "Sending request to %v", pl.Addr)
186201
zc := pb.NewZeroClient(pl.Get())
187202
num.Forwarded = true
188-
reply, err = zc.AssignUids(ctx, num)
203+
reply, err = zc.AssignIds(ctx, num)
189204
return err
190205
}
191206

dgraph/cmd/zero/http.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,17 @@ func (st *state) assign(w http.ResponseWriter, r *http.Request) {
7373
what := r.URL.Query().Get("what")
7474
switch what {
7575
case "uids":
76-
ids, err = st.zero.AssignUids(ctx, num)
76+
num.Type = pb.Num_UID
77+
ids, err = st.zero.AssignIds(ctx, num)
7778
case "timestamps":
79+
num.Type = pb.Num_TXN_TS
7880
if num.Val == 0 {
7981
num.ReadOnly = true
8082
}
8183
ids, err = st.zero.Timestamps(ctx, num)
84+
case "nsids":
85+
num.Type = pb.Num_NS_ID
86+
ids, err = st.zero.AssignIds(ctx, num)
8287
default:
8388
x.SetStatus(w, x.Error,
8489
fmt.Sprintf("Invalid what: [%s]. Must be one of uids or timestamps", what))

dgraph/cmd/zero/oracle.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -402,8 +402,8 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
402402
return s.proposeTxn(ctx, src)
403403
}
404404

405-
num := pb.Num{Val: 1}
406-
assigned, err := s.lease(ctx, &num, true)
405+
num := pb.Num{Val: 1, Type: pb.Num_TXN_TS}
406+
assigned, err := s.lease(ctx, &num)
407407
if err != nil {
408408
return err
409409
}
@@ -512,7 +512,8 @@ func (s *Server) Timestamps(ctx context.Context, num *pb.Num) (*pb.AssignedIds,
512512
return &emptyAssignedIds, ctx.Err()
513513
}
514514

515-
reply, err := s.lease(ctx, num, true)
515+
num.Type = pb.Num_TXN_TS
516+
reply, err := s.lease(ctx, num)
516517
span.Annotatef(nil, "Response: %+v. Error: %v", reply, err)
517518

518519
switch err {

dgraph/cmd/zero/raft.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -418,15 +418,18 @@ func (n *node) applyProposal(e raftpb.Entry) (uint64, error) {
418418
}
419419

420420
switch {
421-
case p.MaxLeaseId > state.MaxLeaseId:
422-
state.MaxLeaseId = p.MaxLeaseId
421+
case p.MaxUID > state.MaxUID:
422+
state.MaxUID = p.MaxUID
423423
case p.MaxTxnTs > state.MaxTxnTs:
424424
state.MaxTxnTs = p.MaxTxnTs
425-
case p.MaxLeaseId != 0 || p.MaxTxnTs != 0:
425+
case p.MaxNsID > state.MaxNsID:
426+
state.MaxNsID = p.MaxNsID
427+
case p.MaxUID != 0 || p.MaxTxnTs != 0 || p.MaxNsID != 0:
426428
// Could happen after restart when some entries were there in WAL and did not get
427429
// snapshotted.
428-
glog.Infof("Could not apply proposal, ignoring: p.MaxLeaseId=%v, p.MaxTxnTs=%v maxLeaseId=%d"+
429-
" maxTxnTs=%d\n", p.MaxLeaseId, p.MaxTxnTs, state.MaxLeaseId, state.MaxTxnTs)
430+
glog.Infof("Could not apply proposal, ignoring: p.MaxUID=%v, p.MaxTxnTs=%v"+
431+
"p.MaxNsID=%v, maxUID=%d maxTxnTs=%d maxNsID=%d\n",
432+
p.MaxUID, p.MaxTxnTs, p.MaxNsID, state.MaxUID, state.MaxTxnTs, state.MaxNsID)
430433
}
431434
if p.Txn != nil {
432435
n.server.orc.updateCommitStatus(e.Index, p.Txn)

dgraph/cmd/zero/zero.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,9 @@ type Server struct {
5959
state *pb.MembershipState
6060
nextRaftId uint64
6161

62-
nextLeaseId uint64
63-
nextTxnTs uint64
64-
readOnlyTs uint64
65-
leaseLock sync.Mutex // protects nextLeaseId, nextTxnTs and corresponding proposals.
62+
nextLease map[pb.NumLeaseType]uint64
63+
readOnlyTs uint64
64+
leaseLock sync.Mutex // protects nextUID, nextTxnTs, nextNsID and corresponding proposals.
6665

6766
// groupMap map[uint32]*Group
6867
nextGroup uint32
@@ -90,9 +89,11 @@ func (s *Server) Init() {
9089
Groups: make(map[uint32]*pb.Group),
9190
Zeros: make(map[uint64]*pb.Member),
9291
}
92+
s.nextLease = make(map[pb.NumLeaseType]uint64)
9393
s.nextRaftId = 1
94-
s.nextLeaseId = 1
95-
s.nextTxnTs = 1
94+
s.nextLease[pb.Num_UID] = 1
95+
s.nextLease[pb.Num_TXN_TS] = 1
96+
s.nextLease[pb.Num_NS_ID] = 1
9697
s.nextGroup = 1
9798
s.leaderChangeCh = make(chan struct{}, 1)
9899
s.closer = z.NewCloser(2) // grpc and http

ee/backup/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ func runRestoreCmd() error {
246246
if result.MaxLeaseUid > 0 {
247247
ctx, cancelUid := context.WithTimeout(context.Background(), time.Minute)
248248
defer cancelUid()
249-
if _, err = zc.AssignUids(ctx, &pb.Num{Val: result.MaxLeaseUid}); err != nil {
249+
if _, err = zc.AssignIds(ctx, &pb.Num{Val: result.MaxLeaseUid, Type: pb.Num_UID}); err != nil {
250250
fmt.Printf("Failed to assign maxLeaseId %d in Zero: %v\n", result.MaxLeaseUid, err)
251251
return err
252252
}

graphql/admin/admin.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ const (
140140
counter: Int
141141
groups: [ClusterGroup]
142142
zeros: [Member]
143-
maxLeaseId: Int
143+
maxUID: Int
144+
maxNsID: Int
144145
maxTxnTs: Int
145146
maxRaftId: Int
146147
removed: [Member]

graphql/admin/state.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,16 @@ import (
1515
)
1616

1717
type membershipState struct {
18-
Counter uint64 `json:"counter,omitempty"`
19-
Groups []clusterGroup `json:"groups,omitempty"`
20-
Zeros []*pb.Member `json:"zeros,omitempty"`
21-
MaxLeaseId uint64 `json:"maxLeaseId,omitempty"`
22-
MaxTxnTs uint64 `json:"maxTxnTs,omitempty"`
23-
MaxRaftId uint64 `json:"maxRaftId,omitempty"`
24-
Removed []*pb.Member `json:"removed,omitempty"`
25-
Cid string `json:"cid,omitempty"`
26-
License *pb.License `json:"license,omitempty"`
18+
Counter uint64 `json:"counter,omitempty"`
19+
Groups []clusterGroup `json:"groups,omitempty"`
20+
Zeros []*pb.Member `json:"zeros,omitempty"`
21+
MaxUID uint64 `json:"maxUID,omitempty"`
22+
MaxNsID uint64 `json:"maxNsID,omitempty"`
23+
MaxTxnTs uint64 `json:"maxTxnTs,omitempty"`
24+
MaxRaftId uint64 `json:"maxRaftId,omitempty"`
25+
Removed []*pb.Member `json:"removed,omitempty"`
26+
Cid string `json:"cid,omitempty"`
27+
License *pb.License `json:"license,omitempty"`
2728
}
2829

2930
type clusterGroup struct {
@@ -98,8 +99,9 @@ func convertToGraphQLResp(ms pb.MembershipState) membershipState {
9899
for _, v := range ms.Zeros {
99100
state.Zeros = append(state.Zeros, v)
100101
}
101-
state.MaxLeaseId = ms.MaxLeaseId
102+
state.MaxUID = ms.MaxUID
102103
state.MaxTxnTs = ms.MaxTxnTs
104+
state.MaxNsID = ms.MaxNsID
103105
state.MaxRaftId = ms.MaxRaftId
104106
state.Removed = ms.Removed
105107
state.Cid = ms.Cid

0 commit comments

Comments
 (0)