diff --git a/edgraph/server.go b/edgraph/server.go index fecdf322d60..aa334999608 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -978,10 +978,14 @@ func (s *Server) doQuery(ctx context.Context, req *api.Request, doAuth AuthMode) // assigned in the processQuery function called below. defer annotateStartTs(qc.span, qc.req.StartTs) // For mutations, we update the startTs if necessary. - if isMutation && req.StartTs == 0 && !x.WorkerConfig.LudicrousMode { - start := time.Now() - req.StartTs = worker.State.GetTimestamp(false) - qc.latency.AssignTimestamp = time.Since(start) + if isMutation && req.StartTs == 0 { + if x.WorkerConfig.LudicrousMode { + req.StartTs = posting.Oracle().MaxAssigned() + } else { + start := time.Now() + req.StartTs = worker.State.GetTimestamp(false) + qc.latency.AssignTimestamp = time.Since(start) + } } if resp, rerr = processQuery(ctx, qc); rerr != nil { diff --git a/query/mutation.go b/query/mutation.go index d110a646304..76cbef53855 100644 --- a/query/mutation.go +++ b/query/mutation.go @@ -63,7 +63,6 @@ func expandEdges(ctx context.Context, m *pb.Mutations) ([]*pb.DirectedEdge, erro sg := &SubGraph{} sg.DestUIDs = &pb.List{Uids: []uint64{edge.GetEntity()}} sg.ReadTs = m.StartTs - types, err := getNodeTypes(ctx, sg) if err != nil { return nil, err diff --git a/systest/ludicrous/upsert_test.go b/systest/ludicrous/upsert_test.go index 129b695c883..a2681eea748 100644 --- a/systest/ludicrous/upsert_test.go +++ b/systest/ludicrous/upsert_test.go @@ -18,7 +18,6 @@ package main import ( "context" - "encoding/json" "fmt" "sync" "testing" @@ -30,19 +29,6 @@ import ( "github.com/stretchr/testify/require" ) -type Person struct { - Name string `json:"name,omitempty"` - count int -} - -type Data struct { - Name string `json:"name,omitempty"` - Counts []int `json:"count,omitempty"` -} -type ResponseData struct { - All []Data `json:"all,omitempty"` -} - func InitData(t *testing.T) { dg, err := testutil.DgraphClient(testutil.SockAddr) require.NoError(t, err) @@ -55,15 +41,11 @@ func InitData(t *testing.T) { err = dg.Alter(context.Background(), &api.Operation{Schema: schema}) require.NoError(t, err) - p := Person{ - Name: "Alice", - count: 1, - } - pb, err := json.Marshal(p) - require.NoError(t, err) - mu := &api.Mutation{ - SetJson: pb, + SetNquads: []byte(` + _:a "Alice" . + _:a "1" . + `), CommitNow: true, } txn := dg.NewTxn() @@ -89,7 +71,7 @@ func TestConcurrentUpdate(t *testing.T) { defer wg.Done() query := `query { user as var(func: eq(name, "Alice")) - }` + }` mu := &api.Mutation{ SetNquads: []byte(fmt.Sprintf(`uid(user) "%d" .`, i)), } @@ -110,7 +92,6 @@ func TestConcurrentUpdate(t *testing.T) { q := `query all($a: string) { all(func: eq(name, $a)) { - name count } }` @@ -118,11 +99,7 @@ func TestConcurrentUpdate(t *testing.T) { txn := dg.NewTxn() res, err := txn.QueryWithVars(ctx, q, map[string]string{"$a": "Alice"}) require.NoError(t, err) - var dat ResponseData - err = json.Unmarshal(res.Json, &dat) - require.NoError(t, err) - - require.Equal(t, 10, len(dat.All[0].Counts)) + require.JSONEq(t, `{"all":[{"count":[0,4,5,2,8,1,3,9,6,7]}]}`, string(res.GetJson())) } func TestSequentialUpdate(t *testing.T) { @@ -159,7 +136,6 @@ func TestSequentialUpdate(t *testing.T) { q := `query all($a: string) { all(func: eq(name, $a)) { - name count } }` @@ -167,9 +143,63 @@ func TestSequentialUpdate(t *testing.T) { txn := dg.NewTxn() res, err := txn.QueryWithVars(ctx, q, map[string]string{"$a": "Alice"}) require.NoError(t, err) - var dat ResponseData - err = json.Unmarshal(res.Json, &dat) + require.JSONEq(t, `{"all":[{"count":[0,4,5,2,8,1,3,9,6,7]}]}`, string(res.GetJson())) +} + +func TestDelete(t *testing.T) { + dg, err := testutil.DgraphClient(testutil.SockAddr) + require.NoError(t, err) + testutil.DropAll(t, dg) + schema := ` + name: string . + xid: string . + type MyType { + name + xid + } + ` + + err = dg.Alter(context.Background(), &api.Operation{Schema: schema}) + require.NoError(t, err) + + mu := &api.Mutation{ + SetNquads: []byte(` + _:n "MyType" . + _:n "Alice" . + _:n "10" . + `), + + CommitNow: true, + } + txn := dg.NewTxn() + ctx := context.Background() + defer txn.Discard(ctx) + + res, err := txn.Mutate(ctx, mu) require.NoError(t, err) + uid := res.Uids["n"] + + ctx = context.Background() + query := func() string { + q := `{ q(func: uid(` + uid + `)){ dgraph.type name xid } }` + res, err := dg.NewTxn().Query(ctx, q) + require.NoError(t, err) + return string(res.GetJson()) + } + time.Sleep(time.Second) + expected := `{"q":[{"dgraph.type":["MyType"],"name":"Alice","xid":"10"}]}` + require.JSONEq(t, expected, query()) + + mu = &api.Mutation{ + DelNquads: []byte(`<` + uid + `> * * .`), + CommitNow: true, + } + txn = dg.NewTxn() + ctx = context.Background() + defer txn.Discard(ctx) - require.Equal(t, 10, len(dat.All[0].Counts)) + _, err = txn.Mutate(ctx, mu) + require.NoError(t, err) + time.Sleep(time.Second) + require.JSONEq(t, `{"q":[]}`, query()) } diff --git a/worker/draft.go b/worker/draft.go index 1c8eaed077b..080481ab7fc 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -661,7 +661,8 @@ func (n *node) processApplyCh() { psz := proposal.Size() totalSize += int64(psz) - // In case of upserts the startTs would be > 0, so, no need to check startTs is 0 + // Ignore the start ts in case of ludicrous mode. We get a new ts and use that as the + // commit ts. if x.WorkerConfig.LudicrousMode && proposal.Mutations != nil { proposal.Mutations.StartTs = State.GetTimestamp(false) }