forked from timescale/tsbs
-
Notifications
You must be signed in to change notification settings - Fork 2
/
loader.go
347 lines (302 loc) · 11.9 KB
/
loader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
package load
import (
"encoding/json"
"fmt"
"github.com/benchant/tsbs/pkg/targets"
"io/ioutil"
"log"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/spf13/pflag"
"github.com/benchant/tsbs/load/insertstrategy"
)
const (
// defaultBatchSize - default size of batches to be inserted
defaultBatchSize = 10000
DefaultChannelCapacityFlagVal = 0
defaultChannelCapacityPerWorker = 5
errDBExistsFmt = "database \"%s\" exists: aborting."
)
// change for more useful testing
var (
printFn = fmt.Printf
fatal = log.Fatalf
)
// BenchmarkRunnerConfig contains all the configuration information required for running BenchmarkRunner.
type BenchmarkRunnerConfig struct {
DBName string `yaml:"db-name" mapstructure:"db-name" json:"db-name"`
BatchSize uint `yaml:"batch-size" mapstructure:"batch-size" json:"batch-size"`
Workers uint `yaml:"workers" mapstructure:"workers" json:"workers"`
Limit uint64 `yaml:"limit" mapstructure:"limit" json:"limit"`
DoLoad bool `yaml:"do-load" mapstructure:"do-load" json:"do-load"`
DoCreateDB bool `yaml:"do-create-db" mapstructure:"do-create-db" json:"do-create-db"`
DoAbortOnExist bool `yaml:"do-abort-on-exist" mapstructure:"do-abort-on-exist" json:"do-abort-on-exist"`
ReportingPeriod time.Duration `yaml:"reporting-period" mapstructure:"reporting-period" json:"reporting-period"`
HashWorkers bool `yaml:"hash-workers" mapstructure:"hash-workers" json:"hash-workers"`
NoFlowControl bool `yaml:"no-flow-control" mapstructure:"no-flow-control" json:"no-flow-control"`
ChannelCapacity uint `yaml:"channel-capacity" mapstructure:"channel-capacity" json:"channel-capacity"`
InsertIntervals string `yaml:"insert-intervals" mapstructure:"insert-intervals" json:"insert-intervals"`
ResultsFile string `yaml:"results-file" mapstructure:"results-file" json:"results-file"`
// deprecated, should not be used in other places other than tsbs_load_xx commands
FileName string `yaml:"file" mapstructure:"file" json:"file"`
Seed int64 `yaml:"seed" mapstructure:"seed" json:"seed"`
}
// AddToFlagSet adds command line flags needed by the BenchmarkRunnerConfig to the flag set.
func (c BenchmarkRunnerConfig) AddToFlagSet(fs *pflag.FlagSet) {
fs.String("db-name", "benchmark", "Name of database")
fs.Uint("batch-size", defaultBatchSize, "Number of items to batch together in a single insert")
fs.Uint("workers", 1, "Number of parallel clients inserting")
fs.Uint64("limit", 0, "Number of items to insert (0 = all of them).")
fs.Bool("do-load", true, "Whether to write data. Set this flag to false to check input read speed.")
fs.Bool("do-create-db", true, "Whether to create the database. Disable on all but one client if running on a multi client setup.")
fs.Bool("do-abort-on-exist", false, "Whether to abort if a database with the given name already exists.")
fs.Duration("reporting-period", 10*time.Second, "Period to report write stats")
fs.String("file", "", "File name to read data from")
fs.Int64("seed", 0, "PRNG seed (default: 0, which uses the current timestamp)")
fs.String("insert-intervals", "", "Time to wait between each insert, default '' => all workers insert ASAP. '1,2' = worker 1 waits 1s between inserts, worker 2 and others wait 2s")
fs.Bool("hash-workers", false, "Whether to consistently hash insert data to the same workers (i.e., the data for a particular host always goes to the same worker)")
fs.String("results-file", "", "Write the test results summary json to this file")
}
type BenchmarkRunner interface {
DatabaseName() string
RunBenchmark(b targets.Benchmark)
}
// CommonBenchmarkRunner is responsible for initializing and storing common
// flags across all database systems and ultimately running a supplied Benchmark
type CommonBenchmarkRunner struct {
BenchmarkRunnerConfig
metricCnt uint64
rowCnt uint64
initialRand *rand.Rand
sleepRegulator insertstrategy.SleepRegulator
}
// GetBenchmarkRunnerWithBatchSize returns the singleton CommonBenchmarkRunner for use in a benchmark program
// with specified batch size.
func GetBenchmarkRunner(c BenchmarkRunnerConfig) BenchmarkRunner {
loader := CommonBenchmarkRunner{}
loader.BenchmarkRunnerConfig = c
// If the configuration batch size is 0 use the default batch size.
if loader.BatchSize == 0 {
loader.BatchSize = defaultBatchSize
}
loader.initialRand = rand.New(rand.NewSource(loader.Seed))
var err error
if c.InsertIntervals == "" {
loader.sleepRegulator = insertstrategy.NoWait()
} else {
loader.sleepRegulator, err = insertstrategy.NewSleepRegulator(c.InsertIntervals, int(loader.Workers), loader.initialRand)
if err != nil {
panic(fmt.Sprintf("could not initialize BenchmarkRunner: %v", err))
}
}
if !c.NoFlowControl {
return &loader
}
if c.ChannelCapacity == DefaultChannelCapacityFlagVal {
if c.HashWorkers {
loader.ChannelCapacity = defaultChannelCapacityPerWorker
} else {
loader.ChannelCapacity = c.Workers * defaultChannelCapacityPerWorker
}
}
return &noFlowBenchmarkRunner{loader}
}
// DatabaseName returns the value of the --db-name flag (name of the database to store data)
func (l *CommonBenchmarkRunner) DatabaseName() string {
return l.DBName
}
func (l *CommonBenchmarkRunner) preRun(b targets.Benchmark) (*sync.WaitGroup, *time.Time, func()) {
// Create required DB
var cleanupFn func()
if b.GetDBCreator() != nil {
cleanupFn = l.useDBCreator(b.GetDBCreator())
}
if l.ReportingPeriod.Nanoseconds() > 0 {
go l.report(l.ReportingPeriod)
}
wg := &sync.WaitGroup{}
wg.Add(int(l.Workers))
start := time.Now()
return wg, &start, cleanupFn
}
func (l *CommonBenchmarkRunner) postRun(wg *sync.WaitGroup, start *time.Time) {
// Wait for all workers to finish
wg.Wait()
end := time.Now()
took := end.Sub(*start)
l.summary(took)
if l.BenchmarkRunnerConfig.ResultsFile != "" {
metricRate := float64(l.metricCnt) / took.Seconds()
rowRate := float64(l.rowCnt) / took.Seconds()
l.saveTestResult(took, *start, end, metricRate, rowRate)
}
}
func (l *CommonBenchmarkRunner) saveTestResult(took time.Duration, start time.Time, end time.Time, metricRate, rowRate float64) {
totals := make(map[string]interface{})
totals["metricRate"] = metricRate
if l.rowCnt > 0 {
totals["rowRate"] = rowRate
}
testResult := LoaderTestResult{
ResultFormatVersion: LoaderTestResultVersion,
RunnerConfig: l.BenchmarkRunnerConfig,
StartTime: start.Unix(),
EndTime: end.Unix(),
DurationMillis: took.Milliseconds(),
Totals: totals,
}
_, _ = fmt.Printf("Saving results json file to %s\n", l.BenchmarkRunnerConfig.ResultsFile)
file, err := json.MarshalIndent(testResult, "", " ")
if err != nil {
log.Fatal(err)
}
err = ioutil.WriteFile(l.BenchmarkRunnerConfig.ResultsFile, file, 0644)
if err != nil {
log.Fatal(err)
}
}
// RunBenchmark takes in a Benchmark b and uses it to run the load benchmark
func (l *CommonBenchmarkRunner) RunBenchmark(b targets.Benchmark) {
wg, start, cleanupFn := l.preRun(b)
var numChannels, capacity uint
if l.HashWorkers {
numChannels = l.Workers
capacity = 1
} else {
numChannels = 1
capacity = l.Workers
}
channels := l.createChannels(numChannels, capacity)
// Launch all worker processes in background
for i := uint(0); i < l.Workers; i++ {
go l.work(b, wg, channels[i%numChannels], i)
}
// Start scan process - actual data read process
scanWithFlowControl(channels, l.BatchSize, l.Limit, b.GetDataSource(), b.GetBatchFactory(), b.GetPointIndexer(uint(len(channels))))
// After scan process completed (no more data to come) - begin shutdown process
// Close all communication channels to/from workers
for _, c := range channels {
c.close()
}
cleanupFn()
l.postRun(wg, start)
}
// useDBCreator handles a DBCreator by running it according to flags set by the
// user. The function returns a function that the caller should defer or run
// when the benchmark is finished
func (l *CommonBenchmarkRunner) useDBCreator(dbc targets.DBCreator) func() {
// Empty function to 'defer' from caller
closeFn := func() {}
// DBCreator should still be Init'd even if -do-create-db is false since
// it can initialize the connecting session
dbc.Init()
if l.DoLoad {
switch dbcc := dbc.(type) {
case targets.DBCreatorCloser:
closeFn = dbcc.Close
}
// Check whether required DB already exists
exists := dbc.DBExists(l.DBName)
if exists && l.DoAbortOnExist {
panic(fmt.Sprintf(errDBExistsFmt, l.DBName))
}
// Create required DB if need be
// In case DB already exists - delete it
if l.DoCreateDB {
if exists {
err := dbc.RemoveOldDB(l.DBName)
if err != nil {
panic(err)
}
}
err := dbc.CreateDB(l.DBName)
if err != nil {
panic(err)
}
}
switch dbcp := dbc.(type) {
case targets.DBCreatorPost:
err := dbcp.PostCreateDB(l.DBName)
if err != nil {
log.Println("could not execute PostCreateDB:" + err.Error())
panic(err)
}
}
}
return closeFn
}
// createChannels create channels from which workers would receive tasks
func (l *CommonBenchmarkRunner) createChannels(numChannels, capacity uint) []*duplexChannel {
// Result - channels to be created
var channels []*duplexChannel
// Create duplex communication channels
for i := uint(0); i < numChannels; i++ {
channels = append(channels, newDuplexChannel(int(capacity)))
}
return channels
}
// work is the processing function for each worker in the loader
func (l *CommonBenchmarkRunner) work(b targets.Benchmark, wg *sync.WaitGroup, c *duplexChannel, workerNum uint) {
// Prepare processor
proc := b.GetProcessor()
proc.Init(int(workerNum), l.DoLoad, l.HashWorkers)
// Process batches coming from duplexChannel.toWorker queue
// and send ACKs into duplexChannel.toScanner queue
for batch := range c.toWorker {
startedWorkAt := time.Now()
metricCnt, rowCnt := proc.ProcessBatch(batch, l.DoLoad)
atomic.AddUint64(&l.metricCnt, metricCnt)
atomic.AddUint64(&l.rowCnt, rowCnt)
c.sendToScanner()
l.timeToSleep(workerNum, startedWorkAt)
}
// Close proc if necessary
switch c := proc.(type) {
case targets.ProcessorCloser:
c.Close(l.DoLoad)
}
wg.Done()
}
func (l *CommonBenchmarkRunner) timeToSleep(workerNum uint, startedWorkAt time.Time) {
if l.sleepRegulator != nil {
l.sleepRegulator.Sleep(int(workerNum), startedWorkAt)
}
}
// summary prints the summary of statistics from loading
func (l *CommonBenchmarkRunner) summary(took time.Duration) {
metricRate := float64(l.metricCnt) / took.Seconds()
printFn("\nSummary:\n")
printFn("loaded %d metrics in %0.3fsec with %d workers (mean rate %0.2f metrics/sec)\n", l.metricCnt, took.Seconds(), l.Workers, metricRate)
if l.rowCnt > 0 {
rowRate := float64(l.rowCnt) / float64(took.Seconds())
printFn("loaded %d rows in %0.3fsec with %d workers (mean rate %0.2f rows/sec)\n", l.rowCnt, took.Seconds(), l.Workers, rowRate)
}
}
// report handles periodic reporting of loading stats
func (l *CommonBenchmarkRunner) report(period time.Duration) {
start := time.Now()
prevTime := start
prevColCount := uint64(0)
prevRowCount := uint64(0)
printFn("time,per. metric/s,metric total,overall metric/s,per. row/s,row total,overall row/s\n")
for now := range time.NewTicker(period).C {
cCount := atomic.LoadUint64(&l.metricCnt)
rCount := atomic.LoadUint64(&l.rowCnt)
sinceStart := now.Sub(start)
took := now.Sub(prevTime)
colrate := float64(cCount-prevColCount) / float64(took.Seconds())
overallColRate := float64(cCount) / float64(sinceStart.Seconds())
if rCount > 0 {
rowrate := float64(rCount-prevRowCount) / float64(took.Seconds())
overallRowRate := float64(rCount) / float64(sinceStart.Seconds())
printFn("%d,%0.2f,%E,%0.2f,%0.2f,%E,%0.2f\n", now.Unix(), colrate, float64(cCount), overallColRate, rowrate, float64(rCount), overallRowRate)
} else {
printFn("%d,%0.2f,%E,%0.2f,-,-,-\n", now.Unix(), colrate, float64(cCount), overallColRate)
}
prevColCount = cCount
prevRowCount = rCount
prevTime = now
}
}