-
Notifications
You must be signed in to change notification settings - Fork 18.6k
/
main.go
209 lines (186 loc) · 6.89 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package main
import (
"bufio"
"context"
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"
"strings"
"github.com/containerd/log"
"github.com/docker/docker/libnetwork"
"github.com/docker/docker/libnetwork/diagnostic"
"github.com/docker/docker/libnetwork/drivers/overlay"
)
const (
readyPath = "http://%s:%d/ready"
joinNetwork = "http://%s:%d/joinnetwork?nid=%s"
leaveNetwork = "http://%s:%d/leavenetwork?nid=%s"
clusterPeers = "http://%s:%d/clusterpeers?json"
networkPeers = "http://%s:%d/networkpeers?nid=%s&json"
dumpTable = "http://%s:%d/gettable?nid=%s&tname=%s&json"
deleteEntry = "http://%s:%d/deleteentry?nid=%s&tname=%s&key=%s&json"
)
func httpIsOk(body io.ReadCloser) {
b, err := io.ReadAll(body)
if err != nil {
log.G(context.TODO()).Fatalf("Failed the body parse %s", err)
}
if !strings.Contains(string(b), "OK") {
log.G(context.TODO()).Fatalf("Server not ready %s", b)
}
body.Close()
}
func main() {
ipPtr := flag.String("ip", "127.0.0.1", "ip address")
portPtr := flag.Int("port", 2000, "port")
networkPtr := flag.String("net", "", "target network")
tablePtr := flag.String("t", "", "table to process <sd/overlay>")
remediatePtr := flag.Bool("r", false, "perform remediation deleting orphan entries")
joinPtr := flag.Bool("a", false, "join/leave network")
verbosePtr := flag.Bool("v", false, "verbose output")
flag.Parse()
if *verbosePtr {
_ = log.SetLevel("debug")
}
if _, ok := os.LookupEnv("DIND_CLIENT"); !ok && *joinPtr {
log.G(context.TODO()).Fatal("you are not using the client in docker in docker mode, the use of the -a flag can be disruptive, " +
"please remove it (doc:https://github.com/docker/docker/libnetwork/blob/master/cmd/diagnostic/README.md)")
}
log.G(context.TODO()).Infof("Connecting to %s:%d checking ready", *ipPtr, *portPtr)
resp, err := http.Get(fmt.Sprintf(readyPath, *ipPtr, *portPtr))
if err != nil {
log.G(context.TODO()).WithError(err).Fatalf("The connection failed")
}
httpIsOk(resp.Body)
clusterPeers := fetchNodePeers(*ipPtr, *portPtr, "")
var networkPeers map[string]string
var joinedNetwork bool
if *networkPtr != "" {
if *joinPtr {
log.G(context.TODO()).Infof("Joining the network:%q", *networkPtr)
resp, err = http.Get(fmt.Sprintf(joinNetwork, *ipPtr, *portPtr, *networkPtr))
if err != nil {
log.G(context.TODO()).WithError(err).Fatalf("Failed joining the network")
}
httpIsOk(resp.Body)
joinedNetwork = true
}
networkPeers = fetchNodePeers(*ipPtr, *portPtr, *networkPtr)
if len(networkPeers) == 0 {
log.G(context.TODO()).Warnf("There is no peer on network %q, check the network ID, and verify that is the non truncated version", *networkPtr)
}
}
switch *tablePtr {
case "sd":
fetchTable(*ipPtr, *portPtr, *networkPtr, "endpoint_table", clusterPeers, networkPeers, *remediatePtr)
case "overlay":
fetchTable(*ipPtr, *portPtr, *networkPtr, "overlay_peer_table", clusterPeers, networkPeers, *remediatePtr)
}
if joinedNetwork {
log.G(context.TODO()).Infof("Leaving the network:%q", *networkPtr)
resp, err = http.Get(fmt.Sprintf(leaveNetwork, *ipPtr, *portPtr, *networkPtr))
if err != nil {
log.G(context.TODO()).WithError(err).Fatalf("Failed leaving the network")
}
httpIsOk(resp.Body)
}
}
func fetchNodePeers(ip string, port int, network string) map[string]string {
if network == "" {
log.G(context.TODO()).Infof("Fetch cluster peers")
} else {
log.G(context.TODO()).Infof("Fetch peers network:%q", network)
}
var path string
if network != "" {
path = fmt.Sprintf(networkPeers, ip, port, network)
} else {
path = fmt.Sprintf(clusterPeers, ip, port)
}
resp, err := http.Get(path) //nolint:gosec // G107: Potential HTTP request made with variable url
if err != nil {
log.G(context.TODO()).WithError(err).Fatalf("Failed fetching path")
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
log.G(context.TODO()).WithError(err).Fatalf("Failed the body parse")
}
output := diagnostic.HTTPResult{Details: &diagnostic.TablePeersResult{}}
err = json.Unmarshal(body, &output)
if err != nil {
log.G(context.TODO()).WithError(err).Fatalf("Failed the json unmarshalling")
}
log.G(context.TODO()).Debugf("Parsing JSON response")
result := make(map[string]string, output.Details.(*diagnostic.TablePeersResult).Length)
for _, v := range output.Details.(*diagnostic.TablePeersResult).Elements {
log.G(context.TODO()).Debugf("name:%s ip:%s", v.Name, v.IP)
result[v.Name] = v.IP
}
return result
}
func fetchTable(ip string, port int, network, tableName string, clusterPeers, networkPeers map[string]string, remediate bool) {
log.G(context.TODO()).Infof("Fetch %s table and check owners", tableName)
resp, err := http.Get(fmt.Sprintf(dumpTable, ip, port, network, tableName))
if err != nil {
log.G(context.TODO()).WithError(err).Fatalf("Failed fetching endpoint table")
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
log.G(context.TODO()).WithError(err).Fatalf("Failed the body parse")
}
output := diagnostic.HTTPResult{Details: &diagnostic.TableEndpointsResult{}}
err = json.Unmarshal(body, &output)
if err != nil {
log.G(context.TODO()).WithError(err).Fatalf("Failed the json unmarshalling")
}
log.G(context.TODO()).Debug("Parsing data structures")
var orphanKeys []string
for _, v := range output.Details.(*diagnostic.TableEndpointsResult).Elements {
decoded, err := base64.StdEncoding.DecodeString(v.Value)
if err != nil {
log.G(context.TODO()).WithError(err).Errorf("Failed decoding entry")
continue
}
switch tableName {
case "endpoint_table":
var elem libnetwork.EndpointRecord
elem.Unmarshal(decoded)
log.G(context.TODO()).Debugf("key:%s value:%+v owner:%s", v.Key, elem, v.Owner)
case "overlay_peer_table":
var elem overlay.PeerRecord
elem.Unmarshal(decoded)
log.G(context.TODO()).Debugf("key:%s value:%+v owner:%s", v.Key, elem, v.Owner)
}
if _, ok := networkPeers[v.Owner]; !ok {
log.G(context.TODO()).Warnf("The element with key:%s does not belong to any node on this network", v.Key)
orphanKeys = append(orphanKeys, v.Key)
}
if _, ok := clusterPeers[v.Owner]; !ok {
log.G(context.TODO()).Warnf("The element with key:%s does not belong to any node on this cluster", v.Key)
}
}
if len(orphanKeys) > 0 && remediate {
log.G(context.TODO()).Warnf("The following keys:%v results as orphan, do you want to proceed with the deletion (this operation is irreversible)? [Yes/No]", orphanKeys)
reader := bufio.NewReader(os.Stdin)
text, _ := reader.ReadString('\n')
text = strings.ReplaceAll(text, "\n", "")
if strings.Compare(text, "Yes") == 0 {
for _, k := range orphanKeys {
resp, err := http.Get(fmt.Sprintf(deleteEntry, ip, port, network, tableName, k))
if err != nil {
log.G(context.TODO()).WithError(err).Errorf("Failed deleting entry k:%s", k)
break
}
resp.Body.Close()
}
} else {
log.G(context.TODO()).Infof("Deletion skipped")
}
}
}