-
Notifications
You must be signed in to change notification settings - Fork 513
/
appender.go
113 lines (91 loc) · 2.43 KB
/
appender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package v2
import (
"hash"
"sync"
"github.com/cespare/xxhash"
"github.com/grafana/tempo/tempodb/encoding/common"
)
// Appender is capable of tracking objects and ids that are added to it
type Appender interface {
Append(common.ID, []byte) error
Complete() error
Records() []common.Record
RecordsForID(common.ID) []common.Record
Length() int
DataLength() uint64
}
type appender struct {
dataWriter common.DataWriter
records map[uint64][]common.Record
recordsMtx sync.RWMutex
hash hash.Hash64
currentOffset uint64
}
// NewAppender returns an appender. This appender simply appends new objects
// to the provided dataWriter.
func NewAppender(dataWriter common.DataWriter) Appender {
return &appender{
dataWriter: dataWriter,
records: map[uint64][]common.Record{},
hash: xxhash.New(),
}
}
// Append appends the id/object to the writer. Note that the caller is giving up ownership of the two byte arrays backing the slices.
// Copies should be made and passed in if this is a problem
func (a *appender) Append(id common.ID, b []byte) error {
_, err := a.dataWriter.Write(id, b)
if err != nil {
return err
}
bytesWritten, err := a.dataWriter.CutPage()
if err != nil {
return err
}
a.hash.Reset()
_, _ = a.hash.Write(id)
hash := a.hash.Sum64()
a.addRecord(hash, id, bytesWritten)
a.currentOffset += uint64(bytesWritten)
return nil
}
func (a *appender) addRecord(hash uint64, id common.ID, bytesWritten int) {
new := common.Record{
ID: id,
Start: a.currentOffset,
Length: uint32(bytesWritten),
}
a.recordsMtx.Lock()
defer a.recordsMtx.Unlock()
records := a.records[hash]
records = append(records, new)
a.records[hash] = records
}
func (a *appender) Records() []common.Record {
a.recordsMtx.RLock()
sliceRecords := make([]common.Record, 0, len(a.records))
for _, r := range a.records {
sliceRecords = append(sliceRecords, r...)
}
a.recordsMtx.RUnlock()
common.SortRecords(sliceRecords)
return sliceRecords
}
func (a *appender) RecordsForID(id common.ID) []common.Record {
hasher := xxhash.New()
_, _ = hasher.Write(id)
hash := hasher.Sum64()
a.recordsMtx.RLock()
defer a.recordsMtx.RUnlock()
return a.records[hash]
}
func (a *appender) Length() int {
a.recordsMtx.Lock()
defer a.recordsMtx.RUnlock()
return len(a.records)
}
func (a *appender) DataLength() uint64 {
return a.currentOffset
}
func (a *appender) Complete() error {
return a.dataWriter.Complete()
}