Permalink
Cannot retrieve contributors at this time
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
616 lines (555 sloc)
14.4 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2009 The Go Authors. All rights reserved. | |
// Use of this source code is governed by a BSD-style | |
// license that can be found in the LICENSE file. | |
package runtime | |
// This file contains the implementation of Go select statements. | |
import ( | |
"runtime/internal/atomic" | |
"unsafe" | |
) | |
const debugSelect = false | |
// Select case descriptor. | |
// Known to compiler. | |
// Changes here must also be made in src/cmd/internal/gc/select.go's scasetype. | |
type scase struct { | |
c *hchan // chan | |
elem unsafe.Pointer // data element | |
} | |
var ( | |
chansendpc = funcPC(chansend) | |
chanrecvpc = funcPC(chanrecv) | |
) | |
func selectsetpc(pc *uintptr) { | |
*pc = getcallerpc() | |
} | |
func sellock(scases []scase, lockorder []uint16) { | |
var c *hchan | |
for _, o := range lockorder { | |
c0 := scases[o].c | |
if c0 != c { | |
c = c0 | |
lock(&c.lock) | |
} | |
} | |
} | |
func selunlock(scases []scase, lockorder []uint16) { | |
// We must be very careful here to not touch sel after we have unlocked | |
// the last lock, because sel can be freed right after the last unlock. | |
// Consider the following situation. | |
// First M calls runtime·park() in runtime·selectgo() passing the sel. | |
// Once runtime·park() has unlocked the last lock, another M makes | |
// the G that calls select runnable again and schedules it for execution. | |
// When the G runs on another M, it locks all the locks and frees sel. | |
// Now if the first M touches sel, it will access freed memory. | |
for i := len(lockorder) - 1; i >= 0; i-- { | |
c := scases[lockorder[i]].c | |
if i > 0 && c == scases[lockorder[i-1]].c { | |
continue // will unlock it on the next iteration | |
} | |
unlock(&c.lock) | |
} | |
} | |
func selparkcommit(gp *g, _ unsafe.Pointer) bool { | |
// There are unlocked sudogs that point into gp's stack. Stack | |
// copying must lock the channels of those sudogs. | |
// Set activeStackChans here instead of before we try parking | |
// because we could self-deadlock in stack growth on a | |
// channel lock. | |
gp.activeStackChans = true | |
// Mark that it's safe for stack shrinking to occur now, | |
// because any thread acquiring this G's stack for shrinking | |
// is guaranteed to observe activeStackChans after this store. | |
atomic.Store8(&gp.parkingOnChan, 0) | |
// Make sure we unlock after setting activeStackChans and | |
// unsetting parkingOnChan. The moment we unlock any of the | |
// channel locks we risk gp getting readied by a channel operation | |
// and so gp could continue running before everything before the | |
// unlock is visible (even to gp itself). | |
// This must not access gp's stack (see gopark). In | |
// particular, it must not access the *hselect. That's okay, | |
// because by the time this is called, gp.waiting has all | |
// channels in lock order. | |
var lastc *hchan | |
for sg := gp.waiting; sg != nil; sg = sg.waitlink { | |
if sg.c != lastc && lastc != nil { | |
// As soon as we unlock the channel, fields in | |
// any sudog with that channel may change, | |
// including c and waitlink. Since multiple | |
// sudogs may have the same channel, we unlock | |
// only after we've passed the last instance | |
// of a channel. | |
unlock(&lastc.lock) | |
} | |
lastc = sg.c | |
} | |
if lastc != nil { | |
unlock(&lastc.lock) | |
} | |
return true | |
} | |
func block() { | |
gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1) // forever | |
} | |
// selectgo implements the select statement. | |
// | |
// cas0 points to an array of type [ncases]scase, and order0 points to | |
// an array of type [2*ncases]uint16 where ncases must be <= 65536. | |
// Both reside on the goroutine's stack (regardless of any escaping in | |
// selectgo). | |
// | |
// For race detector builds, pc0 points to an array of type | |
// [ncases]uintptr (also on the stack); for other builds, it's set to | |
// nil. | |
// | |
// selectgo returns the index of the chosen scase, which matches the | |
// ordinal position of its respective select{recv,send,default} call. | |
// Also, if the chosen scase was a receive operation, it reports whether | |
// a value was received. | |
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) { | |
if debugSelect { | |
print("select: cas0=", cas0, "\n") | |
} | |
// NOTE: In order to maintain a lean stack size, the number of scases | |
// is capped at 65536. | |
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0)) | |
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0)) | |
ncases := nsends + nrecvs | |
scases := cas1[:ncases:ncases] | |
pollorder := order1[:ncases:ncases] | |
lockorder := order1[ncases:][:ncases:ncases] | |
// NOTE: pollorder/lockorder's underlying array was not zero-initialized by compiler. | |
// Even when raceenabled is true, there might be select | |
// statements in packages compiled without -race (e.g., | |
// ensureSigM in runtime/signal_unix.go). | |
var pcs []uintptr | |
if raceenabled && pc0 != nil { | |
pc1 := (*[1 << 16]uintptr)(unsafe.Pointer(pc0)) | |
pcs = pc1[:ncases:ncases] | |
} | |
casePC := func(casi int) uintptr { | |
if pcs == nil { | |
return 0 | |
} | |
return pcs[casi] | |
} | |
var t0 int64 | |
if blockprofilerate > 0 { | |
t0 = cputicks() | |
} | |
// The compiler rewrites selects that statically have | |
// only 0 or 1 cases plus default into simpler constructs. | |
// The only way we can end up with such small sel.ncase | |
// values here is for a larger select in which most channels | |
// have been nilled out. The general code handles those | |
// cases correctly, and they are rare enough not to bother | |
// optimizing (and needing to test). | |
// generate permuted order | |
norder := 0 | |
for i := range scases { | |
cas := &scases[i] | |
// Omit cases without channels from the poll and lock orders. | |
if cas.c == nil { | |
cas.elem = nil // allow GC | |
continue | |
} | |
j := fastrandn(uint32(norder + 1)) | |
pollorder[norder] = pollorder[j] | |
pollorder[j] = uint16(i) | |
norder++ | |
} | |
pollorder = pollorder[:norder] | |
lockorder = lockorder[:norder] | |
// sort the cases by Hchan address to get the locking order. | |
// simple heap sort, to guarantee n log n time and constant stack footprint. | |
for i := range lockorder { | |
j := i | |
// Start with the pollorder to permute cases on the same channel. | |
c := scases[pollorder[i]].c | |
for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() { | |
k := (j - 1) / 2 | |
lockorder[j] = lockorder[k] | |
j = k | |
} | |
lockorder[j] = pollorder[i] | |
} | |
for i := len(lockorder) - 1; i >= 0; i-- { | |
o := lockorder[i] | |
c := scases[o].c | |
lockorder[i] = lockorder[0] | |
j := 0 | |
for { | |
k := j*2 + 1 | |
if k >= i { | |
break | |
} | |
if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() { | |
k++ | |
} | |
if c.sortkey() < scases[lockorder[k]].c.sortkey() { | |
lockorder[j] = lockorder[k] | |
j = k | |
continue | |
} | |
break | |
} | |
lockorder[j] = o | |
} | |
if debugSelect { | |
for i := 0; i+1 < len(lockorder); i++ { | |
if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() { | |
print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n") | |
throw("select: broken sort") | |
} | |
} | |
} | |
// lock all the channels involved in the select | |
sellock(scases, lockorder) | |
var ( | |
gp *g | |
sg *sudog | |
c *hchan | |
k *scase | |
sglist *sudog | |
sgnext *sudog | |
qp unsafe.Pointer | |
nextp **sudog | |
) | |
// pass 1 - look for something already waiting | |
var casi int | |
var cas *scase | |
var caseSuccess bool | |
var caseReleaseTime int64 = -1 | |
var recvOK bool | |
for _, casei := range pollorder { | |
casi = int(casei) | |
cas = &scases[casi] | |
c = cas.c | |
if casi >= nsends { | |
sg = c.sendq.dequeue() | |
if sg != nil { | |
goto recv | |
} | |
if c.qcount > 0 { | |
goto bufrecv | |
} | |
if c.closed != 0 { | |
goto rclose | |
} | |
} else { | |
if raceenabled { | |
racereadpc(c.raceaddr(), casePC(casi), chansendpc) | |
} | |
if c.closed != 0 { | |
goto sclose | |
} | |
sg = c.recvq.dequeue() | |
if sg != nil { | |
goto send | |
} | |
if c.qcount < c.dataqsiz { | |
goto bufsend | |
} | |
} | |
} | |
if !block { | |
selunlock(scases, lockorder) | |
casi = -1 | |
goto retc | |
} | |
// pass 2 - enqueue on all chans | |
gp = getg() | |
if gp.waiting != nil { | |
throw("gp.waiting != nil") | |
} | |
nextp = &gp.waiting | |
for _, casei := range lockorder { | |
casi = int(casei) | |
cas = &scases[casi] | |
c = cas.c | |
sg := acquireSudog() | |
sg.g = gp | |
sg.isSelect = true | |
// No stack splits between assigning elem and enqueuing | |
// sg on gp.waiting where copystack can find it. | |
sg.elem = cas.elem | |
sg.releasetime = 0 | |
if t0 != 0 { | |
sg.releasetime = -1 | |
} | |
sg.c = c | |
// Construct waiting list in lock order. | |
*nextp = sg | |
nextp = &sg.waitlink | |
if casi < nsends { | |
c.sendq.enqueue(sg) | |
} else { | |
c.recvq.enqueue(sg) | |
} | |
} | |
// wait for someone to wake us up | |
gp.param = nil | |
// Signal to anyone trying to shrink our stack that we're about | |
// to park on a channel. The window between when this G's status | |
// changes and when we set gp.activeStackChans is not safe for | |
// stack shrinking. | |
atomic.Store8(&gp.parkingOnChan, 1) | |
gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1) | |
gp.activeStackChans = false | |
sellock(scases, lockorder) | |
gp.selectDone = 0 | |
sg = (*sudog)(gp.param) | |
gp.param = nil | |
// pass 3 - dequeue from unsuccessful chans | |
// otherwise they stack up on quiet channels | |
// record the successful case, if any. | |
// We singly-linked up the SudoGs in lock order. | |
casi = -1 | |
cas = nil | |
caseSuccess = false | |
sglist = gp.waiting | |
// Clear all elem before unlinking from gp.waiting. | |
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink { | |
sg1.isSelect = false | |
sg1.elem = nil | |
sg1.c = nil | |
} | |
gp.waiting = nil | |
for _, casei := range lockorder { | |
k = &scases[casei] | |
if sg == sglist { | |
// sg has already been dequeued by the G that woke us up. | |
casi = int(casei) | |
cas = k | |
caseSuccess = sglist.success | |
if sglist.releasetime > 0 { | |
caseReleaseTime = sglist.releasetime | |
} | |
} else { | |
c = k.c | |
if int(casei) < nsends { | |
c.sendq.dequeueSudoG(sglist) | |
} else { | |
c.recvq.dequeueSudoG(sglist) | |
} | |
} | |
sgnext = sglist.waitlink | |
sglist.waitlink = nil | |
releaseSudog(sglist) | |
sglist = sgnext | |
} | |
if cas == nil { | |
throw("selectgo: bad wakeup") | |
} | |
c = cas.c | |
if debugSelect { | |
print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n") | |
} | |
if casi < nsends { | |
if !caseSuccess { | |
goto sclose | |
} | |
} else { | |
recvOK = caseSuccess | |
} | |
if raceenabled { | |
if casi < nsends { | |
raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc) | |
} else if cas.elem != nil { | |
raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc) | |
} | |
} | |
if msanenabled { | |
if casi < nsends { | |
msanread(cas.elem, c.elemtype.size) | |
} else if cas.elem != nil { | |
msanwrite(cas.elem, c.elemtype.size) | |
} | |
} | |
selunlock(scases, lockorder) | |
goto retc | |
bufrecv: | |
// can receive from buffer | |
if raceenabled { | |
if cas.elem != nil { | |
raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc) | |
} | |
racenotify(c, c.recvx, nil) | |
} | |
if msanenabled && cas.elem != nil { | |
msanwrite(cas.elem, c.elemtype.size) | |
} | |
recvOK = true | |
qp = chanbuf(c, c.recvx) | |
if cas.elem != nil { | |
typedmemmove(c.elemtype, cas.elem, qp) | |
} | |
typedmemclr(c.elemtype, qp) | |
c.recvx++ | |
if c.recvx == c.dataqsiz { | |
c.recvx = 0 | |
} | |
c.qcount-- | |
selunlock(scases, lockorder) | |
goto retc | |
bufsend: | |
// can send to buffer | |
if raceenabled { | |
racenotify(c, c.sendx, nil) | |
raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc) | |
} | |
if msanenabled { | |
msanread(cas.elem, c.elemtype.size) | |
} | |
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem) | |
c.sendx++ | |
if c.sendx == c.dataqsiz { | |
c.sendx = 0 | |
} | |
c.qcount++ | |
selunlock(scases, lockorder) | |
goto retc | |
recv: | |
// can receive from sleeping sender (sg) | |
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) | |
if debugSelect { | |
print("syncrecv: cas0=", cas0, " c=", c, "\n") | |
} | |
recvOK = true | |
goto retc | |
rclose: | |
// read at end of closed channel | |
selunlock(scases, lockorder) | |
recvOK = false | |
if cas.elem != nil { | |
typedmemclr(c.elemtype, cas.elem) | |
} | |
if raceenabled { | |
raceacquire(c.raceaddr()) | |
} | |
goto retc | |
send: | |
// can send to a sleeping receiver (sg) | |
if raceenabled { | |
raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc) | |
} | |
if msanenabled { | |
msanread(cas.elem, c.elemtype.size) | |
} | |
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) | |
if debugSelect { | |
print("syncsend: cas0=", cas0, " c=", c, "\n") | |
} | |
goto retc | |
retc: | |
if caseReleaseTime > 0 { | |
blockevent(caseReleaseTime-t0, 1) | |
} | |
return casi, recvOK | |
sclose: | |
// send on closed channel | |
selunlock(scases, lockorder) | |
panic(plainError("send on closed channel")) | |
} | |
func (c *hchan) sortkey() uintptr { | |
return uintptr(unsafe.Pointer(c)) | |
} | |
// A runtimeSelect is a single case passed to rselect. | |
// This must match ../reflect/value.go:/runtimeSelect | |
type runtimeSelect struct { | |
dir selectDir | |
typ unsafe.Pointer // channel type (not used here) | |
ch *hchan // channel | |
val unsafe.Pointer // ptr to data (SendDir) or ptr to receive buffer (RecvDir) | |
} | |
// These values must match ../reflect/value.go:/SelectDir. | |
type selectDir int | |
const ( | |
_ selectDir = iota | |
selectSend // case Chan <- Send | |
selectRecv // case <-Chan: | |
selectDefault // default | |
) | |
//go:linkname reflect_rselect reflect.rselect | |
func reflect_rselect(cases []runtimeSelect) (int, bool) { | |
if len(cases) == 0 { | |
block() | |
} | |
sel := make([]scase, len(cases)) | |
orig := make([]int, len(cases)) | |
nsends, nrecvs := 0, 0 | |
dflt := -1 | |
for i, rc := range cases { | |
var j int | |
switch rc.dir { | |
case selectDefault: | |
dflt = i | |
continue | |
case selectSend: | |
j = nsends | |
nsends++ | |
case selectRecv: | |
nrecvs++ | |
j = len(cases) - nrecvs | |
} | |
sel[j] = scase{c: rc.ch, elem: rc.val} | |
orig[j] = i | |
} | |
// Only a default case. | |
if nsends+nrecvs == 0 { | |
return dflt, false | |
} | |
// Compact sel and orig if necessary. | |
if nsends+nrecvs < len(cases) { | |
copy(sel[nsends:], sel[len(cases)-nrecvs:]) | |
copy(orig[nsends:], orig[len(cases)-nrecvs:]) | |
} | |
order := make([]uint16, 2*(nsends+nrecvs)) | |
var pc0 *uintptr | |
if raceenabled { | |
pcs := make([]uintptr, nsends+nrecvs) | |
for i := range pcs { | |
selectsetpc(&pcs[i]) | |
} | |
pc0 = &pcs[0] | |
} | |
chosen, recvOK := selectgo(&sel[0], &order[0], pc0, nsends, nrecvs, dflt == -1) | |
// Translate chosen back to caller's ordering. | |
if chosen < 0 { | |
chosen = dflt | |
} else { | |
chosen = orig[chosen] | |
} | |
return chosen, recvOK | |
} | |
func (q *waitq) dequeueSudoG(sgp *sudog) { | |
x := sgp.prev | |
y := sgp.next | |
if x != nil { | |
if y != nil { | |
// middle of queue | |
x.next = y | |
y.prev = x | |
sgp.next = nil | |
sgp.prev = nil | |
return | |
} | |
// end of queue | |
x.next = nil | |
q.last = x | |
sgp.prev = nil | |
return | |
} | |
if y != nil { | |
// start of queue | |
y.prev = nil | |
q.first = y | |
sgp.next = nil | |
return | |
} | |
// x==y==nil. Either sgp is the only element in the queue, | |
// or it has already been removed. Use q.first to disambiguate. | |
if q.first == sgp { | |
q.first = nil | |
q.last = nil | |
} | |
} |