-
Notifications
You must be signed in to change notification settings - Fork 3
/
scan.go
46 lines (37 loc) · 1.1 KB
/
scan.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package main
import (
"bufio"
"github.com/RediSearch/ftsb/benchmark_runner"
"log"
"sync"
)
type decoder struct {
scanner *bufio.Scanner
}
// Reads and returns a text line that encodes a databuild point for a specif field name.
// Since scanning happens in a single thread, we hold off on transforming it
// to an INSERT statement until it's being processed concurrently by a worker.
func (d *decoder) Decode(_ *bufio.Reader) *benchmark_runner.DocHolder {
ok := d.scanner.Scan()
if !ok && d.scanner.Err() == nil { // nothing scanned & no error = EOF
return nil
} else if !ok {
log.Fatalf("scan error: %v", d.scanner.Err())
}
return benchmark_runner.NewDocument(d.scanner.Text())
}
type eventsBatch struct {
rows []string
}
func (eb *eventsBatch) Len() int {
return len(eb.rows)
}
func (eb *eventsBatch) Append(item *benchmark_runner.DocHolder) {
that := item.Data.(string)
eb.rows = append(eb.rows, that)
}
var ePool = &sync.Pool{New: func() interface{} { return &eventsBatch{rows: []string{}} }}
type factory struct{}
func (f *factory) New() benchmark_runner.Batch {
return ePool.Get().(*eventsBatch)
}