forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.go
129 lines (105 loc) · 3.83 KB
/
index.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package tsdb
import (
"fmt"
"os"
"regexp"
"sort"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/uber-go/zap"
)
type Index interface {
Open() error
Close() error
WithLogger(zap.Logger)
MeasurementExists(name []byte) (bool, error)
MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
DropMeasurement(name []byte) error
ForEachMeasurementName(fn func(name []byte) error) error
InitializeSeries(key, name []byte, tags models.Tags) error
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DropSeries(key []byte) error
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesN() int64
HasTagKey(name, key []byte) (bool, error)
TagSets(name []byte, options influxql.IteratorOptions) ([]*influxql.TagSet, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
MeasurementTagKeyValuesByExpr(name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
TagKeyCardinality(name, key []byte) int
// InfluxQL system iterators
MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error)
ForEachMeasurementSeriesByExpr(name []byte, expr influxql.Expr, fn func(tags models.Tags) error) error
SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iterator, error)
// Sets a shared fieldset from the engine.
SetFieldSet(fs *MeasurementFieldSet)
// Creates hard links inside path for snapshotting.
SnapshotTo(path string) error
// To be removed w/ tsi1.
SetFieldName(measurement []byte, name string)
AssignShard(k string, shardID uint64)
UnassignShard(k string, shardID uint64) error
RemoveShard(shardID uint64)
Type() string
}
// IndexFormat represents the format for an index.
type IndexFormat int
const (
// InMemFormat is the format used by the original in-memory shared index.
InMemFormat IndexFormat = 1
// TSI1Format is the format used by the tsi1 index.
TSI1Format IndexFormat = 2
)
// NewIndexFunc creates a new index.
type NewIndexFunc func(id uint64, database, path string, options EngineOptions) Index
// newIndexFuncs is a lookup of index constructors by name.
var newIndexFuncs = make(map[string]NewIndexFunc)
// RegisterIndex registers a storage index initializer by name.
func RegisterIndex(name string, fn NewIndexFunc) {
if _, ok := newIndexFuncs[name]; ok {
panic("index already registered: " + name)
}
newIndexFuncs[name] = fn
}
// RegisteredIndexs returns the slice of currently registered indexes.
func RegisteredIndexes() []string {
a := make([]string, 0, len(newIndexFuncs))
for k := range newIndexFuncs {
a = append(a, k)
}
sort.Strings(a)
return a
}
// NewIndex returns an instance of an index based on its format.
// If the path does not exist then the DefaultFormat is used.
func NewIndex(id uint64, database, path string, options EngineOptions) (Index, error) {
format := options.IndexVersion
// Use default format unless existing directory exists.
_, err := os.Stat(path)
if os.IsNotExist(err) {
// nop, use default
} else if err != nil {
return nil, err
} else if err == nil {
format = "tsi1"
}
// Lookup index by format.
fn := newIndexFuncs[format]
if fn == nil {
return nil, fmt.Errorf("invalid index format: %q", format)
}
return fn(id, database, path, options), nil
}
func MustOpenIndex(id uint64, database, path string, options EngineOptions) Index {
idx, err := NewIndex(id, database, path, options)
if err != nil {
panic(err)
} else if err := idx.Open(); err != nil {
panic(err)
}
return idx
}