Skip to content

Commit aab613a

Browse files
committed
fix: 🐛 寻找入库问题
1 parent cebdc16 commit aab613a

File tree

1 file changed

+24
-6
lines changed

1 file changed

+24
-6
lines changed

internal/core/node/node.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,30 @@ func CloseNodePool() error {
6969
return nil
7070
}
7171

72+
type nameNode struct {
73+
Name string
74+
}
75+
7276
func Add(node *[]nodeModel.Base) int {
7377
var nodesToProcess []nodeModel.Base
7478

7579
for _, n := range *node {
80+
var nameNode nameNode
81+
if err := yaml.Unmarshal(n.Raw, &nameNode); err != nil {
82+
log.Warnf("yaml.Unmarshal failed: %v", err)
83+
continue
84+
}
7685
if !nodeExist.Exist(n.UniqueKey) && !nodeProcess.Exist(n.UniqueKey) {
7786
nodeProcess.Add(n.UniqueKey)
7887
nodesToProcess = append(nodesToProcess, n)
88+
log.Debugf("add process node: %s", nameNode.Name)
89+
} else {
90+
log.Debugf("node already exist: %s", nameNode.Name)
7991
}
8092
}
8193

94+
log.Debugf("add %d nodes to process", len(nodesToProcess))
95+
8296
if len(nodesToProcess) > 0 {
8397
go func() {
8498
for _, node := range nodesToProcess {
@@ -116,14 +130,14 @@ func Add(node *[]nodeModel.Base) int {
116130
var info nodeModel.Info
117131
info.Delay.Update(uint16(time.Since(start).Milliseconds()))
118132
info.SetAliveStatus(nodeModel.Alive, true)
119-
// Copy raw to avoid pinning the whole subscription buffer
120133
rawCopy := append([]byte(nil), n.Raw...)
121134
n.Raw = rawCopy
122135
validMutex.Lock()
123136
validNodes = append(validNodes, nodeModel.Data{
124137
Base: n,
125138
Info: &info,
126139
})
140+
log.Debugf("node: %s test end, Delay: %d", raw["name"].(string), info.Delay.Average())
127141
validMutex.Unlock()
128142
})
129143

@@ -134,11 +148,12 @@ func Add(node *[]nodeModel.Base) int {
134148
go func() {
135149
time.Sleep(time.Second * 5)
136150
wgSync.Wait()
151+
mergedNodes := 0
137152
if len(validNodes) > 0 {
138-
mergeNodesToPool(validNodes)
153+
mergedNodes = mergeNodesToPool(validNodes)
139154
RefreshInfo()
140155
}
141-
log.Infof("Receipt successful, %d new nodes added", len(validNodes))
156+
log.Infof("Receipt successful, %d new nodes added", mergedNodes)
142157
validNodes = validNodes[:0]
143158
wgStatus = false
144159
}()
@@ -226,7 +241,7 @@ func GetByFilter(filter nodeModel.Filter) *[]nodeModel.Data {
226241
return &result
227242
}
228243

229-
func mergeNodesToPool(newNodes []nodeModel.Data) {
244+
func mergeNodesToPool(newNodes []nodeModel.Data) int {
230245
sort.Slice(newNodes, func(i, j int) bool {
231246
return newNodes[i].Info.Delay.Average() < newNodes[j].Info.Delay.Average()
232247
})
@@ -244,7 +259,7 @@ func mergeNodesToPool(newNodes []nodeModel.Data) {
244259
for _, node := range newNodes {
245260
nodeExist.Add(node.Base.UniqueKey)
246261
}
247-
return
262+
return len(newNodes)
248263
} else {
249264
pool = append(pool, newNodes[:remainingCap]...)
250265
for _, node := range newNodes[:remainingCap] {
@@ -261,14 +276,17 @@ func mergeNodesToPool(newNodes []nodeModel.Data) {
261276
newNodeIndex := 0
262277
for i := len(pool) - 1; i >= 0 && newNodeIndex < len(newNodes); i-- {
263278
if newNodes[newNodeIndex].Info.Delay.Average() < pool[i].Info.Delay.Average() {
279+
log.Debugf("new node delay %dms < old delay %dms,merge", newNodes[newNodeIndex].Info.Delay.Average(), pool[i].Info.Delay.Average())
264280
nodeExist.Remove(pool[i].Base.UniqueKey)
265281
pool[i] = newNodes[newNodeIndex]
266282
nodeExist.Add(newNodes[newNodeIndex].Base.UniqueKey)
267283
newNodeIndex++
268284
} else {
269-
return
285+
log.Debugf("new node delay %dms > old delay %dms,not merge", newNodes[newNodeIndex].Info.Delay.Average(), pool[i].Info.Delay.Average())
286+
return newNodeIndex
270287
}
271288
}
289+
return 0
272290
}
273291

274292
func GetSubInfo(subID uint16) nodeModel.SimpleInfo {

0 commit comments

Comments
 (0)