forked from DiceDB/dice
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasync_tcp.go
191 lines (156 loc) · 4.91 KB
/
async_tcp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package server
import (
"log"
"net"
"os"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/dicedb/dice/config"
"github.com/dicedb/dice/core"
)
var cronFrequency time.Duration = 1 * time.Second
var lastCronExecTime time.Time = time.Now()
const EngineStatus_WAITING int32 = 1 << 1
const EngineStatus_BUSY int32 = 1 << 2
const EngineStatus_SHUTTING_DOWN int32 = 1 << 3
const EngineStatus_TRANSACTION int32 = 1 << 4
var eStatus int32 = EngineStatus_WAITING
var connectedClients map[int]*core.Client
func init() {
connectedClients = make(map[int]*core.Client)
}
func WaitForSignal(wg *sync.WaitGroup, sigs chan os.Signal) {
defer wg.Done()
<-sigs
// if server is busy continue to wait
for atomic.LoadInt32(&eStatus) == EngineStatus_BUSY {
}
// CRITICAL TO HANDLE
// We do not want server to ever go back to BUSY state
// when control flow is here ->
// immediately set the status to be SHUTTING DOWN
// the only place where we can set the status to be SHUTTING DOWN
atomic.StoreInt32(&eStatus, EngineStatus_SHUTTING_DOWN)
// if server is in any other state, initiate a shutdown
core.Shutdown()
os.Exit(0)
}
func RunAsyncTCPServer(wg *sync.WaitGroup) error {
defer wg.Done()
defer func() {
atomic.StoreInt32(&eStatus, EngineStatus_SHUTTING_DOWN)
}()
log.Println("starting an asynchronous TCP server on", config.Host, config.Port)
max_clients := 20000
// Create EPOLL Event Objects to hold events
var events []syscall.EpollEvent = make([]syscall.EpollEvent, max_clients)
// Create a socket
serverFD, err := syscall.Socket(syscall.AF_INET, syscall.O_NONBLOCK|syscall.SOCK_STREAM, 0)
if err != nil {
return err
}
defer syscall.Close(serverFD)
// Set the Socket operate in a non-blocking mode
if err = syscall.SetNonblock(serverFD, true); err != nil {
return err
}
// Bind the IP and the port
ip4 := net.ParseIP(config.Host)
if err = syscall.Bind(serverFD, &syscall.SockaddrInet4{
Port: config.Port,
Addr: [4]byte{ip4[0], ip4[1], ip4[2], ip4[3]},
}); err != nil {
return err
}
// Start listening
if err = syscall.Listen(serverFD, max_clients); err != nil {
return err
}
// AsyncIO starts here!!
// creating EPOLL instance
epollFD, err := syscall.EpollCreate1(0)
if err != nil {
log.Fatal(err)
}
defer syscall.Close(epollFD)
// Specify the events we want to get hints about
// and set the socket on which
var socketServerEvent syscall.EpollEvent = syscall.EpollEvent{
Events: syscall.EPOLLIN,
Fd: int32(serverFD),
}
// Listen to read events on the Server itself
if err = syscall.EpollCtl(epollFD, syscall.EPOLL_CTL_ADD, serverFD, &socketServerEvent); err != nil {
return err
}
// loop until the server is not shutting down
for atomic.LoadInt32(&eStatus) != EngineStatus_SHUTTING_DOWN {
if time.Now().After(lastCronExecTime.Add(cronFrequency)) {
core.DeleteExpiredKeys()
lastCronExecTime = time.Now()
}
// Say, the Engine triggered SHUTTING down when the control flow is here ->
// Current: Engine status == WAITING
// Update: Engine status = SHUTTING_DOWN
// Then we have to exit (handled in Signal Handler)
// see if any FD is ready for an IO
nevents, e := syscall.EpollWait(epollFD, events[:], -1)
if e != nil {
continue
}
// Here, we do not want server to go back from SHUTTING DOWN
// to BUSY
// If the engine status == SHUTTING_DOWN over here ->
// We have to exit
// hence the only legal transitiion is from WAITING to BUSY
// if that does not happen then we can exit.
// mark engine as BUSY only when it is in the waiting state
if !atomic.CompareAndSwapInt32(&eStatus, EngineStatus_WAITING, EngineStatus_BUSY) {
// if swap unsuccessful then the existing status is not WAITING, but something else
switch eStatus {
case EngineStatus_SHUTTING_DOWN:
return nil
}
}
for i := 0; i < nevents; i++ {
// if the socket server itself is ready for an IO
if int(events[i].Fd) == serverFD {
// accept the incoming connection from a client
fd, _, err := syscall.Accept(serverFD)
if err != nil {
log.Println("err", err)
continue
}
connectedClients[fd] = core.NewClient(fd)
syscall.SetNonblock(fd, true)
// add this new TCP connection to be monitored
var socketClientEvent syscall.EpollEvent = syscall.EpollEvent{
Events: syscall.EPOLLIN,
Fd: int32(fd),
}
if err := syscall.EpollCtl(epollFD, syscall.EPOLL_CTL_ADD, fd, &socketClientEvent); err != nil {
log.Fatal(err)
}
} else {
comm := connectedClients[int(events[i].Fd)]
if comm == nil {
continue
}
cmds, maliciousFlag, err := readCommands(comm)
if err != nil || maliciousFlag {
syscall.Close(int(events[i].Fd))
delete(connectedClients, int(events[i].Fd))
continue
}
respond(cmds, comm)
}
}
// mark engine as WAITING
// no contention as the signal handler is blocked until
// the engine is BUSY
atomic.StoreInt32(&eStatus, EngineStatus_WAITING)
}
return nil
}