-
Notifications
You must be signed in to change notification settings - Fork 0
/
sketch_accessor.go
146 lines (121 loc) · 3.9 KB
/
sketch_accessor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package sketches
import (
"sort"
"github.com/fluxninja/datasketches-go/sketches/util"
)
const BB_LVL_IDX = -1
type DoublesSketchAccessor interface {
SetLevel(level int32)
NumItems() int32
GetArray(fromIdx int32, numItems int32) []float64
PutArray(srcArray []float64, srcIndex, dstIndex, numItems int32)
Get(index int32) float64
Set(index int32, value float64) float64
Sort()
CopyAndSetLevel(level int32) DoublesSketchAccessor
}
type AbstractDoublesSketchAccessor struct {
DoublesSketchAccessor
sketch DoublesSketch
forceSize bool
n int64
numItems int32
currentLevel int32
offset int32
}
func (acc *AbstractDoublesSketchAccessor) NumItems() int32 {
return acc.numItems
}
func (acc *AbstractDoublesSketchAccessor) SetLevel(level int32) {
acc.currentLevel = level
if level == BB_LVL_IDX {
acc.numItems = acc.sketch.GetBaseBufferCount()
if acc.forceSize {
acc.numItems = acc.sketch.GetK() * 2
}
acc.offset = 0
if acc.sketch.IsDirect() {
acc.offset = COMBINED_BUFFER
}
} else {
util.Assert(level >= 0, "level >= 0")
acc.numItems = 0
if acc.forceSize || ((acc.sketch.GetBitPattern() & (1 << level)) > 0) {
acc.numItems = acc.sketch.GetK()
}
var levelStart int32 = (2 + acc.currentLevel) * acc.sketch.GetK()
if acc.sketch.IsCompact() {
levelStart = acc.sketch.GetBaseBufferCount() + (acc.countValidLevelsBelow(level) * acc.sketch.GetK())
}
acc.offset = levelStart
if acc.sketch.IsDirect() {
var preLongsAndExtra int32 = MAX_PRELONGS + 2
acc.offset = (preLongsAndExtra + levelStart) << 3
}
}
acc.n = acc.sketch.GetN()
}
func (acc *AbstractDoublesSketchAccessor) countValidLevelsBelow(level int32) int32 {
var count int32 = 0
var ubitPattern uint64 = uint64(acc.sketch.GetBitPattern())
for i := int32(0); (i < level) && (ubitPattern > 0); i++ {
if (ubitPattern & 1) > 0 {
count++
}
ubitPattern >>= 1
}
return count
}
func NewDoublesSketchAccessor(sketch DoublesSketch, forceSize bool) DoublesSketchAccessor {
if sketch.IsDirect() {
// TODO: FLUX-1797, implement NewDirectDoublesSketchAccessor
return NewHeapDoublesSketchAccessor(sketch, forceSize, BB_LVL_IDX)
}
return NewHeapDoublesSketchAccessor(sketch, forceSize, BB_LVL_IDX)
}
type DirectDoublesSketchAccessor struct{}
func NewDirectDoublesSketchAccessor() *DirectDoublesSketchAccessor {
return &DirectDoublesSketchAccessor{}
}
type HeapDoublesSketchAccessor struct {
*AbstractDoublesSketchAccessor
}
func (acc *HeapDoublesSketchAccessor) CopyAndSetLevel(level int32) DoublesSketchAccessor {
return NewHeapDoublesSketchAccessor(acc.sketch, acc.forceSize, level)
}
func NewHeapDoublesSketchAccessor(sketch DoublesSketch, forceSize bool, level int32) *HeapDoublesSketchAccessor {
accessor := &HeapDoublesSketchAccessor{
&AbstractDoublesSketchAccessor{
sketch: sketch,
forceSize: forceSize,
},
}
accessor.SetLevel(level)
return accessor
}
func (acc *HeapDoublesSketchAccessor) GetArray(fromIdx int32, numItems int32) []float64 {
stIdx := acc.offset + fromIdx
x := make([]float64, numItems)
copy(x, acc.sketch.GetCombinedBuffer()[stIdx:stIdx+numItems])
return x
}
func (acc *HeapDoublesSketchAccessor) PutArray(srcArray []float64, srcIndex, dstIndex, numItems int32) {
var tgtIdx int32 = acc.offset + dstIndex
copy(acc.sketch.GetCombinedBuffer()[tgtIdx:tgtIdx+numItems], srcArray[srcIndex:srcIndex+numItems])
}
func (acc *HeapDoublesSketchAccessor) Get(index int32) float64 {
return acc.sketch.GetCombinedBuffer()[acc.offset+index]
}
func (acc *HeapDoublesSketchAccessor) Set(index int32, value float64) float64 {
idxOffset := acc.offset + index
oldVal := acc.sketch.GetCombinedBuffer()[idxOffset]
acc.sketch.GetCombinedBuffer()[idxOffset] = value
return oldVal
}
func (acc *HeapDoublesSketchAccessor) Sort() {
startIdx := acc.offset
endIdx := acc.offset + acc.NumItems()
if !acc.sketch.IsCompact() {
sort.Float64s(acc.sketch.GetCombinedBuffer()[startIdx:endIdx])
}
}