-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
delta_length_byte_array.go
148 lines (127 loc) · 5.35 KB
/
delta_length_byte_array.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package encoding
import (
"github.com/apache/arrow/go/v16/arrow/memory"
"github.com/apache/arrow/go/v16/internal/utils"
"github.com/apache/arrow/go/v16/parquet"
"golang.org/x/xerrors"
)
// DeltaLengthByteArrayEncoder encodes data using by taking all of the byte array lengths
// and encoding them in front using delta encoding, followed by all of the binary data
// concatenated back to back. The expected savings is from the cost of encoding the lengths
// and possibly better compression in the data which will no longer be interleaved with the lengths.
//
// This encoding is always preferred over PLAIN for byte array columns where possible.
//
// For example, if the data was "Hello", "World", "Foobar", "ABCDEF" the encoded data would be:
// DeltaEncoding(5, 5, 6, 6) "HelloWorldFoobarABCDEF"
type DeltaLengthByteArrayEncoder struct {
encoder
lengthEncoder *DeltaBitPackInt32Encoder
}
// Put writes the provided slice of byte arrays to the encoder
func (enc *DeltaLengthByteArrayEncoder) Put(in []parquet.ByteArray) {
lengths := make([]int32, len(in))
totalLen := int(0)
for idx, val := range in {
lengths[idx] = int32(val.Len())
totalLen += val.Len()
}
enc.lengthEncoder.Put(lengths)
enc.sink.Reserve(totalLen)
for _, val := range in {
enc.sink.UnsafeWrite(val)
}
}
// PutSpaced is like Put, but the data is spaced out according to the bitmap provided and is compressed
// accordingly before it is written to drop the null data from the write.
func (enc *DeltaLengthByteArrayEncoder) PutSpaced(in []parquet.ByteArray, validBits []byte, validBitsOffset int64) {
if validBits != nil {
data := make([]parquet.ByteArray, len(in))
nvalid := spacedCompress(in, data, validBits, validBitsOffset)
enc.Put(data[:nvalid])
} else {
enc.Put(in)
}
}
// Type returns the underlying type which is handled by this encoder, ByteArrays only.
func (DeltaLengthByteArrayEncoder) Type() parquet.Type {
return parquet.Types.ByteArray
}
// FlushValues flushes any remaining data and returns the final encoded buffer of data
// or returns nil and any error encountered.
func (enc *DeltaLengthByteArrayEncoder) FlushValues() (Buffer, error) {
ret, err := enc.lengthEncoder.FlushValues()
if err != nil {
return nil, err
}
defer ret.Release()
data := enc.sink.Finish()
defer data.Release()
output := bufferPool.Get().(*memory.Buffer)
output.ResizeNoShrink(ret.Len() + data.Len())
copy(output.Bytes(), ret.Bytes())
copy(output.Bytes()[ret.Len():], data.Bytes())
return poolBuffer{output}, nil
}
// DeltaLengthByteArrayDecoder is a decoder for handling data produced by the corresponding
// encoder which expects delta packed lengths followed by the bytes of data.
type DeltaLengthByteArrayDecoder struct {
decoder
mem memory.Allocator
lengths []int32
}
// Type returns the underlying type which is handled by this encoder, ByteArrays only.
func (DeltaLengthByteArrayDecoder) Type() parquet.Type {
return parquet.Types.ByteArray
}
func (d *DeltaLengthByteArrayDecoder) Allocator() memory.Allocator { return d.mem }
// SetData sets in the expected data to the decoder which should be nvalues delta packed lengths
// followed by the rest of the byte array data immediately after.
func (d *DeltaLengthByteArrayDecoder) SetData(nvalues int, data []byte) error {
dec := DeltaBitPackInt32Decoder{
deltaBitPackDecoder: &deltaBitPackDecoder{
decoder: newDecoderBase(d.encoding, d.descr),
mem: d.mem}}
if err := dec.SetData(nvalues, data); err != nil {
return err
}
d.lengths = make([]int32, dec.totalValues)
dec.Decode(d.lengths)
return d.decoder.SetData(nvalues, data[int(dec.bytesRead()):])
}
// Decode populates the passed in slice with data decoded until it hits the length of out
// or runs out of values in the column to decode, then returns the number of values actually decoded.
func (d *DeltaLengthByteArrayDecoder) Decode(out []parquet.ByteArray) (int, error) {
max := utils.Min(len(out), d.nvals)
for i := 0; i < max; i++ {
out[i] = d.data[:d.lengths[i]:d.lengths[i]]
d.data = d.data[d.lengths[i]:]
}
d.nvals -= max
d.lengths = d.lengths[max:]
return max, nil
}
// DecodeSpaced is like Decode, but for spaced data using the provided bitmap to determine where the nulls should be inserted.
func (d *DeltaLengthByteArrayDecoder) DecodeSpaced(out []parquet.ByteArray, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
toread := len(out) - nullCount
values, _ := d.Decode(out[:toread])
if values != toread {
return values, xerrors.New("parquet: number of values / definition levels read did not match")
}
return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
}