Skip to content

[BREAKING] perf(Transactions): Run transactions concurrently #7694

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Apr 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3771106
Introduce the idea of concurrent application of mutations
manishrjain Apr 2, 2021
3eead8a
Do not wait for StartTs. Use a stale ts to execute mutation, and then…
manishrjain Apr 3, 2021
98b1ed1
More progress in making concurrent txns work
manishrjain Apr 7, 2021
9556918
Switch to showing data in Hex.
manishrjain Apr 7, 2021
028a8c8
Fix up the count index issue, which was due to ordering change of pro…
manishrjain Apr 8, 2021
96a7982
Fix the bug in filterStringFunction
manishrjain Apr 8, 2021
8f8d73e
Fix proposal context and fix build
ahsanbarkati Apr 8, 2021
d808f17
Tested with 21M. Self-review.
manishrjain Apr 9, 2021
a7afce2
Self review
manishrjain Apr 9, 2021
1cccc33
Self review
manishrjain Apr 9, 2021
b733634
Potential fix for txn with repeated mutations.
manishrjain Apr 9, 2021
a261e35
Get rid of ludicrous mode
manishrjain Apr 9, 2021
119fec9
Clean up the commit ts map.
manishrjain Apr 9, 2021
38ad510
Some fixups
manishrjain Apr 9, 2021
bada288
Fix overflow
manishrjain Apr 9, 2021
27badd2
Fix up index_test
manishrjain Apr 9, 2021
50abe3b
Merge branch 'master' into mrjn/conc-apply-ch
manishrjain Apr 9, 2021
7811af0
Merge branch 'master' into mrjn/conc-apply-ch
ahsanbarkati Apr 9, 2021
d54f53e
Bug Fix when a mutation runs the second time, but gets reset.
manishrjain Apr 9, 2021
6e3ef5b
Add comment
manishrjain Apr 9, 2021
4208bff
Refactor code to use getProposal more widely
manishrjain Apr 9, 2021
9072aa7
Update comment
manishrjain Apr 9, 2021
179e5c4
Fix bugs causing TestSnapshot to fail.
manishrjain Apr 9, 2021
4e12a26
When receiving a snapshot, ensure max assigned is set correctly.
manishrjain Apr 10, 2021
3872f56
self review
manishrjain Apr 10, 2021
6768ef1
Self review
manishrjain Apr 10, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions compose/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ type options struct {
MemLimit string
ExposePorts bool
Encryption bool
LudicrousMode bool
SnapshotAfter string
ContainerNames bool
AlphaVolumes []string
Expand Down Expand Up @@ -308,9 +307,7 @@ func getAlpha(idx int, raft string) service {
svc.Command += fmt.Sprintf(" --my=%s:%d", svc.name, internalPort)
svc.Command += fmt.Sprintf(" --zero=%s", zerosOpt)
svc.Command += fmt.Sprintf(" --logtostderr -v=%d", opts.Verbosity)
if opts.LudicrousMode {
svc.Command += ` --ludicrous "enabled=true;"`
}
svc.Command += " --expose_trace=true"

if opts.SnapshotAfter != "" {
raft = fmt.Sprintf("%s; %s", raft, opts.SnapshotAfter)
Expand Down Expand Up @@ -398,11 +395,14 @@ func getJaeger() service {
toPort(16686),
},
Environment: []string{
"SPAN_STORAGE_TYPE=badger",
"SPAN_STORAGE_TYPE=memory",
// "SPAN_STORAGE_TYPE=badger",
// Note: Badger doesn't quite work as well in Jaeger. The integration isn't well
// written.
},
Command: "--badger.ephemeral=false" +
" --badger.directory-key /working/jaeger" +
" --badger.directory-value /working/jaeger",
// Command: "--badger.ephemeral=false" +
// " --badger.directory-key /working/jaeger" +
// " --badger.directory-value /working/jaeger",
}
return svc
}
Expand Down Expand Up @@ -560,8 +560,8 @@ func main() {
"include jaeger service")
cmd.PersistentFlags().BoolVarP(&opts.Metrics, "metrics", "m", false,
"include metrics (prometheus, grafana) services")
cmd.PersistentFlags().IntVarP(&opts.PortOffset, "port_offset", "o", 100,
"port offset for alpha and, if not 100, zero as well")
cmd.PersistentFlags().IntVarP(&opts.PortOffset, "port_offset", "o", 0,
"port offset for alpha and zero")
cmd.PersistentFlags().IntVarP(&opts.Verbosity, "verbosity", "v", 2,
"glog verbosity level")
cmd.PersistentFlags().StringVarP(&opts.OutFile, "out", "O",
Expand All @@ -586,8 +586,6 @@ func main() {
"comma-separated list of pattern=N settings for file-filtered logging")
cmd.PersistentFlags().BoolVar(&opts.Encryption, "encryption", false,
"enable encryption-at-rest feature.")
cmd.PersistentFlags().BoolVar(&opts.LudicrousMode, "ludicrous", false,
"enable zeros and alphas in ludicrous mode.")
cmd.PersistentFlags().StringVar(&opts.SnapshotAfter, "snapshot_after", "",
"create a new Raft snapshot after this many number of Raft entries.")
cmd.PersistentFlags().StringVar(&opts.AlphaFlags, "extra_alpha_flags", "",
Expand Down
12 changes: 0 additions & 12 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,6 @@ they form a Raft group and provide synchronous replication.
" The liveness of a transaction is determined by its last mutation.").
String())

flag.String("ludicrous", worker.LudicrousDefaults, z.NewSuperFlagHelp(worker.LudicrousDefaults).
Head("Ludicrous options").
Flag("enabled",
"Set enabled to true to run Dgraph in Ludicrous mode.").
Flag("concurrency",
"The number of concurrent threads to use in Ludicrous mode.").
String())

flag.String("graphql", worker.GraphQLDefaults, z.NewSuperFlagHelp(worker.GraphQLDefaults).
Head("GraphQL options").
Flag("introspection",
Expand Down Expand Up @@ -681,8 +673,6 @@ func run() {
tlsServerConf, err := x.LoadServerTLSConfigForInternalPort(Alpha.Conf)
x.Check(err)

ludicrous := z.NewSuperFlag(Alpha.Conf.GetString("ludicrous")).MergeAndCheckDefault(
worker.LudicrousDefaults)
raft := z.NewSuperFlag(Alpha.Conf.GetString("raft")).MergeAndCheckDefault(worker.RaftDefaults)
x.WorkerConfig = x.WorkerOptions{
TmpDir: Alpha.Conf.GetString("tmp"),
Expand All @@ -694,8 +684,6 @@ func run() {
AclEnabled: aclKey != nil,
AbortOlderThan: abortDur,
StartTime: startTime,
Ludicrous: ludicrous,
LudicrousEnabled: ludicrous.GetBool("enabled"),
Security: security,
TLSClientConfig: tlsClientConf,
TLSServerConfig: tlsServerConf,
Expand Down
20 changes: 13 additions & 7 deletions dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type flagOptions struct {
readTs uint64
sizeHistogram bool
noKeys bool
namespace uint64
key x.Sensitive

// Options related to the WAL.
Expand All @@ -93,8 +94,9 @@ func init() {
"Ignore key_. Only consider amount when calculating total.")
flag.StringVar(&opt.jepsen, "jepsen", "", "Disect Jepsen output. Can be linear/binary.")
flag.Uint64Var(&opt.readTs, "at", math.MaxUint64, "Set read timestamp for all txns.")
flag.BoolVarP(&opt.readOnly, "readonly", "o", true, "Open in read only mode.")
flag.BoolVarP(&opt.readOnly, "readonly", "o", false, "Open in read only mode.")
flag.StringVarP(&opt.predicate, "pred", "r", "", "Only output specified predicate.")
flag.Uint64VarP(&opt.namespace, "ns", "", 0, "Which namespace to use.")
flag.StringVarP(&opt.prefix, "prefix", "", "", "Uses a hex prefix.")
flag.StringVarP(&opt.keyLookup, "lookup", "l", "", "Hex of key to lookup.")
flag.StringVar(&opt.rollupKey, "rollup", "", "Hex of key to rollup.")
Expand Down Expand Up @@ -311,7 +313,7 @@ func showAllPostingsAt(db *badger.DB, readTs uint64) {
}
}
for uid, acc := range keys {
fmt.Fprintf(&buf, "Uid: %d %x Key: %d Amount: %d\n", uid, uid, acc.Key, acc.Amt)
fmt.Fprintf(&buf, "Uid: %#x Key: %d Amount: %d\n", uid, acc.Key, acc.Amt)
}
fmt.Println(buf.String())
}
Expand Down Expand Up @@ -424,7 +426,7 @@ func history(lookup []byte, itr *badger.Iterator) {
break
}
for _, uid := range uids[:num] {
fmt.Fprintf(&buf, " Uid = %d\n", uid)
fmt.Fprintf(&buf, " Uid = %#x\n", uid)
}
}
}
Expand All @@ -434,7 +436,7 @@ func history(lookup []byte, itr *badger.Iterator) {
}

func appendPosting(w io.Writer, o *pb.Posting) {
fmt.Fprintf(w, " Uid: %d Op: %d ", o.Uid, o.Op)
fmt.Fprintf(w, " Uid: %#x Op: %d ", o.Uid, o.Op)

if len(o.Value) > 0 {
fmt.Fprintf(w, " Type: %v. ", o.ValType)
Expand Down Expand Up @@ -545,7 +547,8 @@ func lookup(db *badger.DB) {
func printKeys(db *badger.DB) {
var prefix []byte
if len(opt.predicate) > 0 {
prefix = x.PredicatePrefix(opt.predicate)
pred := x.NamespaceAttr(opt.namespace, opt.predicate)
prefix = x.PredicatePrefix(pred)
} else if len(opt.prefix) > 0 {
p, err := hex.DecodeString(opt.prefix)
x.Check(err)
Expand Down Expand Up @@ -583,11 +586,14 @@ func printKeys(db *badger.DB) {
if len(pk.Term) > 0 {
fmt.Fprintf(&buf, " term: [%d] %s ", pk.Term[0], pk.Term[1:])
}
if pk.Count > 0 {
fmt.Fprintf(&buf, " count: %d ", pk.Count)
}
if pk.Uid > 0 {
fmt.Fprintf(&buf, " uid: %d ", pk.Uid)
fmt.Fprintf(&buf, " uid: %#x ", pk.Uid)
}
if pk.StartUid > 0 {
fmt.Fprintf(&buf, " startUid: %d ", pk.StartUid)
fmt.Fprintf(&buf, " startUid: %#x ", pk.StartUid)
}

if opt.itemMeta {
Expand Down
16 changes: 9 additions & 7 deletions dgraph/cmd/live/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/status"

"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v3/y"
"github.com/dgraph-io/dgo/v210"
"github.com/dgraph-io/dgo/v210/protos/api"
"github.com/dgraph-io/dgraph/gql"
Expand All @@ -41,6 +42,7 @@ import (
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/dgraph/xidmap"
"github.com/dgryski/go-farm"
"github.com/dustin/go-humanize"
"github.com/dustin/go-humanize/english"
)

Expand Down Expand Up @@ -255,7 +257,7 @@ func (l *loader) conflictKeysForNQuad(nq *api.NQuad) ([]uint64, error) {
pred, found := l.schema.preds[attr]

// We dont' need to generate conflict keys for predicate with noconflict directive.
if found && pred.NoConflict || opt.ludicrousMode {
if found && pred.NoConflict {
return nil, nil
}

Expand Down Expand Up @@ -421,15 +423,15 @@ func (l *loader) printCounters() {
l.ticker = time.NewTicker(period)
start := time.Now()

var last Counter
r := y.NewRateMonitor(6) // Last 30 seconds of samples.
for range l.ticker.C {
counter := l.Counter()
rate := float64(counter.Nquads-last.Nquads) / period.Seconds()
c := l.Counter()
r.Capture(c.Nquads)
elapsed := time.Since(start).Round(time.Second)
timestamp := time.Now().Format("15:04:05Z0700")
fmt.Printf("[%s] Elapsed: %s Txns: %d N-Quads: %d N-Quads/s [last 5s]: %5.0f Aborts: %d\n",
timestamp, x.FixedDuration(elapsed), counter.TxnsDone, counter.Nquads, rate, counter.Aborts)
last = counter
fmt.Printf("[%s] Elapsed: %s Txns: %d N-Quads: %s N-Quads/s: %s Aborts: %d\n",
timestamp, x.FixedDuration(elapsed), c.TxnsDone,
humanize.Comma(int64(c.Nquads)), humanize.Comma(int64(r.Rate())), c.Aborts)
}
}

Expand Down
4 changes: 0 additions & 4 deletions dgraph/cmd/live/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ type options struct {
verbose bool
httpAddr string
bufferSize int
ludicrousMode bool
upsertPredicate string
tmpDir string
key x.Sensitive
Expand Down Expand Up @@ -181,8 +180,6 @@ func init() {
Sample flag could look like --creds user=username;password=mypass;namespace=2`)

flag.StringP("bufferSize", "m", "100", "Buffer for each thread")
flag.Bool("ludicrous", false, "Run live loader in ludicrous mode (Should "+
"only be done when alpha is under ludicrous mode)")
flag.StringP("upsertPredicate", "U", "", "run in upsertPredicate mode. the value would "+
"be used to store blank nodes as an xid")
flag.String("tmp", "t", "Directory to store temporary buffers.")
Expand Down Expand Up @@ -714,7 +711,6 @@ func run() error {
verbose: Live.Conf.GetBool("verbose"),
httpAddr: Live.Conf.GetString("http"),
bufferSize: Live.Conf.GetInt("bufferSize"),
ludicrousMode: Live.Conf.GetBool("ludicrous"),
upsertPredicate: Live.Conf.GetString("upsertPredicate"),
tmpDir: Live.Conf.GetString("tmp"),
}
Expand Down
23 changes: 3 additions & 20 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,16 +597,6 @@ func (s *Server) doMutate(ctx context.Context, qc *queryContext, resp *api.Respo
resp.Txn, err = query.ApplyMutations(ctx, m)
qc.span.Annotatef(nil, "Txn Context: %+v. Err=%v", resp.Txn, err)

if x.WorkerConfig.LudicrousEnabled {
// Mutations are automatically committed in case of ludicrous mode, so we don't
// need to manually commit.
if resp.Txn == nil {
return errors.Wrapf(err, "Txn Context is nil")
}
resp.Txn.Keys = resp.Txn.Keys[:0]
resp.Txn.CommitTs = qc.req.StartTs
return err
}
// calculateMutationMetrics calculate cost for the mutation.
calculateMutationMetrics := func() {
cost := uint64(len(newUids) + len(edges))
Expand Down Expand Up @@ -1244,13 +1234,9 @@ func (s *Server) doQuery(ctx context.Context, req *Request) (
defer annotateStartTs(qc.span, qc.req.StartTs)
// For mutations, we update the startTs if necessary.
if isMutation && req.req.StartTs == 0 {
if x.WorkerConfig.LudicrousEnabled {
req.req.StartTs = posting.Oracle().MaxAssigned()
} else {
start := time.Now()
req.req.StartTs = worker.State.GetTimestamp(false)
qc.latency.AssignTimestamp = time.Since(start)
}
start := time.Now()
req.req.StartTs = worker.State.GetTimestamp(false)
qc.latency.AssignTimestamp = time.Since(start)
}

var gqlErrs error
Expand Down Expand Up @@ -1310,9 +1296,6 @@ func processQuery(ctx context.Context, qc *queryContext) (*api.Response, error)
if ctx.Err() != nil {
return resp, ctx.Err()
}
if x.WorkerConfig.LudicrousEnabled {
qc.req.StartTs = posting.Oracle().MaxAssigned()
}
qr := query.Request{
Latency: qc.latency,
GqlQuery: &qc.gqlRes,
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.12

// replace github.com/dgraph-io/badger/v3 => /home/mrjn/go/src/github.com/dgraph-io/badger
// replace github.com/dgraph-io/ristretto => /home/mrjn/go/src/github.com/dgraph-io/ristretto

// replace github.com/dgraph-io/roaring => /home/mrjn/go/src/github.com/dgraph-io/roaring

require (
Expand All @@ -23,7 +24,7 @@ require (
github.com/dgraph-io/gqlgen v0.13.2
github.com/dgraph-io/gqlparser/v2 v2.2.0
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed
github.com/dgraph-io/ristretto v0.0.4-0.20210310100713-a4346e5d1f90
github.com/dgraph-io/ristretto v0.0.4-0.20210407062338-62d2e1706f55
github.com/dgraph-io/roaring v0.5.6-0.20210227175938-766b897233a5
github.com/dgraph-io/simdjson-go v0.3.0
github.com/dgrijalva/jwt-go v3.2.0+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ github.com/dgraph-io/gqlparser/v2 v2.2.0/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed h1:pgGMBoTtFhR+xkyzINaToLYRurHn+6pxMYffIGmmEPc=
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed/go.mod h1:7z3c/5w0sMYYZF5bHsrh8IH4fKwG5O5Y70cPH1ZLLRQ=
github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a/go.mod h1:MIonLggsKgZLUSt414ExgwNtlOL5MuEoAJP514mwGe8=
github.com/dgraph-io/ristretto v0.0.4-0.20210310100713-a4346e5d1f90 h1:arWVlUO9NhZ/2vWprIqpe825GISUPpgJhU/b0ep3j/M=
github.com/dgraph-io/ristretto v0.0.4-0.20210310100713-a4346e5d1f90/go.mod h1:MIonLggsKgZLUSt414ExgwNtlOL5MuEoAJP514mwGe8=
github.com/dgraph-io/ristretto v0.0.4-0.20210407062338-62d2e1706f55 h1:CO2ExPUrWQ01M1zDN1Kfk0cEOowEYiqfbb+lKToPyNo=
github.com/dgraph-io/ristretto v0.0.4-0.20210407062338-62d2e1706f55/go.mod h1:MIonLggsKgZLUSt414ExgwNtlOL5MuEoAJP514mwGe8=
github.com/dgraph-io/roaring v0.5.6-0.20210227175938-766b897233a5 h1:9t3OKcvsQlxU9Cu0U55tgvNtaRYVGDr6rUb95P8cSbg=
github.com/dgraph-io/roaring v0.5.6-0.20210227175938-766b897233a5/go.mod h1:I8kxPBtSQW3OdQFWonumQdCx2DTmq2WjdnTjGXz3uTM=
github.com/dgraph-io/simdjson-go v0.3.0 h1:h71LO7vR4LHMPUhuoGN8bqGm1VNfGOlAG8BI6iDUKw0=
Expand Down
24 changes: 3 additions & 21 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

"github.com/golang/glog"
"github.com/pkg/errors"
ostats "go.opencensus.io/stats"
otrace "go.opencensus.io/trace"

"github.com/dgraph-io/badger/v3"
Expand Down Expand Up @@ -124,11 +123,7 @@ func (txn *Txn) addIndexMutation(ctx context.Context, edge *pb.DirectedEdge, tok
}

x.AssertTrue(plist != nil)
if err = plist.addMutation(ctx, txn, edge); err != nil {
return err
}
ostats.Record(ctx, x.NumEdges.M(1))
return nil
return plist.addMutation(ctx, txn, edge)
}

// countParams is sent to updateCount function. It is used to update the count index.
Expand Down Expand Up @@ -187,12 +182,7 @@ func (txn *Txn) addReverseMutation(ctx context.Context, t *pb.DirectedEdge) erro
Op: t.Op,
Facets: t.Facets,
}
if err := plist.addMutation(ctx, txn, edge); err != nil {
return err
}

ostats.Record(ctx, x.NumEdges.M(1))
return nil
return plist.addMutation(ctx, txn, edge)
}

func (txn *Txn) addReverseAndCountMutation(ctx context.Context, t *pb.DirectedEdge) error {
Expand Down Expand Up @@ -261,14 +251,11 @@ func (txn *Txn) addReverseAndCountMutation(ctx context.Context, t *pb.DirectedEd
if err != nil {
return err
}
ostats.Record(ctx, x.NumEdges.M(1))

if hasCountIndex && cp.countAfter != cp.countBefore {
if err := txn.updateCount(ctx, cp); err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -334,11 +321,7 @@ func (txn *Txn) addCountMutation(ctx context.Context, t *pb.DirectedEdge, count

x.AssertTruef(plist != nil, "plist is nil [%s] %d",
t.Attr, t.ValueId)
if err = plist.addMutation(ctx, txn, t); err != nil {
return err
}
ostats.Record(ctx, x.NumEdges.M(1))
return nil
return plist.addMutation(ctx, txn, t)
}

func (txn *Txn) updateCount(ctx context.Context, params countParams) error {
Expand Down Expand Up @@ -485,7 +468,6 @@ func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge,
if err != nil {
return err
}
ostats.Record(ctx, x.NumEdges.M(1))
if hasCountIndex && cp.countAfter != cp.countBefore {
if err := txn.updateCount(ctx, cp); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion posting/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func addMutation(t *testing.T, l *List, edge *pb.DirectedEdge, op uint32,
default:
x.Fatalf("Unhandled op: %v", op)
}
txn := Oracle().RegisterStartTs(startTs)
txn, _ := Oracle().RegisterStartTs(startTs)
txn.cache.SetIfAbsent(string(l.key), l)
if index {
require.NoError(t, l.AddMutationWithIndex(context.Background(), edge, txn))
Expand Down
5 changes: 0 additions & 5 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,11 +452,6 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed
hex.EncodeToString(l.key), mpost)
}

if x.WorkerConfig.LudicrousEnabled {
// Conflict detection is not required for ludicrous mode.
return nil
}

// We ensure that commit marks are applied to posting lists in the right
// order. We can do so by proposing them in the same order as received by the Oracle delta
// stream from Zero, instead of in goroutines.
Expand Down
Loading