-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Closed
Milestone
Description
What version of Dgraph are you using?
v20.03.0
Have you tried reproducing the issue with the latest release?
Yes
What is the hardware spec (RAM, OS)?
Windows 10 Pro, 24GB ram
Steps to reproduce the issue (command/config used to run Dgraph).
Start Dgraph with dgraph zero and dgraph alpha --lru_mb=2048
Put the following code into a main.go file:
package main
import (
"bufio"
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"log"
"math/rand"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/dgraph-io/dgo/v200"
"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
var (
users = flag.Int("users", 2, "Number of accounts.")
conc = flag.Int("txns", 3, "Number of concurrent transactions per client.")
dur = flag.String("dur", "1m", "How long to run the transactions.")
alpha = flag.String("alpha", "localhost:9080", "Address of Dgraph alpha.")
verbose = flag.Bool("verbose", true, "Output all logs in verbose mode.")
login = flag.Bool("login", true, "Login as groot. Used for ACL-enabled cluster.")
)
var startBal = 10
type account struct {
UID string `json:"uid"`
Key int `json:"key,omitempty"`
Bal int `json:"bal,omitempty"`
Typ string `json:"typ"`
}
type state struct {
aborts int32
runs int32
}
// Check logs fatal if err != nil.
func Check(err error) {
if err != nil {
err = errors.Wrap(err, "")
log.Fatalf("%+v", err)
}
}
func (s *state) createAccounts(dg *dgo.Dgraph) {
op := api.Operation{DropAll: true}
Check(dg.Alter(context.Background(), &op))
op.DropAll = false
op.Schema = `
key: int @index(int) @upsert .
bal: int .
typ: string @index(exact) @upsert .
`
Check(dg.Alter(context.Background(), &op))
var all []account
for i := 1; i <= *users; i++ {
a := account{
Key: i,
Bal: startBal,
Typ: "ba",
}
all = append(all, a)
}
data, err := json.Marshal(all)
Check(err)
txn := dg.NewTxn()
defer func() {
if err := txn.Discard(context.Background()); err != nil {
log.Fatalf("Discarding transaction failed: %+v\n", err)
}
}()
var mu api.Mutation
mu.SetJson = data
if *verbose {
log.Printf("mutation: %s\n", mu.SetJson)
}
_, err = txn.Mutate(context.Background(), &mu)
Check(err)
Check(txn.Commit(context.Background()))
}
func (s *state) runTotal(dg *dgo.Dgraph) error {
query := `
{
q(func: eq(typ, "ba")) {
uid
key
bal
}
}
`
txn := dg.NewReadOnlyTxn()
defer func() {
if err := txn.Discard(context.Background()); err != nil {
log.Fatalf("Discarding transaction failed: %+v\n", err)
}
}()
resp, err := txn.Query(context.Background(), query)
if err != nil {
return err
}
m := make(map[string][]account)
if err := json.Unmarshal(resp.Json, &m); err != nil {
return err
}
accounts := m["q"]
sort.Slice(accounts, func(i, j int) bool {
return accounts[i].Key < accounts[j].Key
})
var total int
for _, a := range accounts {
total += a.Bal
}
if *verbose {
log.Printf("Read: %v. Total: %d\n", accounts, total)
}
if len(accounts) > *users {
log.Fatalf("len(accounts) = %d", len(accounts))
}
if total != *users*startBal {
log.Fatalf("Total = %d", total)
}
return nil
}
func (s *state) findAccount(txn *dgo.Txn, key int) (account, error) {
query := fmt.Sprintf(`{ q(func: eq(key, %d)) { key, uid, bal, typ }}`, key)
resp, err := txn.Query(context.Background(), query)
if err != nil {
return account{}, err
}
m := make(map[string][]account)
if err := json.Unmarshal(resp.Json, &m); err != nil {
log.Fatal(err)
}
accounts := m["q"]
if len(accounts) > 1 {
log.Printf("Query: %s. Response: %s\n", query, resp.Json)
log.Fatal("Found multiple accounts")
}
if len(accounts) == 0 {
if *verbose {
log.Printf("Unable to find account for K_%02d. JSON: %s\n", key, resp.Json)
}
return account{Key: key, Typ: "ba"}, nil
}
return accounts[0], nil
}
func (s *state) runTransaction(dg *dgo.Dgraph, buf *bytes.Buffer) error {
w := bufio.NewWriter(buf)
fmt.Fprintf(w, "==>\n")
defer func() {
fmt.Fprintf(w, "---\n")
_ = w.Flush()
}()
ctx := context.Background()
txn := dg.NewTxn()
defer func() {
if err := txn.Discard(context.Background()); err != nil {
log.Fatalf("Discarding transaction failed: %+v\n", err)
}
}()
var sk, sd int
for {
sk = rand.Intn(*users + 1)
sd = rand.Intn(*users + 1)
if sk == 0 || sd == 0 { // Don't touch zero.
continue
}
if sk != sd {
break
}
}
src, err := s.findAccount(txn, sk)
if err != nil {
return err
}
dst, err := s.findAccount(txn, sd)
if err != nil {
return err
}
if src.Key == dst.Key {
return nil
}
amount := rand.Intn(10)
if src.Bal-amount <= 0 {
amount = src.Bal
}
fmt.Fprintf(w, "Moving [$%d, K_%02d -> K_%02d]. Src:%+v. Dst: %+v\n",
amount, src.Key, dst.Key, src, dst)
src.Bal -= amount
dst.Bal += amount
var mu api.Mutation
if len(src.UID) > 0 {
// If there was no src.UID, then don't run any mutation.
if src.Bal == 0 {
pb, err := json.Marshal(src)
Check(err)
mu.DeleteJson = pb
fmt.Fprintf(w, "Deleting K_%02d: %s\n", src.Key, mu.DeleteJson)
} else {
data, err := json.Marshal(src)
Check(err)
mu.SetJson = data
}
_, err := txn.Mutate(ctx, &mu)
if err != nil {
fmt.Fprintf(w, "Error while mutate: %v", err)
return err
}
}
mu = api.Mutation{}
data, err := json.Marshal(dst)
Check(err)
mu.SetJson = data
assigned, err := txn.Mutate(ctx, &mu)
if err != nil {
fmt.Fprintf(w, "Error while mutate: %v", err)
return err
}
if err := txn.Commit(ctx); err != nil {
return err
}
if len(assigned.GetUids()) > 0 {
fmt.Fprintf(w, "CREATED K_%02d: %+v for %+v\n", dst.Key, assigned.GetUids(), dst)
for _, uid := range assigned.GetUids() {
dst.UID = uid
}
}
fmt.Fprintf(w, "MOVED [$%d, K_%02d -> K_%02d]. Src:%+v. Dst: %+v\n",
amount, src.Key, dst.Key, src, dst)
return nil
}
func (s *state) loop(dg *dgo.Dgraph, wg *sync.WaitGroup) {
defer wg.Done()
dur, err := time.ParseDuration(*dur)
Check(err)
end := time.Now().Add(dur)
var buf bytes.Buffer
for i := 0; ; i++ {
if i%5 == 0 {
if err := s.runTotal(dg); err != nil {
log.Printf("Error while runTotal: %v", err)
}
continue
}
buf.Reset()
err := s.runTransaction(dg, &buf)
if *verbose {
log.Printf("Final error: %v. %s", err, buf.String())
}
if err != nil {
atomic.AddInt32(&s.aborts, 1)
} else {
r := atomic.AddInt32(&s.runs, 1)
if r%100 == 0 {
a := atomic.LoadInt32(&s.aborts)
fmt.Printf("Runs: %d. Aborts: %d\n", r, a)
}
if time.Now().After(end) {
return
}
}
}
}
func main() {
flag.Parse()
all := strings.Split(*alpha, ",")
var clients []*dgo.Dgraph
for _, one := range all {
conn, err := grpc.Dial(one, grpc.WithInsecure())
Check(err)
dc := api.NewDgraphClient(conn)
dg := dgo.NewDgraphClient(dc)
if *login {
// login as groot to perform the DropAll operation later
Check(dg.Login(context.Background(), "groot", "password"))
}
clients = append(clients, dg)
}
s := state{}
s.createAccounts(clients[0])
var wg sync.WaitGroup
for i := 0; i < *conc; i++ {
for _, dg := range clients {
wg.Add(1)
go s.loop(dg, &wg)
}
}
wg.Wait()
fmt.Println()
fmt.Println("Total aborts", s.aborts)
fmt.Println("Total success", s.runs)
if err := s.runTotal(clients[0]); err != nil {
log.Fatal(err)
}
}
Then in your command prompt run: go run main.go --dur=24h --login=false
Expected behaviour and actual result.
Wait a few seconds/minutes and you'll start seeing the following output:
E0403 21:24:18.995075 15588 draft.go:593] Applying proposal. Error: cannot retrieve posting for UID 18446744073709551615 from list with key 0000036b6579000000000000005a0d: readTs: 130315 less than minTs: 130316 for key: "\x00\x00\x03key\x00\x00\x00\x00\x00\x00\x00Z\r". Proposal: "mutations:<group_id:1 start_ts:130315 edges:<entity:23053 attr:\"bal\" value:\"\\013\\000\\000\\000\\000\\000\\000\\000\" value_type:INT value_id:18446744073709551615 > edges:<entity:23053 attr:\"key\" value:\"\\002\\000\\000\\000\\000\\000\\000\\000\" value_type:INT > edges:<entity:23053 attr:\"typ\" value:\"ba\" value_type:STRING > metadata:<pred_hints:<key:\"bal\" value:SINGLE > pred_hints:<key:\"key\" value:SINGLE > pred_hints:<key:\"typ\" value:SINGLE > > > key:\"01-6465654083867107638\" index:254307 ".
E0403 21:24:30.510410 15588 draft.go:593] Applying proposal. Error: cannot retrieve posting for UID 18446744073709551615 from list with key 0000036b6579000000000000005bea: readTs: 133351 less than minTs: 133352 for key: "\x00\x00\x03key\x00\x00\x00\x00\x00\x00\x00[\xea". Proposal: "mutations:<group_id:1 start_ts:133351 edges:<entity:23530 attr:\"bal\" value:\"\\022\\000\\000\\000\\000\\000\\000\\000\" value_type:INT value_id:18446744073709551615 > edges:<entity:23530 attr:\"key\" value:\"\\002\\000\\000\\000\\000\\000\\000\\000\" value_type:INT > edges:<entity:23530 attr:\"typ\" value:\"ba\" value_type:STRING > metadata:<pred_hints:<key:\"bal\" value:SINGLE > pred_hints:<key:\"key\" value:SINGLE > pred_hints:<key:\"typ\" value:SINGLE > > > key:\"01-9145524679231515354\" index:260213 ".
E0403 21:24:55.417702 15588 draft.go:593] Applying proposal. Error: cannot retrieve posting for UID 18446744073709551615 from list with key 000003747970000000000000005f64: readTs: 139080 less than minTs: 139081 for key: "\x00\x00\x03typ\x00\x00\x00\x00\x00\x00\x00_d". Proposal: "mutations:<group_id:1 start_ts:139080 edges:<entity:24420 attr:\"bal\" value:\"\\005\\000\\000\\000\\000\\000\\000\\000\" value_type:INT value_id:18446744073709551615 > edges:<entity:24420 attr:\"key\" value:\"\\002\\000\\000\\000\\000\\000\\000\\000\" value_type:INT value_id:18446744073709551615 > edges:<entity:24420 attr:\"typ\" value:\"ba\" value_type:STRING > metadata:<pred_hints:<key:\"bal\" value:SINGLE > pred_hints:<key:\"key\" value:SINGLE > pred_hints:<key:\"typ\" value:SINGLE > > > key:\"01-8411719662243846299\" index:271522 ".
E0403 21:25:03.015936 15588 draft.go:593] Applying proposal. Error: cannot retrieve posting for UID 18446744073709551615 from list with key 0000036b65790000000000000060a2: readTs: 141232 less than minTs: 141233 for key: "\x00\x00\x03key\x00\x00\x00\x00\x00\x00\x00`\xa2". Proposal: "mutations:<group_id:1 start_ts:141232 edges:<entity:24738 attr:\"bal\" value:\"\\024\\000\\000\\000\\000\\000\\000\\000\" value_type:INT value_id:18446744073709551615 > edges:<entity:24738 attr:\"key\" value:\"\\001\\000\\000\\000\\000\\000\\000\\000\" value_type:INT > edges:<entity:24738 attr:\"typ\" value:\"ba\" value_type:STRING > metadata:<pred_hints:<key:\"bal\" value:SINGLE > pred_hints:<key:\"key\" value:SINGLE > pred_hints:<key:\"typ\" value:SINGLE > > > key:\"01-10967020125233178942\" index:275745 ".
E0403 21:25:07.564530 15588 draft.go:593] Applying proposal. Error: cannot retrieve posting for UID 18446744073709551615 from list with key 0000036b657900000000000000614f: readTs: 142472 less than minTs: 142473 for key: "\x00\x00\x03key\x00\x00\x00\x00\x00\x00\x00aO". Proposal: "mutations:<group_id:1 start_ts:142472 edges:<entity:24911 attr:\"bal\" value:\"\\013\\000\\000\\000\\000\\000\\000\\000\" value_type:INT value_id:18446744073709551615 > edges:<entity:24911 attr:\"key\" value:\"\\001\\000\\000\\000\\000\\000\\000\\000\" value_type:INT > edges:<entity:24911 attr:\"typ\" value:\"ba\" value_type:STRING > metadata:<pred_hints:<key:\"bal\" value:SINGLE > pred_hints:<key:\"key\" value:SINGLE > pred_hints:<key:\"typ\" value:SINGLE > > > key:\"01-9730193241761714502\" index:278242 ".
E0403 21:25:09.204460 15588 draft.go:593] Applying proposal. Error: cannot retrieve posting for UID 18446744073709551615 from list with key 0000037479700000000000000061a0: readTs: 142953 less than minTs: 142954 for key: "\x00\x00\x03typ\x00\x00\x00\x00\x00\x00\x00a\xa0". Proposal: "mutations:<group_id:1 start_ts:142953 edges:<entity:24992 attr:\"bal\" value:\"\\010\\000\\000\\000\\000\\000\\000\\000\" value_type:INT value_id:18446744073709551615 > edges:<entity:24992 attr:\"key\" value:\"\\002\\000\\000\\000\\000\\000\\000\\000\" value_type:INT value_id:18446744073709551615 > edges:<entity:24992 attr:\"typ\" value:\"ba\" value_type:STRING > metadata:<pred_hints:<key:\"bal\" value:SINGLE > pred_hints:<key:\"key\" value:SINGLE > pred_hints:<key:\"typ\" value:SINGLE > > > key:\"01-1614230704698162120\" index:279173 ".
E0403 21:25:09.207459 15588 draft.go:593] Applying proposal. Error: cannot retrieve posting for UID 18446744073709551615 from list with key 0000036b65790000000000000061a0: readTs: 142952 less than minTs: 142954 for key: "\x00\x00\x03key\x00\x00\x00\x00\x00\x00\x00a\xa0". Proposal: "mutations:<group_id:1 start_ts:142952 edges:<entity:24992 attr:\"bal\" value:\"\\013\\000\\000\\000\\000\\000\\000\\000\" value_type:INT value_id:18446744073709551615 > edges:<entity:24992 attr:\"key\" value:\"\\002\\000\\000\\000\\000\\000\\000\\000\" value_type:INT > edges:<entity:24992 attr:\"typ\" value:\"ba\" value_type:STRING > metadata:<pred_hints:<key:\"bal\" value:SINGLE > pred_hints:<key:\"key\" value:SINGLE > pred_hints:<key:\"typ\" value:SINGLE > > > key:\"01-18082252721299465293\" index:279174 ".
E0403 21:25:12.017062 15588 draft.go:593] Applying proposal. Error: cannot retrieve posting for UID 18446744073709551615 from list with key 0000036b6579000000000000006218: readTs: 143795 less than minTs: 143796 for key: "\x00\x00\x03key\x00\x00\x00\x00\x00\x00\x00b\x18". Proposal: "mutations:<group_id:1 start_ts:143795 edges:<entity:25112 attr:\"bal\" value:\"\\016\\000\\000\\000\\000\\000\\000\\000\" value_type:INT value_id:18446744073709551615 > edges:<entity:25112 attr:\"key\" value:\"\\001\\000\\000\\000\\000\\000\\000\\000\" value_type:INT > edges:<entity:25112 attr:\"typ\" value:\"ba\" value_type:STRING > metadata:<pred_hints:<key:\"bal\" value:SINGLE > pred_hints:<key:\"key\" value:SINGLE > pred_hints:<key:\"typ\" value:SINGLE > > > key:\"01-1966592156063837312\" index:280799 ".
I0403 21:25:14.928982 15588 draft.go:523] Creating snapshot at index: 282287. ReadTs: 144551.
I0403 21:25:15.831924 15588 log.go:34] Got compaction priority: {level:0 score:1 dropPrefix:[]}
I0403 21:25:15.831924 15588 log.go:34] Running for level: 0
I0403 21:25:16.396598 15588 log.go:34] LOG Compact 0->1, del 6 tables, add 1 tables, took 564.6743ms
I0403 21:25:16.396598 15588 log.go:34] Compaction for level: 0 DONE
E0403 21:25:27.330277 15588 draft.go:593] Applying proposal. Error: cannot retrieve posting for UID 18446744073709551615 from list with key 0000036b6579000000000000006466: readTs: 148025 less than minTs: 148026 for key: "\x00\x00\x03key\x00\x00\x00\x00\x00\x00\x00df". Proposal: "mutations:<group_id:1 start_ts:148025 edges:<entity:25702 attr:\"bal\" value:\"\\024\\000\\000\\000\\000\\000\\000\\000\" value_type:INT value_id:18446744073709551615 > edges:<entity:25702 attr:\"key\" value:\"\\002\\000\\000\\000\\000\\000\\000\\000\" value_type:INT > edges:<entity:25702 attr:\"typ\" value:\"ba\" value_type:STRING > metadata:<pred_hints:<key:\"bal\" value:SINGLE > pred_hints:<key:\"key\" value:SINGLE > pred_hints:<key:\"typ\" value:SINGLE > > > key:\"01-15821457914345571877\" index:288991 ".
This bank code is almost line for line from the Dgraph bank example https://github.com/dgraph-io/dgraph/blob/master/contrib/integration/bank/main.go
Any ideas?
Metadata
Metadata
Assignees
Labels
No labels