-
Notifications
You must be signed in to change notification settings - Fork 106
/
proxy.go
174 lines (157 loc) · 4.44 KB
/
proxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package monitor
import (
"encoding/json"
"fmt"
"io"
"net"
"strconv"
"gopkg.in/dedis/cothority.v0/lib/dbg"
)
// Implements a simple proxy
// A <-> D <-> B
// D is the proxy. It will listen for incoming connections on the side of B
// And will connect to A
// serverConn is the connection object to the server
var serverConn net.Conn
// to write back the measure to the server
var serverEnc *json.Encoder
var serverDec *json.Decoder
var readyCount int64
// proxy connections opened
var proxyConns map[string]*json.Encoder
var proxyDone chan bool
func init() {
proxyDone = make(chan bool)
}
// Proxy will launch a routine that waits for input connections
// It takes a redirection address soas to where redirect incoming packets
// Proxy will listen on Sink:SinkPort variables so that the user do not
// differentiate between connecting to a proxy or directly to the sink
// It will panic if it can not contact the server or can not bind to the address
func Proxy(redirection string) error {
// Connect to the sink
if err := connectToSink(redirection); err != nil {
return err
}
dbg.Lvl2("Proxy connected to sink", redirection)
// The proxy listens on the port one lower than itself
_, port, err := net.SplitHostPort(redirection)
if err != nil {
dbg.Fatal("Couldn't get port-numbre from", redirection)
}
portNbr, err := strconv.Atoi(port)
if err != nil {
dbg.Fatal("Couldn't convert", port, "to a number")
}
sinkAddr := Sink + ":" + strconv.Itoa(portNbr-1)
ln, err := net.Listen("tcp", sinkAddr)
if err != nil {
return fmt.Errorf("Error while binding proxy to addr %s: %v", sinkAddr, err)
}
dbg.Lvl2("Proxy listening on", sinkAddr)
newConn := make(chan bool)
closeConn := make(chan bool)
finished := false
proxyConns := make(map[string]*json.Encoder)
// Listen for incoming connections
go func() {
for finished == false {
conn, err := ln.Accept()
if err != nil {
operr, ok := err.(*net.OpError)
// the listener is closed
if ok && operr.Op == "accept" {
break
}
dbg.Lvl1("Error proxy accepting connection:", err)
continue
}
dbg.Lvl3("Proxy accepting incoming connection from:", conn.RemoteAddr().String())
newConn <- true
proxyConns[conn.RemoteAddr().String()] = json.NewEncoder(conn)
go proxyConnection(conn, closeConn)
}
}()
go func() {
// notify every new connection and every end of connection. When all
// connections are closed, send an "end" measure to the sink.
var nconn int
for finished == false {
select {
case <-newConn:
nconn++
case <-closeConn:
nconn--
if nconn == 0 {
// everything is finished
if err := serverEnc.Encode(NewSingleMeasure("end", 0)); err != nil {
dbg.Error("Couldn't send 'end' message:", err)
}
if err := serverConn.Close(); err != nil {
dbg.Error("Couldn't close server connection:", err)
}
if err := ln.Close(); err != nil {
dbg.Error("Couldn't close listener:", err)
}
finished = true
break
}
}
}
}()
return nil
}
// connectToSink starts the connection with the server
func connectToSink(redirection string) error {
conn, err := net.Dial("tcp", redirection)
if err != nil {
return fmt.Errorf("Proxy connection to server %s failed: %v", redirection, err)
}
serverConn = conn
serverEnc = json.NewEncoder(conn)
serverDec = json.NewDecoder(conn)
return nil
}
// The core of the file: read any input from the connection and outputs it into
// the server connection
func proxyConnection(conn net.Conn, done chan bool) {
dec := json.NewDecoder(conn)
nerr := 0
for {
m := SingleMeasure{}
// Receive data
if err := dec.Decode(&m); err != nil {
if err == io.EOF {
break
}
dbg.Lvl1("Error receiving data from", conn.RemoteAddr().String(), ":", err)
nerr++
if nerr > 1 {
dbg.Lvl1("Too many errors from", conn.RemoteAddr().String(), ": Abort connection")
break
}
}
dbg.Lvl3("Proxy received", m)
// Proxy data back to monitor
if err := serverEnc.Encode(m); err != nil {
dbg.Lvl2("Error proxying data :", err)
break
}
if m.Name == "end" {
// the end
dbg.Lvl2("Proxy detected end of measurement. Closing connection.")
break
}
}
if err := conn.Close(); err != nil {
dbg.Error("Couldn't close connection:", err)
}
done <- true
}
// proxyDataServer send the data to the server...
func proxyDataServer(data []byte) {
_, err := serverConn.Write(data)
if err != nil {
panic(fmt.Errorf("Error proxying data to server: %v", err))
}
}