forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
engine.go
164 lines (134 loc) · 5.05 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package tsdb
import (
"errors"
"fmt"
"io"
"os"
"regexp"
"sort"
"time"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/uber-go/zap"
)
var (
// ErrFormatNotFound is returned when no format can be determined from a path.
ErrFormatNotFound = errors.New("format not found")
// ErrUnknownEngineFormat is returned when the engine format is
// unknown. ErrUnknownEngineFormat is currently returned if a format
// other than tsm1 is encountered.
ErrUnknownEngineFormat = errors.New("unknown engine format")
)
// Engine represents a swappable storage engine for the shard.
type Engine interface {
Open() error
Close() error
SetEnabled(enabled bool)
SetCompactionsEnabled(enabled bool)
WithLogger(zap.Logger)
LoadMetadataIndex(shardID uint64, index Index) error
CreateSnapshot() (string, error)
Backup(w io.Writer, basePath string, since time.Time) error
Restore(r io.Reader, basePath string) error
Import(r io.Reader, basePath string) error
CreateIterator(measurement string, opt influxql.IteratorOptions) (influxql.Iterator, error)
WritePoints(points []models.Point) error
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DeleteSeriesRange(keys [][]byte, min, max int64) error
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesN() int64
MeasurementExists(name []byte) (bool, error)
MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
MeasurementFields(measurement []byte) *MeasurementFields
ForEachMeasurementName(fn func(name []byte) error) error
DeleteMeasurement(name []byte) error
// TagKeys(name []byte) ([][]byte, error)
HasTagKey(name, key []byte) (bool, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
MeasurementTagKeyValuesByExpr(name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
TagKeyCardinality(name, key []byte) int
// InfluxQL iterators
MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error)
ForEachMeasurementSeriesByExpr(name []byte, expr influxql.Expr, fn func(tags models.Tags) error) error
SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iterator, error)
// Statistics will return statistics relevant to this engine.
Statistics(tags map[string]string) []models.Statistic
LastModified() time.Time
DiskSize() int64
IsIdle() bool
io.WriterTo
}
// EngineFormat represents the format for an engine.
type EngineFormat int
const (
// TSM1Format is the format used by the tsm1 engine.
TSM1Format EngineFormat = 2
)
// NewEngineFunc creates a new engine.
type NewEngineFunc func(id uint64, i Index, database, path string, walPath string, options EngineOptions) Engine
// newEngineFuncs is a lookup of engine constructors by name.
var newEngineFuncs = make(map[string]NewEngineFunc)
// RegisterEngine registers a storage engine initializer by name.
func RegisterEngine(name string, fn NewEngineFunc) {
if _, ok := newEngineFuncs[name]; ok {
panic("engine already registered: " + name)
}
newEngineFuncs[name] = fn
}
// RegisteredEngines returns the slice of currently registered engines.
func RegisteredEngines() []string {
a := make([]string, 0, len(newEngineFuncs))
for k := range newEngineFuncs {
a = append(a, k)
}
sort.Strings(a)
return a
}
// NewEngine returns an instance of an engine based on its format.
// If the path does not exist then the DefaultFormat is used.
func NewEngine(id uint64, i Index, database, path string, walPath string, options EngineOptions) (Engine, error) {
// Create a new engine
if _, err := os.Stat(path); os.IsNotExist(err) {
return newEngineFuncs[options.EngineVersion](id, i, database, path, walPath, options), nil
}
// If it's a dir then it's a tsm1 engine
format := DefaultEngine
if fi, err := os.Stat(path); err != nil {
return nil, err
} else if !fi.Mode().IsDir() {
return nil, ErrUnknownEngineFormat
} else {
format = "tsm1"
}
// Lookup engine by format.
fn := newEngineFuncs[format]
if fn == nil {
return nil, fmt.Errorf("invalid engine format: %q", format)
}
return fn(id, i, database, path, walPath, options), nil
}
// EngineOptions represents the options used to initialize the engine.
type EngineOptions struct {
EngineVersion string
IndexVersion string
ShardID uint64
InmemIndex interface{} // shared in-memory index
CompactionLimiter limiter.Fixed
Config Config
}
// NewEngineOptions returns the default options.
func NewEngineOptions() EngineOptions {
return EngineOptions{
EngineVersion: DefaultEngine,
IndexVersion: DefaultIndex,
Config: NewConfig(),
}
}
// NewInmemIndex returns a new "inmem" index type.
var NewInmemIndex func(name string) (interface{}, error)