Skip to content

Commit

Permalink
MB-42825 hangs on intersect scan
Browse files Browse the repository at this point in the history
Change-Id: I60ee368e83f7b0738ffdb3a3250a4fd042604a40
Reviewed-on: http://review.couchbase.org/c/query/+/141189
Reviewed-by: Sitaram Vemulapalli <sitaram.vemulapalli@couchbase.com>
Reviewed-by: Bingjie Miao <bingjie.miao@couchbase.com>
Well-Formed: Build Bot <build@couchbase.com>
Tested-by: Marco Greco <marco.greco@couchbase.com>
  • Loading branch information
Marco Greco committed Dec 1, 2020
1 parent c104453 commit 3c7ab4b
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 14 deletions.
34 changes: 28 additions & 6 deletions execution/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,17 @@ func (this *base) baseSendAction(action opAction) bool {
// PANICKED, COMPLETED and STOPPED have already sent a notifyStop
// DONE, ENDED and KILLED can no longer be operated upon
if this.stopped && !this.valueExchange.isWaiting() {
opState := this.opState
return opState == _RUNNING || opState == _STOPPING || opState == _PAUSED
switch this.opState {
case _PAUSED:
if action == _ACTION_PAUSE {
return true
}
// _ACTION_STOP has to take the slow route
case _RUNNING, _STOPPING:
return true
default:
return false
}
}

// STOPPED, COMPLETED, DONE, ENDED, KILLED have already sent signals or stopped operating
Expand Down Expand Up @@ -719,11 +728,24 @@ func (this *base) childrenWait(n int) bool {
}

// wait for at least n children to complete ignoring stop messages
func (this *base) childrenWaitNoStop(n int) {
func (this *base) childrenWaitNoStop(ops ...Operator) {
this.switchPhase(_CHANTIME)
for n > 0 {
this.ValueExchange().retrieveChildNoStop()
n--
for _, o := range ops {
b := o.getBase()
b.activeCond.L.Lock()
state := b.opState
stopped := b.stopped
b.activeCond.L.Unlock()
if stopped || state == _PAUSED || state == _KILLED || state == _PANICKED {
continue
} else {

// we are waiting after we've sent a stop but before we have terminated
// flag bad states
if assert(state != _CREATED && state != _DONE, "child has unexpected state") {
this.ValueExchange().retrieveChildNoStop()
}
}
}
this.switchPhase(_EXECTIME)
}
Expand Down
2 changes: 1 addition & 1 deletion execution/join_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ loop:

if n > 0 {
notifyChildren(buildOp)
base.childrenWaitNoStop(n)
base.childrenWaitNoStop(buildOp)
}

if stopped {
Expand Down
2 changes: 1 addition & 1 deletion execution/join_nl.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ loop:
if stopped || !ok {
if n > 0 {
this.child.SendAction(_ACTION_STOP)
this.childrenWaitNoStop(n)
this.childrenWaitNoStop(this.child)
}

return false
Expand Down
2 changes: 1 addition & 1 deletion execution/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (this *Merge) RunOnce(context *Context, parent value.Value) {
}

// Wait for all children
this.childrenWaitNoStop(len(this.children))
this.childrenWaitNoStop(this.children...)
})
}

Expand Down
2 changes: 1 addition & 1 deletion execution/nest_nl.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ loop:
if stopped || !ok {
if n > 0 {
this.child.SendAction(_ACTION_STOP)
this.childrenWaitNoStop(n)
this.childrenWaitNoStop(this.child)
}

return false
Expand Down
2 changes: 1 addition & 1 deletion execution/scan_distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (this *DistinctScan) RunOnce(context *Context, parent value.Value) {
// Await child scan
if n > 0 {
sendChildren(this.plan, this.scan)
this.childrenWaitNoStop(n)
this.childrenWaitNoStop(this.scan)
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion execution/scan_intersect.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (this *IntersectScan) RunOnce(context *Context, parent value.Value) {
sendChildren(this.plan, this.scans...)
}
if n > 0 {
this.childrenWaitNoStop(n)
this.childrenWaitNoStop(this.scans...)
this.channel.close(context)
}
break loop
Expand Down
2 changes: 1 addition & 1 deletion execution/scan_intersect_ord.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (this *OrderedIntersectScan) RunOnce(context *Context, parent value.Value)
}
if n > 0 {
sendChildren(this.plan, this.scans[0])
this.childrenWaitNoStop(n)
this.childrenWaitNoStop(this.scans...)
this.channel.close(context)
}
break loop
Expand Down
2 changes: 1 addition & 1 deletion execution/scan_union.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (this *UnionScan) RunOnce(context *Context, parent value.Value) {
// stop children, wait and clean up
if n > 0 {
sendChildren(this.plan, this.scans...)
this.childrenWaitNoStop(n)
this.childrenWaitNoStop(this.scans...)
this.channel.close(context)
}
break loop
Expand Down

0 comments on commit 3c7ab4b

Please sign in to comment.