Skip to content

Commit fadfe05

Browse files
author
Pawan Rawal
committed
Aggregate uids stored in a variable over levels for recurse query.
1 parent 65c285c commit fadfe05

File tree

3 files changed

+127
-68
lines changed

3 files changed

+127
-68
lines changed

query/query.go

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1423,6 +1423,8 @@ func (sg *SubGraph) populateUidValVar(doneVars map[string]varValue, sgPath []*Su
14231423
return nil
14241424
}
14251425

1426+
var v varValue
1427+
var ok bool
14261428
if sg.Attr == "_predicate_" {
14271429
// This is a predicates list.
14281430
doneVars[sg.Params.Var] = varValue{
@@ -1444,10 +1446,22 @@ func (sg *SubGraph) populateUidValVar(doneVars map[string]varValue, sgPath []*Su
14441446
}
14451447
} else if len(sg.DestUIDs.Uids) != 0 {
14461448
// This implies it is a entity variable.
1447-
doneVars[sg.Params.Var] = varValue{
1448-
Uids: sg.DestUIDs,
1449-
path: sgPath,
1449+
if v, ok = doneVars[sg.Params.Var]; !ok {
1450+
doneVars[sg.Params.Var] = varValue{
1451+
Uids: sg.DestUIDs,
1452+
path: sgPath,
1453+
}
1454+
return nil
14501455
}
1456+
// For a recurse query this can happen. We don't allow using the same variable more than
1457+
// once otherwise.
1458+
uids := v.Uids
1459+
lists := make([]*protos.List, 0, 2)
1460+
lists = append(lists, uids, sg.DestUIDs)
1461+
v.Uids = algo.MergeSorted(lists)
1462+
doneVars[sg.Params.Var] = v
1463+
1464+
// This implies it is a value variable.
14511465
} else if len(sg.valueMatrix) != 0 && sg.SrcUIDs != nil && len(sgPath) != 0 {
14521466
if sg.Attr == "_uid_" {
14531467
// Its still an entity variable if its _uid_.
@@ -1458,21 +1472,33 @@ func (sg *SubGraph) populateUidValVar(doneVars map[string]varValue, sgPath []*Su
14581472
return nil
14591473
}
14601474

1461-
// This implies it is a value variable.
1462-
// NOTE: Value variables cannot be defined and used in the same query block. so
1463-
// checking len(sgPath) is okay.
1475+
// We reach here, if a uid variable was already set and now a uid edge has no dest uids.
1476+
// We don't want to set the variable as empty in this case.
1477+
if _, ok := doneVars[sg.Params.Var]; ok {
1478+
return nil
1479+
}
1480+
14641481
doneVars[sg.Params.Var] = varValue{
14651482
Vals: make(map[uint64]types.Val),
14661483
path: sgPath,
14671484
}
14681485
for idx, uid := range sg.SrcUIDs.Uids {
1486+
if len(sg.valueMatrix[idx].Values) > 1 {
1487+
return x.Errorf("Value variables not supported for predicate with list type.")
1488+
}
1489+
14691490
val, err := convertWithBestEffort(sg.valueMatrix[idx].Values[0], sg.Attr)
14701491
if err != nil {
14711492
continue
14721493
}
14731494
doneVars[sg.Params.Var].Vals[uid] = val
14741495
}
14751496
} else {
1497+
// If the variable already existed and now we see it again without any DestUIDs or
1498+
// ValueMatrix then lets just return.
1499+
if _, ok := doneVars[sg.Params.Var]; ok {
1500+
return nil
1501+
}
14761502
// Insert a empty entry to keep the dependency happy.
14771503
doneVars[sg.Params.Var] = varValue{
14781504
path: sgPath,
@@ -1481,6 +1507,7 @@ func (sg *SubGraph) populateUidValVar(doneVars map[string]varValue, sgPath []*Su
14811507
}
14821508
return nil
14831509
}
1510+
14841511
func (sg *SubGraph) populateFacetVars(doneVars map[string]varValue, sgPath []*SubGraph) error {
14851512
if sg.Params.FacetVar != nil && sg.Params.Facet != nil {
14861513
sgPath = append(sgPath, sg)

query/query_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1825,6 +1825,48 @@ func TestRecurseQueryLimitDepth2(t *testing.T) {
18251825
`{"data": {"recurse":[{"_uid_":"0x1","friend":[{"_uid_":"0x17","name":"Rick Grimes"},{"_uid_":"0x18","name":"Glenn Rhee"},{"_uid_":"0x19","name":"Daryl Dixon"},{"_uid_":"0x1f","name":"Andrea"},{"_uid_":"0x65"}],"name":"Michonne"}]}}`, js)
18261826
}
18271827

1828+
func TestRecurseVariable(t *testing.T) {
1829+
populateGraph(t)
1830+
query := `
1831+
{
1832+
recurse(func: uid(0x01)) {
1833+
a as friend
1834+
}
1835+
1836+
me(func: uid(a)) {
1837+
name
1838+
}
1839+
}
1840+
`
1841+
1842+
js := processToFastJSON(t, query)
1843+
require.Equal(t, `{"data": {"me":[{"name":"Michonne"},{"name":"Rick Grimes"},{"name":"Glenn Rhee"},{"name":"Daryl Dixon"},{"name":"Andrea"}]}}`, js)
1844+
}
1845+
1846+
func TestRecurseVariable2(t *testing.T) {
1847+
populateGraph(t)
1848+
1849+
query := `
1850+
{
1851+
1852+
recurse(func: uid(0x1)) {
1853+
f2 as friend
1854+
f as follow
1855+
}
1856+
1857+
me(func: uid(f)) {
1858+
name
1859+
}
1860+
1861+
me2(func: uid(f2)) {
1862+
name
1863+
}
1864+
}
1865+
`
1866+
js := processToFastJSON(t, query)
1867+
require.Equal(t, `{"data": {"me":[{"name":"Glenn Rhee"},{"name":"Andrea"},{"name":"Alice"},{"name":"Bob"},{"name":"Matt"},{"name":"John"}],"me2":[{"name":"Michonne"},{"name":"Rick Grimes"},{"name":"Glenn Rhee"},{"name":"Daryl Dixon"},{"name":"Andrea"}]}}`, js)
1868+
}
1869+
18281870
func TestShortestPath_ExpandError(t *testing.T) {
18291871
populateGraph(t)
18301872
query := `
@@ -9472,3 +9514,22 @@ func TestUidVariable(t *testing.T) {
94729514
js := processToFastJSON(t, query)
94739515
require.Equal(t, `{"data": {"me":[{"name":"Rick Grimes"},{"name":"Glenn Rhee"},{"name":"Daryl Dixon"},{"name":"Andrea"}]}}`, js)
94749516
}
9517+
9518+
func TestMultipleValueVarError(t *testing.T) {
9519+
populateGraph(t)
9520+
9521+
query := `{
9522+
var(func:ge(graduation, "1930")) {
9523+
o as graduation
9524+
}
9525+
9526+
me(func: uid(o)) {
9527+
graduation
9528+
}
9529+
}`
9530+
9531+
ctx := defaultContext()
9532+
_, err := processToFastJsonReqCtx(t, query, ctx)
9533+
require.Error(t, err)
9534+
require.Contains(t, err.Error(), "Value variables not supported for predicate with list type.")
9535+
}

query/recurse.go

Lines changed: 33 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import (
2828
"github.com/dgraph-io/dgraph/x"
2929
)
3030

31-
func (start *SubGraph) expandRecurse(ctx context.Context, next chan bool, rch chan error) {
31+
func (start *SubGraph) expandRecurse(ctx context.Context, maxDepth uint64) error {
3232
// Note: Key format is - "attr|fromUID|toUID"
3333
reachMap := make(map[string]struct{})
3434
var numEdges int
@@ -48,56 +48,63 @@ func (start *SubGraph) expandRecurse(ctx context.Context, next chan bool, rch ch
4848
if tr, ok := trace.FromContext(ctx); ok {
4949
tr.LazyPrintf("Error while processing child task: %+v", err)
5050
}
51-
rch <- err
52-
return
51+
return err
5352
}
5453
case <-ctx.Done():
5554
if tr, ok := trace.FromContext(ctx); ok {
5655
tr.LazyPrintf("Context done before full execution: %+v", ctx.Err())
5756
}
58-
rch <- ctx.Err()
59-
return
57+
return ctx.Err()
6058
}
6159

6260
for _, child := range startChildren {
6361
temp := new(SubGraph)
6462
temp.copyFiltersRecurse(child)
6563
temp.SrcUIDs = start.DestUIDs
64+
temp.Params.Var = child.Params.Var
6665
exec = append(exec, temp)
6766
start.Children = append(start.Children, temp)
6867
}
6968

7069
dummy := &SubGraph{}
70+
var depth uint64
7171
for {
72-
isNext := <-next
73-
if !isNext {
74-
return
72+
if depth >= maxDepth {
73+
return nil
7574
}
75+
depth++
7676

7777
rrch := make(chan error, len(exec))
7878
for _, sg := range exec {
7979
go ProcessGraph(ctx, sg, dummy, rrch)
8080
}
8181

82+
var recurseErr error
8283
for range exec {
8384
select {
8485
case err = <-rrch:
8586
if err != nil {
8687
if tr, ok := trace.FromContext(ctx); ok {
8788
tr.LazyPrintf("Error while processing child task: %+v", err)
8889
}
89-
rch <- err
90-
return
90+
if recurseErr == nil {
91+
recurseErr = err
92+
}
9193
}
9294
case <-ctx.Done():
9395
if tr, ok := trace.FromContext(ctx); ok {
9496
tr.LazyPrintf("Context done before full execution: %+v", ctx.Err())
9597
}
96-
rch <- ctx.Err()
97-
return
98+
if recurseErr == nil {
99+
recurseErr = ctx.Err()
100+
}
98101
}
99102
}
100103

104+
if recurseErr != nil {
105+
return recurseErr
106+
}
107+
101108
for _, sg := range exec {
102109
if len(sg.Filters) > 0 {
103110
// We need to do this in case we had some filters.
@@ -107,8 +114,15 @@ func (start *SubGraph) expandRecurse(ctx context.Context, next chan bool, rch ch
107114
// This is for avoiding loops in graph.
108115
algo.ApplyFilter(sg.uidMatrix[mIdx], func(uid uint64, i int) bool {
109116
key := fmt.Sprintf("%s|%d|%d", sg.Attr, fromUID, uid)
110-
_, ok := reachMap[key] // Combine fromUID here.
111-
return !ok
117+
_, seen := reachMap[key] // Combine fromUID here.
118+
if seen {
119+
return false
120+
} else {
121+
// Mark this edge as taken. We'd disallow this edge later.
122+
reachMap[key] = struct{}{}
123+
numEdges++
124+
return true
125+
}
112126
})
113127
}
114128
if len(sg.Params.Order) > 0 {
@@ -129,77 +143,34 @@ func (start *SubGraph) expandRecurse(ctx context.Context, next chan bool, rch ch
129143
temp := new(SubGraph)
130144
temp.copyFiltersRecurse(child)
131145
temp.SrcUIDs = sg.DestUIDs
146+
temp.Params.Var = child.Params.Var
132147
sg.Children = append(sg.Children, temp)
133148
out = append(out, temp)
134149
}
135-
// Mark the reached nodes
136-
for mIdx, fromUID := range sg.SrcUIDs.Uids {
137-
for _, toUID := range sg.uidMatrix[mIdx].Uids {
138-
key := fmt.Sprintf("%s|%d|%d", sg.Attr, fromUID, toUID)
139-
// Mark this edge as taken. We'd disallow this edge later.
140-
reachMap[key] = struct{}{}
141-
numEdges++
142-
}
143-
}
144150
}
145151

146152
if numEdges > 1000000 {
147153
// If we've seen too many nodes, stop the query.
148-
rch <- ErrTooBig
149-
return
154+
return ErrTooBig
150155
}
151156

152157
if len(out) == 0 {
153-
rch <- ErrStop
154-
return
158+
return nil
155159
}
156-
// This marks the end of one level of exectution.
157-
rch <- nil
158160
exec = out
159161
}
160162
}
161163

162164
func Recurse(ctx context.Context, sg *SubGraph) error {
163-
var err error
164165
if sg.Params.Alias != "recurse" {
165166
return x.Errorf("Invalid shortest path query")
166167
}
167-
expandErr := make(chan error, 2)
168-
next := make(chan bool, 2)
169-
go sg.expandRecurse(ctx, next, expandErr)
168+
170169
depth := sg.Params.ExploreDepth
171170
if depth == 0 {
172171
// If no depth is specified, expand till we reach all leaf nodes
173172
// or we see reach too many nodes.
174173
depth = math.MaxUint64
175174
}
176-
177-
L:
178-
// Recurse number of times specified by the user.
179-
for i := uint64(0); i < depth; i++ {
180-
next <- true
181-
select {
182-
case err = <-expandErr:
183-
if err != nil {
184-
if err == ErrTooBig {
185-
return err
186-
}
187-
if err == ErrStop {
188-
break L
189-
}
190-
if tr, ok := trace.FromContext(ctx); ok {
191-
tr.LazyPrintf("Error while processing child task: %+v", err)
192-
}
193-
return err
194-
}
195-
case <-ctx.Done():
196-
if tr, ok := trace.FromContext(ctx); ok {
197-
tr.LazyPrintf("Context done before full execution: %+v", ctx.Err())
198-
}
199-
return ctx.Err()
200-
}
201-
}
202-
// Done expanding.
203-
next <- false
204-
return nil
175+
return sg.expandRecurse(ctx, depth)
205176
}

0 commit comments

Comments
 (0)