/
tracer.go
242 lines (224 loc) · 6.98 KB
/
tracer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
// Copyright 2019 GRAIL, Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
package exec
import (
"fmt"
"io"
"strings"
"sync"
"time"
"github.com/grailbio/bigslice/internal/trace"
)
// A tracer tracks a set of trace events associated with objects in
// Bigslice. Trace events are logged in the Chrome tracing format and
// can be visualized using its built-in visualization tool
// (chrome://tracing). Each machine is represented as a Chrome "process",
// and individual task or invocation events are tracked by the machine
// they are run on.
//
// To produce easier to interpret visualizations, tracer assigns generated
// virtual "thread IDs" to trace events, and events are also coalesced into
// "complete events" (X) at the time of rendering.
//
// TODO(marius): garbage collection of old events.
type tracer struct {
mu sync.Mutex
events []trace.Event
taskEvents map[*Task][]trace.Event
compileEvents map[compileKey][]trace.Event
machinePids map[*sliceMachine]int
machineTidPools map[*sliceMachine]*tidPool
// firstEvent is used to store the time of the first observed
// event so that the offsets in the trace are meaningful.
firstEvent time.Time
}
// tidPool is a pool of (virtual) thread IDs that we use to assign Tids to
// events. This makes visualization with the Chrome tracing tool much nicer, as
// concurrent events are shown on their own rows. The length of the pool is the
// maximum number of B events without a matching E event. The indexes of the
// slices are the Tids that we allocate, their corresponding value indicating
// whether it is considered available for allocation.
type tidPool []bool
// compileKey is the key used for compilation events, which are scoped to a
// (machine, invocation).
type compileKey struct {
addr string
inv uint64
}
func newTracer() *tracer {
return &tracer{
taskEvents: make(map[*Task][]trace.Event),
compileEvents: make(map[compileKey][]trace.Event),
machinePids: make(map[*sliceMachine]int),
machineTidPools: make(map[*sliceMachine]*tidPool),
}
}
// Event logs an event on the provided machine with the given
// subject, type (ph), and arguments. The event's subject must be
// either a *Task or a bigslice.Invocation; ph is as in Chrome's
// tracing format. Arguments is list of interleaved key-value pairs
// that are attached as event metadata. Args must be of even length.
//
// If mach is nil, the event is assigned to the evaluator.
func (t *tracer) Event(mach *sliceMachine, subject interface{}, ph string, args ...interface{}) {
if t == nil {
return
}
if len(args)%2 != 0 {
panic("trace.Event: invalid arguments")
}
var event trace.Event
event.Args = make(map[string]interface{}, len(args)/2)
for i := 0; i < len(args); i += 2 {
event.Args[fmt.Sprint(args[i])] = args[i+1]
}
event.Ph = ph
t.mu.Lock()
defer t.mu.Unlock()
if t.firstEvent.IsZero() {
t.firstEvent = time.Now()
event.Ts = 0
} else {
event.Ts = time.Since(t.firstEvent).Nanoseconds() / 1e3
}
if mach != nil {
pid, ok := t.machinePids[mach]
if !ok {
pid = len(t.machinePids) + 1 // pid=0 is reserved for evaluator events
t.machinePids[mach] = pid
// Attach "process" name metadata so we can identify where a task is running.
t.events = append(t.events, trace.Event{
Pid: pid,
Ts: event.Ts,
Ph: "M",
Name: "process_name",
Args: map[string]interface{}{
"name": mach.Addr,
},
})
}
event.Pid = pid
}
switch arg := subject.(type) {
case *Task:
event.Name = arg.Name.String()
event.Cat = "task"
t.assignTid(mach, ph, t.taskEvents[arg], &event)
t.taskEvents[arg] = append(t.taskEvents[arg], event)
case execInvocation:
var name strings.Builder
fmt.Fprint(&name, arg.Index)
if arg.Exclusive {
name.WriteString("[x]")
}
event.Name = name.String()
event.Cat = "invocation"
key := compileKey{mach.Addr, arg.Index}
t.assignTid(mach, ph, t.compileEvents[key], &event)
t.compileEvents[key] = append(t.compileEvents[key], event)
default:
panic(fmt.Sprintf("unsupported subject type %T", subject))
}
}
// assignTid assigns a thread ID to event, using mach's tid pool and type of
// event. events is the slices of existing relevant events, e.g.
// t.taskEvents[arg].
func (t *tracer) assignTid(mach *sliceMachine, ph string, events []trace.Event, event *trace.Event) {
event.Tid = 0
pool, ok := t.machineTidPools[mach]
if !ok {
pool = new(tidPool)
t.machineTidPools[mach] = pool
}
switch ph {
case "B":
event.Tid = pool.Acquire()
case "E":
if len(events) == 0 {
break
}
lastEvent := events[len(events)-1]
if lastEvent.Ph != "B" {
break
}
event.Tid = lastEvent.Tid
pool.Release(event.Tid)
}
}
// Marshal writes the trace captured by t into the writer w in
// Chrome's event tracing format.
func (t *tracer) Marshal(w io.Writer) error {
t.mu.Lock()
events := make([]trace.Event, len(t.events))
copy(events, t.events)
for _, v := range t.taskEvents {
events = appendCoalesce(events, v, t.firstEvent)
}
for _, v := range t.compileEvents {
events = appendCoalesce(events, v, t.firstEvent)
}
t.mu.Unlock()
trace := trace.T{Events: events}
return trace.Encode(w)
}
// appendCoalesce appends a set of events on the provided list,
// first coalescing events so that "B" and "E" events are matched
// into a single "X" event. This produces more visually compact (and
// useful) trace visualizations. appendCoalesce also prunes orphan
// events.
func appendCoalesce(list []trace.Event, events []trace.Event, firstEvent time.Time) []trace.Event {
var begIndex = -1
for _, event := range events {
if event.Ph == "B" && begIndex < 0 {
begIndex = len(list)
}
if event.Ph == "E" && begIndex >= 0 {
list[begIndex].Ph = "X"
list[begIndex].Dur = event.Ts - list[begIndex].Ts
if list[begIndex].Dur == 0 {
list[begIndex].Dur = 1
}
for k, v := range event.Args {
if _, ok := list[begIndex].Args[k]; !ok {
list[begIndex].Args[k] = v
}
}
// Reset the begin index, so that if a task fails but is retried
// on the same machine; then these are captured as two unique
// events.
begIndex = -1
} else if event.Ph != "E" {
list = append(list, event)
} // drop unmatched "E"s
}
if begIndex >= 0 {
// We have an unmatched "B". Drop it.
copy(list[begIndex:], list[begIndex+1:])
list = list[:len(list)-1]
}
return list
}
// Acquire acquires an available thread ID from pool p. Thread IDs are
// sequential and 1-indexed, preserving 0 for events without meaningful thread
// IDs.
func (p *tidPool) Acquire() int {
for tid, available := range *p {
if available {
(*p)[tid] = false
return tid + 1
}
}
// Nothing available in the pool, so grow it.
tid := len(*p)
*p = append(*p, false)
return tid + 1
}
// Release releases a tid, a thread ID previously acquired in Acquire. This
// makes it available to be returned from a future call to Acquire.
func (p *tidPool) Release(tid int) {
if (*p)[tid-1] {
panic("releasing unallocated tid")
}
(*p)[tid-1] = true
}