forked from eremetic-framework/eremetic
/
match.go
114 lines (94 loc) · 2.49 KB
/
match.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package mesos
import (
"errors"
"fmt"
"github.com/Sirupsen/logrus"
ogle "github.com/jacobsa/oglematchers"
"github.com/mesos/mesos-go/api/v0/mesosproto"
"github.com/klarna/eremetic"
)
type resourceMatcher struct {
name string
value float64
}
type attributeMatcher struct {
constraint eremetic.SlaveConstraint
}
func (m *resourceMatcher) Matches(o interface{}) error {
offer := o.(*mesosproto.Offer)
err := errors.New("")
for _, res := range offer.Resources {
if res.GetName() == m.name {
if res.GetType() != mesosproto.Value_SCALAR {
return err
}
if res.Scalar.GetValue() >= m.value {
return nil
}
return err
}
}
return err
}
func (m *resourceMatcher) Description() string {
return fmt.Sprintf("%f of scalar resource %s", m.value, m.name)
}
func cpuAvailable(v float64) ogle.Matcher {
return &resourceMatcher{"cpus", v}
}
func memoryAvailable(v float64) ogle.Matcher {
return &resourceMatcher{"mem", v}
}
func (m *attributeMatcher) Matches(o interface{}) error {
offer := o.(*mesosproto.Offer)
for _, attr := range offer.Attributes {
if attr.GetName() == m.constraint.AttributeName {
if attr.GetType() != mesosproto.Value_TEXT ||
attr.Text.GetValue() != m.constraint.AttributeValue {
return errors.New("")
}
return nil
}
}
return errors.New("")
}
func (m *attributeMatcher) Description() string {
return fmt.Sprintf("slave attribute constraint %s=%s",
m.constraint.AttributeName,
m.constraint.AttributeValue,
)
}
func attributeMatch(slaveConstraints []eremetic.SlaveConstraint) ogle.Matcher {
var submatchers []ogle.Matcher
for _, constraint := range slaveConstraints {
submatchers = append(submatchers, &attributeMatcher{constraint})
}
return ogle.AllOf(submatchers...)
}
func createMatcher(task eremetic.Task) ogle.Matcher {
return ogle.AllOf(
cpuAvailable(task.TaskCPUs),
memoryAvailable(task.TaskMem),
attributeMatch(task.SlaveConstraints),
)
}
func matches(matcher ogle.Matcher, o interface{}) bool {
err := matcher.Matches(o)
return err == nil
}
func matchOffer(task eremetic.Task, offers []*mesosproto.Offer) (*mesosproto.Offer, []*mesosproto.Offer) {
var matcher = createMatcher(task)
for i, off := range offers {
if matches(matcher, off) {
offers[i] = offers[len(offers)-1]
offers = offers[:len(offers)-1]
return off, offers
}
logrus.WithFields(logrus.Fields{
"offer_id": off.Id.GetValue(),
"matcher": matcher.Description(),
"task_id": task.ID,
}).Debug("Unable to match offer")
}
return nil, offers
}