/
endpoint_writer.go
129 lines (112 loc) · 3.22 KB
/
endpoint_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package remote
import (
"github.com/hei6775/light-protoactor-go/actor"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
func newEndpointWriter(address string, config *remoteConfig) actor.Producer {
return func() actor.Actor {
return &endpointWriter{
address: address,
config: config,
}
}
}
type endpointWriter struct {
config *remoteConfig
address string
conn *grpc.ClientConn
stream Remoting_ReceiveClient
}
func (state *endpointWriter) initialize() {
err := state.initializeInternal()
if err != nil {
logger.Error("EndpointWriter failed to connect, address=[%v], %s", state.address, err)
}
}
func (state *endpointWriter) initializeInternal() error {
logger.Info("Started EndpointWriter, address=[%v]", state.address)
logger.Info("EndpointWatcher connecting, address=[%v]", state.address)
conn, err := grpc.Dial(state.address, state.config.dialOptions...)
if err != nil {
return err
}
state.conn = conn
c := NewRemotingClient(conn)
// log.Printf("Getting stream from address %v", mgr.address)
stream, err := c.Receive(context.Background(), state.config.callOptions...)
if err != nil {
return err
}
go func() {
_, err := stream.Recv()
if err != nil {
//logger.Info("EndpointWriter lost connection to address, address=[%v]", state.address)
actor.Tell(endpointManagerPID, &EndpointTerminatedEvent{Address: state.address})
}
}()
logger.Info("EndpointWriter connected, address=[%v]", state.address)
state.stream = stream
return nil
}
func (state *endpointWriter) sendEnvelopes(msg []interface{}, ctx actor.Context) {
envelopes := make([]*MessageEnvelope, len(msg))
//type name uniqueness map name string to type index
typeNames := make(map[string]int32)
typeNamesArr := make([]string, 0)
targetNames := make(map[string]int32)
targetNamesArr := make([]string, 0)
var typeID int32
var targetID int32
for i, tmp := range msg {
rd := tmp.(*remoteDeliver)
bytes, typeName, _ := serialize(rd.message)
typeID, typeNamesArr = addToLookup(typeNames, typeName, typeNamesArr)
targetID, targetNamesArr = addToLookup(targetNames, rd.target.Id, targetNamesArr)
envelopes[i] = &MessageEnvelope{
MessageData: bytes,
Sender: rd.sender,
Target: targetID,
TypeId: typeID,
}
}
batch := &MessageBatch{
TypeNames: typeNamesArr,
TargetNames: targetNamesArr,
Envelopes: envelopes,
}
err := state.stream.Send(batch)
if err != nil {
//ctx.Stash()
logger.Debug("gRPC Failed to send, address=[%v]", state.address)
//panic("restart it")
actor.Tell(endpointManagerPID, &EndpointTerminatedEvent{Address: state.address})
}
}
func addToLookup(m map[string]int32, name string, a []string) (int32, []string) {
max := int32(len(m))
id, ok := m[name]
if !ok {
m[name] = max
id = max
a = append(a, name)
}
return id, a
}
func (state *endpointWriter) Receive(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *actor.Started:
state.initialize()
case *actor.Stopping:
// pass
case *actor.Stopped:
state.conn.Close()
logger.Info("Stopped EndpointWriter, address=[%v]", state.address)
case *actor.Restarting:
state.conn.Close()
case []interface{}:
state.sendEnvelopes(msg, ctx)
default:
logger.Error("Unknown message[%#v]", msg, msg)
}
}