/
auction_runner.go
136 lines (117 loc) · 4.28 KB
/
auction_runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package auctionrunner
import (
"os"
"time"
"code.cloudfoundry.org/bbs/trace"
"code.cloudfoundry.org/clock"
"code.cloudfoundry.org/lager/v3"
"code.cloudfoundry.org/auction/auctiontypes"
"code.cloudfoundry.org/auctioneer"
"code.cloudfoundry.org/workpool"
)
type auctionRunner struct {
logger lager.Logger
delegate auctiontypes.AuctionRunnerDelegate
metricEmitter auctiontypes.AuctionMetricEmitterDelegate
batch *Batch
clock clock.Clock
workPool *workpool.WorkPool
binPackFirstFitWeight float64
startingContainerWeight float64
startingContainerCountMaximum int
}
func New(
logger lager.Logger,
delegate auctiontypes.AuctionRunnerDelegate,
metricEmitter auctiontypes.AuctionMetricEmitterDelegate,
clock clock.Clock,
workPool *workpool.WorkPool,
binPackFirstFitWeight float64,
startingContainerWeight float64,
startingContainerCountMaximum int,
) *auctionRunner {
return &auctionRunner{
logger: logger,
delegate: delegate,
metricEmitter: metricEmitter,
batch: NewBatch(clock),
clock: clock,
workPool: workPool,
binPackFirstFitWeight: binPackFirstFitWeight,
startingContainerWeight: startingContainerWeight,
startingContainerCountMaximum: startingContainerCountMaximum,
}
}
func (a *auctionRunner) Run(signals <-chan os.Signal, ready chan<- struct{}) error {
close(ready)
var hasWork chan Work
hasWork = a.batch.HasWork
for {
select {
case work := <-hasWork:
logger := trace.LoggerWithTraceInfo(a.logger, work.TraceID).Session("auction")
logger.Info("fetching-cell-reps")
clients, err := a.delegate.FetchCellReps(logger, work.TraceID)
if err != nil {
logger.Error("failed-to-fetch-reps", err)
time.Sleep(time.Second)
hasWork = make(chan Work, 1)
hasWork <- work
break
}
logger.Info("fetched-cell-reps", lager.Data{"cell-reps-count": len(clients)})
hasWork = a.batch.HasWork
logger.Info("fetching-zone-state")
fetchStatesStartTime := time.Now()
zones := FetchStateAndBuildZones(logger, a.workPool, clients, a.metricEmitter, a.binPackFirstFitWeight)
fetchStateDuration := time.Since(fetchStatesStartTime)
err = a.metricEmitter.FetchStatesCompleted(fetchStateDuration)
if err != nil {
logger.Error("failed-sending-fetch-states-completed-metric", err)
}
cellCount := 0
for zone, cells := range zones {
logger.Info("zone-state", lager.Data{"zone": zone, "cell-count": len(cells)})
cellCount += len(cells)
}
logger.Info("fetched-zone-state", lager.Data{
"cell-state-count": cellCount,
"num-failed-requests": len(clients) - cellCount,
"duration": fetchStateDuration.String(),
})
logger.Info("fetching-auctions")
lrpAuctions, taskAuctions := a.batch.DedupeAndDrain()
logger.Info("fetched-auctions", lager.Data{
"lrp-start-auctions": len(lrpAuctions),
"task-auctions": len(taskAuctions),
})
if len(lrpAuctions) == 0 && len(taskAuctions) == 0 {
logger.Info("nothing-to-auction")
break
}
logger.Info("scheduling")
auctionRequest := auctiontypes.AuctionRequest{
LRPs: lrpAuctions,
Tasks: taskAuctions,
}
scheduler := NewScheduler(a.workPool, zones, a.clock, logger, a.binPackFirstFitWeight, a.startingContainerWeight, a.startingContainerCountMaximum)
auctionResults := scheduler.Schedule(auctionRequest)
logger.Info("scheduled", lager.Data{
"successful-lrp-start-auctions": len(auctionResults.SuccessfulLRPs),
"successful-task-auctions": len(auctionResults.SuccessfulTasks),
"failed-lrp-start-auctions": len(auctionResults.FailedLRPs),
"failed-task-auctions": len(auctionResults.FailedTasks),
})
a.metricEmitter.AuctionCompleted(auctionResults)
a.delegate.AuctionCompleted(logger, work.TraceID, auctionResults)
case <-signals:
return nil
}
}
}
func (a *auctionRunner) ScheduleLRPsForAuctions(lrpStarts []auctioneer.LRPStartRequest, traceID string) {
a.batch.AddLRPStarts(lrpStarts, traceID)
}
func (a *auctionRunner) ScheduleTasksForAuctions(tasks []auctioneer.TaskStartRequest, traceID string) {
a.batch.AddTasks(tasks, traceID)
}