/
server.go
113 lines (103 loc) · 3.38 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package host
import (
"fmt"
"go.uber.org/zap"
"os"
"os/signal"
"sync"
"syscall"
"time"
drshproto "github.com/dmhacker/drsh/internal/proto"
)
// Represents a host on the network that accepts connections & pings from any clients
// that wish to communicate with it. On a successful handshake with a client, a server will
// spawn a separate "session" through which all communication is encrypted. Because the session
// handles most of the connection legwork, the actual server class is fairly light.
type Server struct {
Host *RedisHost
Sessions sync.Map
}
// NewServer creates a new server and its underlying connection to Redis. It is not actively
// receiving, sending, or processing messages at this point; that is only enabled upon start.
func NewServer(hostname string, uri string, logger *zap.SugaredLogger) (*Server, error) {
serv := Server{}
hst, err := NewRedisHost("se-"+hostname, uri, logger)
if err != nil {
return nil, err
}
serv.Host = hst
return &serv, nil
}
func (serv *Server) handlePing(sender string) {
serv.Host.SendPublicMessage(sender, drshproto.PublicMessage{
Type: drshproto.PublicMessage_PING_RESPONSE,
Sender: serv.Host.Hostname,
})
}
func (serv *Server) handleSession(sender string, keyPart []byte) {
resp := drshproto.PublicMessage{
Type: drshproto.PublicMessage_SESSION_RESPONSE,
Sender: serv.Host.Hostname,
}
session, err := serv.NewSession(sender, keyPart)
if err != nil {
serv.Host.Logger.Warnf("Failed to setup session with '%s': %s", sender, err)
resp.SessionCreated = false
resp.SessionError = err.Error()
serv.Host.SendPublicMessage(sender, resp)
} else {
serv.Host.Logger.Infof("'%s' has joined session %s.", sender, session.Host.Hostname)
resp.SessionCreated = true
resp.SessionKeyPart = session.Host.Encryption.PrivateKey.Bytes()
resp.SessionHostname = session.Host.Hostname
serv.Host.SendPublicMessage(sender, resp)
session.Host.Encryption.FreePrivateKeys()
session.Start()
}
}
func (serv *Server) startMessageHandler() {
for imsg := range serv.Host.incomingMessages {
pmsg := imsg.publicMessage
if pmsg == nil {
serv.Host.Logger.Warnf("Server %s only accepts public messages.", serv.Host.Hostname)
continue
}
switch pmsg.GetType() {
case drshproto.PublicMessage_PING_REQUEST:
serv.handlePing(pmsg.GetSender())
case drshproto.PublicMessage_SESSION_REQUEST:
serv.handleSession(pmsg.GetSender(), pmsg.GetSessionKeyPart())
default:
serv.Host.Logger.Warnf("Received invalid message from '%s'.", pmsg.GetSender())
}
}
}
func (serv *Server) addInterruptHandler() {
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
serv.Host.Logger.Infof("Server %s is shutting down.", serv.Host.Hostname)
// All active sessions are given the chance to properly exit
serv.Sessions.Range(func(key interface{}, value interface{}) bool {
session := value.(*Session)
if session.Host.IsOpen() {
session.handleExit(fmt.Errorf("terminated"), true)
}
return true
})
// Ensure that server has time to send termination packets
time.Sleep(100 * time.Millisecond)
os.Exit(1)
}()
}
// Non-blocking function that enables server message processing.
func (serv *Server) Start() {
serv.addInterruptHandler()
serv.Host.Start()
go serv.startMessageHandler()
}
// Destroys the server's Redis connection and perform cleanup.
func (serv *Server) Close() {
serv.Host.Close()
}