diff --git a/core/src/main/java/bisq/core/dao/monitoring/network/RequestStateHashesHandler.java b/core/src/main/java/bisq/core/dao/monitoring/network/RequestStateHashesHandler.java index ea59281d22f..fc7f23d176a 100644 --- a/core/src/main/java/bisq/core/dao/monitoring/network/RequestStateHashesHandler.java +++ b/core/src/main/java/bisq/core/dao/monitoring/network/RequestStateHashesHandler.java @@ -121,7 +121,7 @@ public void requestStateHashes(int fromHeight) { TIMEOUT); } - log.info("We send to peer {} a {}.", nodeAddress, getStateHashesRequest); + log.debug("We send to peer {} a {}.", nodeAddress, getStateHashesRequest); networkNode.addMessageListener(this); SettableFuture future = networkNode.sendMessage(nodeAddress, getStateHashesRequest); Futures.addCallback(future, new FutureCallback<>() { diff --git a/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java b/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java index 25e85eb3159..f19412a3e1e 100644 --- a/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java +++ b/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java @@ -19,6 +19,7 @@ import bisq.core.dao.node.messages.GetBlocksResponse; import bisq.core.dao.node.messages.NewBlockBroadcastMessage; +import bisq.core.dao.state.model.blockchain.BaseTx; import bisq.network.p2p.NodeAddress; import bisq.network.p2p.network.CloseConnectionReason; @@ -26,6 +27,7 @@ import bisq.network.p2p.network.ConnectionListener; import bisq.network.p2p.network.MessageListener; import bisq.network.p2p.network.NetworkNode; +import bisq.network.p2p.peers.Broadcaster; import bisq.network.p2p.peers.PeerManager; import bisq.network.p2p.seed.SeedNodeRepository; @@ -44,6 +46,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; @@ -87,6 +90,7 @@ public interface Listener { private final NetworkNode networkNode; private final PeerManager peerManager; + private final Broadcaster broadcaster; private final Collection seedNodeAddresses; private final List listeners = new CopyOnWriteArrayList<>(); @@ -95,6 +99,7 @@ public interface Listener { private final Map, RequestBlocksHandler> requestBlocksHandlerMap = new HashMap<>(); private Timer retryTimer; private boolean stopped; + private Set receivedBlocks = new HashSet<>(); /////////////////////////////////////////////////////////////////////////////////////////// @@ -104,9 +109,11 @@ public interface Listener { @Inject public LiteNodeNetworkService(NetworkNode networkNode, PeerManager peerManager, + Broadcaster broadcaster, SeedNodeRepository seedNodesRepository) { this.networkNode = networkNode; this.peerManager = peerManager; + this.broadcaster = broadcaster; // seedNodeAddresses can be empty (in case there is only 1 seed node, the seed node starting up has no other seed nodes) this.seedNodeAddresses = new HashSet<>(seedNodesRepository.getSeedNodeAddresses()); } @@ -219,11 +226,23 @@ public void onAwakeFromStandby() { @Override public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { if (networkEnvelope instanceof NewBlockBroadcastMessage) { - log.info("We received blocks from peer {}", connection.getPeersNodeAddressOptional()); - listeners.forEach(listener -> listener.onNewBlockReceived((NewBlockBroadcastMessage) networkEnvelope)); + NewBlockBroadcastMessage newBlockBroadcastMessage = (NewBlockBroadcastMessage) networkEnvelope; + // We combine blockHash and txId list in case we receive blocks with different transactions. + List txIds = newBlockBroadcastMessage.getBlock().getRawTxs().stream().map(BaseTx::getId).collect(Collectors.toList()); + String extBlockId = newBlockBroadcastMessage.getBlock().getHash() + ":" + txIds; + if (!receivedBlocks.contains(extBlockId)) { + log.info("We received a new message from peer {} and broadcast it to our peers. extBlockId={}", + connection.getPeersNodeAddressOptional(), extBlockId); + receivedBlocks.add(extBlockId); + broadcaster.broadcast(newBlockBroadcastMessage, networkNode.getNodeAddress(), null, false); + listeners.forEach(listener -> listener.onNewBlockReceived(newBlockBroadcastMessage)); + } else { + log.debug("We had that message already and do not further broadcast it. extBlockId={}", extBlockId); + } } } + /////////////////////////////////////////////////////////////////////////////////////////// // RequestData ///////////////////////////////////////////////////////////////////////////////////////////