forked from cloudfoundry/auctioneer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stop_auction.go
95 lines (77 loc) · 2.63 KB
/
stop_auction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package stop_auction_bbs
import (
"fmt"
"github.com/cloudfoundry-incubator/runtime-schema/bbs/shared"
"github.com/cloudfoundry-incubator/runtime-schema/models"
"github.com/cloudfoundry/storeadapter"
)
func (bbs *StopAuctionBBS) RequestLRPStopAuction(lrp models.LRPStopAuction) error {
return shared.RetryIndefinitelyOnStoreTimeout(func() error {
lrp.State = models.LRPStopAuctionStatePending
lrp.UpdatedAt = bbs.timeProvider.Time().UnixNano()
return bbs.store.Create(storeadapter.StoreNode{
Key: shared.LRPStopAuctionSchemaPath(lrp),
Value: lrp.ToJSON(),
})
})
}
func (bbs *StopAuctionBBS) ClaimLRPStopAuction(lrp models.LRPStopAuction) error {
originalValue := lrp.ToJSON()
lrp.State = models.LRPStopAuctionStateClaimed
lrp.UpdatedAt = bbs.timeProvider.Time().UnixNano()
changedValue := lrp.ToJSON()
return shared.RetryIndefinitelyOnStoreTimeout(func() error {
return bbs.store.CompareAndSwap(storeadapter.StoreNode{
Key: shared.LRPStopAuctionSchemaPath(lrp),
Value: originalValue,
}, storeadapter.StoreNode{
Key: shared.LRPStopAuctionSchemaPath(lrp),
Value: changedValue,
})
})
}
func (s *StopAuctionBBS) ResolveLRPStopAuction(lrp models.LRPStopAuction) error {
err := shared.RetryIndefinitelyOnStoreTimeout(func() error {
return s.store.Delete(shared.LRPStopAuctionSchemaPath(lrp))
})
return err
}
func (bbs *StopAuctionBBS) GetAllLRPStopAuctions() ([]models.LRPStopAuction, error) {
lrps := []models.LRPStopAuction{}
node, err := bbs.store.ListRecursively(shared.LRPStopAuctionSchemaRoot)
if err == storeadapter.ErrorKeyNotFound {
return lrps, nil
}
if err != nil {
return lrps, err
}
for _, node := range node.ChildNodes {
for _, node := range node.ChildNodes {
lrp, err := models.NewLRPStopAuctionFromJSON(node.Value)
if err != nil {
return lrps, fmt.Errorf("cannot parse lrp JSON for key %s: %s", node.Key, err.Error())
} else {
lrps = append(lrps, lrp)
}
}
}
return lrps, nil
}
func (bbs *StopAuctionBBS) WatchForLRPStopAuction() (<-chan models.LRPStopAuction, chan<- bool, <-chan error) {
lrps := make(chan models.LRPStopAuction)
filter := func(event storeadapter.WatchEvent) (models.LRPStopAuction, bool) {
switch event.Type {
case storeadapter.CreateEvent, storeadapter.UpdateEvent:
lrp, err := models.NewLRPStopAuctionFromJSON(event.Node.Value)
if err != nil {
return models.LRPStopAuction{}, false
}
if lrp.State == models.LRPStopAuctionStatePending {
return lrp, true
}
}
return models.LRPStopAuction{}, false
}
stop, errs := shared.WatchWithFilter(bbs.store, shared.LRPStopAuctionSchemaRoot, lrps, filter)
return lrps, stop, errs
}