-
Notifications
You must be signed in to change notification settings - Fork 128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for buffered channels #2
Comments
@divan, I tried adding support for buffered channels by adding a new buffer I'm probably missing something very obvious here, this is my first time messing with the go source. I have some basic familiarity with the channel code from Kavya's great talk on it, but I'm not sure how memory allocation works here. Here is my patch, I used The patch (click to expand)diff --git a/src/internal/trace/parser.go b/src/internal/trace/parser.go
index 254f20137b..b450339630 100644
--- a/src/internal/trace/parser.go
+++ b/src/internal/trace/parser.go
@@ -1058,7 +1058,9 @@ const (
EvUserTaskEnd = 46 // end of task [timestamp, internal task id, stack]
EvUserRegion = 47 // trace.WithRegion [timestamp, internal task id, mode(0:start, 1:end), stack, name string]
EvUserLog = 48 // trace.Log [timestamp, internal id, key string id, stack, value string]
- EvCount = 49
+ EvGoSend = 49 // goroutine chan send [timestamp, stack]
+ EvGoRecv = 50 // goroutine chan recv [timestamp, stack]
+ EvCount = 51
)
var EventDescriptions = [EvCount]struct {
@@ -1117,4 +1119,6 @@ var EventDescriptions = [EvCount]struct {
EvUserTaskEnd: {"UserTaskEnd", 1011, true, []string{"taskid"}, nil},
EvUserRegion: {"UserRegion", 1011, true, []string{"taskid", "mode", "typeid"}, []string{"name"}},
EvUserLog: {"UserLog", 1011, true, []string{"id", "keyid"}, []string{"category", "message"}},
+ EvGoSend: {"GoSend", 1016, true, []string{"eid", "cid", "val"}, nil},
+ EvGoRecv: {"GoRecv", 1016, true, []string{"eid", "cid", "val"}, nil},
}
diff --git a/src/runtime/chan.go b/src/runtime/chan.go
index f2a75b30f4..d03dc97fce 100644
--- a/src/runtime/chan.go
+++ b/src/runtime/chan.go
@@ -30,9 +30,11 @@ const (
)
type hchan struct {
+ id uint64 // channel id for using in send/recv events
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
+ evBuf unsafe.Pointer // points to array of events
elemsize uint16
closed uint32
elemtype *_type // element type
@@ -68,6 +70,12 @@ func makechan64(t *chantype, size int64) *hchan {
return makechan(t, int(size))
}
+// global evil counters for channel and send/recv event IDs
+var (
+ chID uint64 = 1
+ evID uint64 = 1
+)
+
func makechan(t *chantype, size int) *hchan {
elem := t.elem
@@ -95,20 +103,28 @@ func makechan(t *chantype, size int) *hchan {
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
+ c.evBuf = mallocgc(uintptr(64), nil, true)
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
+ c.evBuf = mallocgc(uintptr(64 * size), nil, true)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
+ c.evBuf = mallocgc(uintptr(64 * size), nil, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
+
+ // increment channel ID
+ chID = atomic.Xadd64(&chID, 1)
+ c.id = chID
+
lockInit(&c.lock, lockRankHchan)
if debugChan {
@@ -121,6 +137,10 @@ func makechan(t *chantype, size int) *hchan {
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
+// chanevbuf(c, i) is pointer to the i'th slot in the event buffer.
+func chanevbuf(c *hchan, i uint) unsafe.Pointer {
+ return add(c.evBuf, uintptr(i * 64))
+}
// full reports whether a send on c would block (that is, the channel is full).
// It uses a single word-sized read of mutable state, so although
@@ -205,6 +225,12 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
}
if sg := c.recvq.dequeue(); sg != nil {
+ // gotrace: chansend - receiver already waiting
+ sg.cid = c.id
+ evID = atomic.Xadd64(&evID, 1)
+ sg.eventid = atomic.Load64(&evID)
+ traceGoSend(evID, c.id, elem2int(ep))
+
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
@@ -213,10 +239,18 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
+
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
+
+ // gotrace: chansend - buffered channel
+ evID = atomic.Xadd64(&evID, 1)
+ ebp := chanevbuf(c, c.sendx)
+ *(*uint64)(ebp) = evID
+ traceGoSend(evID, c.id, elem2int(ep))
+
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
@@ -248,6 +282,13 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
mysg.c = c
gp.waiting = mysg
gp.param = nil
+
+ // gotrace: chansend - unbuffered
+ mysg.cid = c.id
+ evID = atomic.Xadd64(&evID, 1)
+ mysg.eventid = atomic.Load64(&evID)
+ traceGoSend(mysg.eventid, c.id, elem2int(ep))
+
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
@@ -511,6 +552,7 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
if raceenabled {
raceacquire(c.raceaddr())
}
+
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
@@ -537,6 +579,11 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
+
+ // gotrace: chanrecv - buffered channel
+ ebp := chanevbuf(c, c.recvx)
+ traceGoRecv(*(*uint64)(ebp), c.id, elem2int(ep))
+
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
@@ -579,6 +626,10 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
+
+ // gotrace: chanrecv - woke after sleeping
+ traceGoRecv(mysg.eventid, c.id, elem2int(ep))
+
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
@@ -629,6 +680,10 @@ func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
+
+ // gotrace: chanrecv - full or unbuffered channel
+ traceGoRecv(sg.eventid, c.id, elem2int(ep))
+
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
@@ -843,3 +898,10 @@ func racenotify(c *hchan, idx uint, sg *sudog) {
}
}
}
+
+func elem2int(elem unsafe.Pointer) uint64 {
+ if elem == nil {
+ return 0
+ }
+ return uint64(*((*int)(elem)))
+}
diff --git a/src/runtime/runtime2.go b/src/runtime/runtime2.go
index 0e0eb0b728..0b9a16c4e8 100644
--- a/src/runtime/runtime2.go
+++ b/src/runtime/runtime2.go
@@ -375,6 +375,10 @@ type sudog struct {
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
+
+ eventid uint64 // used for correlating send/recv
+ value string // used for representing value to tracer
+ cid uint64 // channel id
}
type libcall struct {
diff --git a/src/runtime/trace.go b/src/runtime/trace.go
index 1530178c85..5c22367aad 100644
--- a/src/runtime/trace.go
+++ b/src/runtime/trace.go
@@ -69,7 +69,9 @@ const (
traceEvUserTaskEnd = 46 // end of a task [timestamp, internal task id, stack]
traceEvUserRegion = 47 // trace.WithRegion [timestamp, internal task id, mode(0:start, 1:end), stack, name string]
traceEvUserLog = 48 // trace.Log [timestamp, internal task id, key string id, stack, value string]
- traceEvCount = 49
+ traceEvGoSend = 49 // goroutine on chan send [timestamp, stack]
+ traceEvGoRecv = 50 // goroutine on chan recv [timestamp, stack]
+ traceEvCount = 51
// Byte is used but only 6 bits are available for event type.
// The remaining 2 bits are used to specify the number of arguments.
// That means, the max event type value is 63.
@@ -1230,3 +1232,11 @@ func trace_userLog(id uint64, category, message string) {
traceReleaseBuffer(pid)
}
+
+func traceGoSend(eid, cid, val uint64) {
+ traceEvent(traceEvGoSend, -1, eid, cid, val)
+}
+
+func traceGoRecv(eid, cid, val uint64) {
+ traceEvent(traceEvGoRecv, -1, eid, cid, val)
+} |
Currently only unbuffered channels are supported. Buffered channels send/recv will not appear in visualization made by gotrace.
That's because buffered and unbuffered channels implemented differently, and in the first case there is no (obvious) way to attach something like EventID for sending/receiving event, which is needed to correlate send/recv events.
More details to come.
The text was updated successfully, but these errors were encountered: