Skip to content

Commit fe7b749

Browse files
authored
Improve efficiency of readonly transactions by reusing the same read ts (#2604)
Currently, every transaction gets a new timestamp from Zero, which then has to propagate the allocated timestamp to every Alpha, so that it knows when to service that transaction. It would apply all the updates up until that ts, so the txn always returns consistent results. If a user is only doing read-only transactions, then this system causes a lot of unnecessary work, because each txn gets a new ts from Zero. This PR changes that by allocating a new read-only timestamp, which can be reused across many read-only transactions, if no RW txns are going on in the system. This speeds up reads, because it avoids the wait for ts propagation from Zero to Alpha leader to followers. Commits: * Logic to retrieve read only timestamps from Dgraph cluster. This would make readonly queries more efficient by utilizing the same readonly timestamp, instead of having to lease and propagate a new MaxAssigned each time. * Working code for ReadOnly Txns. * Add a test to ensure that all read-only txns get the same timestamp. Also, avoid streaming MaxAssigned if nothing new was assigned.
1 parent 8f24674 commit fe7b749

File tree

17 files changed

+1015
-664
lines changed

17 files changed

+1015
-664
lines changed

conn/node.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ func (n *Node) ProposePeerRemoval(ctx context.Context, id uint64) error {
452452
}
453453

454454
type linReadReq struct {
455-
// A one-shot chan which we send a raft index upon
455+
// A one-shot chan which we send a raft index upon.
456456
indexCh chan<- uint64
457457
}
458458

contrib/integration/bank/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func (s *State) runTotal(dg *dgo.Dgraph) error {
9696
}
9797
}
9898
`
99-
txn := dg.NewTxn()
99+
txn := dg.NewReadOnlyTxn()
100100
defer txn.Discard(context.Background())
101101
resp, err := txn.Query(context.Background(), query)
102102
if err != nil {

contrib/integration/increment/main.go

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,36 +26,54 @@ import (
2626
var (
2727
addr = flag.String("addr", "localhost:9080", "Address of Dgraph server.")
2828
num = flag.Int("num", 1, "How many times to run.")
29+
ro = flag.Bool("ro", false, "Only read the counter value, don't update it.")
2930
wait = flag.String("wait", "0", "How long to wait.")
3031
pred = flag.String("pred", "counter.val", "Predicate to use for storing the counter.")
3132
)
3233

3334
type Counter struct {
3435
Uid string `json:"uid"`
3536
Val int `json:"val"`
37+
38+
startTs uint64 // Only used for internal testing.
3639
}
3740

38-
func increment(dg *dgo.Dgraph) (int, error) {
41+
func queryCounter(txn *dgo.Txn) (Counter, error) {
3942
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
4043
defer cancel()
4144

42-
txn := dg.NewTxn()
45+
var counter Counter
4346
query := fmt.Sprintf("{ q(func: has(%s)) { uid, val: %s }}", *pred, *pred)
4447
resp, err := txn.Query(ctx, query)
4548
if err != nil {
46-
return 0, err
49+
return counter, err
4750
}
4851
m := make(map[string][]Counter)
4952
if err := json.Unmarshal(resp.Json, &m); err != nil {
50-
return 0, err
53+
return counter, err
5154
}
52-
var counter Counter
5355
if len(m["q"]) == 0 {
5456
// Do nothing.
5557
} else if len(m["q"]) == 1 {
5658
counter = m["q"][0]
5759
} else {
58-
log.Fatalf("Invalid response: %q", resp.Json)
60+
panic(fmt.Sprintf("Invalid response: %q", resp.Json))
61+
}
62+
counter.startTs = resp.GetTxn().GetStartTs()
63+
return counter, nil
64+
}
65+
66+
func process(dg *dgo.Dgraph, readOnly bool) (Counter, error) {
67+
if readOnly {
68+
txn := dg.NewReadOnlyTxn()
69+
defer txn.Discard(nil)
70+
return queryCounter(txn)
71+
}
72+
73+
txn := dg.NewTxn()
74+
counter, err := queryCounter(txn)
75+
if err != nil {
76+
return Counter{}, err
5977
}
6078
counter.Val += 1
6179

@@ -64,11 +82,14 @@ func increment(dg *dgo.Dgraph) (int, error) {
6482
counter.Uid = "_:new"
6583
}
6684
mu.SetNquads = []byte(fmt.Sprintf(`<%s> <%s> "%d"^^<xs:int> .`, counter.Uid, *pred, counter.Val))
85+
86+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
87+
defer cancel()
6788
_, err = txn.Mutate(ctx, &mu)
6889
if err != nil {
69-
return 0, err
90+
return Counter{}, err
7091
}
71-
return counter.Val, txn.Commit(ctx)
92+
return counter, txn.Commit(ctx)
7293
}
7394

7495
func main() {
@@ -87,13 +108,13 @@ func main() {
87108
}
88109

89110
for *num > 0 {
90-
val, err := increment(dg)
111+
cnt, err := process(dg, *ro)
91112
if err != nil {
92-
fmt.Printf("While trying to increment counter: %v. Retrying...\n", err)
113+
fmt.Printf("While trying to process counter: %v. Retrying...\n", err)
93114
time.Sleep(time.Second)
94115
continue
95116
}
96-
fmt.Printf("Counter SET OK: %d\n", val)
117+
fmt.Printf("Counter VAL: %d [ Ts: %d ]\n", cnt.Val, cnt.startTs)
97118
*num -= 1
98119
time.Sleep(waitDur)
99120
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"log"
6+
"strings"
7+
"sync"
8+
"testing"
9+
10+
"github.com/dgraph-io/dgo"
11+
"github.com/dgraph-io/dgo/protos/api"
12+
"github.com/dgraph-io/dgraph/x"
13+
"github.com/stretchr/testify/require"
14+
"google.golang.org/grpc"
15+
)
16+
17+
const N = 10
18+
19+
func increment(t *testing.T, dg *dgo.Dgraph) int {
20+
var max int
21+
var mu sync.Mutex
22+
storeMax := func(a int) {
23+
mu.Lock()
24+
if max < a {
25+
max = a
26+
}
27+
mu.Unlock()
28+
}
29+
30+
var wg sync.WaitGroup
31+
// N goroutines, process N times each goroutine.
32+
for i := 0; i < N; i++ {
33+
wg.Add(1)
34+
go func() {
35+
defer wg.Done()
36+
for i := 0; i < N; i++ {
37+
cnt, err := process(dg, false)
38+
if err != nil {
39+
if strings.Index(err.Error(), "Transaction has been aborted") >= 0 {
40+
// pass
41+
} else {
42+
t.Logf("Error while incrementing: %v\n", err)
43+
}
44+
} else {
45+
storeMax(cnt.Val)
46+
}
47+
}
48+
}()
49+
}
50+
wg.Wait()
51+
return max
52+
}
53+
54+
func read(t *testing.T, dg *dgo.Dgraph, expected int) {
55+
cnt, err := process(dg, true)
56+
require.NoError(t, err)
57+
ts := cnt.startTs
58+
t.Logf("Readonly stage counter: %+v\n", cnt)
59+
60+
var wg sync.WaitGroup
61+
for i := 0; i < N; i++ {
62+
wg.Add(1)
63+
go func() {
64+
defer wg.Done()
65+
for i := 0; i < N; i++ {
66+
cnt, err := process(dg, true)
67+
if err != nil {
68+
t.Logf("Error while reading: %v\n", err)
69+
} else {
70+
require.Equal(t, expected, cnt.Val)
71+
require.Equal(t, ts, cnt.startTs)
72+
}
73+
}
74+
}()
75+
}
76+
wg.Wait()
77+
}
78+
79+
func TestIncrement(t *testing.T) {
80+
conn, err := grpc.Dial("localhost:9180", grpc.WithInsecure())
81+
if err != nil {
82+
log.Fatal(err)
83+
}
84+
dc := api.NewDgraphClient(conn)
85+
dg := dgo.NewDgraphClient(dc)
86+
87+
op := api.Operation{DropAll: true}
88+
x.Check(dg.Alter(context.Background(), &op))
89+
90+
cnt, err := process(dg, false)
91+
if err != nil {
92+
t.Logf("Error while reading: %v\n", err)
93+
} else {
94+
t.Logf("Initial value: %d\n", cnt.Val)
95+
}
96+
97+
val := increment(t, dg)
98+
t.Logf("Increment stage done. Got value: %d\n", val)
99+
read(t, dg, val)
100+
t.Logf("Read stage done with value: %d\n", val)
101+
val = increment(t, dg)
102+
t.Logf("Increment stage done. Got value: %d\n", val)
103+
read(t, dg, val)
104+
t.Logf("Read stage done with value: %d\n", val)
105+
}

dgraph/cmd/bulk/loader.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424

2525
"github.com/dgraph-io/badger"
2626
bo "github.com/dgraph-io/badger/options"
27-
"github.com/dgraph-io/dgo/protos/api"
2827
"github.com/dgraph-io/dgraph/protos/intern"
2928
"github.com/dgraph-io/dgraph/schema"
3029
"github.com/dgraph-io/dgraph/x"
@@ -180,7 +179,7 @@ func findRDFFiles(dir string) []string {
180179
}
181180

182181
type uidRangeResponse struct {
183-
uids *api.AssignedIds
182+
uids *intern.AssignedIds
184183
err error
185184
}
186185

dgraph/cmd/zero/assign.go

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@ import (
1212

1313
"golang.org/x/net/context"
1414

15-
"github.com/dgraph-io/dgo/protos/api"
1615
"github.com/dgraph-io/dgraph/protos/intern"
1716
"github.com/dgraph-io/dgraph/x"
17+
"github.com/golang/glog"
1818
)
1919

2020
var (
2121
emptyNum intern.Num
22-
emptyAssignedIds api.AssignedIds
22+
emptyAssignedIds intern.AssignedIds
2323
)
2424

2525
const (
@@ -48,11 +48,13 @@ func (s *Server) maxTxnTs() uint64 {
4848
return s.state.MaxTxnTs
4949
}
5050

51+
var servedFromMemory = errors.New("Lease was served from memory.")
52+
5153
// lease would either allocate ids or timestamps.
5254
// This function is triggered by an RPC call. We ensure that only leader can assign new UIDs,
5355
// so we can tackle any collisions that might happen with the leasemanager
5456
// In essence, we just want one server to be handing out new uids.
55-
func (s *Server) lease(ctx context.Context, num *intern.Num, txn bool) (*api.AssignedIds, error) {
57+
func (s *Server) lease(ctx context.Context, num *intern.Num, txn bool) (*intern.AssignedIds, error) {
5658
node := s.Node
5759
// TODO: Fix when we move to linearizable reads, need to check if we are the leader, might be
5860
// based on leader leases. If this node gets partitioned and unless checkquorum is enabled, this
@@ -61,14 +63,36 @@ func (s *Server) lease(ctx context.Context, num *intern.Num, txn bool) (*api.Ass
6163
return &emptyAssignedIds, x.Errorf("Assigning IDs is only allowed on leader.")
6264
}
6365

64-
val := int(num.Val)
65-
if val == 0 {
66-
return &emptyAssignedIds, x.Errorf("Nothing to be marked or assigned")
66+
if num.Val == 0 && !num.ReadOnly {
67+
return &emptyAssignedIds, x.Errorf("Nothing to be leased")
68+
}
69+
if glog.V(3) {
70+
glog.Infof("Got lease request for txn: %v. Num: %+v\n", txn, num)
6771
}
6872

6973
s.leaseLock.Lock()
7074
defer s.leaseLock.Unlock()
7175

76+
if txn {
77+
if num.Val == 0 && num.ReadOnly {
78+
// If we're only asking for a readonly timestamp, we can potentially
79+
// service it directly.
80+
if glog.V(3) {
81+
glog.Infof("Attempting to serve read only txn ts [%d, %d]",
82+
s.readOnlyTs, s.nextTxnTs)
83+
}
84+
if s.readOnlyTs > 0 && s.readOnlyTs == s.nextTxnTs-1 {
85+
return &intern.AssignedIds{ReadOnly: s.readOnlyTs}, servedFromMemory
86+
}
87+
}
88+
// We couldn't service it. So, let's request an extra timestamp for
89+
// readonly transactions, if needed.
90+
}
91+
92+
// If we're asking for more ids than the standard lease bandwidth, then we
93+
// should set howMany generously, so we can service future requests from
94+
// memory, without asking for another lease. Only used if we need to renew
95+
// our lease.
7296
howMany := leaseBandwidth
7397
if num.Val > leaseBandwidth {
7498
howMany = num.Val + leaseBandwidth
@@ -81,6 +105,8 @@ func (s *Server) lease(ctx context.Context, num *intern.Num, txn bool) (*api.Ass
81105
var maxLease, available uint64
82106
var proposal intern.ZeroProposal
83107

108+
// Calculate how many ids do we have available in memory, before we need to
109+
// renew our lease.
84110
if txn {
85111
maxLease = s.maxTxnTs()
86112
available = maxLease - s.nextTxnTs + 1
@@ -91,19 +117,27 @@ func (s *Server) lease(ctx context.Context, num *intern.Num, txn bool) (*api.Ass
91117
proposal.MaxLeaseId = maxLease + howMany
92118
}
93119

94-
if available < num.Val {
120+
// If we have less available than what we need, we need to renew our lease.
121+
if available < num.Val+1 { // +1 for a potential readonly ts.
95122
// Blocking propose to get more ids or timestamps.
96123
if err := s.Node.proposeAndWait(ctx, &proposal); err != nil {
97124
return nil, err
98125
}
99126
}
100127

101-
out := &api.AssignedIds{}
128+
out := &intern.AssignedIds{}
102129
if txn {
103-
out.StartId = s.nextTxnTs
104-
out.EndId = out.StartId + num.Val - 1
105-
s.nextTxnTs = out.EndId + 1
106-
s.orc.doneUntil.Begin(out.EndId)
130+
if num.Val > 0 {
131+
out.StartId = s.nextTxnTs
132+
out.EndId = out.StartId + num.Val - 1
133+
s.nextTxnTs = out.EndId + 1
134+
}
135+
if num.ReadOnly {
136+
s.readOnlyTs = s.nextTxnTs
137+
s.nextTxnTs++
138+
out.ReadOnly = s.readOnlyTs
139+
}
140+
s.orc.doneUntil.Begin(x.Max(out.EndId, out.ReadOnly))
107141
} else {
108142
out.StartId = s.nextLeaseId
109143
out.EndId = out.StartId + num.Val - 1
@@ -114,7 +148,7 @@ func (s *Server) lease(ctx context.Context, num *intern.Num, txn bool) (*api.Ass
114148

115149
// AssignUids is used to assign new uids by communicating with the leader of the RAFT group
116150
// responsible for handing out uids.
117-
func (s *Server) AssignUids(ctx context.Context, num *intern.Num) (*api.AssignedIds, error) {
151+
func (s *Server) AssignUids(ctx context.Context, num *intern.Num) (*intern.AssignedIds, error) {
118152
if ctx.Err() != nil {
119153
return &emptyAssignedIds, ctx.Err()
120154
}

0 commit comments

Comments
 (0)