-
Notifications
You must be signed in to change notification settings - Fork 1
/
proxyxx.go
124 lines (96 loc) · 2.53 KB
/
proxyxx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package p2p
import (
"fmt"
"go.uber.org/zap"
"github.com/eosforce/goforceio"
)
type Proxy_Test struct {
Peer1 *Peer
Peer2 *Peer
handlers []Handler
waitingOriginHandShake bool
waitingDestinationHandShake bool
}
func NewProxy_test(peer1 *Peer, peer2 *Peer) *Proxy_Test {
return &Proxy_Test{
Peer1: peer1,
Peer2: peer2,
}
}
func (p *Proxy_Test) RegisterHandler(handler Handler) {
p.handlers = append(p.handlers, handler)
}
func (p *Proxy_Test) RegisterHandlers(handlers []Handler) {
p.handlers = append(p.handlers, handlers...)
}
func (p *Proxy_Test) read(sender *Peer, receiver *Peer, errChannel chan error) {
for {
//p2pLog.Debug("Waiting for packet")
packet, err := sender.Read()
//p2pLog.Debug("Received for packet")
if err != nil {
errChannel <- fmt.Errorf("read message from %s: %s", sender.Address, err)
return
}
err = p.handle(packet, sender, receiver)
if err != nil {
errChannel <- err
}
}
}
func (p *Proxy_Test) handle(packet *eos.Packet, sender *Peer, receiver *Peer) error {
_, err := receiver.Write(packet.Raw)
if err != nil {
return fmt.Errorf("handleDefault: %s", err)
}
switch m := packet.P2PMessage.(type) {
case *eos.GoAwayMessage:
return fmt.Errorf("handling message: go away: reason [%d]", m.Reason)
}
envelope := NewEnvelope(sender, receiver, packet)
for _, handle := range p.handlers {
handle.Handle(envelope)
}
return nil
}
// func triggerHandshake(peer *Peer) error {
// return peer.SendHandshake(peer.handshakeInfo)
// }
func (p *Proxy_Test) ConnectAndStart() error {
p2pLog.Info("Connecting and starting proxy")
errorChannel := make(chan error)
peer1ReadyChannel := p.Peer1.Connect(errorChannel)
peer2ReadyChannel := p.Peer2.Connect(errorChannel)
peer1Ready := false
peer2Ready := false
for {
select {
case <-peer1ReadyChannel:
peer1Ready = true
case <-peer2ReadyChannel:
peer2Ready = true
case err := <-errorChannel:
return err
}
if peer1Ready && peer2Ready {
break
}
}
return p.Start()
}
func (p *Proxy_Test) Start() error {
p2pLog.Info("Starting readers",
zap.String("peer1", p.Peer1.Address),
zap.String("peer1", p.Peer2.Address))
errorChannel := make(chan error)
go p.read(p.Peer1, p.Peer2, errorChannel)
go p.read(p.Peer2, p.Peer1, errorChannel)
if p.Peer2.handshakeInfo != nil {
err := triggerHandshake(p.Peer2)
if err != nil {
return fmt.Errorf("connect and start: trigger handshake: %s", err)
}
}
//p2pLog.Info("Started")
return <-errorChannel
}