/
sync_listener_tcp.go
174 lines (161 loc) · 5.55 KB
/
sync_listener_tcp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package server
// sync_listener_tcp.go creates a listener for TCP requests, specifically one
// type of request that allows a hardware device to see what reports have been
// received by the server.
//
// This is important because the hardware devices send reports over UDP, which
// does not guarantee that the reports will arrive. In the event that the
// report packets are dropped, the hardware device needs some way to detect
// that they were dropped and attempt to resubmit them. This file provides the
// synchronization data.
//
// The synchronization data itself takes the form of a bitfield, one bit per
// timeslot. A '0' indicates that the server does not have a report for that
// timeslot, and a '1' indicates that the server does have a report for that
// timeslot, allowing the hardware (which runs on an IoT mobile network) to see
// what reports are missing while using minimal bandwidth.
//
// This endpoint is expected to be called roughly every 4 hours by each
// hardware device.
import (
"encoding/binary"
"io"
"net"
"time"
"github.com/glowlabs-org/gca-backend/glow"
)
// threadedListenForSyncRequests creates a TCP listener that will listen for
// queries that want to see which timeslots have reports for a given piece of
// hardware.
func (gcas *GCAServer) threadedListenForSyncRequests(tcpReady chan struct{}) {
// Listen on TCP port
listener, err := net.Listen("tcp", tcpPort)
if err != nil {
gcas.logger.Fatalf("Failed to start server: %s", err)
}
gcas.tcpListener = listener
defer listener.Close()
gcas.tcpPort = uint16(listener.Addr().(*net.TCPAddr).Port)
close(tcpReady)
for {
// Check for a shutdown signal.
select {
case <-gcas.quit:
return
default:
// Wait for the next incoming request
}
// Wait for a connection
conn, err := listener.Accept()
if err != nil {
gcas.logger.Infof("Failed to accept connection: %s", err)
continue
}
// Handle the connection in a new goroutine
go gcas.managedHandleSyncConn(conn)
}
}
// managedHandleSyncConn will handle the incoming tcp request. The incoming
// request is expected to have a 4 byte payload, representing the ShortID of
// the equipment that we want history from.
//
// If successful, the response will be:
// - 32 bytes, contianing the public key of the equipment
// - 4 bytes, containing the timeslot where the history starts
// - 504 bytes, containing the bitfield exposing the missing history
// - 8 bytes, containing a Unix timestamp for when the response was authorized
// - 64 bytes, containing a signature from the GCA server asserting the authenticity of the data
//
// If unsuccessful, the response will be a single zero byte followed by the
// connection closing.
func (gcas *GCAServer) managedHandleSyncConn(conn net.Conn) {
defer conn.Close()
// Create a buffer to store incoming data
buf := make([]byte, 4)
_, err := io.ReadFull(conn, buf)
if err != nil {
gcas.logger.Infof("Unable to read request")
return
}
// Read the ShortID from the request.
id := binary.LittleEndian.Uint32(buf)
// Fetch the corresponding data.
var bitfield [504]byte
gcas.mu.Lock()
reports, exists := gcas.equipmentReports[id]
if exists {
for i, report := range reports {
byteIndex := i / 8
bitIndex := i % 8
if report.PowerOutput > 0 {
bitfield[byteIndex] |= 1 << bitIndex
}
}
}
equipment, exists2 := gcas.equipment[id]
reportsOffset := gcas.equipmentReportsOffset
migration, migrationExists := gcas.equipmentMigrations[equipment.PublicKey]
gcas.mu.Unlock()
// If there is no hardware for the provided short id, write a zero byte
// and close the connection.
if !exists || !exists2 {
var ded [1]byte
conn.Write(ded[:])
return
}
// Prepare the response. The first two bytes will be used as a length
// prefix.
resp := make([]byte, 578)
// Copy in the public key.
copy(resp[2:34], equipment.PublicKey[:])
// Copy in the reports offset
binary.LittleEndian.PutUint32(resp[34:38], reportsOffset)
// Copy in the bitfield.
copy(resp[38:542], bitfield[:])
if migrationExists {
// When copying in the migration, we can skip the first 32
// bytes because it contains the equipment public key, which
// already appears before the bitfield.
mBytes := migration.Serialize()
resp = resp[:542]
resp = append(resp, mBytes[32:]...)
} else {
// Add the list of gcaServers.
gcas.gcaServers.mu.Lock()
for _, s := range gcas.gcaServers.servers {
locationLen := len(s.Location)
sBytes := make([]byte, 104+locationLen)
copy(sBytes[:32], s.PublicKey[:])
if s.Banned {
sBytes[32] = 1
}
sBytes[33] = byte(locationLen)
copy(sBytes[34:], []byte(s.Location))
binary.LittleEndian.PutUint16(sBytes[34+locationLen:], s.HttpPort)
binary.LittleEndian.PutUint16(sBytes[36+locationLen:], s.TcpPort)
binary.LittleEndian.PutUint16(sBytes[38+locationLen:], s.UdpPort)
copy(sBytes[40+locationLen:], s.GCAAuthorization[:])
resp = append(resp, sBytes...)
}
// Copy in a blank GCA signature.
var newGCASig glow.Signature
resp = append(resp, newGCASig[:]...)
gcas.gcaServers.mu.Unlock()
}
// Copy in the unix timestamp
var timeBytes [8]byte
timestamp := time.Now().Unix()
binary.LittleEndian.PutUint64(timeBytes[:], uint64(timestamp))
resp = append(resp, timeBytes[:]...)
// Create the signature
sig := glow.Sign(resp[2:], gcas.staticPrivateKey)
resp = append(resp, sig[:]...)
respLen := len(resp) - 2 // subtract 2 because the length prefix doesn't count
binary.LittleEndian.PutUint16(resp[:2], uint16(respLen))
_, err = conn.Write(resp)
if err != nil {
gcas.logger.Errorf("Failed to write response: %v", err)
return
}
return
}