forked from ghetzel/pivot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
backends.go
109 lines (91 loc) · 3.02 KB
/
backends.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package backends
import (
"fmt"
"time"
"github.com/alexcesaro/statsd"
"github.com/PerformLine/go-stockutil/log"
"github.com/PerformLine/pivot/v3/dal"
"github.com/PerformLine/pivot/v3/filter"
)
var querylog = log.Logger()
var stats, _ = statsd.New()
var DefaultAutoregister = false
var AutopingTimeout = 5 * time.Second
type BackendFeature int
const (
PartialSearch BackendFeature = iota
CompositeKeys
Constraints
)
type Backend interface {
Initialize() error
SetIndexer(dal.ConnectionString) error
RegisterCollection(*dal.Collection)
GetConnectionString() *dal.ConnectionString
Exists(collection string, id interface{}) bool
Retrieve(collection string, id interface{}, fields ...string) (*dal.Record, error)
Insert(collection string, records *dal.RecordSet) error
Update(collection string, records *dal.RecordSet, target ...string) error
Delete(collection string, ids ...interface{}) error
CreateCollection(definition *dal.Collection) error
DeleteCollection(collection string) error
ListCollections() ([]string, error)
GetCollection(collection string) (*dal.Collection, error)
WithSearch(collection *dal.Collection, filters ...*filter.Filter) Indexer
WithAggregator(collection *dal.Collection) Aggregator
Flush() error
Ping(time.Duration) error
String() string
Supports(feature ...BackendFeature) bool
}
var NotImplementedError = fmt.Errorf("Not Implemented")
type BackendFunc func(dal.ConnectionString) Backend
var backendMap = map[string]BackendFunc{
`dynamodb`: NewDynamoBackend,
`file`: NewFileBackend,
`fs`: NewFilesystemBackend,
`mongodb`: NewMongoBackend,
`mongo`: NewMongoBackend,
`mysql`: NewSqlBackend,
`postgres`: NewSqlBackend,
`postgresql`: NewSqlBackend,
`psql`: NewSqlBackend,
`sqlite`: NewSqlBackend,
`redis`: NewRedisBackend,
}
// Register a new or replacement backend for the given connection string scheme.
// For example, registering backend "foo" will allow Pivot to handle "foo://"
// connection strings.
func RegisterBackend(name string, fn BackendFunc) {
backendMap[name] = fn
}
func startPeriodicPinger(interval time.Duration, backend Backend) {
for {
if err := backend.Ping(AutopingTimeout); err != nil {
log.Warningf("%v: ping failed with error: %v", backend, err)
}
time.Sleep(interval)
}
}
// Instantiate the appropriate Backend for the given connection string.
func MakeBackend(connection dal.ConnectionString) (Backend, error) {
var autopingInterval time.Duration
backendName := connection.Backend()
log.Infof("Creating backend: %v", connection.String())
if fn, ok := backendMap[backendName]; ok {
if i := connection.OptDuration(`ping`, 0); i > 0 {
autopingInterval = i
}
connection.ClearOpt(`ping`)
if backend := fn(connection); backend != nil {
if autopingInterval > 0 {
go startPeriodicPinger(autopingInterval, backend)
}
return backend, nil
} else {
return nil, fmt.Errorf("Error occurred instantiating backend %q", backendName)
}
} else {
return nil, fmt.Errorf("Unknown backend type %q", backendName)
}
}