Skip to content

Commit

Permalink
runtime: add tracing for iter.Pull
Browse files Browse the repository at this point in the history
This change resolves a TODO in the coroutine switch implementation (used
exclusively by iter.Pull at the moment) to enable tracing. This was
blocked on eliminating the atomic load in the tracer's "off" path
(completed in the previous CL in this series) and the addition of new
tracer events to minimize the overhead of tracing in this circumstance.

This change introduces 3 new event types to support coroutine switches:
GoCreateBlocked, GoSwitch, and GoSwitchDestroy.

GoCreateBlocked needs to be introduced because the goroutine created for
the coroutine starts out in a blocked state. There's no way to represent
this in the tracer right now, so we need a new event for it.

GoSwitch represents the actual coroutine switch, which conceptually
consists of a GoUnblock, a GoBlock, and a GoStart event in series
(unblocking the next goroutine to run, blocking the current goroutine,
and then starting the next goroutine to run).

GoSwitchDestroy is closely related to GoSwitch, implementing the same
semantics except that GoBlock is replaced with GoDestroy. This is used
when exiting the coroutine.

The implementation of all this is fairly straightforward, and the trace
parser simply translates GoSwitch* into the three constituent events.

Because GoSwitch and GoSwitchDestroy imply a GoUnblock and a GoStart,
they need to synchronize with other past and future GoStart events to
create a correct partial ordering in the trace. Therefore, these events
need a sequence number for the goroutine that will be unblocked and
started.

Also, while implementing this, I noticed that the coroutine
implementation is actually buggy with respect to LockOSThread. In fact,
it blatantly disregards its invariants without an explicit panic. While
such a case is likely to be rare (and inefficient!) we should decide how
iter.Pull behaves with respect to runtime.LockOSThread.

Lastly, this change also bumps the trace version from Go 1.22 to Go
1.23. We're adding events that are incompatible with a Go 1.22 parser,
but Go 1.22 traces are all valid Go 1.23 traces, so the newer parser
supports both (and the CL otherwise updates the Go 1.22 definitions of
events and such). We may want to reconsider the structure and naming of
some of these packages though; it could quickly get confusing.

For #61897.

Change-Id: I96897a46d5852c02691cde9f957dc6c13ef4d8e7
Reviewed-on: https://go-review.googlesource.com/c/go/+/565937
Reviewed-by: Michael Pratt <mpratt@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Auto-Submit: Michael Knyszek <mknyszek@google.com>
  • Loading branch information
mknyszek authored and gopherbot committed Mar 22, 2024
1 parent 27f41bb commit c9c88d7
Show file tree
Hide file tree
Showing 14 changed files with 306 additions and 43 deletions.
16 changes: 13 additions & 3 deletions src/internal/trace/v2/event.go
Expand Up @@ -566,8 +566,12 @@ func (e Event) StateTransition() StateTransition {
case go122.EvProcStatus:
// N.B. ordering.advance populates e.base.extra.
s = procStateTransition(ProcID(e.base.args[0]), ProcState(e.base.extra(version.Go122)[0]), go122ProcStatus2ProcState[e.base.args[1]])
case go122.EvGoCreate:
s = goStateTransition(GoID(e.base.args[0]), GoNotExist, GoRunnable)
case go122.EvGoCreate, go122.EvGoCreateBlocked:
status := GoRunnable
if e.base.typ == go122.EvGoCreateBlocked {
status = GoWaiting
}
s = goStateTransition(GoID(e.base.args[0]), GoNotExist, status)
s.Stack = Stack{table: e.table, id: stackID(e.base.args[1])}
case go122.EvGoCreateSyscall:
s = goStateTransition(GoID(e.base.args[0]), GoNotExist, GoSyscall)
Expand All @@ -586,7 +590,10 @@ func (e Event) StateTransition() StateTransition {
s = goStateTransition(e.ctx.G, GoRunning, GoWaiting)
s.Reason = e.table.strings.mustGet(stringID(e.base.args[0]))
s.Stack = e.Stack() // This event references the resource the event happened on.
case go122.EvGoUnblock:
case go122.EvGoUnblock, go122.EvGoSwitch, go122.EvGoSwitchDestroy:
// N.B. GoSwitch and GoSwitchDestroy both emit additional events, but
// the first thing they both do is unblock the goroutine they name,
// identically to an unblock event (even their arguments match).
s = goStateTransition(GoID(e.base.args[0]), GoWaiting, GoRunnable)
case go122.EvGoSyscallBegin:
s = goStateTransition(e.ctx.G, GoRunning, GoSyscall)
Expand Down Expand Up @@ -646,6 +653,9 @@ var go122Type2Kind = [...]EventKind{
go122.EvUserRegionBegin: EventRegionBegin,
go122.EvUserRegionEnd: EventRegionEnd,
go122.EvUserLog: EventLog,
go122.EvGoSwitch: EventStateTransition,
go122.EvGoSwitchDestroy: EventStateTransition,
go122.EvGoCreateBlocked: EventStateTransition,
evSync: EventSync,
}

Expand Down
21 changes: 21 additions & 0 deletions src/internal/trace/v2/event/go122/event.go
Expand Up @@ -67,6 +67,11 @@ const (
EvUserRegionBegin // trace.{Start,With}Region [timestamp, internal task ID, name string ID, stack ID]
EvUserRegionEnd // trace.{End,With}Region [timestamp, internal task ID, name string ID, stack ID]
EvUserLog // trace.Log [timestamp, internal task ID, key string ID, value string ID, stack]

// Coroutines. Added in Go 1.23.
EvGoSwitch // goroutine switch (coroswitch) [timestamp, goroutine ID, goroutine seq]
EvGoSwitchDestroy // goroutine switch and destroy [timestamp, goroutine ID, goroutine seq]
EvGoCreateBlocked // goroutine creation (starts blocked) [timestamp, new goroutine ID, new stack ID, stack ID]
)

// EventString returns the name of a Go 1.22 event.
Expand Down Expand Up @@ -332,6 +337,22 @@ var specs = [...]event.Spec{
StackIDs: []int{4},
StringIDs: []int{2, 3},
},
EvGoSwitch: event.Spec{
Name: "GoSwitch",
Args: []string{"dt", "g", "g_seq"},
IsTimedEvent: true,
},
EvGoSwitchDestroy: event.Spec{
Name: "GoSwitchDestroy",
Args: []string{"dt", "g", "g_seq"},
IsTimedEvent: true,
},
EvGoCreateBlocked: event.Spec{
Name: "GoCreateBlocked",
Args: []string{"dt", "new_g", "new_stack", "stack"},
IsTimedEvent: true,
StackIDs: []int{3, 2},
},
}

type GoStatus uint8
Expand Down
94 changes: 83 additions & 11 deletions src/internal/trace/v2/order.go
Expand Up @@ -334,7 +334,7 @@ func (o *ordering) Advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
curCtx.M = mid
}
o.queue.push(currentEvent())
case go122.EvGoCreate:
case go122.EvGoCreate, go122.EvGoCreateBlocked:
// Goroutines must be created on a running P, but may or may not be created
// by a running goroutine.
reqs := event.SchedReqs{Thread: event.MustHave, Proc: event.MustHave, Goroutine: event.MayHave}
Expand All @@ -350,7 +350,11 @@ func (o *ordering) Advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
if _, ok := o.gStates[newgid]; ok {
return false, fmt.Errorf("tried to create goroutine (%v) that already exists", newgid)
}
o.gStates[newgid] = &gState{id: newgid, status: go122.GoRunnable, seq: makeSeq(gen, 0)}
status := go122.GoRunnable
if typ == go122.EvGoCreateBlocked {
status = go122.GoWaiting
}
o.gStates[newgid] = &gState{id: newgid, status: status, seq: makeSeq(gen, 0)}
o.queue.push(currentEvent())
case go122.EvGoDestroy, go122.EvGoStop, go122.EvGoBlock:
// These are goroutine events that all require an active running
Expand Down Expand Up @@ -418,6 +422,64 @@ func (o *ordering) Advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
// N.B. No context to validate. Basically anything can unblock
// a goroutine (e.g. sysmon).
o.queue.push(currentEvent())
case go122.EvGoSwitch, go122.EvGoSwitchDestroy:
// GoSwitch and GoSwitchDestroy represent a trio of events:
// - Unblock of the goroutine to switch to.
// - Block or destroy of the current goroutine.
// - Start executing the next goroutine.
//
// Because it acts like a GoStart for the next goroutine, we can
// only advance it if the sequence numbers line up.
//
// The current goroutine on the thread must be actively running.
if err := validateCtx(curCtx, event.UserGoReqs); err != nil {
return false, err
}
curGState, ok := o.gStates[curCtx.G]
if !ok {
return false, fmt.Errorf("event %s for goroutine (%v) that doesn't exist", go122.EventString(typ), curCtx.G)
}
if curGState.status != go122.GoRunning {
return false, fmt.Errorf("%s event for goroutine that's not %s", go122.EventString(typ), GoRunning)
}
nextg := GoID(ev.args[0])
seq := makeSeq(gen, ev.args[1]) // seq is for nextg, not curCtx.G.
nextGState, ok := o.gStates[nextg]
if !ok || nextGState.status != go122.GoWaiting || !seq.succeeds(nextGState.seq) {
// We can't make an inference as to whether this is bad. We could just be seeing
// a GoSwitch on a different M before the goroutine was created, before it had its
// state emitted, or before we got to the right point in the trace yet.
return false, nil
}
o.queue.push(currentEvent())

// Update the state of the executing goroutine and emit an event for it
// (GoSwitch and GoSwitchDestroy will be interpreted as GoUnblock events
// for nextg).
switch typ {
case go122.EvGoSwitch:
// Goroutine blocked. It's waiting now and not running on this M.
curGState.status = go122.GoWaiting

// Emit a GoBlock event.
// TODO(mknyszek): Emit a reason.
o.queue.push(makeEvent(evt, curCtx, go122.EvGoBlock, ev.time, 0 /* no reason */, 0 /* no stack */))
case go122.EvGoSwitchDestroy:
// This goroutine is exiting itself.
delete(o.gStates, curCtx.G)

// Emit a GoDestroy event.
o.queue.push(makeEvent(evt, curCtx, go122.EvGoDestroy, ev.time))
}
// Update the state of the next goroutine.
nextGState.status = go122.GoRunning
nextGState.seq = seq
newCtx.G = nextg

// Queue an event for the next goroutine starting to run.
startCtx := curCtx
startCtx.G = NoGoroutine
o.queue.push(makeEvent(evt, startCtx, go122.EvGoStart, ev.time, uint64(nextg), ev.args[1]))
case go122.EvGoSyscallBegin:
// Entering a syscall requires an active running goroutine with a
// proc on some thread. It is always advancable.
Expand Down Expand Up @@ -578,15 +640,7 @@ func (o *ordering) Advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
newCtx.P = NoProc

// Queue an extra self-ProcSteal event.
extra := Event{
table: evt,
ctx: curCtx,
base: baseEvent{
typ: go122.EvProcSteal,
time: ev.time,
},
}
extra.base.args[0] = uint64(curCtx.P)
extra := makeEvent(evt, curCtx, go122.EvProcSteal, ev.time, uint64(curCtx.P))
extra.base.extra(version.Go122)[0] = uint64(go122.ProcSyscall)
o.queue.push(extra)
}
Expand Down Expand Up @@ -1155,3 +1209,21 @@ func (q *queue[T]) pop() (T, bool) {
q.start++
return value, true
}

// makeEvent creates an Event from the provided information.
//
// It's just a convenience function; it's always OK to construct
// an Event manually if this isn't quite the right way to express
// the contents of the event.
func makeEvent(table *evTable, ctx schedCtx, typ event.Type, time Time, args ...uint64) Event {
ev := Event{
table: table,
ctx: ctx,
base: baseEvent{
typ: typ,
time: time,
},
}
copy(ev.base.args[:], args)
return ev
}
2 changes: 1 addition & 1 deletion src/internal/trace/v2/reader.go
Expand Up @@ -46,7 +46,7 @@ func NewReader(r io.Reader) (*Reader, error) {
return &Reader{
go121Events: convertOldFormat(tr),
}, nil
case version.Go122:
case version.Go122, version.Go123:
return &Reader{
r: br,
order: ordering{
Expand Down
85 changes: 85 additions & 0 deletions src/internal/trace/v2/testdata/testprog/iter-pull.go
@@ -0,0 +1,85 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Tests coroutine switches.

//go:build ignore

package main

import (
"iter"
"log"
"os"
"runtime/trace"
"sync"
)

func main() {
// Start tracing.
if err := trace.Start(os.Stdout); err != nil {
log.Fatalf("failed to start tracing: %v", err)
}

// Try simple pull iteration.
i := pullRange(100)
for {
_, ok := i.next()
if !ok {
break
}
}

// Try bouncing the pull iterator between two goroutines.
var wg sync.WaitGroup
var iterChans [2]chan intIter
wg.Add(2)
iterChans[0] = make(chan intIter)
iterChans[1] = make(chan intIter)
go func() {
defer wg.Done()

iter := pullRange(100)
iterChans[1] <- iter

for i := range iterChans[0] {
_, ok := i.next()
if !ok {
close(iterChans[1])
break
}
iterChans[1] <- i
}
}()
go func() {
defer wg.Done()

for i := range iterChans[1] {
_, ok := i.next()
if !ok {
close(iterChans[0])
break
}
iterChans[0] <- i
}
}()
wg.Wait()

// End of traced execution.
trace.Stop()
}

func pullRange(n int) intIter {
next, stop := iter.Pull(func(yield func(v int) bool) {
for i := range n {
yield(i)
}
})
return intIter{next: next, stop: stop}
}

type intIter struct {
next func() (int, bool)
stop func()
}
6 changes: 5 additions & 1 deletion src/internal/trace/v2/trace_test.go
Expand Up @@ -531,6 +531,10 @@ func TestTraceWaitOnPipe(t *testing.T) {
t.Skip("no applicable syscall.Pipe on " + runtime.GOOS)
}

func TestTraceIterPull(t *testing.T) {
testTraceProg(t, "iter-pull.go", nil)
}

func testTraceProg(t *testing.T, progName string, extra func(t *testing.T, trace, stderr []byte, stress bool)) {
testenv.MustHaveGoRun(t)

Expand All @@ -547,7 +551,7 @@ func testTraceProg(t *testing.T, progName string, extra func(t *testing.T, trace
cmd.Args = append(cmd.Args, "-race")
}
cmd.Args = append(cmd.Args, testPath)
cmd.Env = append(os.Environ(), "GOEXPERIMENT=exectracer2")
cmd.Env = append(os.Environ(), "GOEXPERIMENT=exectracer2", "GOEXPERIMENT=rangefunc")
if stress {
// Advance a generation constantly.
cmd.Env = append(cmd.Env, "GODEBUG=traceadvanceperiod=0")
Expand Down
7 changes: 6 additions & 1 deletion src/internal/trace/v2/version/version.go
Expand Up @@ -20,7 +20,8 @@ const (
Go119 Version = 19
Go121 Version = 21
Go122 Version = 22
Current = Go122
Go123 Version = 23
Current = Go123
)

var versions = map[Version][]event.Spec{
Expand All @@ -31,6 +32,10 @@ var versions = map[Version][]event.Spec{
Go121: nil,

Go122: go122.Specs(),
// Go 1.23 adds backwards-incompatible events, but
// traces produced by Go 1.22 are also always valid
// Go 1.23 traces.
Go123: go122.Specs(),
}

// Specs returns the set of event.Specs for this version.
Expand Down

0 comments on commit c9c88d7

Please sign in to comment.