/
proxy.go
128 lines (99 loc) · 2.58 KB
/
proxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package p2p
import (
"fmt"
"github.com/pkg/errors"
"go.uber.org/zap"
"github.com/houyanzu/yotta-go"
)
type Proxy struct {
Peer1 *Peer
Peer2 *Peer
handlers []Handler
waitingOriginHandShake bool
waitingDestinationHandShake bool
}
func NewProxy(peer1 *Peer, peer2 *Peer) *Proxy {
return &Proxy{
Peer1: peer1,
Peer2: peer2,
}
}
func (p *Proxy) RegisterHandler(handler Handler) {
p.handlers = append(p.handlers, handler)
}
func (p *Proxy) RegisterHandlers(handlers []Handler) {
p.handlers = append(p.handlers, handlers...)
}
func (p *Proxy) read(sender *Peer, receiver *Peer, errChannel chan error) {
for {
//p2pLog.Debug("Waiting for packet")
packet, err := sender.Read()
//p2pLog.Debug("Received for packet")
if err != nil {
errChannel <- errors.Wrapf(err, "read message from %s", sender.Address)
return
}
err = p.handle(packet, sender, receiver)
if err != nil {
errChannel <- err
}
}
}
func (p *Proxy) handle(packet *eos.Packet, sender *Peer, receiver *Peer) error {
_, err := receiver.Write(packet.Raw)
if err != nil {
return errors.Wrapf(err, "handleDefault")
}
switch m := packet.P2PMessage.(type) {
case *eos.GoAwayMessage:
return errors.Errorf("handling message: go away: reason [%d]", m.Reason)
}
envelope := NewEnvelope(sender, receiver, packet)
for _, handle := range p.handlers {
handle.Handle(envelope)
}
return nil
}
func triggerHandshake(peer *Peer) error {
return peer.SendHandshake(peer.handshakeInfo)
}
func (p *Proxy) ConnectAndStart() error {
p2pLog.Info("Connecting and starting proxy")
errorChannel := make(chan error)
peer1ReadyChannel := p.Peer1.Connect(errorChannel)
peer2ReadyChannel := p.Peer2.Connect(errorChannel)
peer1Ready := false
peer2Ready := false
for {
select {
case <-peer1ReadyChannel:
peer1Ready = true
case <-peer2ReadyChannel:
peer2Ready = true
case err := <-errorChannel:
return err
}
if peer1Ready && peer2Ready {
break
}
}
return p.Start()
}
func (p *Proxy) Start() error {
p2pLog.Info("Starting readers",
zap.String("peer1", p.Peer1.Address),
zap.String("peer1", p.Peer2.Address))
errorChannel := make(chan error)
go p.read(p.Peer1, p.Peer2, errorChannel)
go p.read(p.Peer2, p.Peer1, errorChannel)
if p.Peer2.handshakeInfo != nil {
err := triggerHandshake(p.Peer2)
if err != nil {
return fmt.Errorf("connect and start: trigger handshake: %s", err)
}
return errors.Wrap(triggerHandshake(p.Peer2),
"connect and start: trigger handshake")
}
//p2pLog.Info("Started")
return <-errorChannel
}