-
Notifications
You must be signed in to change notification settings - Fork 0
/
benchmark.go
240 lines (207 loc) · 5.22 KB
/
benchmark.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
// Copyright (C) 2023 DeepSquare Association
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package benchmark
import (
"context"
"embed"
"fmt"
"time"
"github.com/deepsquare-io/grid/supervisor/logger"
"github.com/deepsquare-io/grid/supervisor/pkg/job/scheduler"
"github.com/deepsquare-io/grid/supervisor/pkg/utils/hash"
"github.com/deepsquare-io/grid/supervisor/pkg/utils/validate"
"go.uber.org/zap"
)
//go:embed templates/*.tmpl
var templates embed.FS
const (
GBtoMB = 1000
jobNameFormat = "benchmark-%s-%s"
DefaultTimeLimit = 24 * time.Hour
)
type Benchmark scheduler.JobDefinition
type options struct {
nodes uint64
cpusPerNode uint64
gpusPerNode uint64
memPerNode uint64
image string
secret string
supervisorPublicAddress string
blockSize uint64
memoryPercent float64
ucx bool
ucxAffinity string
ucxTransport string
additionalEnv map[string]string
// trace enables benchmark trace logging.
trace bool
}
type Option func(*options)
func WithBlockSize(
nb uint64,
) Option {
return func(o *options) {
o.blockSize = nb
}
}
func WithMemoryPercent(
memoryPercent float64,
) Option {
return func(o *options) {
o.memoryPercent = memoryPercent
}
}
func WithClusterSpecs(
nodes uint64,
cpusPerNode uint64,
gpusPerNode uint64,
memPerNode uint64,
) Option {
return func(o *options) {
o.nodes = nodes
o.cpusPerNode = cpusPerNode
o.gpusPerNode = gpusPerNode
o.memPerNode = memPerNode
}
}
func WithImage(
image string,
) Option {
return func(o *options) {
o.image = image
}
}
func WithTrace() Option {
return func(o *options) {
o.trace = true
}
}
func WithSupervisorPublicAddress(supervisorPublicAddress string) Option {
return func(o *options) {
o.supervisorPublicAddress = supervisorPublicAddress
}
}
func WithUCX(affinity string, transport string) Option {
return func(o *options) {
o.ucx = true
o.ucxTransport = transport
o.ucxAffinity = affinity
}
}
func WithAdditionalEnv(key string, value string) Option {
return func(o *options) {
if o.additionalEnv == nil {
o.additionalEnv = make(map[string]string)
}
if !validate.EnvVarNameValidator(key) {
logger.I.Error(
"environment variable key is not valid",
zap.String("key", key),
zap.String("value", value),
)
return
}
o.additionalEnv[key] = value
}
}
type Launcher interface {
// Cancel cancels all running benchmark
Cancel(ctx context.Context, name string) error
// Get the generated job name for benchmarks.
GetJobName(name string) string
Launch(
ctx context.Context,
name string,
benchmark *Benchmark,
) error
}
type launcher struct {
supervisorPublicAddress string
user string
scheduler scheduler.Scheduler
timeLimit time.Duration
wait bool
}
type LauncherOption func(*launcher)
func WithNoWait() LauncherOption {
return func(l *launcher) {
l.wait = false
}
}
func WithTimeLimit(timeLimit time.Duration) LauncherOption {
return func(l *launcher) {
l.timeLimit = timeLimit
}
}
func NewLauncher(
user string,
supervisorPublicAddress string,
scheduler scheduler.Scheduler,
opts ...LauncherOption,
) Launcher {
l := &launcher{
scheduler: scheduler,
user: user,
supervisorPublicAddress: supervisorPublicAddress,
timeLimit: DefaultTimeLimit,
wait: true,
}
for _, opt := range opts {
opt(l)
}
return l
}
func (l *launcher) GetJobName(name string) string {
hash := hash.GenerateAlphanumeric(l.supervisorPublicAddress)
return fmt.Sprintf(jobNameFormat, name, hash)
}
func (l *launcher) Cancel(ctx context.Context, name string) error {
return l.scheduler.CancelJob(ctx, l.GetJobName(name), l.user)
}
func (l *launcher) Launch(
ctx context.Context,
name string,
benchmark *Benchmark,
) error {
benchmark.Wait = l.wait
benchmark.TimeLimit = uint64(l.timeLimit.Minutes())
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
errC := make(chan error, 1)
go func() {
_, err := l.scheduler.Submit(ctx, &scheduler.SubmitRequest{
Name: l.GetJobName(name),
User: l.user,
Prefix: "benchmark",
JobDefinition: (*scheduler.JobDefinition)(benchmark),
})
errC <- err
}()
logger.I.Info("benchmark started", zap.String("name", name))
for {
select {
case err := <-errC:
if err != nil {
logger.I.Error("benchmark failed", zap.String("name", name), zap.Error(err))
} else {
logger.I.Info("benchmark succeeded", zap.String("name", name))
}
return err
case <-ticker.C:
logger.I.Info("benchmark is still running", zap.String("name", name))
}
}
}