-
Notifications
You must be signed in to change notification settings - Fork 405
/
data_access.go
191 lines (158 loc) · 5.76 KB
/
data_access.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package agent
import (
"context"
"io"
"time"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/common/singleflight"
"github.com/fnproject/fn/api/models"
"github.com/patrickmn/go-cache"
)
//ReadDataAccess represents read operations required to operate a load balancer node
type ReadDataAccess interface {
GetAppID(ctx context.Context, appName string) (string, error)
// GetAppByID abstracts querying the datastore for an app.
GetAppByID(ctx context.Context, appID string) (*models.App, error)
GetTriggerBySource(ctx context.Context, appId string, triggerType, source string) (*models.Trigger, error)
GetFnByID(ctx context.Context, fnId string) (*models.Fn, error)
}
//DequeueDataAccess abstracts an underlying dequeue for async runners
type DequeueDataAccess interface {
// Dequeue will query the queue for the next available Call that can be run
// by this Agent, and reserve it (ultimately forwards to mq.Reserve).
Dequeue(ctx context.Context) (*models.Call, error)
}
//EnqueueDataAccess abstracts an underying enqueue for async queueing
type EnqueueDataAccess interface {
// Enqueue will add a Call to the queue (ultimately forwards to mq.Push).
Enqueue(ctx context.Context, mCall *models.Call) error
}
// CallHandler consumes the start and finish events for a call
// This is effectively a callback that is allowed to read the logs -
// TODO Deprecate this - this could be a CallListener except it also consumes logs
type CallHandler interface {
// Start will attempt to start the provided Call within an appropriate
// context.
Start(ctx context.Context, mCall *models.Call) error
// Finish will notify the system that the Call has been processed, and
// fulfill the reservation in the queue if the call came from a queue.
Finish(ctx context.Context, mCall *models.Call, stderr io.Reader, async bool) error
}
// DataAccess is currently
type DataAccess interface {
ReadDataAccess
DequeueDataAccess
CallHandler
}
// CachedDataAccess wraps a DataAccess and caches the results of GetApp.
type cachedDataAccess struct {
ReadDataAccess
cache *cache.Cache
singleflight singleflight.SingleFlight
}
func NewCachedDataAccess(da ReadDataAccess) ReadDataAccess {
cda := &cachedDataAccess{
ReadDataAccess: da,
cache: cache.New(5*time.Second, 1*time.Minute),
}
return cda
}
func appIDCacheKey(appID string) string {
return "a:" + appID
}
func (da *cachedDataAccess) GetAppID(ctx context.Context, appName string) (string, error) {
return da.ReadDataAccess.GetAppID(ctx, appName)
}
func (da *cachedDataAccess) GetAppByID(ctx context.Context, appID string) (*models.App, error) {
key := appIDCacheKey(appID)
app, ok := da.cache.Get(key)
if ok {
return app.(*models.App), nil
}
resp, err := da.singleflight.Do(key,
func() (interface{}, error) {
return da.ReadDataAccess.GetAppByID(ctx, appID)
})
if err != nil {
return nil, err
}
app = resp.(*models.App)
da.cache.Set(key, app, cache.DefaultExpiration)
return app.(*models.App), nil
}
type directDataAccess struct {
mq models.MessageQueue
ls models.LogStore
}
type directDequeue struct {
mq models.MessageQueue
}
func (ddq *directDequeue) Dequeue(ctx context.Context) (*models.Call, error) {
return ddq.mq.Reserve(ctx)
}
func NewDirectDequeueAccess(mq models.MessageQueue) DequeueDataAccess {
return &directDequeue{
mq: mq,
}
}
type directEnequeue struct {
mq models.MessageQueue
}
func NewDirectEnqueueAccess(mq models.MessageQueue) EnqueueDataAccess {
return &directEnequeue{
mq: mq,
}
}
func (da *directEnequeue) Enqueue(ctx context.Context, mCall *models.Call) error {
_, err := da.mq.Push(ctx, mCall)
return err
// TODO: Insert a call in the datastore with the 'queued' state
}
func NewDirectCallDataAccess(ls models.LogStore, mq models.MessageQueue) CallHandler {
da := &directDataAccess{
mq: mq,
ls: ls,
}
return da
}
func (da *directDataAccess) Enqueue(ctx context.Context, mCall *models.Call) error {
_, err := da.mq.Push(ctx, mCall)
return err
// TODO: Insert a call in the datastore with the 'queued' state
}
func (da *directDataAccess) Start(ctx context.Context, mCall *models.Call) error {
// TODO Access datastore and try a Compare-And-Swap to set the call to
// 'running'. If it fails, delete the message from the MQ and return an
// error. If it is successful, don't do anything - the message will be
// removed when the call Finish'es.
// At the moment we don't have the queued/running/finished mechanics so we
// remove the message here.
return da.mq.Delete(ctx, mCall)
}
func (da *directDataAccess) Finish(ctx context.Context, mCall *models.Call, stderr io.Reader, async bool) error {
// this means that we could potentially store an error / timeout status for a
// call that ran successfully [by a user's perspective]
// TODO: this should be update, really
if err := da.ls.InsertCall(ctx, mCall); err != nil {
common.Logger(ctx).WithError(err).Error("error inserting call into datastore")
// note: Not returning err here since the job could have already finished successfully.
}
if err := da.ls.InsertLog(ctx, mCall, stderr); err != nil {
common.Logger(ctx).WithError(err).Error("error uploading log")
// note: Not returning err here since the job could have already finished successfully.
}
if async {
// XXX (reed): delete MQ message, eventually
// YYY (hhexo): yes, once we have the queued/running/finished mechanics
// return cda.mq.Delete(ctx, mCall)
}
return nil
}
type noAsyncEnqueueAccess struct{}
func (noAsyncEnqueueAccess) Enqueue(ctx context.Context, mCall *models.Call) error {
return models.ErrAsyncUnsupported
}
//NewUnsupportedEnqueueAccess is a backstop that errors when you try to enqueue an async operation on a server that doesn't support async
func NewUnsupportedAsyncEnqueueAccess() EnqueueDataAccess {
return &noAsyncEnqueueAccess{}
}