/
interface.go
140 lines (128 loc) · 3.4 KB
/
interface.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package chunkenc
import (
"context"
"errors"
"fmt"
"io"
"strings"
"time"
"github.com/credativ/vali/pkg/iter"
"github.com/credativ/vali/pkg/logproto"
"github.com/credativ/vali/pkg/logql/log"
)
// Errors returned by the chunk interface.
var (
ErrChunkFull = errors.New("chunk full")
ErrOutOfOrder = errors.New("entry out of order")
ErrInvalidSize = errors.New("invalid size")
ErrInvalidFlag = errors.New("invalid flag")
ErrInvalidChecksum = errors.New("invalid chunk checksum")
)
// Encoding is the identifier for a chunk encoding.
type Encoding byte
// The different available encodings.
// Make sure to preserve the order, as these numeric values are written to the chunks!
const (
EncNone Encoding = iota
EncGZIP
EncDumb
EncLZ4_64k
EncSnappy
EncLZ4_256k
EncLZ4_1M
EncLZ4_4M
EncFlate
EncZstd
)
var supportedEncoding = []Encoding{
EncNone,
EncGZIP,
EncLZ4_64k,
EncSnappy,
EncLZ4_256k,
EncLZ4_1M,
EncLZ4_4M,
EncFlate,
EncZstd,
}
func (e Encoding) String() string {
switch e {
case EncGZIP:
return "gzip"
case EncNone:
return "none"
case EncDumb:
return "dumb"
case EncLZ4_64k:
return "lz4-64k"
case EncLZ4_256k:
return "lz4-256k"
case EncLZ4_1M:
return "lz4-1M"
case EncLZ4_4M:
return "lz4"
case EncSnappy:
return "snappy"
case EncFlate:
return "flate"
case EncZstd:
return "zstd"
default:
return "unknown"
}
}
// ParseEncoding parses an chunk encoding (compression algorithm) by its name.
func ParseEncoding(enc string) (Encoding, error) {
for _, e := range supportedEncoding {
if strings.EqualFold(e.String(), enc) {
return e, nil
}
}
return 0, fmt.Errorf("invalid encoding: %s, supported: %s", enc, SupportedEncoding())
}
// SupportedEncoding returns the list of supported Encoding.
func SupportedEncoding() string {
var sb strings.Builder
for i := range supportedEncoding {
sb.WriteString(supportedEncoding[i].String())
if i != len(supportedEncoding)-1 {
sb.WriteString(", ")
}
}
return sb.String()
}
// Chunk is the interface for the compressed logs chunk format.
type Chunk interface {
Bounds() (time.Time, time.Time)
SpaceFor(*logproto.Entry) bool
Append(*logproto.Entry) error
Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error)
SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator
// Returns the list of blocks in the chunks.
Blocks(mintT, maxtT time.Time) []Block
Size() int
Bytes() ([]byte, error)
BytesWith([]byte) ([]byte, error) // uses provided []byte for buffer instantiation
io.WriterTo
BlockCount() int
Utilization() float64
UncompressedSize() int
CompressedSize() int
Close() error
Encoding() Encoding
}
// Block is a chunk block.
type Block interface {
// MinTime is the minimum time of entries in the block
MinTime() int64
// MaxTime is the maximum time of entries in the block
MaxTime() int64
// Offset is the offset/position of the block in the chunk. Offset is unique for a given block per chunk.
Offset() int
// Entries is the amount of entries in the block.
Entries() int
// Iterator returns an entry iterator for the block.
Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator
// SampleIterator returns a sample iterator for the block.
SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator
}