This repository has been archived by the owner on Feb 21, 2024. It is now read-only.
/
batch.go
2001 lines (1784 loc) · 59.1 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2021 Molecula Corp. All rights reserved.
// Package batch provides tooling to prepare batches of records for ingest.
package batch
import (
"bytes"
"context"
"math/bits"
"sort"
"sync"
"time"
featurebase "github.com/featurebasedb/featurebase/v3"
"github.com/featurebasedb/featurebase/v3/batch/egpool"
"github.com/featurebasedb/featurebase/v3/dax"
"github.com/featurebasedb/featurebase/v3/logger"
"github.com/featurebasedb/featurebase/v3/pql"
"github.com/featurebasedb/featurebase/v3/roaring"
"github.com/pkg/errors"
)
// Batch defaults.
const (
DefaultKeyTranslateBatchSize = 100000
existenceFieldName = "_exists"
existenceViewName = "existence" // this should match top level featurebase viewExistence
)
// TODO if using column translation, column ids might get way out of
// order. Could be worth sorting everything after translation (as an
// option?). Instead of sorting all simultaneously, it might be faster
// (more cache friendly) to sort ids and save the swap ops to apply to
// everything else that needs to be sorted. Note: we're already doing
// some sorting in importValueData and importMutexData, so if we
// implement it at the top level, remember to remove it there.
// TODO support clearing values? nil values in records are ignored,
// but perhaps we could have a special type indicating that a bit or
// value should explicitly be cleared?
// RecordBatch is a Pilosa ingest interface designed to allow for
// maximum throughput on common workloads. Users should call Add()
// with a Row object until it returns ErrBatchNowFull, at which time
// they should call Import(), and then repeat.
//
// Add will not modify or otherwise retain the Row once it returns, so
// it is recommended that callers reuse the same Row with repeated
// calls to Add, just modifying its values appropriately in between
// calls. This avoids allocating a new slice of Values for each
// inserted Row.
//
// The supported types of the values in Row.Values are implementation
// defined. Similarly, the supported types for Row.ID are
// implementation defined.
type RecordBatch interface {
Add(Row) error
// Import does translation, creates the fragment files, and then,
// if we're not using split batch mode, imports everything to
// Pilosa. It then resets internal data structures for the next
// batch. If we are using split batch mode, it saves the fragment
// data to the batch, resets all other internal structures, and
// continues.
// Split batch mode DOES NOT CURRENTLY SUPPORT MUTEX OR INT FIELDS!
Import() error
// Len reports the number of records which have been added to the
// batch since the last call to Import (or since it was created).
Len() int
// Flush is only applicable in split batch mode where it actually
// imports the stored data to Pilosa. Otherwise it simply returns
// nil.
Flush() error
}
// agedTranslation combines a translation with a recording of when it was last used.
type agedTranslation struct {
id uint64
lastUsed uint64
}
// Batch implements RecordBatch.
//
// It supports Values of type string, uint64, int64, float64, or nil. The
// following table describes what Pilosa field each type of value must map to.
// Fields are set up when calling "NewBatch".
//
// | type | pilosa field type | options |
// |--------+-------------------+-----------|
// | string | set | keys=true |
// | uint64 | set | any |
// | int64 | int | any |
// | float64| decimal | scale |
// | bool | bool | any |
// | nil | any | |
//
// nil values are ignored.
type Batch struct {
importer featurebase.Importer
tbl *dax.Table
header []*featurebase.FieldInfo
headerMap map[string]*featurebase.FieldInfo
// prevDuration records the time that each doImport() takes. This
// is used to set the timeout for transactions to a reasonable
// value based on the last import. It starts with a conservative
// default set in NewBatch.
prevDuration time.Duration
// ids is a slice of length batchSize of record IDs
ids []uint64
// rowIDs is a map of field index (in the header) to slices of
// length batchSize which contain row IDs.
rowIDs map[int][]uint64
// clearRowIDs is a map[fieldIndex][idsIndex]rowID we don't expect
// clears to happen very often, so we store the idIndex/value
// mapping in a map rather than a slice as we do for rowIDs. This
// is a potentially temporary workaround to allow packed boolean
// fields to clear "false" values. Packed fields may be more
// completely supported by Pilosa in future.
clearRowIDs map[int]map[int]uint64
// rowIDSets is a map from field name to a batchSize slice of
// slices of row IDs. When a given record can have more than one
// value for a field, rowIDSets stores that information.
rowIDSets map[string][][]uint64
// values holds the values for each record of an int field
values map[string][]int64
// boolValues is a map[fieldName][idsIndex]bool, which holds the values for
// each record of a bool field. It is a map of maps in order to accomodate
// nil values (they just aren't recorded in the map[int]).
boolValues map[string]map[int]bool
// boolNulls holds a slice of indices into b.ids for each bool field which
// has nil values.
boolNulls map[string][]uint64
// times holds a time for each record. (if any of the fields are time fields)
times []QuantizedTime
// nullIndices holds a slice of indices into b.ids for each
// integer field which has nil values.
nullIndices map[string][]uint64
// TODO support bool fields.
// for each field, keep a map of key to which record indexes that key mapped to
toTranslate map[int]map[string][]int
toTranslateClear map[int]map[string][]int
// toTranslateSets is a map from field name to a map of string
// keys that need to be translated to sets of record indexes which
// those keys map to.
toTranslateSets map[string]map[string][]int
// toTranslateID maps each string key to a record index - this
// will get translated into Batch.rowIDs
toTranslateID []string
colTranslations map[string]agedTranslation
rowTranslations map[string]map[string]agedTranslation
cycle uint64
maxAge uint64
// staleTime tracks the time the first record of the batch was inserted
// plus the maxStaleness, in order to raise ErrBatchNowStale if the
// maxStaleness has elapsed
staleTime time.Time
maxStaleness time.Duration
// Maximum number of keys to translate at one time.
keyTranslateBatchSize int
log logger.Logger
// experimental — only used by FlushToFragments which is an
// alternative to Import which just builds the bitmap data for a
// batch without actually importing it.
splitBatchMode bool
frags fragments
clearFrags fragments
useShardTransactionalEndpoint bool
}
func (b *Batch) Len() int { return len(b.ids) }
// BatchOption is a functional option for Batch objects.
type BatchOption func(b *Batch) error
func OptLogger(l logger.Logger) BatchOption {
return func(b *Batch) error {
b.log = l
return nil
}
}
func OptSplitBatchMode(on bool) BatchOption {
return func(b *Batch) error {
b.splitBatchMode = on
return nil
}
}
func OptCacheMaxAge(age uint64) BatchOption {
return func(b *Batch) error {
b.maxAge = age
return nil
}
}
func OptMaxStaleness(t time.Duration) BatchOption {
return func(b *Batch) error {
b.maxStaleness = t
return nil
}
}
func OptKeyTranslateBatchSize(v int) BatchOption {
return func(b *Batch) error {
b.keyTranslateBatchSize = v
return nil
}
}
// OptUseShardTransactionalEndpoint tells the batch to import using
// the newer shard-transactional endpoint.
func OptUseShardTransactionalEndpoint(use bool) BatchOption {
return func(b *Batch) error {
b.useShardTransactionalEndpoint = use
return nil
}
}
func OptImporter(i featurebase.Importer) BatchOption {
return func(b *Batch) error {
b.importer = i
return nil
}
}
// NewBatch initializes a new Batch object which will use the given Importer,
// index, set of fields, and will take "size" records before returning
// ErrBatchNowFull. The positions of the Fields in 'fields' correspond to the
// positions of values in the Row's Values passed to Batch.Add().
func NewBatch(importer featurebase.Importer, size int, tbl *dax.Table, fields []*featurebase.FieldInfo, opts ...BatchOption) (*Batch, error) {
if len(fields) == 0 {
return nil, errors.New("can't batch with no fields")
} else if size == 0 {
return nil, errors.New("can't batch with no batch size")
}
headerMap := make(map[string]*featurebase.FieldInfo, len(fields))
rowIDs := make(map[int][]uint64, len(fields))
values := make(map[string][]int64)
boolValues := make(map[string]map[int]bool)
boolNulls := make(map[string][]uint64)
tt := make(map[int]map[string][]int, len(fields))
ttSets := make(map[string]map[string][]int)
hasTime := false
for i, field := range fields {
headerMap[field.Name] = field
opts := field.Options
// The client package has a FieldTypeDefault, but featurebase does not.
// When this code was moved from the client package to the batch
// package, FieldTypeDefault was no longer available. It probably isn't
// necessary, but to ensure backwards compatiblity, we continue to
// support it here with an unexported variable.
fieldTypeDefault := ""
switch typ := opts.Type; typ {
case fieldTypeDefault, featurebase.FieldTypeSet, featurebase.FieldTypeTime:
if opts.Keys {
tt[i] = make(map[string][]int)
ttSets[field.Name] = make(map[string][]int)
}
hasTime = typ == featurebase.FieldTypeTime || hasTime
case featurebase.FieldTypeInt, featurebase.FieldTypeDecimal, featurebase.FieldTypeTimestamp:
// tt line only needed if int field is string foreign key
tt[i] = make(map[string][]int)
values[field.Name] = make([]int64, 0, size)
case featurebase.FieldTypeMutex:
// similar to set/time fields, but no need to support sets
// of values (hence no ttSets)
if opts.Keys {
tt[i] = make(map[string][]int)
}
rowIDs[i] = make([]uint64, 0, size)
case featurebase.FieldTypeBool:
boolValues[field.Name] = make(map[int]bool)
default:
return nil, errors.Errorf("field type '%s' is not currently supported through Batch", typ)
}
}
b := &Batch{
importer: importer,
header: fields,
headerMap: headerMap,
prevDuration: time.Minute * 11,
tbl: tbl,
ids: make([]uint64, 0, size),
rowIDs: rowIDs,
clearRowIDs: make(map[int]map[int]uint64),
rowIDSets: make(map[string][][]uint64),
values: values,
boolValues: boolValues,
boolNulls: boolNulls,
nullIndices: make(map[string][]uint64),
toTranslate: tt,
toTranslateClear: make(map[int]map[string][]int),
toTranslateSets: ttSets,
colTranslations: make(map[string]agedTranslation),
rowTranslations: make(map[string]map[string]agedTranslation),
maxAge: 64,
maxStaleness: time.Duration(0),
keyTranslateBatchSize: DefaultKeyTranslateBatchSize,
log: logger.NopLogger,
frags: make(fragments),
clearFrags: make(fragments),
}
if hasTime {
b.times = make([]QuantizedTime, 0, size)
}
for _, opt := range opts {
err := opt(b)
if err != nil {
return nil, errors.Wrap(err, "applying options")
}
}
return b, nil
}
// Row represents a single record which can be added to a Batch.
type Row struct {
ID interface{}
// Values map to the slice of fields in Batch.header
Values []interface{}
// Clears' int key is an index into Batch.header
Clears map[int]interface{}
// Time applies to all time fields
Time QuantizedTime
}
// QuantizedTime represents a moment in time down to some granularity
// (year, month, day, or hour).
type QuantizedTime struct {
ymdh [10]byte
}
// Set sets the Quantized time to the given timestamp (down to hour
// granularity).
func (qt *QuantizedTime) Set(t time.Time) {
copy(qt.ymdh[:], t.Format("2006010215"))
}
// SetYear sets the quantized time's year, but leaves month, day, and
// hour untouched.
func (qt *QuantizedTime) SetYear(year string) {
copy(qt.ymdh[:4], year)
}
// SetMonth sets the QuantizedTime's month, but leaves year, day, and
// hour untouched.
func (qt *QuantizedTime) SetMonth(month string) {
copy(qt.ymdh[4:6], month)
}
// SetDay sets the QuantizedTime's day, but leaves year, month, and
// hour untouched.
func (qt *QuantizedTime) SetDay(day string) {
copy(qt.ymdh[6:8], day)
}
// SetHour sets the QuantizedTime's hour, but leaves year, month, and
// day untouched.
func (qt *QuantizedTime) SetHour(hour string) {
copy(qt.ymdh[8:10], hour)
}
func (qt *QuantizedTime) Time() (time.Time, error) {
return time.Parse("2006010215", string(qt.ymdh[:]))
}
// Reset sets the time to the zero value which generates no time views.
func (qt *QuantizedTime) Reset() {
for i := range qt.ymdh {
qt.ymdh[i] = 0
}
}
// views builds the list of Pilosa views for this particular time,
// given a quantum.
func (qt *QuantizedTime) views(q featurebase.TimeQuantum) ([]string, error) {
zero := QuantizedTime{}
if *qt == zero {
return nil, nil
}
views := make([]string, 0, len(q))
for _, unit := range q {
switch unit {
case 'Y':
if qt.ymdh[0] == 0 {
return nil, errors.New("no data set for year")
}
views = append(views, string(qt.ymdh[:4]))
case 'M':
if qt.ymdh[4] == 0 {
return nil, errors.New("no data set for month")
}
views = append(views, string(qt.ymdh[:6]))
case 'D':
if qt.ymdh[6] == 0 {
return nil, errors.New("no data set for day")
}
views = append(views, string(qt.ymdh[:8]))
case 'H':
if qt.ymdh[8] == 0 {
return nil, errors.New("no data set for hour")
}
views = append(views, string(qt.ymdh[:10]))
}
}
return views, nil
}
func (b *Batch) getColTranslation(key string) (uint64, bool) {
trans, ok := b.colTranslations[key]
if ok {
trans.lastUsed = b.cycle
b.colTranslations[key] = trans
}
return trans.id, ok
}
func (b *Batch) getRowTranslation(field, key string) (uint64, bool) {
trans, ok := b.rowTranslations[field][key]
if ok {
trans.lastUsed = b.cycle
b.rowTranslations[field][key] = trans
}
return trans.id, ok
}
// Add adds a record to the batch. Performance will be best if record
// IDs are shard-sorted. That is, all records which belong to the same
// Pilosa shard are added adjacent to each other. If the records are
// also in-order within a shard this will likely help as well. Add
// clears rec.Clears when it returns normally (either a nil error or
// BatchNowFull).
func (b *Batch) Add(rec Row) error {
// Clear recValues and rec.Clears upon return.
defer func() {
for i := range rec.Values {
rec.Values[i] = nil
}
for k := range rec.Clears {
delete(rec.Clears, k)
}
}()
if len(b.ids) == cap(b.ids) {
return ErrBatchAlreadyFull
}
if len(rec.Values) != len(b.header) {
return errors.Errorf("record needs to match up with batch fields, got %d fields and %d record", len(b.header), len(rec.Values))
}
handleStringID := func(rid string) error {
if rid == "" {
return errors.Errorf("record identifier cannot be an empty string")
}
if colID, ok := b.getColTranslation(rid); ok {
b.ids = append(b.ids, colID)
} else {
if b.toTranslateID == nil {
b.toTranslateID = make([]string, cap(b.ids))
}
b.toTranslateID[len(b.ids)] = rid
b.ids = append(b.ids, 0)
}
return nil
}
var err error
switch rid := rec.ID.(type) {
case uint64:
b.ids = append(b.ids, rid)
case string:
err := handleStringID(rid)
if err != nil {
return err
}
case []byte:
err = handleStringID(string(rid))
if err != nil {
return err
}
default: // TODO support nil ID as being auto-allocated.
return errors.Errorf("unsupported id type %T value %v", rid, rid)
}
// curPos is the current position in b.ids, rowIDs[*], etc.
curPos := len(b.ids) - 1
if b.times != nil {
b.times = append(b.times, rec.Time)
}
for i := 0; i < len(rec.Values); i++ {
field := b.header[i]
switch val := rec.Values[i].(type) {
case string:
switch field.Options.Type {
case featurebase.FieldTypeInt:
if val == "" {
// copied from the `case nil:` section for ints and decimals
b.values[field.Name] = append(b.values[field.Name], 0)
nullIndices, ok := b.nullIndices[field.Name]
if !ok {
nullIndices = make([]uint64, 0)
}
nullIndices = append(nullIndices, uint64(curPos))
b.nullIndices[field.Name] = nullIndices
} else if intVal, ok := b.getRowTranslation(field.Name, val); ok {
b.values[field.Name] = append(b.values[field.Name], int64(intVal))
} else {
ints, ok := b.toTranslate[i][val]
if !ok {
ints = make([]int, 0)
}
ints = append(ints, curPos)
b.toTranslate[i][val] = ints
b.values[field.Name] = append(b.values[field.Name], 0)
}
case featurebase.FieldTypeBool:
// If we want to support bools as string values, we would do
// that here.
default:
// nil-extend
for len(b.rowIDs[i]) < curPos {
b.rowIDs[i] = append(b.rowIDs[i], nilSentinel)
}
rowIDs := b.rowIDs[i]
// empty string is not a valid value at this point (Pilosa refuses to translate it)
if val == "" { //
b.rowIDs[i] = append(rowIDs, nilSentinel)
} else if rowID, ok := b.getRowTranslation(field.Name, val); ok {
b.rowIDs[i] = append(rowIDs, rowID)
} else {
ints, ok := b.toTranslate[i][val]
if !ok {
ints = make([]int, 0)
}
ints = append(ints, curPos)
b.toTranslate[i][val] = ints
b.rowIDs[i] = append(rowIDs, 0)
}
}
case uint64:
// nil-extend
for len(b.rowIDs[i]) < curPos {
b.rowIDs[i] = append(b.rowIDs[i], nilSentinel)
}
b.rowIDs[i] = append(b.rowIDs[i], val)
case int64:
b.values[field.Name] = append(b.values[field.Name], val)
case []string:
// note that a length of 0 can be valid, and represents an
// empty set. an empty set counts as a non-NULL value for
// SQL purposes -- it means the existence view bit should
// get set.
if val == nil {
continue
}
rowIDSets, ok := b.rowIDSets[field.Name]
if !ok {
rowIDSets = make([][]uint64, len(b.ids)-1, cap(b.ids))
b.rowIDSets[field.Name] = rowIDSets
}
for len(rowIDSets) < len(b.ids)-1 {
rowIDSets = append(rowIDSets, nil) // nil extend
}
rowIDs := make([]uint64, 0, len(val))
for _, k := range val {
if k == "" {
continue
}
if rowID, ok := b.getRowTranslation(field.Name, k); ok {
rowIDs = append(rowIDs, rowID)
} else {
ttsets, ok := b.toTranslateSets[field.Name]
if !ok {
ttsets = make(map[string][]int)
b.toTranslateSets[field.Name] = make(map[string][]int)
}
ints, ok := ttsets[k]
if !ok {
ints = make([]int, 0, 1)
}
ints = append(ints, curPos)
b.toTranslateSets[field.Name][k] = ints
}
}
b.rowIDSets[field.Name] = append(rowIDSets, rowIDs)
case []uint64:
// note that a length of 0 can be valid, and represents an
// empty set. an empty set counts as a non-NULL value for
// SQL purposes -- it means the existence view bit should
// get set.
if val == nil {
continue
}
rowIDSets, ok := b.rowIDSets[field.Name]
if !ok {
rowIDSets = make([][]uint64, len(b.ids)-1, cap(b.ids))
}
for len(rowIDSets) < len(b.ids)-1 {
rowIDSets = append(rowIDSets, nil) // nil extend
}
b.rowIDSets[field.Name] = append(rowIDSets, val)
case nil:
switch field.Options.Type {
case featurebase.FieldTypeInt, featurebase.FieldTypeDecimal, featurebase.FieldTypeTimestamp:
b.values[field.Name] = append(b.values[field.Name], 0)
nullIndices, ok := b.nullIndices[field.Name]
if !ok {
nullIndices = make([]uint64, 0)
}
nullIndices = append(nullIndices, uint64(curPos))
b.nullIndices[field.Name] = nullIndices
case featurebase.FieldTypeBool:
boolNulls, ok := b.boolNulls[field.Name]
if !ok {
boolNulls = make([]uint64, 0)
}
boolNulls = append(boolNulls, uint64(curPos))
b.boolNulls[field.Name] = boolNulls
default:
// only append nil to rowIDs if this field already has
// rowIDs. Otherwise, this could be a []string or
// []uint64 field where we've only seen nil values so
// far. when we see a uint64 or string value, we'll
// "nil-extend" rowIDs to make sure it's the right
// length.
if rowIDs, ok := b.rowIDs[i]; ok {
b.rowIDs[i] = append(rowIDs, nilSentinel)
}
}
case bool:
b.boolValues[field.Name][curPos] = val
case pql.Decimal:
b.values[field.Name] = append(b.values[field.Name], val.ToInt64(field.Options.Scale))
default:
return errors.Errorf("Val %v Type %[1]T is not currently supported. Use string, uint64 (row id), or int64 (integer value)", val)
}
}
for i, uval := range rec.Clears {
field := b.header[i]
if field.Options.Type == featurebase.FieldTypeMutex && uval != nil {
return errors.Errorf("individual-bit clears not allowed on mutex fields; use nil to clear a mutex")
}
if _, ok := b.clearRowIDs[i]; !ok {
b.clearRowIDs[i] = make(map[int]uint64)
}
switch val := uval.(type) {
case string:
clearRows := b.clearRowIDs[i]
// translate val and add to clearRows
if rowID, ok := b.getRowTranslation(field.Name, val); ok {
clearRows[curPos] = rowID
} else {
_, ok := b.toTranslateClear[i]
if !ok {
b.toTranslateClear[i] = make(map[string][]int)
}
ints, ok := b.toTranslateClear[i][val]
if !ok {
ints = make([]int, 0)
}
ints = append(ints, curPos)
b.toTranslateClear[i][val] = ints
}
case uint64:
b.clearRowIDs[i][curPos] = val
case nil:
if field.Options.Type == featurebase.FieldTypeMutex {
for len(b.rowIDs[i]) <= curPos {
b.rowIDs[i] = append(b.rowIDs[i], nilSentinel)
}
b.rowIDs[i][len(b.rowIDs[i])-1] = clearSentinel
}
default:
return errors.Errorf("Clearing a value '%v' Type %[1]T is not currently supported (field '%s')", val, field.Name)
}
// nil extend b.rowIDs so we don't run into a horrible bug
// where we skip doing clears because b.rowIDs doesn't have a
// value for this field
for len(b.rowIDs[i]) <= curPos {
b.rowIDs[i] = append(b.rowIDs[i], nilSentinel)
}
}
if len(b.ids) == cap(b.ids) {
return ErrBatchNowFull
}
if b.maxStaleness != time.Duration(0) { // set maxStaleness to 0 to disable staleness checking
if len(b.ids) == 1 {
b.staleTime = time.Now().Add(b.maxStaleness)
} else if time.Now().After(b.staleTime) {
return ErrBatchNowStale
}
}
return nil
}
// ErrBatchNowFull — similar to io.EOF — is a marker error to notify the user of
// a batch that it is time to call Import.
var ErrBatchNowFull = errors.New("batch is now full - you cannot add any more records (though the one you just added was accepted)")
// ErrBatchAlreadyFull is a real error saying that Batch.Add did not
// complete because the batch was full.
var ErrBatchAlreadyFull = errors.New("batch was already full, record was rejected")
// ErrBatchNowStale indicates that the oldest record in the batch is older than
// the maxStaleness value of the batch. Like ErrBatchNowFull, the error does
// not mean the record was rejected.
var ErrBatchNowStale = errors.New("batch is stale and needs to be imported (however, record was accepted)")
// Import does translation, creates the fragment files, and then,
// if we're not using split batch mode, imports everything to
// Pilosa. It then resets internal data structures for the next
// batch. If we are using split batch mode, it saves the fragment
// data to the batch, resets all other internal structures, and
// continues. split batch mode DOES NOT CURRENTLY SUPPORT MUTEX
// OR INT FIELDS!
func (b *Batch) Import() error {
ctx := context.Background()
start := time.Now()
if !b.useShardTransactionalEndpoint {
trns, err := b.importer.StartTransaction(ctx, "", b.prevDuration*10, false, time.Hour)
if err != nil {
return errors.Wrap(err, "starting transaction")
}
defer func() {
if trns != nil {
if trnsl, err := b.importer.FinishTransaction(ctx, trns.ID); err != nil {
b.log.Errorf("error finishing transaction: %v. trns: %+v", err, trnsl)
}
}
}()
}
defer func() {
featurebase.SummaryBatchImportDurationSeconds.Observe(time.Since(start).Seconds())
}()
size := len(b.ids)
transStart := time.Now()
// first we need to translate the toTranslate, then fill out the missing row IDs
err := b.doTranslation()
if err != nil {
return errors.Wrap(err, "doing Translation")
}
transTime := time.Now()
b.log.Printf("translating batch of %d took: %v", size, transTime.Sub(transStart))
frags, clearFrags, err := b.makeFragments(b.frags, b.clearFrags)
if err != nil {
return errors.Wrap(err, "making fragments (flush)")
}
if b.useShardTransactionalEndpoint {
frags, clearFrags, err = b.makeSingleValFragments(frags, clearFrags)
if err != nil {
return errors.Wrap(err, "making single val fragments")
}
}
makeTime := time.Now()
b.log.Printf("making fragments for batch of %d took %v", size, makeTime.Sub(transTime))
if b.splitBatchMode {
b.frags = frags
b.clearFrags = clearFrags
} else {
b.frags = make(fragments)
b.clearFrags = make(fragments)
// create bitmaps out of each field in b.rowIDs and import. Also
// import int data.
if !b.useShardTransactionalEndpoint {
err = b.doImport(frags, clearFrags)
if err != nil {
return errors.Wrap(err, "doing import")
}
b.log.Printf("importing fragments took %v", time.Since(makeTime))
} else {
err = b.doImportShardTransactional(frags, clearFrags)
if err != nil {
return errors.Wrap(err, "doing shard transactional import")
}
}
}
b.reset()
return nil
}
// Flush is only applicable in split batch mode where it actually
// imports the stored data to Pilosa. Otherwise it simply returns
// nil.
func (b *Batch) Flush() error {
ctx := context.Background()
if !b.splitBatchMode {
return nil
}
start := time.Now()
trns, err := b.importer.StartTransaction(ctx, "", b.prevDuration*10, false, time.Hour)
if err != nil {
return errors.Wrap(err, "starting transaction")
}
defer func() {
trnsl, err := b.importer.FinishTransaction(ctx, trns.ID)
if err != nil {
b.log.Errorf("error finishing transaction: %v. trns: %+v", err, trnsl)
}
featurebase.SummaryBatchFlushDurationSeconds.Observe(time.Since(start).Seconds())
}()
importStart := time.Now()
err = b.doImport(b.frags, b.clearFrags)
if err != nil {
return errors.Wrap(err, "doing import (ImportFragments)")
}
b.log.Debugf("superbatch import took %v", time.Since(importStart))
b.reset()
b.frags = make(fragments)
b.clearFrags = make(fragments)
return nil
}
func (b *Batch) doTranslation() error {
eg := egpool.Group{PoolSize: 20}
// Translate the column keys.
eg.Go(func() error {
// Deduplicate keys to translate.
dedup := make(map[string]struct{})
var keys []string
for _, key := range b.toTranslateID {
if key == "" {
continue
}
if _, ok := dedup[key]; ok {
continue
}
dedup[key] = struct{}{}
keys = append(keys, key)
}
if len(keys) == 0 {
// There are no column keys to translate.
return nil
}
// Create the keys.
start := time.Now()
trans, err := b.createIndexKeys(keys...)
if err != nil {
return errors.Wrap(err, "translating col keys")
}
if len(trans) != len(keys) {
return errors.Errorf("requested IDs for %d column keys but got %d back", len(keys), len(trans))
}
b.log.Debugf("translating %d column keys took %v", len(keys), time.Since(start))
// Apply keys to translation cache.
for key, id := range trans {
b.colTranslations[key] = agedTranslation{
id: id,
lastUsed: b.cycle,
}
}
// Translate remaining keys in batch.
for index, ttkey := range b.toTranslateID {
if ttkey == "" {
continue
}
b.ids[index] = trans[ttkey]
}
return nil
})
// creating a lock up here for the rowCache(s) which we get
// below. Usually this isn't needed, but sometimes I think the
// same rowCache gets used repeatedly because the same field is in
// there multiple times, and that can lead to race
// conditions. Need to understand this better, but gonna see if
// this avoids the races.
rowCacheLock := &sync.Mutex{}
// Translate the row keys.
for i, tt := range b.toTranslate {
// Skip this if there are no keys to translate.
ttc := b.toTranslateClear[i]
if len(tt) == 0 && len(ttc) == 0 {
continue
}
// Look up the associated field.
field := b.header[i]
fieldName := field.Name
// Fetch the translation cache.
rowCache := b.rowTranslations[fieldName]
if rowCache == nil {
rowCache = make(map[string]agedTranslation)
b.rowTranslations[fieldName] = rowCache
}
i, tt := i, tt
eg.Go(func() error {
// Collect the keys to translate.
keys := make([]string, 0, len(tt)+len(ttc))
for k := range tt {
keys = append(keys, k)
}
for k := range ttc {
keys = append(keys, k)
}
// Create the keys.
start := time.Now()
trans, err := b.createFieldKeys(field, keys...)
if err != nil {
return errors.Wrap(err, "translating field keys")
}
b.log.Debugf("translating %d field keys for %s took %v", len(trans), fieldName, time.Since(start))
// Apply keys to translation cache.
rowCacheLock.Lock()
for key, id := range trans {
rowCache[key] = agedTranslation{
id: id,
lastUsed: b.cycle,
}
}
rowCacheLock.Unlock()
switch ftype := field.Options.Type; ftype {
case featurebase.FieldTypeSet, featurebase.FieldTypeMutex, featurebase.FieldTypeTime:
// Fill out missing IDs in local batch records with translated IDs.
rows := b.rowIDs[i]
for key, idxs := range tt {
id, ok := trans[key]
if !ok {
return errors.Errorf("key translation missing: %q in field %q", key, fieldName)
}
for _, i := range idxs {
rows[i] = id
}
}
// Fill out missing IDs in clear lists.
clearRows := b.clearRowIDs[i]
for key, idxs := range ttc {
id, ok := trans[key]
if !ok {
return errors.Errorf("key translation missing: %q in field %q", key, fieldName)
}
for _, i := range idxs {
clearRows[i] = id
}
}
case featurebase.FieldTypeInt: