diff --git a/dgraph/cmd/live/batch.go b/dgraph/cmd/live/batch.go index 10cf43cf648..bda05a5dfa1 100644 --- a/dgraph/cmd/live/batch.go +++ b/dgraph/cmd/live/batch.go @@ -37,6 +37,7 @@ import ( "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/dgraph/xidmap" + "github.com/dustin/go-humanize/english" ) var ( @@ -78,6 +79,7 @@ type loader struct { // To get time elapsed start time.Time + reqNum uint64 reqs chan api.Mutation zeroconn *grpc.ClientConn } @@ -125,34 +127,42 @@ type Counter struct { // server expects TLS and our certificate does not match or the host name is not verified. When // the node certificate is created the name much match the request host name. e.g., localhost not // 127.0.0.1. -func handleError(err error) { +func handleError(err error, reqNum uint64, isRetry bool) { s := status.Convert(err) switch { case s.Code() == codes.Internal, s.Code() == codes.Unavailable: x.Fatalf(s.Message()) case strings.Contains(s.Message(), "x509"): x.Fatalf(s.Message()) + case s.Code() == codes.Aborted: + if !isRetry { + fmt.Printf("Transaction #%d aborted. Will retry in background.\n", reqNum) + } case strings.Contains(s.Message(), "Server overloaded."): dur := time.Duration(1+rand.Intn(10)) * time.Minute - fmt.Printf("Server is overloaded. Will retry after %s.", dur.Round(time.Minute)) + fmt.Printf("Server is overloaded. Will retry after %s.\n", dur.Round(time.Minute)) time.Sleep(dur) - case err != y.ErrAborted && err != y.ErrConflict: + case err != y.ErrConflict: fmt.Printf("Error while mutating: %v\n", s.Message()) } } -func (l *loader) infinitelyRetry(req api.Mutation) { +func (l *loader) infinitelyRetry(req api.Mutation, reqNum uint64) { defer l.retryRequestsWg.Done() + nretries := 1 for i := time.Millisecond; ; i *= 2 { txn := l.dc.NewTxn() req.CommitNow = true _, err := txn.Mutate(l.opts.Ctx, &req) if err == nil { + fmt.Printf("Transaction #%d succeeded after %s.\n", + reqNum, english.Plural(nretries, "retry", "retries")) atomic.AddUint64(&l.nquads, uint64(len(req.Set))) atomic.AddUint64(&l.txns, 1) return } - handleError(err) + nretries++ + handleError(err, reqNum, true) atomic.AddUint64(&l.aborts, 1) if i >= 10*time.Second { i = 10 * time.Second @@ -161,7 +171,7 @@ func (l *loader) infinitelyRetry(req api.Mutation) { } } -func (l *loader) request(req api.Mutation) { +func (l *loader) request(req api.Mutation, reqNum uint64) { txn := l.dc.NewTxn() req.CommitNow = true _, err := txn.Mutate(l.opts.Ctx, &req) @@ -171,10 +181,10 @@ func (l *loader) request(req api.Mutation) { atomic.AddUint64(&l.txns, 1) return } - handleError(err) + handleError(err, reqNum, false) atomic.AddUint64(&l.aborts, 1) l.retryRequestsWg.Add(1) - go l.infinitelyRetry(req) + go l.infinitelyRetry(req, reqNum) } // makeRequests can receive requests from batchNquads or directly from BatchSetWithMark. @@ -183,7 +193,8 @@ func (l *loader) request(req api.Mutation) { func (l *loader) makeRequests() { defer l.requestsWg.Done() for req := range l.reqs { - l.request(req) + reqNum := atomic.AddUint64(&l.reqNum, 1) + l.request(req, reqNum) } }