forked from golang/exp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
reader.go
235 lines (215 loc) · 6.73 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Code generated by "gen.bash" from internal/trace/v2; DO NOT EDIT.
//go:build go1.21
package trace
import (
"bufio"
"fmt"
"io"
"slices"
"strings"
"github.com/fromelicks/exp/trace/internal/event/go122"
"github.com/fromelicks/exp/trace/internal/oldtrace"
"github.com/fromelicks/exp/trace/internal/version"
)
// Reader reads a byte stream, validates it, and produces trace events.
type Reader struct {
r *bufio.Reader
lastTs Time
gen *generation
spill *spilledBatch
frontier []*batchCursor
cpuSamples []cpuSample
order ordering
emittedSync bool
go121Events *oldTraceConverter
}
// NewReader creates a new trace reader.
func NewReader(r io.Reader) (*Reader, error) {
br := bufio.NewReader(r)
v, err := version.ReadHeader(br)
if err != nil {
return nil, err
}
switch v {
case version.Go111, version.Go119, version.Go121:
tr, err := oldtrace.Parse(br, v)
if err != nil {
return nil, err
}
return &Reader{
go121Events: convertOldFormat(tr),
}, nil
case version.Go122, version.Go123:
return &Reader{
r: br,
order: ordering{
mStates: make(map[ThreadID]*mState),
pStates: make(map[ProcID]*pState),
gStates: make(map[GoID]*gState),
activeTasks: make(map[TaskID]taskState),
},
// Don't emit a sync event when we first go to emit events.
emittedSync: true,
}, nil
default:
return nil, fmt.Errorf("unknown or unsupported version go 1.%d", v)
}
}
// ReadEvent reads a single event from the stream.
//
// If the stream has been exhausted, it returns an invalid
// event and io.EOF.
func (r *Reader) ReadEvent() (e Event, err error) {
if r.go121Events != nil {
ev, err := r.go121Events.next()
if err != nil {
// XXX do we have to emit an EventSync when the trace is done?
return Event{}, err
}
return ev, nil
}
// Go 1.22+ trace parsing algorithm.
//
// (1) Read in all the batches for the next generation from the stream.
// (a) Use the size field in the header to quickly find all batches.
// (2) Parse out the strings, stacks, CPU samples, and timestamp conversion data.
// (3) Group each event batch by M, sorted by timestamp. (batchCursor contains the groups.)
// (4) Organize batchCursors in a min-heap, ordered by the timestamp of the next event for each M.
// (5) Try to advance the next event for the M at the top of the min-heap.
// (a) On success, select that M.
// (b) On failure, sort the min-heap and try to advance other Ms. Select the first M that advances.
// (c) If there's nothing left to advance, goto (1).
// (6) Select the latest event for the selected M and get it ready to be returned.
// (7) Read the next event for the selected M and update the min-heap.
// (8) Return the selected event, goto (5) on the next call.
// Set us up to track the last timestamp and fix up
// the timestamp of any event that comes through.
defer func() {
if err != nil {
return
}
if err = e.validateTableIDs(); err != nil {
return
}
if e.base.time <= r.lastTs {
e.base.time = r.lastTs + 1
}
r.lastTs = e.base.time
}()
// Consume any events in the ordering first.
if ev, ok := r.order.Next(); ok {
return ev, nil
}
// Check if we need to refresh the generation.
if len(r.frontier) == 0 && len(r.cpuSamples) == 0 {
if !r.emittedSync {
r.emittedSync = true
return syncEvent(r.gen.evTable, r.lastTs), nil
}
if r.gen != nil && r.spill == nil {
// If we have a generation from the last read,
// and there's nothing left in the frontier, and
// there's no spilled batch, indicating that there's
// no further generation, it means we're done.
// Return io.EOF.
return Event{}, io.EOF
}
// Read the next generation.
r.gen, r.spill, err = readGeneration(r.r, r.spill)
if err != nil {
return Event{}, err
}
// Reset CPU samples cursor.
r.cpuSamples = r.gen.cpuSamples
// Reset frontier.
for m, batches := range r.gen.batches {
bc := &batchCursor{m: m}
ok, err := bc.nextEvent(batches, r.gen.freq)
if err != nil {
return Event{}, err
}
if !ok {
// Turns out there aren't actually any events in these batches.
continue
}
r.frontier = heapInsert(r.frontier, bc)
}
// Reset emittedSync.
r.emittedSync = false
}
tryAdvance := func(i int) (bool, error) {
bc := r.frontier[i]
if ok, err := r.order.Advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); !ok || err != nil {
return ok, err
}
// Refresh the cursor's event.
ok, err := bc.nextEvent(r.gen.batches[bc.m], r.gen.freq)
if err != nil {
return false, err
}
if ok {
// If we successfully refreshed, update the heap.
heapUpdate(r.frontier, i)
} else {
// There's nothing else to read. Delete this cursor from the frontier.
r.frontier = heapRemove(r.frontier, i)
}
return true, nil
}
// Inject a CPU sample if it comes next.
if len(r.cpuSamples) != 0 {
if len(r.frontier) == 0 || r.cpuSamples[0].time < r.frontier[0].ev.time {
e := r.cpuSamples[0].asEvent(r.gen.evTable)
r.cpuSamples = r.cpuSamples[1:]
return e, nil
}
}
// Try to advance the head of the frontier, which should have the minimum timestamp.
// This should be by far the most common case
if len(r.frontier) == 0 {
return Event{}, fmt.Errorf("broken trace: frontier is empty:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
}
if ok, err := tryAdvance(0); err != nil {
return Event{}, err
} else if !ok {
// Try to advance the rest of the frontier, in timestamp order.
//
// To do this, sort the min-heap. A sorted min-heap is still a
// min-heap, but now we can iterate over the rest and try to
// advance in order. This path should be rare.
slices.SortFunc(r.frontier, (*batchCursor).compare)
success := false
for i := 1; i < len(r.frontier); i++ {
if ok, err = tryAdvance(i); err != nil {
return Event{}, err
} else if ok {
success = true
break
}
}
if !success {
return Event{}, fmt.Errorf("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
}
}
// Pick off the next event on the queue. At this point, one must exist.
ev, ok := r.order.Next()
if !ok {
panic("invariant violation: advance successful, but queue is empty")
}
return ev, nil
}
func dumpFrontier(frontier []*batchCursor) string {
var sb strings.Builder
for _, bc := range frontier {
spec := go122.Specs()[bc.ev.typ]
fmt.Fprintf(&sb, "M %d [%s time=%d", bc.m, spec.Name, bc.ev.time)
for i, arg := range spec.Args[1:] {
fmt.Fprintf(&sb, " %s=%d", arg, bc.ev.args[i])
}
fmt.Fprintf(&sb, "]\n")
}
return sb.String()
}