-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
connections_tracker.go
146 lines (127 loc) · 4.29 KB
/
connections_tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
// Package listeners implements the StatsdListener interfaces.
package listeners
import (
"net"
"time"
"go.uber.org/atomic"
"github.com/DataDog/datadog-agent/pkg/trace/log"
)
// ConnectionTracker tracks connections and closes them gracefully.
type ConnectionTracker struct {
connections map[net.Conn]struct{}
connToTrack chan net.Conn
connToClose chan net.Conn
stopChan chan struct{}
stoppedChan chan struct{}
closeDelay time.Duration
activeConnections *atomic.Int32
name string
}
// NewConnectionTracker creates a new ConnectionTracker.
// closeDelay is the time to wait before closing a connection. First it will be shutdown for write, which
// will notify the client that we are disconnecting, then it will be closed. This gives some time to
// consume the remaining packets.
func NewConnectionTracker(name string, closeDelay time.Duration) *ConnectionTracker {
return &ConnectionTracker{
connections: make(map[net.Conn]struct{}),
connToTrack: make(chan net.Conn),
connToClose: make(chan net.Conn),
stopChan: make(chan struct{}),
stoppedChan: make(chan struct{}),
closeDelay: closeDelay,
activeConnections: atomic.NewInt32(0),
name: name,
}
}
// Start starts the connection tracker.
func (t *ConnectionTracker) Start() {
go t.HandleConnections()
}
// Track tracks a connection.
func (t *ConnectionTracker) Track(conn net.Conn) {
t.connToTrack <- conn
}
// Close closes a connection.
func (t *ConnectionTracker) Close(conn net.Conn) {
t.connToClose <- conn
}
// HandleConnections handles connections.
func (t *ConnectionTracker) HandleConnections() {
requestStop := false
for stop := false; !stop; {
select {
case conn := <-t.connToTrack:
log.Debugf("dogstatsd-%s: tracking new connection %s", t.name, conn.RemoteAddr().String())
if requestStop {
//Close it immediately if we are shutting down.
conn.Close()
} else {
t.connections[conn] = struct{}{}
t.activeConnections.Inc()
}
case conn := <-t.connToClose:
err := conn.Close()
log.Infof("dogstatsd-%s: failed to close connection: %v", t.name, err)
if _, ok := t.connections[conn]; !ok {
log.Warnf("dogstatsd-%s: connection wasn't tracked", t.name)
} else {
delete(t.connections, conn)
t.activeConnections.Dec()
}
case <-t.stopChan:
log.Infof("dogstatsd-%s: stopping connections", t.name)
requestStop = true
var err error
for c := range t.connections {
// First, when possible, we close the write end of the connection to notify
// the client that we are shutting down.
switch c := c.(type) {
case *net.TCPConn:
err = c.CloseWrite()
case *net.UnixConn:
err = c.CloseWrite()
}
log.Debugf("dogstatsd-%s: failed to shutdown connection: %v", t.name, err)
}
time.Sleep(t.closeDelay)
for c := range t.connections {
// First, when possible, we close the write end of the connection to notify
// the client that we are shutting down.
switch c := c.(type) {
case *net.TCPConn:
err = c.CloseRead()
case *net.UnixConn:
err = c.CloseRead()
default:
// We don't have a choice, setting a 0 timeout would likely be a retryable error.
err = c.Close()
}
log.Debugf("dogstatsd-%s: failed to shutdown connection: %v", t.name, err)
}
case <-time.After(1 * time.Second):
// We don't want to block forever on the select, so we add a timeout.
}
// Stop if we are requested to stop and all connections are closed. We might drop
// some work in the channels but that should be fine.
if requestStop && len(t.connections) == 0 {
log.Debugf("dogstatsd-%s: all connections closed", t.name)
stop = true
}
}
t.stoppedChan <- struct{}{}
}
// Stop stops the connection tracker.
// To be called one the listener is stopped, after the server socket has been close.
func (t *ConnectionTracker) Stop() {
if t.activeConnections.Load() == 0 {
return
}
// Request closing connections
t.stopChan <- struct{}{}
// Wait until all connections are closed
<-t.stoppedChan
}