/
provider_actor.go
130 lines (118 loc) · 3.47 KB
/
provider_actor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package consul
import (
"fmt"
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/cluster"
"github.com/AsynkronIT/protoactor-go/log"
"github.com/AsynkronIT/protoactor-go/scheduler"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
)
type providerActor struct {
*Provider
actor.Behavior
refreshCanceller scheduler.CancelFunc
}
type RegisterService struct{}
type UpdateTTL struct{}
type MemberListUpdated struct {
members []*cluster.Member
index uint64
}
func (pa *providerActor) Receive(ctx actor.Context) {
pa.Behavior.Receive(ctx)
}
func newProviderActor(provider *Provider) actor.Actor {
pa := &providerActor{
Behavior: actor.NewBehavior(),
Provider: provider,
}
pa.Become(pa.init)
return pa
}
func (pa *providerActor) init(ctx actor.Context) {
switch ctx.Message().(type) {
case *actor.Started:
ctx.Send(ctx.Self(), &RegisterService{})
case *RegisterService:
if err := pa.registerService(); err != nil {
plog.Error("Failed to register service to consul, will retry", log.Error(err))
ctx.Send(ctx.Self(), &RegisterService{})
} else {
plog.Info("Registered service to consul")
refreshScheduler := scheduler.NewTimerScheduler(ctx)
pa.refreshCanceller = refreshScheduler.SendRepeatedly(0, pa.refreshTTL, ctx.Self(), &UpdateTTL{})
if err := pa.startWatch(ctx); err == nil {
pa.Become(pa.running)
}
}
}
}
func (pa *providerActor) running(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *UpdateTTL:
if err := blockingUpdateTTL(pa.Provider); err != nil {
plog.Warn("Failed to update TTL", log.Error(err))
}
case *MemberListUpdated:
pa.cluster.MemberList.UpdateClusterTopology(msg.members, msg.index)
case *actor.Stopping:
pa.refreshCanceller()
if err := pa.deregisterService(); err != nil {
plog.Error("Failed to deregister service from consul", log.Error(err))
} else {
plog.Info("De-registered service from consul")
}
}
}
func (pa *providerActor) startWatch(ctx actor.Context) error {
params := make(map[string]interface{})
params["type"] = "service"
params["service"] = pa.clusterName
params["passingonly"] = false
plan, err := watch.Parse(params)
if err != nil {
plog.Error("Failed to parse consul watch definition", log.Error(err))
return err
}
plan.Handler = func(index uint64, result interface{}) {
pa.processConsulUpdate(index, result, ctx)
}
go func() {
if err = plan.Run(pa.consulServerAddress); err != nil {
plog.Error("Failed to start consul watch", log.Error(err))
panic(err)
}
}()
return nil
}
func (pa *providerActor) processConsulUpdate(index uint64, result interface{}, ctx actor.Context) {
serviceEntries, ok := result.([]*api.ServiceEntry)
if !ok {
plog.Warn("Didn't get expected data from consul watch")
return
}
var members []*cluster.Member
for _, v := range serviceEntries {
if len(v.Checks) > 0 && v.Checks.AggregatedStatus() == api.HealthPassing {
memberId := v.Service.Meta["id"]
if memberId == "" {
memberId = fmt.Sprintf("%v@%v:%v", pa.clusterName, v.Service.Address, v.Service.Port)
plog.Info("meta['id'] was empty, fixed", log.String("id", memberId))
}
members = append(members, &cluster.Member{
Id: memberId,
Host: v.Service.Address,
Port: int32(v.Service.Port),
Kinds: v.Service.Tags,
})
}
}
// delay the fist update until there is at least one member
if len(members) > 0 {
ctx.Send(ctx.Self(), &MemberListUpdated{
members: members,
index: index,
})
}
}