From c5b142a28f429f13485b789b1006c1a1176066c5 Mon Sep 17 00:00:00 2001 From: J Bilal rafique Date: Wed, 22 Oct 2025 17:04:51 +0500 Subject: [PATCH 1/2] Optimize supernode symbol fetch --- p2p/kademlia/dht.go | 285 ++++++++++++++++++++----------------------- p2p/kademlia/node.go | 14 +-- 2 files changed, 140 insertions(+), 159 deletions(-) diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index a430de0b..acc4193c 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -17,6 +17,7 @@ import ( "github.com/btcsuite/btcutil/base58" "github.com/cenkalti/backoff/v4" "github.com/cosmos/cosmos-sdk/crypto/keyring" + "golang.org/x/sync/semaphore" "github.com/LumeraProtocol/lumera/x/lumeraid/securekeyx" "github.com/LumeraProtocol/supernode/v2/pkg/errors" @@ -688,11 +689,7 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, var foundLocalCount int32 hexKeys := make([]string, len(keys)) - globalClosestContacts := make(map[string]*NodeList) hashes := make([][]byte, len(keys)) - knownNodes := make(map[string]*Node) - var knownMu sync.Mutex - var closestMu sync.RWMutex defer func() { resMap.Range(func(key, value interface{}) bool { @@ -716,15 +713,6 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, } }() - for _, key := range keys { - result[key] = nil - } - - supernodeAddr, _ := s.getSupernodeAddress(ctx) - hostIP := parseSupernodeAddress(supernodeAddr) - self := &Node{ID: s.ht.self.ID, IP: hostIP, Port: s.ht.self.Port} - self.SetHashedID() - for i, key := range keys { decoded := base58.Decode(key) if len(decoded) != B/8 { @@ -732,16 +720,60 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, } hashes[i] = decoded hexKeys[i] = hex.EncodeToString(decoded) + result[key] = nil } + foundLocalCount, err = s.fetchAndAddLocalKeys(ctx, hexKeys, &resMap, required) + if err != nil { + return nil, fmt.Errorf("fetch and add local keys: %v", err) + } + // Found locally count is logged via summary below; no external metrics + + if foundLocalCount >= required { + logtrace.Debug(ctx, "DHT BatchRetrieve satisfied from local storage", logtrace.Fields{ + "txid": txID, "found_local": foundLocalCount, "required": required, + }) + return result, nil + } + + if len(localOnly) > 0 && localOnly[0] { + logtrace.Debug(ctx, "DHT BatchRetrieve local-only mode, insufficient keys", logtrace.Fields{ + "txid": txID, "found_local": foundLocalCount, "required": required, + }) + return result, fmt.Errorf("local-only: found %d, required %d", foundLocalCount, required) + } + + supernodeAddr, addrErr := s.getSupernodeAddress(ctx) + if addrErr != nil { + logtrace.Warn(ctx, "Failed to get supernode address", logtrace.Fields{ + logtrace.FieldModule: "dht", + logtrace.FieldError: addrErr.Error(), + }) + } + hostIP := parseSupernodeAddress(supernodeAddr) + self := &Node{ID: s.ht.self.ID, IP: hostIP, Port: s.ht.self.Port} + self.SetHashedID() + + knownNodes := make(map[string]*Node) + var knownMu sync.Mutex + for _, n := range s.ht.nodes() { nn := &Node{ID: n.ID, IP: n.IP, Port: n.Port} nn.SetHashedID() knownNodes[string(nn.ID)] = nn } + ignoreList := s.ignorelist.ToNodeList() + + globalClosestContacts := make(map[string]*NodeList) + var closestMu sync.RWMutex + for i := range keys { - top6 := s.ht.closestContactsWithIncludingNode(Alpha, hashes[i], s.ignorelist.ToNodeList(), nil) + if _, found := resMap.Load(hexKeys[i]); found { + continue + } + + top6 := s.ht.closestContactsWithIncludingNode(Alpha, hashes[i], ignoreList, nil) closestMu.Lock() globalClosestContacts[keys[i]] = top6 closestMu.Unlock() @@ -750,21 +782,12 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, delete(knownNodes, string(self.ID)) - foundLocalCount, err = s.fetchAndAddLocalKeys(ctx, hexKeys, &resMap, required) - if err != nil { - return nil, fmt.Errorf("fetch and add local keys: %v", err) - } - // Found locally count is logged via summary below; no external metrics - if foundLocalCount >= required { - return result, nil - } - batchSize := batchRetrieveSize var networkFound int32 totalBatches := int(math.Ceil(float64(required) / float64(batchSize))) parallelBatches := int(math.Min(float64(totalBatches), float64(fetchSymbolsBatchConcurrency))) - semaphore := make(chan struct{}, parallelBatches) + sem := semaphore.NewWeighted(int64(parallelBatches)) var wg sync.WaitGroup gctx, cancel := context.WithCancel(ctx) defer cancel() @@ -777,27 +800,39 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, if end > len(keys) { end = len(keys) } + if atomic.LoadInt32(&networkFound)+int32(foundLocalCount) >= int32(required) { break } wg.Add(1) - semaphore <- struct{}{} - go s.processBatch( - gctx, - keys[start:end], - hexKeys[start:end], - semaphore, &wg, - globalClosestContacts, - &closestMu, - knownNodes, &knownMu, - &resMap, - required, - foundLocalCount, - &networkFound, - cancel, - txID, - ) + go func(start, end int) { + defer wg.Done() + + if err := sem.Acquire(gctx, 1); err != nil { + return + } + defer sem.Release(1) + + if atomic.LoadInt32(&networkFound)+int32(foundLocalCount) >= int32(required) { + return + } + + s.processBatch( + gctx, + keys[start:end], + hexKeys[start:end], + globalClosestContacts, + &closestMu, + knownNodes, &knownMu, + &resMap, + required, + foundLocalCount, + &networkFound, + cancel, + txID, + ) + }(start, end) } wg.Wait() @@ -821,8 +856,6 @@ func (s *DHT) processBatch( ctx context.Context, batchKeys []string, batchHexKeys []string, - semaphore chan struct{}, - wg *sync.WaitGroup, globalClosestContacts map[string]*NodeList, closestMu *sync.RWMutex, knownNodes map[string]*Node, @@ -834,94 +867,65 @@ func (s *DHT) processBatch( cancel context.CancelFunc, txID string, ) { - defer wg.Done() - defer func() { <-semaphore }() + select { + case <-ctx.Done(): + return + default: + } - for i := 0; i < maxIterations; i++ { - select { - case <-ctx.Done(): - return - default: - } + fetchMap := make(map[string][]int) - // Build fetch map (read globalClosestContacts under RLock) - fetchMap := make(map[string][]int) - for i, key := range batchKeys { - closestMu.RLock() - nl := globalClosestContacts[key] - closestMu.RUnlock() - if nl == nil { - continue - } - for _, node := range nl.Nodes { - nodeID := string(node.ID) - fetchMap[nodeID] = append(fetchMap[nodeID], i) - } - } + closestMu.RLock() + localContacts := make(map[string]*NodeList, len(batchKeys)) + for _, key := range batchKeys { + localContacts[key] = globalClosestContacts[key] + } + closestMu.RUnlock() - foundCount, newClosestContacts, batchErr := s.iterateBatchGetValues( - ctx, knownNodes, batchKeys, batchHexKeys, fetchMap, resMap, required, foundLocalCount+atomic.LoadInt32(networkFound), - ) - if batchErr != nil { - logtrace.Error(ctx, "Iterate batch get values failed", logtrace.Fields{ - logtrace.FieldModule: "dht", "txid": txID, logtrace.FieldError: batchErr.Error(), - }) + for idx, key := range batchKeys { + nl := localContacts[key] + if nl == nil { + continue } - - atomic.AddInt32(networkFound, int32(foundCount)) - if atomic.LoadInt32(networkFound)+int32(foundLocalCount) >= int32(required) { - cancel() - break + for _, node := range nl.Nodes { + nodeID := string(node.ID) + fetchMap[nodeID] = append(fetchMap[nodeID], idx) } + } - changed := false - for key, nodesList := range newClosestContacts { - if nodesList == nil || nodesList.Nodes == nil { - continue - } - - closestMu.RLock() - curr := globalClosestContacts[key] - closestMu.RUnlock() - if curr == nil || curr.Nodes == nil { - logtrace.Warn(ctx, "Global contacts missing key during merge", logtrace.Fields{"key": key}) - continue - } - - if !haveAllNodes(nodesList.Nodes, curr.Nodes) { - changed = true - } - - nodesList.AddNodes(curr.Nodes) - nodesList.Sort() - nodesList.TopN(Alpha) - - s.addKnownNodesSafe(ctx, nodesList.Nodes, knownNodes, knownMu) - - closestMu.Lock() - globalClosestContacts[key] = nodesList - closestMu.Unlock() - } + foundCount, batchErr := s.iterateBatchGetValues( + ctx, knownNodes, batchHexKeys, fetchMap, resMap, required, foundLocalCount+atomic.LoadInt32(networkFound), + ) + if batchErr != nil { + logtrace.Error(ctx, "Iterate batch get values failed", logtrace.Fields{ + logtrace.FieldModule: "dht", "txid": txID, logtrace.FieldError: batchErr.Error(), + }) + } - if !changed { - break - } + atomic.AddInt32(networkFound, int32(foundCount)) + if atomic.LoadInt32(networkFound)+int32(foundLocalCount) >= int32(required) { + cancel() } } -func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, keys []string, hexKeys []string, fetchMap map[string][]int, - resMap *sync.Map, req, alreadyFound int32) (int, map[string]*NodeList, error) { - semaphore := make(chan struct{}, storeSameSymbolsBatchConcurrency) // Limit concurrency to 1 - closestContacts := make(map[string]*NodeList) +func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, hexKeys []string, fetchMap map[string][]int, + resMap *sync.Map, req, alreadyFound int32) (int, error) { + sem := semaphore.NewWeighted(int64(storeSameSymbolsBatchConcurrency)) var wg sync.WaitGroup - contactsMap := make(map[string]map[string][]*Node) var firstErr error var mu sync.Mutex // To protect the firstErr foundCount := int32(0) gctx, cancel := context.WithCancel(ctx) // Create a cancellable context defer cancel() - for nodeID, node := range nodes { + + for nodeID := range fetchMap { + node, ok := nodes[nodeID] + if !ok { + continue + } + + if s.ignorelist.Banned(node) { logtrace.Debug(ctx, "Ignore banned node in iterate batch get values", logtrace.Fields{ logtrace.FieldModule: "dht", @@ -930,18 +934,17 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, continue } - contactsMap[nodeID] = make(map[string][]*Node) wg.Add(1) go func(node *Node, nodeID string) { defer wg.Done() - select { - case <-ctx.Done(): + if err := sem.Acquire(gctx, 1); err != nil { return - case <-gctx.Done(): + } + defer sem.Release(1) + + if atomic.LoadInt32(&foundCount) >= int32(req-alreadyFound) { return - case semaphore <- struct{}{}: - defer func() { <-semaphore }() } indices := fetchMap[nodeID] @@ -985,8 +988,6 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, break } } - } else { - contactsMap[nodeID][k] = v.Closest } } @@ -1009,33 +1010,7 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, }) } - for _, closestNodes := range contactsMap { - for key, nodes := range closestNodes { - comparator, err := hex.DecodeString(key) - if err != nil { - logtrace.Error(ctx, "Failed to decode hex key in closestNodes.Range", logtrace.Fields{ - logtrace.FieldModule: "dht", - "key": key, - logtrace.FieldError: err.Error(), - }) - return 0, nil, err - } - bkey := base58.Encode(comparator) - - if _, ok := closestContacts[bkey]; !ok { - closestContacts[bkey] = &NodeList{Nodes: nodes, Comparator: comparator} - } else { - closestContacts[bkey].AddNodes(nodes) - } - } - } - - for key, nodes := range closestContacts { - nodes.Sort() - nodes.TopN(Alpha) - closestContacts[key] = nodes - } - return int(foundCount), closestContacts, firstErr + return int(foundCount), firstErr } func (s *DHT) doBatchGetValuesCall(ctx context.Context, node *Node, requestKeys map[string]KeyValWithClosest) (map[string]KeyValWithClosest, error) { @@ -1413,17 +1388,25 @@ func (s *DHT) addNode(ctx context.Context, node *Node) *Node { if node != nil { fields["peer"] = node.String() } - logtrace.Debug(ctx, "Rejecting node: peer below minimum version", fields) + logtrace.Info(ctx, "Rejecting node: peer below minimum version", fields) return nil } // Allow localhost for integration testing isIntegrationTest := os.Getenv("INTEGRATION_TEST") == "true" if node.IP == "" || node.IP == "0.0.0.0" || (!isIntegrationTest && node.IP == "127.0.0.1") { - logtrace.Debug(ctx, "Trying to add invalid node", logtrace.Fields{logtrace.FieldModule: "p2p"}) + logtrace.Info(ctx, "Rejecting node: invalid IP", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "ip": node.IP, + "node": node.String(), + "integration_test": isIntegrationTest, + }) return nil } if bytes.Equal(node.ID, s.ht.self.ID) { - logtrace.Debug(ctx, "Trying to add itself", logtrace.Fields{logtrace.FieldModule: "p2p"}) + logtrace.Info(ctx, "Rejecting node: is self", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "node": node.String(), + }) return nil } node.SetHashedID() diff --git a/p2p/kademlia/node.go b/p2p/kademlia/node.go index 0011c8be..8a3e14f1 100644 --- a/p2p/kademlia/node.go +++ b/p2p/kademlia/node.go @@ -183,15 +183,13 @@ func (s *NodeList) DelNode(node *Node) { } func haveAllNodes(a, b []*Node) bool { + bSet := make(map[string]bool, len(b)) + for _, y := range b { + bSet[string(y.HashedID)] = true + } + for _, x := range a { - found := false - for _, y := range b { - if bytes.Equal(x.HashedID, y.HashedID) { - found = true - break - } - } - if !found { + if !bSet[string(x.HashedID)] { return false } } From e3225762e5cfff60272fe062b3ccf3314815a6d4 Mon Sep 17 00:00:00 2001 From: J Bilal rafique Date: Wed, 22 Oct 2025 17:06:53 +0500 Subject: [PATCH 2/2] log fix --- p2p/kademlia/dht.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index acc4193c..bf7a7fd0 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -925,7 +925,6 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node, continue } - if s.ignorelist.Banned(node) { logtrace.Debug(ctx, "Ignore banned node in iterate batch get values", logtrace.Fields{ logtrace.FieldModule: "dht", @@ -1388,7 +1387,7 @@ func (s *DHT) addNode(ctx context.Context, node *Node) *Node { if node != nil { fields["peer"] = node.String() } - logtrace.Info(ctx, "Rejecting node: peer below minimum version", fields) + logtrace.Debug(ctx, "Rejecting node: peer below minimum version", fields) return nil } // Allow localhost for integration testing