-
Notifications
You must be signed in to change notification settings - Fork 291
/
resource_collect.go
112 lines (91 loc) · 2.57 KB
/
resource_collect.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package master
import (
"time"
"github.com/chrislusf/gleam/pb"
)
func (tp *Topology) UpdateAgentInformation(ai *pb.Heartbeat) {
dc, hasDc := tp.GetDataCenter(ai.Location.DataCenter)
if !hasDc {
dc = NewDataCenter(ai.Location.DataCenter)
tp.AddDataCenter(dc)
}
rack, hasRack := dc.GetRack(ai.Location.Rack)
if !hasRack {
rack = NewRack(ai.Location.Rack)
dc.AddRack(rack)
}
oldInfo, hasOldInfo := rack.GetAgent(ai.Location.URL())
deltaResource := *ai.Resource
// fmt.Printf("hasOldInfo %+v, oldInfo %+v\n", hasOldInfo, oldInfo)
if hasOldInfo {
deltaResource = deltaResource.Minus(oldInfo.Resource)
if !deltaResource.IsZero() {
oldInfo.Resource = *ai.Resource
}
oldInfo.LastHeartBeat = time.Now()
} else {
rack.AddAgent(&AgentInformation{
Location: *ai.Location,
LastHeartBeat: time.Now(),
Resource: *ai.Resource,
Allocated: *ai.Allocated,
})
}
tp.Lock()
defer tp.Unlock()
if !deltaResource.IsZero() {
rack.Resource = rack.Resource.Plus(deltaResource)
dc.Resource = dc.Resource.Plus(deltaResource)
tp.Resource = tp.Resource.Plus(deltaResource)
}
if hasOldInfo {
deltaAllocated := ai.Allocated.Minus(oldInfo.Allocated)
oldInfo.Allocated = *ai.Allocated
// fmt.Printf("deltaAllocated %+v\n", deltaAllocated)
if !deltaAllocated.IsZero() {
rack.Allocated = rack.Allocated.Plus(deltaAllocated)
dc.Allocated = dc.Allocated.Plus(deltaAllocated)
tp.Allocated = tp.Allocated.Plus(deltaAllocated)
}
}
}
func (tp *Topology) deleteAgentInformation(location *pb.Location) {
dc, hasDc := tp.GetDataCenter(location.DataCenter)
if !hasDc {
return
}
rack, hasRack := dc.GetRack(location.Rack)
if !hasRack {
return
}
oldInfo, hasOldInfo := rack.GetAgent(location.URL())
if !hasOldInfo {
return
}
tp.Lock()
defer tp.Unlock()
deltaResource := oldInfo.Resource
deltaAllocated := oldInfo.Allocated
if !deltaResource.IsZero() {
// fmt.Printf("deleting %+v\n", oldInfo)
rack.DropAgent(location)
rack.Resource = rack.Resource.Minus(deltaResource)
rack.Allocated = rack.Allocated.Minus(deltaAllocated)
dc.Resource = dc.Resource.Minus(deltaResource)
dc.Allocated = dc.Allocated.Minus(deltaAllocated)
tp.Resource = tp.Resource.Minus(deltaResource)
tp.Allocated = tp.Allocated.Minus(deltaAllocated)
}
}
func (tp *Topology) findAgentInformation(location *pb.Location) (*AgentInformation, bool) {
d, hasDc := tp.GetDataCenter(location.DataCenter)
if !hasDc {
return nil, false
}
r, hasRack := d.GetRack(location.Rack)
if !hasRack {
return nil, false
}
ai, ok := r.GetAgent(location.URL())
return ai, ok
}