forked from sourcegraph/sourcegraph
/
batch_inserter.go
109 lines (91 loc) · 3.53 KB
/
batch_inserter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package sqliteutil
import (
"context"
"database/sql"
"fmt"
"strings"
)
// BatchInserter batches insertions to a single column in a SQLite database.
//
// The benchmark tests provided in this package show that 50% more rows can be
// inserted in the same time it takes for them to be inserted individually within
// a transaction.
//
// BenchmarkSQLiteInsertion-8 40417 29440 ns/op
// BenchmarkSQLiteInsertionInTransaction-8 214681 5542 ns/op
// BenchmarkSQLiteInsertionWithBatchInserter-8 324998 3701 ns/op
type BatchInserter struct {
db Execable
numColumns int
maxBatchSize int
batch []interface{}
queryPrefix string
queryPlaceholders []string
}
// MaxNumSqliteParameters is the number of `?` placeholders that can be sent to SQLite without error.
const MaxNumSqliteParameters = 999
// Execable is the minimal common interface over sql.DB and sql.Tx required
// by BatchInserter.
type Execable interface {
// ExecContext executes a query without returning any rows.
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}
// NewBatchInserter creates a new batch inserter.
func NewBatchInserter(db Execable, tableName string, columnNames ...string) *BatchInserter {
numColumns := len(columnNames)
maxBatchSize := (MaxNumSqliteParameters / numColumns) * numColumns
placeholders := make([]string, numColumns)
quotedColumnNames := make([]string, numColumns)
for i, columnName := range columnNames {
placeholders[i] = "?"
quotedColumnNames[i] = fmt.Sprintf(`"%s"`, columnName)
}
queryPrefix := fmt.Sprintf(`INSERT INTO "%s" (%s) VALUES `, tableName, strings.Join(quotedColumnNames, ","))
queryPlaceholders := make([]string, maxBatchSize/numColumns)
for i := range queryPlaceholders {
queryPlaceholders[i] = fmt.Sprintf("(%s)", strings.Join(placeholders, ","))
}
return &BatchInserter{
db: db,
numColumns: numColumns,
maxBatchSize: maxBatchSize,
batch: make([]interface{}, 0, maxBatchSize),
queryPrefix: queryPrefix,
queryPlaceholders: queryPlaceholders,
}
}
// Inserter enqueues the values of a single row for insertion. The given values must match up
// with the columnNames given at construction of the inserter.
func (bi *BatchInserter) Insert(ctx context.Context, values ...interface{}) error {
if len(values) != bi.numColumns {
return fmt.Errorf("expected %d values, got %d", bi.numColumns, len(values))
}
bi.batch = append(bi.batch, values...)
if len(bi.batch) >= bi.maxBatchSize {
// Flush full batch
return bi.Flush(ctx)
}
return nil
}
// Flush ensures that all queued rows are inserted. This method must be invoked at the end
// of insertion to ensure that all records are flushed to the underlying Execable.
func (bi *BatchInserter) Flush(ctx context.Context) error {
if batch := bi.pop(); len(batch) > 0 {
// Create a query with enough placeholders to match the current batch size. This should
// generally be the full queryPlaceholders slice, except for the last call to Flush which
// may be a partial batch.
query := bi.queryPrefix + strings.Join(bi.queryPlaceholders[:len(batch)/bi.numColumns], ",")
if _, err := bi.db.ExecContext(ctx, query, batch...); err != nil {
return err
}
}
return nil
}
func (bi *BatchInserter) pop() (batch []interface{}) {
if len(bi.batch) < bi.maxBatchSize {
batch, bi.batch = bi.batch, bi.batch[:0]
return batch
}
batch, bi.batch = bi.batch[:bi.maxBatchSize], bi.batch[bi.maxBatchSize:]
return batch
}