/
logic.go
executable file
·576 lines (514 loc) · 20.5 KB
/
logic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
package broker
import (
"context"
"encoding/json"
"github.com/golang/glog"
osb "github.com/pmorie/go-open-service-broker-client/v2"
"github.com/pmorie/osb-broker-lib/pkg/broker"
"strings"
)
type BusinessLogic struct {
ActionBase
storage Storage
namePrefix string
}
func NewBusinessLogic(ctx context.Context, o Options) (*BusinessLogic, error) {
storage, namePrefix, err := InitFromOptions(ctx, o)
if err != nil {
return nil, err
}
bl := BusinessLogic{
storage: storage,
namePrefix: namePrefix,
}
bl.AddActions("list_backups", "backups", "GET", bl.ActionListBackups)
bl.AddActions("get_backup", "backups/{backup}", "GET", bl.ActionGetBackup)
bl.AddActions("create_backup", "backups", "POST", bl.ActionCreateBackup)
bl.AddActions("restore_backup", "backups/{backup}", "PUT", bl.ActionRestoreBackup)
bl.AddActions("flush", "flush", "POST", bl.ActionFlushData)
bl.AddActions("stats", "stats", "POST", bl.ActionGetStats)
bl.AddActions("restart", "restart", "POST", bl.ActionRestart)
return &bl, nil
}
func (b *BusinessLogic) ActionRestoreBackup(InstanceID string, vars map[string]string, context *broker.RequestContext) (interface{}, error) {
instance, err := b.GetInstanceById(InstanceID)
if err != nil {
return nil, NotFound()
}
provider, err := GetProviderByPlan(b.namePrefix, instance.Plan)
if err != nil {
glog.Errorf("Unable to restore backups, cannot find provider (GetProviderByPlan failed): %s\n", err.Error())
return nil, InternalServerError()
}
_, err = provider.GetBackup(instance, vars["backup"])
if err != nil {
glog.Errorf("Unable to find backup to restore: %s: %s\n", vars["backup"], err.Error())
return nil, NotFound()
}
byteData, err := json.Marshal(RestoreTaskMetadata{Backup: vars["backup"]})
if err != nil {
glog.Errorf("Error: failed to marshal webhook task metadata: %s\n", err)
return nil, InternalServerError()
}
if _, err = b.storage.AddTask(instance.Id, RestoreTask, string(byteData)); err != nil {
glog.Errorf("Error: Unable to schedule restore backup! (%s): %s\n", instance.Name, err.Error())
return nil, InternalServerError()
}
return map[string]interface{}{"status": "OK"}, nil
}
func (b *BusinessLogic) ActionCreateBackup(InstanceID string, vars map[string]string, context *broker.RequestContext) (interface{}, error) {
instance, err := b.GetInstanceById(InstanceID)
if err != nil {
return nil, NotFound()
}
if !CanBeModified(instance.Status) {
return nil, UnprocessableEntityWithMessage("ServiceNotYetAvailable", "A backup cannot be created while this service is under maintenance.")
}
provider, err := GetProviderByPlan(b.namePrefix, instance.Plan)
if err != nil {
glog.Errorf("Unable to create backup, cannot find provider (GetProviderByPlan failed): %s\n", err.Error())
return nil, InternalServerError()
}
backup, err := provider.CreateBackup(instance)
if err != nil {
glog.Errorf("Unable to create backup, create backup failed: %s\n", err.Error())
return nil, InternalServerError()
}
return backup, nil
}
func (b *BusinessLogic) ActionListBackups(InstanceID string, vars map[string]string, context *broker.RequestContext) (interface{}, error) {
instance, err := b.GetInstanceById(InstanceID)
if err != nil {
return nil, NotFound()
}
provider, err := GetProviderByPlan(b.namePrefix, instance.Plan)
if err != nil {
glog.Errorf("Unable to list backups, cannot find provider (GetProviderByPlan failed): %s\n", err.Error())
return nil, InternalServerError()
}
backups, err := provider.ListBackups(instance)
if err != nil {
glog.Errorf("Unable to list backups, create backup failed: %s\n", err.Error())
return nil, InternalServerError()
}
return backups, nil
}
func (b *BusinessLogic) ActionGetBackup(InstanceID string, vars map[string]string, context *broker.RequestContext) (interface{}, error) {
instance, err := b.GetInstanceById(InstanceID)
if err != nil {
return nil, NotFound()
}
provider, err := GetProviderByPlan(b.namePrefix, instance.Plan)
if err != nil {
glog.Errorf("Unable to create backup, cannot find provider (GetProviderByPlan failed): %s\n", err.Error())
return nil, InternalServerError()
}
backup, err := provider.GetBackup(instance, vars["backup"])
if err != nil && err.Error() == "Not found" {
return nil, NotFound()
} else if err != nil {
glog.Errorf("Unable to get backup, get backup failed: %s\n", err.Error())
return nil, InternalServerError()
}
return backup, nil
}
func (b *BusinessLogic) GetCatalog(c *broker.RequestContext) (*broker.CatalogResponse, error) {
response := &broker.CatalogResponse{}
services, err := b.storage.GetServices()
if err != nil {
return nil, err
}
osbResponse := &osb.CatalogResponse{Services: services}
response.CatalogResponse = *osbResponse
return response, nil
}
func (b *BusinessLogic) ActionFlushData(InstanceID string, vars map[string]string, context *broker.RequestContext) (interface{}, error) {
Instance, err := b.GetInstanceById(InstanceID)
if err != nil {
return nil, NotFound()
}
provider, err := GetProviderByPlan(b.namePrefix, Instance.Plan)
if err != nil {
return nil, InternalServerError()
}
provider.Flush(Instance)
return map[string]string{"flush_all": "ok"}, nil
}
func (b *BusinessLogic) ActionGetStats(InstanceID string, vars map[string]string, context *broker.RequestContext) (interface{}, error) {
Instance, err := b.GetInstanceById(InstanceID)
if err != nil {
return nil, NotFound()
}
provider, err := GetProviderByPlan(b.namePrefix, Instance.Plan)
if err != nil {
return nil, InternalServerError()
}
result, err := provider.Stats(Instance)
if err != nil {
glog.Errorf("Unable to pull stats: %s\n", err.Error())
return nil, InternalServerError()
}
return map[string][]Stat{"stats": result}, nil
}
func (b *BusinessLogic) ActionRestart(InstanceID string, vars map[string]string, context *broker.RequestContext) (interface{}, error) {
Instance, err := b.GetInstanceById(InstanceID)
if err != nil {
return nil, NotFound()
}
provider, err := GetProviderByPlan(b.namePrefix, Instance.Plan)
if err != nil {
return nil, InternalServerError()
}
if err := provider.Restart(Instance); err != nil {
glog.Errorf("Unable to restart: %s\n", err.Error())
return nil, InternalServerError()
}
return map[string]string{"restart": "ok"}, nil
}
func GetInstanceById(namePrefix string, storage Storage, Id string) (*Instance, error) {
entry, err := storage.GetInstance(Id)
if err != nil {
return nil, err
}
plan, err := storage.GetPlanByID(entry.PlanId)
if err != nil {
return nil, err
}
provider, err := GetProviderByPlan(namePrefix, plan)
if err != nil {
return nil, err
}
Instance, err := provider.GetInstance(entry.Name, plan)
if err != nil {
return nil, err
}
Instance.Id = entry.Id
Instance.Username = entry.Username
Instance.Password = entry.Password
Instance.Plan = plan
return Instance, nil
}
func (b *BusinessLogic) GetInstanceById(Id string) (*Instance, error) {
return GetInstanceById(b.namePrefix, b.storage, Id)
}
func (b *BusinessLogic) GetUnclaimedInstance(PlanId string, InstanceId string) (*Instance, error) {
Entry, err := b.storage.GetUnclaimedInstance(PlanId, InstanceId)
if err != nil {
return nil, err
}
Instance, err := b.GetInstanceById(Entry.Id)
if err != nil {
if err = b.storage.ReturnClaimedInstance(Entry.Id); err != nil {
return nil, err
}
return nil, err
}
return Instance, nil
}
// A peice of advice, never try to make this syncronous by waiting for a to return a response. The problem is
// that can take up to 10 minutes in my experience (depending on the provider), and aside from the API call timing
// out the other issue is it can cause the mutex lock to make the entire API unresponsive.
func (b *BusinessLogic) Provision(request *osb.ProvisionRequest, c *broker.RequestContext) (*broker.ProvisionResponse, error) {
b.Lock()
defer b.Unlock()
response := broker.ProvisionResponse{}
if !request.AcceptsIncomplete {
return nil, UnprocessableEntityWithMessage("AsyncRequired", "The query parameter accepts_incomplete=true MUST be included the request.")
}
if request.InstanceID == "" {
return nil, UnprocessableEntityWithMessage("InstanceRequired", "The instance ID was not provided.")
}
// Ensure we are not trying to provision a UUID that has ever been used before.
if err := b.storage.ValidateInstanceID(request.InstanceID); err != nil {
return nil, UnprocessableEntityWithMessage("InstanceInvalid", "The instance ID was either already in-use or invalid.")
}
plan, err := b.storage.GetPlanByID(request.PlanID)
if err != nil && err.Error() == "Not found" {
return nil, NotFound()
} else if err != nil {
glog.Errorf("Unable to provision (GetPlanByID failed): %s\n", err.Error())
return nil, InternalServerError()
}
Instance, err := b.GetInstanceById(request.InstanceID)
if err == nil {
if Instance.Plan.ID != request.PlanID {
return nil, ConflictErrorWithMessage("InstanceID in use")
}
response.Exists = true
} else if err != nil && err.Error() == "Cannot find resource instance" {
response.Exists = false
Instance, err = b.GetUnclaimedInstance(request.PlanID, request.InstanceID)
if err != nil && err.Error() == "Cannot find resource instance" {
// Create a new one
provider, err := GetProviderByPlan(b.namePrefix, plan)
if err != nil {
glog.Errorf("Unable to provision, cannot find provider (GetProviderByPlan failed): %s\n", err.Error())
return nil, InternalServerError()
}
Instance, err = provider.Provision(request.InstanceID, plan, request.OrganizationGUID)
if err != nil {
glog.Errorf("Error provisioning resource: %s\n", err.Error())
return nil, InternalServerError()
}
if err = b.storage.AddInstance(Instance); err != nil {
glog.Errorf("Error inserting record into provisioned table: %s\n", err.Error())
if err = provider.Deprovision(Instance, false); err != nil {
glog.Errorf("Error cleaning up (deprovision failed) after insert record failed but provision succeeded (Resource Id:%s Name: %s) %s\n", Instance.Id, Instance.Name, err.Error())
if _, err = b.storage.AddTask(Instance.Id, DeleteTask, Instance.Name); err != nil {
glog.Errorf("Error: Unable to add task to delete instance, WE HAVE AN ORPHAN! (%s): %s\n", Instance.Name, err.Error())
}
}
return nil, InternalServerError()
}
if !IsAvailable(Instance.Status) {
if _, err = b.storage.AddTask(Instance.Id, PerformPostProvisionTask, ""); err != nil {
glog.Errorf("Error: Unable to schedule resync from provider! (%s): %s\n", Instance.Name, err.Error())
}
// This is a hack to support callbacks, hopefully this will become an OSB standard.
if c != nil && c.Request != nil && c.Request.URL != nil && c.Request.URL.Query().Get("webhook") != "" && c.Request.URL.Query().Get("secret") != "" {
// Schedule a callback
byteData, err := json.Marshal(WebhookTaskMetadata{Url: c.Request.URL.Query().Get("webhook"), Secret: c.Request.URL.Query().Get("secret")})
if err != nil {
glog.Errorf("Error: failed to marshal webhook task metadata: %s\n", err)
}
if _, err = b.storage.AddTask(Instance.Id, NotifyCreateServiceWebhookTask, string(byteData)); err != nil {
glog.Errorf("Error: Unable to schedule resync from provider! (%s): %s\n", Instance.Name, err.Error())
}
}
}
} else if err != nil {
glog.Errorf("Got fatal error from unclaimed instance endpoint: %s\n", err.Error())
return nil, InternalServerError()
}
} else {
glog.Errorf("Unable to get instances: %s\n", err.Error())
return nil, InternalServerError()
}
if request.AcceptsIncomplete && Instance.Ready == false {
opkey := osb.OperationKey(request.InstanceID)
response.Async = !Instance.Ready
response.OperationKey = &opkey
} else if request.AcceptsIncomplete && Instance.Ready == true {
response.Async = false
}
response.ExtensionAPIs = b.ConvertActionsToExtensions(Instance.Id)
return &response, nil
}
func (b *BusinessLogic) Deprovision(request *osb.DeprovisionRequest, c *broker.RequestContext) (*broker.DeprovisionResponse, error) {
b.Lock()
defer b.Unlock()
response := broker.DeprovisionResponse{}
Instance, err := b.GetInstanceById(request.InstanceID)
if err != nil && err.Error() == "Cannot find resource instance" {
return nil, NotFound()
} else if err != nil {
glog.Errorf("Error finding instance id (during deprovision) from provisioned table: %s\n", err.Error())
return nil, InternalServerError()
}
provider, err := GetProviderByPlan(b.namePrefix, Instance.Plan)
if err != nil {
glog.Errorf("Unable to provision, cannot find provider (GetProviderByPlan failed): %s\n", err.Error())
return nil, InternalServerError()
}
if err = provider.Deprovision(Instance, true); err != nil {
glog.Errorf("Error failed to deprovision: (Id: %s Name: %s) %s\n", Instance.Id, Instance.Name, err.Error())
if _, err = b.storage.AddTask(Instance.Id, DeleteTask, Instance.Name); err != nil {
glog.Errorf("Error: Unable to schedule delete from provider! (%s): %s\n", Instance.Name, err.Error())
return nil, InternalServerError()
} else {
glog.Errorf("Successfully scheduled db to be removed.")
response.Async = true
return &response, nil
}
}
if err = b.storage.DeleteInstance(Instance); err != nil {
glog.Errorf("Error removing record from provisioned table: %s\n", err.Error())
return nil, InternalServerError()
}
response.Async = false
return &response, nil
}
func (b *BusinessLogic) Update(request *osb.UpdateInstanceRequest, c *broker.RequestContext) (*broker.UpdateInstanceResponse, error) {
response := broker.UpdateInstanceResponse{}
if !request.AcceptsIncomplete {
return nil, UnprocessableEntity()
}
Instance, err := b.GetInstanceById(request.InstanceID)
if err != nil && err.Error() == "Cannot find resource instance" {
return nil, NotFound()
} else if err != nil {
glog.Errorf("Error finding instance id (during deprovision) from provisioned table: %s\n", err.Error())
return nil, InternalServerError()
}
if request.PlanID == nil {
return nil, UnprocessableEntity()
}
if !IsAvailable(Instance.Status) {
return nil, UnprocessableEntityWithMessage("ConcurrencyError", "Clients MUST wait until pending requests have completed for the specified resources.")
}
if strings.ToLower(*request.PlanID) == strings.ToLower(Instance.Plan.ID) {
return nil, UnprocessableEntityWithMessage("UpgradeError", "Cannot upgrade to the same plan.")
}
target_plan, err := b.storage.GetPlanByID(*request.PlanID)
if err != nil {
glog.Errorf("Unable to provision resource (GetPlanByID failed): %s\n", err.Error())
return nil, err
}
if (Instance.Plan.Provider == target_plan.Provider) || (Instance.Plan.Provider != target_plan.Provider && Instance.Engine == "memcached") {
byteData, err := json.Marshal(ChangePlansTaskMetadata{Plan: *request.PlanID})
if err != nil {
glog.Errorf("Unable to marshal change plans task meta data: %s\n", err.Error())
return nil, err
}
if _, err = b.storage.AddTask(Instance.Id, ChangePlansTask, string(byteData)); err != nil {
glog.Errorf("Error: Unable to schedule upgrade of a plan! (%s): %s\n", Instance.Name, err.Error())
return nil, err
}
response.Async = true
return &response, nil
} else {
return nil, UnprocessableEntityWithMessage("UpgradeError", "Cannot upgrade or change redis plans across provider types.")
}
}
func (b *BusinessLogic) LastOperation(request *osb.LastOperationRequest, c *broker.RequestContext) (*broker.LastOperationResponse, error) {
response := broker.LastOperationResponse{}
upgrading, err := b.storage.IsUpgrading(request.InstanceID)
if err != nil {
glog.Errorf("Unable to get resource (%s) status, IsUpgrading failed: %s\n", request.InstanceID, err.Error())
return nil, InternalServerError()
}
restoring, err := b.storage.IsRestoring(request.InstanceID)
if err != nil {
glog.Errorf("Unable to get resource (%s) status, IsRestoring failed: %s\n", request.InstanceID, err.Error())
return nil, InternalServerError()
}
if upgrading {
desc := "upgrading"
Instance, err := b.GetInstanceById(request.InstanceID)
if err == nil && !IsAvailable(Instance.Status) {
desc = Instance.Status
}
response.Description = &desc
response.State = osb.StateInProgress
return &response, nil
} else if restoring {
desc := "restoring"
Instance, err := b.GetInstanceById(request.InstanceID)
if err == nil && !IsAvailable(Instance.Status) {
desc = Instance.Status
}
response.Description = &desc
response.State = osb.StateInProgress
return &response, nil
}
Instance, err := b.GetInstanceById(request.InstanceID)
if err != nil && err.Error() == "Cannot find resource instance" {
return nil, NotFound()
} else if err != nil {
glog.Errorf("Unable to get resource (%s) status: %s\n", request.InstanceID, err.Error())
return nil, InternalServerError()
}
b.storage.UpdateInstance(Instance, Instance.Plan.ID)
if Instance.Ready == true {
response.Description = &Instance.Status
response.State = osb.StateSucceeded
} else if InProgress(Instance.Status) {
response.Description = &Instance.Status
response.State = osb.StateInProgress
} else {
response.Description = &Instance.Status
response.State = osb.StateFailed
}
return &response, nil
}
func (b *BusinessLogic) Bind(request *osb.BindRequest, c *broker.RequestContext) (*broker.BindResponse, error) {
b.Lock()
defer b.Unlock()
Instance, err := b.GetInstanceById(request.InstanceID)
if err != nil && err.Error() == "Cannot find resource instance" {
return nil, NotFound()
} else if err != nil {
glog.Errorf("Error finding instance id (during getbinding): %s\n", err.Error())
return nil, InternalServerError()
}
if Instance.Ready == false {
return nil, UnprocessableEntity()
}
provider, err := GetProviderByPlan(b.namePrefix, Instance.Plan)
if err != nil {
glog.Errorf("Unable to provision, cannot find provider (GetProviderByPlan failed): %s\n", err.Error())
return nil, InternalServerError()
}
if request.BindResource != nil && request.BindResource.AppGUID != nil {
if err = provider.Tag(Instance, "Binding", request.BindingID); err != nil {
glog.Errorf("Error tagging: %s with %s, got %s\n", request.InstanceID, *request.BindResource.AppGUID, err.Error())
return nil, InternalServerError()
}
if err = provider.Tag(Instance, "App", *request.BindResource.AppGUID); err != nil {
glog.Errorf("Error tagging: %s with %s, got %s\n", request.InstanceID, *request.BindResource.AppGUID, err.Error())
return nil, InternalServerError()
}
}
return &broker.BindResponse{
BindResponse: osb.BindResponse{
Async: false,
Credentials: provider.GetUrl(Instance),
},
}, nil
}
func (b *BusinessLogic) Unbind(request *osb.UnbindRequest, c *broker.RequestContext) (*broker.UnbindResponse, error) {
b.Lock()
defer b.Unlock()
Instance, err := b.GetInstanceById(request.InstanceID)
if err != nil && err.Error() == "Cannot find resource instance" {
return nil, NotFound()
} else if err != nil {
glog.Errorf("Error finding instance id (during getbinding): %s\n", err.Error())
return nil, InternalServerError()
}
if Instance.Ready == false {
return nil, UnprocessableEntity()
}
provider, err := GetProviderByPlan(b.namePrefix, Instance.Plan)
if err != nil {
glog.Errorf("Unable to provision, cannot find provider (GetProviderByPlan failed): %s\n", err.Error())
return nil, InternalServerError()
}
if err = provider.Untag(Instance, "Binding"); err != nil {
glog.Errorf("Error untagging: %s\n", err.Error())
return nil, InternalServerError()
}
if err = provider.Untag(Instance, "App"); err != nil {
glog.Errorf("Error untagging: got %s\n", err.Error())
return nil, InternalServerError()
}
return &broker.UnbindResponse{
UnbindResponse: osb.UnbindResponse{
Async: false,
},
}, nil
}
func (b *BusinessLogic) ValidateBrokerAPIVersion(version string) error {
return nil
}
func (b *BusinessLogic) GetBinding(request *osb.GetBindingRequest, context *broker.RequestContext) (*osb.GetBindingResponse, error) {
Instance, err := b.GetInstanceById(request.InstanceID)
if err == nil && !CanGetBindings(Instance.Status) {
return nil, UnprocessableEntityWithMessage("ServiceNotYetAvailable", "The service requested is not yet available.")
}
if err != nil && err.Error() == "Cannot find resource instance" {
return nil, NotFound()
} else if err != nil {
glog.Errorf("Error finding instance id (during getbinding): %s\n", err.Error())
return nil, err
}
provider, err := GetProviderByPlan(b.namePrefix, Instance.Plan)
if err != nil {
glog.Errorf("Unable to provision, cannot find provider (GetProviderByPlan failed): %s\n", err.Error())
return nil, InternalServerError()
}
return &osb.GetBindingResponse{
Credentials: provider.GetUrl(Instance),
}, nil
}
var _ broker.Interface = &BusinessLogic{}