From badc872533b9fe04b89a1349e7358595dbec634a Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Mon, 24 Aug 2020 00:13:14 -0500 Subject: [PATCH 01/20] Refactoring --- .../network/p2p/peers/BroadcastHandler.java | 227 ++++++++++-------- .../bisq/network/p2p/peers/Broadcaster.java | 6 +- 2 files changed, 129 insertions(+), 104 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java index cc8469d8bda..facdba5eef3 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java @@ -45,7 +45,8 @@ @Slf4j public class BroadcastHandler implements PeerManager.Listener { - private static final long TIMEOUT = 60; + // TODO check if that is not too low + private static final long BASE_TIMEOUT_MS = 60; /////////////////////////////////////////////////////////////////////////////////////////// @@ -59,14 +60,12 @@ interface ResultHandler { } public interface Listener { - @SuppressWarnings({"EmptyMethod", "UnusedParameters"}) void onBroadcasted(BroadcastMessage message, int numOfCompletedBroadcasts); void onBroadcastedToFirstPeer(BroadcastMessage message); void onBroadcastCompleted(BroadcastMessage message, int numOfCompletedBroadcasts, int numOfFailedBroadcasts); - @SuppressWarnings({"EmptyMethod", "UnusedParameters"}) void onBroadcastFailed(String errorMessage); } @@ -76,7 +75,7 @@ public interface Listener { /////////////////////////////////////////////////////////////////////////////////////////// private final NetworkNode networkNode; - public final String uid; + private final String uid; private final PeerManager peerManager; private boolean stopped = false; private int numOfCompletedBroadcasts = 0; @@ -93,11 +92,12 @@ public interface Listener { // Constructor /////////////////////////////////////////////////////////////////////////////////////////// - public BroadcastHandler(NetworkNode networkNode, PeerManager peerManager) { + BroadcastHandler(NetworkNode networkNode, PeerManager peerManager) { this.networkNode = networkNode; this.peerManager = peerManager; - peerManager.addListener(this); uid = UUID.randomUUID().toString(); + + peerManager.addListener(this); } public void cancel() { @@ -110,7 +110,9 @@ public void cancel() { // API /////////////////////////////////////////////////////////////////////////////////////////// - public void broadcast(BroadcastMessage message, @Nullable NodeAddress sender, ResultHandler resultHandler, + public void broadcast(BroadcastMessage message, + @Nullable NodeAddress sender, + ResultHandler resultHandler, @Nullable Listener listener) { this.message = message; this.resultHandler = resultHandler; @@ -118,102 +120,128 @@ public void broadcast(BroadcastMessage message, @Nullable NodeAddress sender, Re Set connectedPeersSet = networkNode.getConfirmedConnections() .stream() - .filter(connection -> !connection.getPeersNodeAddressOptional().get().equals(sender)) + .filter(connection -> connection.getPeersNodeAddressOptional().isPresent() && + !connection.getPeersNodeAddressOptional().get().equals(sender)) // We don't broadcast back to sender .collect(Collectors.toSet()); - if (!connectedPeersSet.isEmpty()) { - numOfCompletedBroadcasts = 0; - List connectedPeersList = new ArrayList<>(connectedPeersSet); - Collections.shuffle(connectedPeersList); - numPeers = connectedPeersList.size(); - int delay = 50; + if (connectedPeersSet.isEmpty()) { + onFault("Message not broadcasted because we have no available peers yet.\n\t" + + "message = " + Utilities.toTruncatedString(message), false); + return; + } - boolean isDataOwner = (sender != null) && sender.equals(networkNode.getNodeAddress()); - if (!isDataOwner) { - // for not data owner (relay nodes) we send to max. 7 nodes and use a longer delay - numPeers = Math.min(7, connectedPeersList.size()); - delay = 100; - } + numOfCompletedBroadcasts = 0; - long timeoutDelay = TIMEOUT + delay * numPeers; - timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions - String errorMessage = "Timeout: Broadcast did not complete after " + timeoutDelay + " sec."; - - log.debug(errorMessage + "\n\t" + - "numOfPeers=" + numPeers + "\n\t" + - "numOfCompletedBroadcasts=" + numOfCompletedBroadcasts + "\n\t" + - "numOfFailedBroadcasts=" + numOfFailedBroadcasts); - onFault(errorMessage, false); - }, timeoutDelay); - - log.debug("Broadcast message to {} peers out of {} total connected peers.", numPeers, connectedPeersSet.size()); - for (int i = 0; i < numPeers; i++) { - if (stopped) - break; // do not continue sending after a timeout or a cancellation - - final long minDelay = (i + 1) * delay; - final long maxDelay = (i + 2) * delay; - final Connection connection = connectedPeersList.get(i); - UserThread.runAfterRandomDelay(() -> sendToPeer(connection, message), minDelay, maxDelay, TimeUnit.MILLISECONDS); - } + List connectedPeersList = new ArrayList<>(connectedPeersSet); + Collections.shuffle(connectedPeersList); + int delay; + + // If we are the owner of the data we broadcast faster and with higher resilience + boolean isDataOwner = sender != null && sender.equals(networkNode.getNodeAddress()); + if (isDataOwner) { + numPeers = connectedPeersList.size(); + delay = 50; } else { - onFault("Message not broadcasted because we have no available peers yet.\n\t" + - "message = " + Utilities.toTruncatedString(message), false); + // Relay nodes do not broadcast to all connected nodes + numPeers = Math.min(7, connectedPeersList.size()); + delay = 100; + } + + long timeoutDelay = BASE_TIMEOUT_MS + delay * numPeers; + timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions + String errorMessage = "Timeout: Broadcast did not complete after " + timeoutDelay + " sec."; + + log.warn(errorMessage + "\n\t" + + "numOfPeers=" + numPeers + "\n\t" + + "numOfCompletedBroadcasts=" + numOfCompletedBroadcasts + "\n\t" + + "numOfFailedBroadcasts=" + numOfFailedBroadcasts); + onFault(errorMessage, true); + }, timeoutDelay); + + log.debug("Broadcast message to {} peers out of {} total connected peers.", numPeers, connectedPeersSet.size()); + for (int i = 0; i < numPeers; i++) { + if (stopped) + break; // Do not continue sending after a timeout or a cancellation + + long minDelay = (i + 1) * delay; + long maxDelay = (i + 2) * delay; + Connection connection = connectedPeersList.get(i); + UserThread.runAfterRandomDelay(() -> sendToPeer(connection, message), minDelay, maxDelay, TimeUnit.MILLISECONDS); } } private void sendToPeer(Connection connection, BroadcastMessage message) { String errorMessage = "Message not broadcasted because we have stopped the handler already.\n\t" + "message = " + Utilities.toTruncatedString(message); - if (!stopped) { - if (!connection.isStopped()) { - if (connection.noCapabilityRequiredOrCapabilityIsSupported(message)) { - NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get(); - SettableFuture future = networkNode.sendMessage(connection, message); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - numOfCompletedBroadcasts++; - if (!stopped) { - if (listener != null) - listener.onBroadcasted(message, numOfCompletedBroadcasts); - - if (listener != null && numOfCompletedBroadcasts == 1) - listener.onBroadcastedToFirstPeer(message); - - if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeers) { - if (listener != null) - listener.onBroadcastCompleted(message, numOfCompletedBroadcasts, numOfFailedBroadcasts); - - cleanup(); - resultHandler.onCompleted(BroadcastHandler.this); - } - } else { - // TODO investigate why that is called very often at seed nodes - onFault("stopped at onSuccess: " + errorMessage, false); - } - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - numOfFailedBroadcasts++; - if (!stopped) { - log.info("Broadcast to " + nodeAddress + " failed.\n\t" + - "ErrorMessage=" + throwable.getMessage()); - if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeers) - onFault("stopped at onFailure: " + errorMessage); - } else { - onFault("stopped at onFailure: " + errorMessage); - } - } - }); + + if (stopped) { + onFault("Handler stopped already: " + errorMessage, false); + return; + } + + if (connection.isStopped()) { + onFault("Connection stopped already.", false); + return; + } + + if (!connection.noCapabilityRequiredOrCapabilityIsSupported(message)) { + onFault("Peer does not support requires capability.", false); + return; + } + + if (!connection.getPeersNodeAddressOptional().isPresent()) { + onFault("Peer node address is not present.", false); + return; + } + + NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get(); + SettableFuture future = networkNode.sendMessage(connection, message); + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(Connection connection) { + numOfCompletedBroadcasts++; + + if (stopped) { + // TODO investigate why that is called very often at seed nodes + onFault("stopped at onSuccess: " + errorMessage, false); + return; + } + + if (listener != null) { + listener.onBroadcasted(message, numOfCompletedBroadcasts); + + if (numOfCompletedBroadcasts == 1) { + listener.onBroadcastedToFirstPeer(message); + } + } + + + if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeers) { + if (listener != null) { + listener.onBroadcastCompleted(message, numOfCompletedBroadcasts, numOfFailedBroadcasts); + } + + cleanup(); + resultHandler.onCompleted(BroadcastHandler.this); } - } else { - onFault("Connection stopped already", false); } - } else { - onFault("stopped at sendToPeer: " + errorMessage, false); - } + + @Override + public void onFailure(@NotNull Throwable throwable) { + numOfFailedBroadcasts++; + + if (stopped) { + onFault("stopped at onFailure: " + errorMessage, true); + return; + } + + log.warn("Broadcast to {} failed. ErrorMessage={}", nodeAddress, throwable.getMessage()); + + if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeers) { + onFault("Last broadcast cause a failure: " + errorMessage, true); + } + } + }); } @@ -248,23 +276,20 @@ private void cleanup() { } } - private void onFault(String errorMessage) { - onFault(errorMessage, true); - } - private void onFault(String errorMessage, boolean logWarning) { cleanup(); - if (logWarning) + if (logWarning) { log.warn(errorMessage); - else - log.debug(errorMessage); + } - if (listener != null) + if (listener != null) { listener.onBroadcastFailed(errorMessage); - if (listener != null && (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeers || stopped)) - listener.onBroadcastCompleted(message, numOfCompletedBroadcasts, numOfFailedBroadcasts); + if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeers || stopped) { + listener.onBroadcastCompleted(message, numOfCompletedBroadcasts, numOfFailedBroadcasts); + } + } resultHandler.onFault(this); } @@ -282,6 +307,6 @@ public boolean equals(Object o) { @Override public int hashCode() { - return uid != null ? uid.hashCode() : 0; + return uid.hashCode(); } } diff --git a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java index 727c488b0e3..9b004583b5d 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java @@ -31,7 +31,6 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { private final NetworkNode networkNode; private final PeerManager peerManager; - private final Set broadcastHandlers = new CopyOnWriteArraySet<>(); @@ -46,7 +45,7 @@ public Broadcaster(NetworkNode networkNode, PeerManager peerManager) { } public void shutDown() { - broadcastHandlers.stream().forEach(BroadcastHandler::cancel); + broadcastHandlers.forEach(BroadcastHandler::cancel); broadcastHandlers.clear(); } @@ -55,7 +54,8 @@ public void shutDown() { // API /////////////////////////////////////////////////////////////////////////////////////////// - public void broadcast(BroadcastMessage message, @Nullable NodeAddress sender, + public void broadcast(BroadcastMessage message, + @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener) { BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager); broadcastHandler.broadcast(message, sender, this, listener); From c4932829fe82464aa35a2eb9d0ddbf0157c7647d Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Mon, 24 Aug 2020 00:37:14 -0500 Subject: [PATCH 02/20] Refactoring --- .../core/dao/monitoring/network/StateNetworkService.java | 2 +- .../dao/node/full/network/FullNodeNetworkService.java | 2 +- .../dao/node/lite/network/LiteNodeNetworkService.java | 2 +- p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java | 5 +++++ .../java/bisq/network/p2p/storage/P2PDataStorage.java | 8 ++++---- 5 files changed, 12 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java b/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java index 69d6c7ea639..8300c76b235 100644 --- a/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java +++ b/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java @@ -156,7 +156,7 @@ public void requestHashesFromAllConnectedSeedNodes(int fromHeight) { public void broadcastMyStateHash(StH myStateHash) { NewStateHashMessage newStateHashMessage = getNewStateHashMessage(myStateHash); - broadcaster.broadcast(newStateHashMessage, networkNode.getNodeAddress(), null); + broadcaster.broadcast(newStateHashMessage, networkNode.getNodeAddress()); } public void requestHashes(int fromHeight, String peersAddress) { diff --git a/core/src/main/java/bisq/core/dao/node/full/network/FullNodeNetworkService.java b/core/src/main/java/bisq/core/dao/node/full/network/FullNodeNetworkService.java index bff48afb1ed..640ee94b155 100644 --- a/core/src/main/java/bisq/core/dao/node/full/network/FullNodeNetworkService.java +++ b/core/src/main/java/bisq/core/dao/node/full/network/FullNodeNetworkService.java @@ -105,7 +105,7 @@ public void publishNewBlock(Block block) { log.info("Publish new block at height={} and block hash={}", block.getHeight(), block.getHash()); RawBlock rawBlock = RawBlock.fromBlock(block); NewBlockBroadcastMessage newBlockBroadcastMessage = new NewBlockBroadcastMessage(rawBlock); - broadcaster.broadcast(newBlockBroadcastMessage, networkNode.getNodeAddress(), null); + broadcaster.broadcast(newBlockBroadcastMessage, networkNode.getNodeAddress()); } diff --git a/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java b/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java index 7fbab2f6b58..3baaa2f5b52 100644 --- a/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java +++ b/core/src/main/java/bisq/core/dao/node/lite/network/LiteNodeNetworkService.java @@ -238,7 +238,7 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { log.debug("We received a new message from peer {} and broadcast it to our peers. extBlockId={}", connection.getPeersNodeAddressOptional().orElse(null), extBlockId); receivedBlocks.add(extBlockId); - broadcaster.broadcast(newBlockBroadcastMessage, connection.getPeersNodeAddressOptional().orElse(null), null); + broadcaster.broadcast(newBlockBroadcastMessage, connection.getPeersNodeAddressOptional().orElse(null)); listeners.forEach(listener -> listener.onNewBlockReceived(newBlockBroadcastMessage)); } else { log.debug("We had that message already and do not further broadcast it. extBlockId={}", extBlockId); diff --git a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java index 9b004583b5d..919d64543b4 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java @@ -54,6 +54,11 @@ public void shutDown() { // API /////////////////////////////////////////////////////////////////////////////////////////// + public void broadcast(BroadcastMessage message, + @Nullable NodeAddress sender) { + broadcast(message, sender, null); + } + public void broadcast(BroadcastMessage message, @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener) { diff --git a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java index 34e29f3f5fa..2283547a1f7 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java @@ -529,7 +529,7 @@ private boolean addPersistableNetworkPayload(PersistableNetworkPayload payload, // Broadcast the payload if requested by caller if (allowBroadcast) - broadcaster.broadcast(new AddPersistableNetworkPayloadMessage(payload), sender, null); + broadcaster.broadcast(new AddPersistableNetworkPayloadMessage(payload), sender); return true; } @@ -675,7 +675,7 @@ public boolean refreshTTL(RefreshOfferMessage refreshTTLMessage, sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 1000); // Always broadcast refreshes - broadcaster.broadcast(refreshTTLMessage, sender, null); + broadcaster.broadcast(refreshTTLMessage, sender); return true; } @@ -725,9 +725,9 @@ public boolean remove(ProtectedStorageEntry protectedStorageEntry, printData("after remove"); if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry) { - broadcaster.broadcast(new RemoveMailboxDataMessage((ProtectedMailboxStorageEntry) protectedStorageEntry), sender, null); + broadcaster.broadcast(new RemoveMailboxDataMessage((ProtectedMailboxStorageEntry) protectedStorageEntry), sender); } else { - broadcaster.broadcast(new RemoveDataMessage(protectedStorageEntry), sender, null); + broadcaster.broadcast(new RemoveDataMessage(protectedStorageEntry), sender); } return true; From 5433707fa254bc7aff8797a5660d923389bf4bf1 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Tue, 25 Aug 2020 21:30:33 -0500 Subject: [PATCH 03/20] Bundle broadcast requests --- .../bisq/network/p2p/BundleOfEnvelopes.java | 8 +- .../java/bisq/network/p2p/P2PService.java | 56 ++- .../network/p2p/peers/BroadcastHandler.java | 323 +++++++++--------- .../bisq/network/p2p/peers/Broadcaster.java | 62 +++- 4 files changed, 247 insertions(+), 202 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/BundleOfEnvelopes.java b/p2p/src/main/java/bisq/network/p2p/BundleOfEnvelopes.java index d4ff8f46a83..c57e0d530eb 100644 --- a/p2p/src/main/java/bisq/network/p2p/BundleOfEnvelopes.java +++ b/p2p/src/main/java/bisq/network/p2p/BundleOfEnvelopes.java @@ -44,6 +44,10 @@ public BundleOfEnvelopes() { this(new ArrayList<>(), Version.getP2PMessageVersion()); } + public BundleOfEnvelopes(List envelopes) { + this(envelopes, Version.getP2PMessageVersion()); + } + public void add(NetworkEnvelope networkEnvelope) { envelopes.add(networkEnvelope); } @@ -67,7 +71,9 @@ public protobuf.NetworkEnvelope toProtoNetworkEnvelope() { .build(); } - public static BundleOfEnvelopes fromProto(protobuf.BundleOfEnvelopes proto, NetworkProtoResolver resolver, int messageVersion) { + public static BundleOfEnvelopes fromProto(protobuf.BundleOfEnvelopes proto, + NetworkProtoResolver resolver, + int messageVersion) { List envelopes = proto.getEnvelopesList() .stream() .map(envelope -> { diff --git a/p2p/src/main/java/bisq/network/p2p/P2PService.java b/p2p/src/main/java/bisq/network/p2p/P2PService.java index 5caab46ec40..563c3905b79 100644 --- a/p2p/src/main/java/bisq/network/p2p/P2PService.java +++ b/p2p/src/main/java/bisq/network/p2p/P2PService.java @@ -37,7 +37,6 @@ import bisq.network.p2p.storage.HashMapChangedListener; import bisq.network.p2p.storage.P2PDataStorage; import bisq.network.p2p.storage.messages.AddDataMessage; -import bisq.network.p2p.storage.messages.BroadcastMessage; import bisq.network.p2p.storage.messages.RefreshOfferMessage; import bisq.network.p2p.storage.payload.CapabilityRequiringPayload; import bisq.network.p2p.storage.payload.MailboxStoragePayload; @@ -53,7 +52,6 @@ import bisq.common.proto.ProtobufferException; import bisq.common.proto.network.NetworkEnvelope; import bisq.common.proto.persistable.PersistedDataHost; -import bisq.common.util.Utilities; import com.google.inject.Inject; @@ -77,6 +75,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -446,7 +445,8 @@ public void onAdded(Collection protectedStorageEntries) { } @Override - public void onRemoved(Collection protectedStorageEntries) { } + public void onRemoved(Collection protectedStorageEntries) { + } /////////////////////////////////////////////////////////////////////////////////////////// // DirectMessages @@ -463,7 +463,9 @@ public void sendEncryptedDirectMessage(NodeAddress peerNodeAddress, PubKeyRing p } } - private void doSendEncryptedDirectMessage(@NotNull NodeAddress peersNodeAddress, PubKeyRing pubKeyRing, NetworkEnvelope message, + private void doSendEncryptedDirectMessage(@NotNull NodeAddress peersNodeAddress, + PubKeyRing pubKeyRing, + NetworkEnvelope message, SendDirectMessageListener sendDirectMessageListener) { log.debug("Send encrypted direct message {} to peer {}", message.getClass().getSimpleName(), peersNodeAddress); @@ -667,41 +669,21 @@ private void addMailboxData(MailboxStoragePayload expirableMailboxStoragePayload BroadcastHandler.Listener listener = new BroadcastHandler.Listener() { @Override - public void onBroadcasted(BroadcastMessage message, int numOfCompletedBroadcasts) { - } - - @Override - public void onBroadcastedToFirstPeer(BroadcastMessage message) { - // The reason for that check was to separate different callback for different send calls. - // We only want to notify our sendMailboxMessageListener for the calls he is interested in. - if (message instanceof AddDataMessage && - ((AddDataMessage) message).getProtectedStorageEntry().equals(protectedMailboxStorageEntry)) { - // We delay a bit to give more time for sufficient propagation in the P2P network. - // This should help to avoid situations where a user closes the app too early and the msg - // does not arrive. - // We could use onBroadcastCompleted instead but it might take too long if one peer - // is very badly connected. - // TODO We could check for a certain threshold of no. of incoming network_messages of the same msg - // to see how well it is propagated. BitcoinJ uses such an approach for tx propagation. - UserThread.runAfter(() -> { - log.info("Broadcasted to first peer (3 sec. ago): Message = {}", Utilities.toTruncatedString(message)); - sendMailboxMessageListener.onStoredInMailbox(); - }, 3); - } + public void onSufficientlyBroadcast(List broadcastRequests) { + broadcastRequests.stream() + .filter(broadcastRequest -> broadcastRequest.getMessage() instanceof AddDataMessage) + .filter(broadcastRequest -> { + AddDataMessage addDataMessage = (AddDataMessage) broadcastRequest.getMessage(); + return addDataMessage.getProtectedStorageEntry().equals(protectedMailboxStorageEntry); + }) + .forEach(e -> sendMailboxMessageListener.onStoredInMailbox()); } @Override - public void onBroadcastCompleted(BroadcastMessage message, int numOfCompletedBroadcasts, int numOfFailedBroadcasts) { - log.info("Broadcast completed: Sent to {} peers (failed: {}). Message = {}", - numOfCompletedBroadcasts, numOfFailedBroadcasts, Utilities.toTruncatedString(message)); - if (numOfCompletedBroadcasts == 0) - sendMailboxMessageListener.onFault("Broadcast completed without any successful broadcast"); - } - - @Override - public void onBroadcastFailed(String errorMessage) { - // TODO investigate why not sending sendMailboxMessageListener.onFault. Related probably - // to the logic from BroadcastHandler.sendToPeer + public void onNotSufficientlyBroadcast(int numOfCompletedBroadcasts, int numOfFailedBroadcast) { + sendMailboxMessageListener.onFault("Message was not sufficiently broadcast.\n" + + "numOfCompletedBroadcasts: " + numOfCompletedBroadcasts + ".\n" + + "numOfFailedBroadcast=" + numOfFailedBroadcast); } }; boolean result = p2PDataStorage.addProtectedStorageEntry(protectedMailboxStorageEntry, networkNode.getNodeAddress(), listener); @@ -714,7 +696,7 @@ public void onBroadcastFailed(String errorMessage) { log.error("Unexpected state: adding mailbox message that already exists."); } } catch (CryptoException e) { - log.error("Signing at getDataWithSignedSeqNr failed. That should never happen."); + log.error("Signing at getMailboxDataWithSignedSeqNr failed."); } } else { sendMailboxMessageListener.onFault("There are no P2P network nodes connected. " + diff --git a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java index facdba5eef3..530cdbaaa64 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java @@ -17,14 +17,13 @@ package bisq.network.p2p.peers; +import bisq.network.p2p.BundleOfEnvelopes; import bisq.network.p2p.NodeAddress; import bisq.network.p2p.network.Connection; import bisq.network.p2p.network.NetworkNode; -import bisq.network.p2p.storage.messages.BroadcastMessage; import bisq.common.Timer; import bisq.common.UserThread; -import bisq.common.util.Utilities; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -33,7 +32,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -41,12 +39,10 @@ import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; @Slf4j public class BroadcastHandler implements PeerManager.Listener { - // TODO check if that is not too low - private static final long BASE_TIMEOUT_MS = 60; + private static final long BASE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(60); /////////////////////////////////////////////////////////////////////////////////////////// @@ -55,18 +51,12 @@ public class BroadcastHandler implements PeerManager.Listener { interface ResultHandler { void onCompleted(BroadcastHandler broadcastHandler); - - void onFault(BroadcastHandler broadcastHandler); } public interface Listener { - void onBroadcasted(BroadcastMessage message, int numOfCompletedBroadcasts); - - void onBroadcastedToFirstPeer(BroadcastMessage message); + void onSufficientlyBroadcast(List broadcastRequests); - void onBroadcastCompleted(BroadcastMessage message, int numOfCompletedBroadcasts, int numOfFailedBroadcasts); - - void onBroadcastFailed(String errorMessage); + void onNotSufficientlyBroadcast(int numOfCompletedBroadcasts, int numOfFailedBroadcast); } @@ -75,16 +65,12 @@ public interface Listener { /////////////////////////////////////////////////////////////////////////////////////////// private final NetworkNode networkNode; - private final String uid; private final PeerManager peerManager; - private boolean stopped = false; - private int numOfCompletedBroadcasts = 0; - private int numOfFailedBroadcasts = 0; - private BroadcastMessage message; - private ResultHandler resultHandler; - @Nullable - private Listener listener; - private int numPeers; + private final ResultHandler resultHandler; + private final String uid; + + private boolean stopped, timeoutTriggered; + private int numOfCompletedBroadcasts, numOfFailedBroadcasts, numPeers; private Timer timeoutTimer; @@ -92,156 +78,80 @@ public interface Listener { // Constructor /////////////////////////////////////////////////////////////////////////////////////////// - BroadcastHandler(NetworkNode networkNode, PeerManager peerManager) { + BroadcastHandler(NetworkNode networkNode, PeerManager peerManager, ResultHandler resultHandler) { this.networkNode = networkNode; this.peerManager = peerManager; + this.resultHandler = resultHandler; uid = UUID.randomUUID().toString(); peerManager.addListener(this); } - public void cancel() { - stopped = true; - onFault("Broadcast canceled.", false); - } - /////////////////////////////////////////////////////////////////////////////////////////// // API /////////////////////////////////////////////////////////////////////////////////////////// - public void broadcast(BroadcastMessage message, - @Nullable NodeAddress sender, - ResultHandler resultHandler, - @Nullable Listener listener) { - this.message = message; - this.resultHandler = resultHandler; - this.listener = listener; - - Set connectedPeersSet = networkNode.getConfirmedConnections() - .stream() - .filter(connection -> connection.getPeersNodeAddressOptional().isPresent() && - !connection.getPeersNodeAddressOptional().get().equals(sender)) // We don't broadcast back to sender - .collect(Collectors.toSet()); - - if (connectedPeersSet.isEmpty()) { - onFault("Message not broadcasted because we have no available peers yet.\n\t" + - "message = " + Utilities.toTruncatedString(message), false); - return; - } - - numOfCompletedBroadcasts = 0; + public void broadcast(List broadcastRequests) { + List confirmedConnections = new ArrayList<>(networkNode.getConfirmedConnections()); + Collections.shuffle(confirmedConnections); - List connectedPeersList = new ArrayList<>(connectedPeersSet); - Collections.shuffle(connectedPeersList); int delay; - - // If we are the owner of the data we broadcast faster and with higher resilience - boolean isDataOwner = sender != null && sender.equals(networkNode.getNodeAddress()); - if (isDataOwner) { - numPeers = connectedPeersList.size(); + if (requestsContainOwnMessage(broadcastRequests)) { + // The broadcastRequests contains at least 1 message we have originated, so we send to all peers and + // with shorter delay + numPeers = confirmedConnections.size(); delay = 50; } else { - // Relay nodes do not broadcast to all connected nodes - numPeers = Math.min(7, connectedPeersList.size()); + // Relay nodes only send to max 7 peers and with longer delay + numPeers = Math.min(7, confirmedConnections.size()); delay = 100; } - long timeoutDelay = BASE_TIMEOUT_MS + delay * numPeers; - timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions - String errorMessage = "Timeout: Broadcast did not complete after " + timeoutDelay + " sec."; - - log.warn(errorMessage + "\n\t" + - "numOfPeers=" + numPeers + "\n\t" + - "numOfCompletedBroadcasts=" + numOfCompletedBroadcasts + "\n\t" + - "numOfFailedBroadcasts=" + numOfFailedBroadcasts); - onFault(errorMessage, true); - }, timeoutDelay); - - log.debug("Broadcast message to {} peers out of {} total connected peers.", numPeers, connectedPeersSet.size()); - for (int i = 0; i < numPeers; i++) { - if (stopped) - break; // Do not continue sending after a timeout or a cancellation - + setupTimeoutHandler(broadcastRequests, delay); // todo missing 1 delay if we start with 1 + int iterations = numPeers; + for (int i = 0; i < iterations; i++) { long minDelay = (i + 1) * delay; long maxDelay = (i + 2) * delay; - Connection connection = connectedPeersList.get(i); - UserThread.runAfterRandomDelay(() -> sendToPeer(connection, message), minDelay, maxDelay, TimeUnit.MILLISECONDS); - } - } - - private void sendToPeer(Connection connection, BroadcastMessage message) { - String errorMessage = "Message not broadcasted because we have stopped the handler already.\n\t" + - "message = " + Utilities.toTruncatedString(message); - - if (stopped) { - onFault("Handler stopped already: " + errorMessage, false); - return; - } - - if (connection.isStopped()) { - onFault("Connection stopped already.", false); - return; - } - - if (!connection.noCapabilityRequiredOrCapabilityIsSupported(message)) { - onFault("Peer does not support requires capability.", false); - return; - } - - if (!connection.getPeersNodeAddressOptional().isPresent()) { - onFault("Peer node address is not present.", false); - return; - } - - NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get(); - SettableFuture future = networkNode.sendMessage(connection, message); - Futures.addCallback(future, new FutureCallback<>() { - @Override - public void onSuccess(Connection connection) { - numOfCompletedBroadcasts++; - + Connection connection = confirmedConnections.get(i); + UserThread.runAfterRandomDelay(() -> { if (stopped) { - // TODO investigate why that is called very often at seed nodes - onFault("stopped at onSuccess: " + errorMessage, false); return; } - if (listener != null) { - listener.onBroadcasted(message, numOfCompletedBroadcasts); + // We use broadcastRequests which have excluded the requests for messages the connection has + // originated to avoid sending back the message we received. We also remove messages not satisfying + // capability checks. + List broadcastRequestsForConnection = getBroadcastRequestsForConnection(connection, broadcastRequests); - if (numOfCompletedBroadcasts == 1) { - listener.onBroadcastedToFirstPeer(message); + // Could be empty list... + if (broadcastRequestsForConnection.isEmpty()) { + // We decrease numPeers in that case for making completion checks correct. + if (numPeers > 0) { + numPeers--; } + checkForCompletion(); + return; } - - if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeers) { - if (listener != null) { - listener.onBroadcastCompleted(message, numOfCompletedBroadcasts, numOfFailedBroadcasts); + if (connection.isStopped()) { + // Connection has died in the meantime. We skip it. + // We decrease numPeers in that case for making completion checks correct. + if (numPeers > 0) { + numPeers--; } - - cleanup(); - resultHandler.onCompleted(BroadcastHandler.this); - } - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - numOfFailedBroadcasts++; - - if (stopped) { - onFault("stopped at onFailure: " + errorMessage, true); + checkForCompletion(); return; } - log.warn("Broadcast to {} failed. ErrorMessage={}", nodeAddress, throwable.getMessage()); + sendToPeer(connection, broadcastRequestsForConnection); + }, minDelay, maxDelay, TimeUnit.MILLISECONDS); + } + } - if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeers) { - onFault("Last broadcast cause a failure: " + errorMessage, true); - } - } - }); + public void cancel() { + stopped = true; + cleanup(); } @@ -251,7 +161,7 @@ public void onFailure(@NotNull Throwable throwable) { @Override public void onAllConnectionsLost() { - onFault("All connections lost", false); + cleanup(); } @Override @@ -267,33 +177,132 @@ public void onAwakeFromStandby() { // Private /////////////////////////////////////////////////////////////////////////////////////////// - private void cleanup() { - stopped = true; - peerManager.removeListener(this); - if (timeoutTimer != null) { - timeoutTimer.stop(); - timeoutTimer = null; - } + // Check if we have at least one message originated by ourselves + private boolean requestsContainOwnMessage(List broadcastRequests) { + NodeAddress myAddress = networkNode.getNodeAddress(); + if (myAddress == null) + return false; + + return broadcastRequests.stream().anyMatch(e -> myAddress.equals(e.getSender())); } - private void onFault(String errorMessage, boolean logWarning) { - cleanup(); + private void setupTimeoutHandler(List broadcastRequests, int delay) { + long timeoutDelay = BASE_TIMEOUT_MS + delay * (numPeers + 1); // We added 1 in the loop + timeoutTimer = UserThread.runAfter(() -> { + if (stopped) { + return; + } + + timeoutTriggered = true; + + String errorMessage = "Timeout: Broadcast did not complete after " + timeoutDelay + " sec." + "\n" + + "numOfPeers=" + numPeers + "\n" + + "numOfCompletedBroadcasts=" + numOfCompletedBroadcasts + "\n" + + "numOfFailedBroadcasts=" + numOfFailedBroadcasts; - if (logWarning) { log.warn(errorMessage); - } - if (listener != null) { - listener.onBroadcastFailed(errorMessage); + maybeNotifyListeners(broadcastRequests); + + cleanup(); + + }, timeoutDelay); + } + + // We exclude the requests containing a message we received from that connection + // Also we filter out messages which requires a capability but peer does not support it. + private List getBroadcastRequestsForConnection(Connection connection, + List broadcastRequests) { + return broadcastRequests.stream() + .filter(broadcastRequest -> !connection.getPeersNodeAddressOptional().isPresent() || + !connection.getPeersNodeAddressOptional().get().equals(broadcastRequest.getSender())) + .filter(broadcastRequest -> connection.noCapabilityRequiredOrCapabilityIsSupported(broadcastRequest.getMessage())) + .collect(Collectors.toList()); + } + + private BundleOfEnvelopes getBundle(List broadcastRequests) { + return new BundleOfEnvelopes(broadcastRequests.stream() + .map(Broadcaster.BroadcastRequest::getMessage) + .collect(Collectors.toList())); + } + + private void sendToPeer(Connection connection, List broadcastRequestsForConnection) { + BundleOfEnvelopes bundleForConnection = getBundle(broadcastRequestsForConnection); + SettableFuture future = networkNode.sendMessage(connection, bundleForConnection); + + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(Connection connection) { + numOfCompletedBroadcasts++; + + if (stopped) { + return; + } - if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeers || stopped) { - listener.onBroadcastCompleted(message, numOfCompletedBroadcasts, numOfFailedBroadcasts); + maybeNotifyListeners(broadcastRequestsForConnection); + + checkForCompletion(); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.warn("Broadcast to {} failed. ErrorMessage={}", connection.getPeersNodeAddressOptional(), + throwable.getMessage()); + + numOfFailedBroadcasts++; + + if (stopped) { + return; + } + + maybeNotifyListeners(broadcastRequestsForConnection); + + checkForCompletion(); + } + }); + } + + private void maybeNotifyListeners(List broadcastRequests) { + // We use equal checks to avoid duplicated listener calls as it would be the case with >= checks. + if (numOfCompletedBroadcasts == 3) { + // We have heard back from 3 peers so we consider the message was sufficiently broadcast. + broadcastRequests.stream() + .filter(broadcastRequest -> broadcastRequest.getListener() != null) + .map(Broadcaster.BroadcastRequest::getListener) + .forEach(listener -> listener.onSufficientlyBroadcast(broadcastRequests)); + } else { + int maxPossibleSuccessCases = numPeers - numOfFailedBroadcasts; + if (maxPossibleSuccessCases == 2) { + // We never can reach required resilience as too many numOfFailedBroadcasts occurred. + broadcastRequests.stream() + .filter(broadcastRequest -> broadcastRequest.getListener() != null) + .map(Broadcaster.BroadcastRequest::getListener) + .forEach(listener -> listener.onNotSufficientlyBroadcast(numOfCompletedBroadcasts, numOfFailedBroadcasts)); + } else if (timeoutTriggered && numOfCompletedBroadcasts < 3) { + // We did not reach resilience level and timeout prevents to reach it later + broadcastRequests.stream() + .filter(broadcastRequest -> broadcastRequest.getListener() != null) + .map(Broadcaster.BroadcastRequest::getListener) + .forEach(listener -> listener.onNotSufficientlyBroadcast(numOfCompletedBroadcasts, numOfFailedBroadcasts)); } } + } - resultHandler.onFault(this); + private void checkForCompletion() { + if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeers) { + cleanup(); + } } + private void cleanup() { + stopped = true; + if (timeoutTimer != null) { + timeoutTimer.stop(); + timeoutTimer = null; + } + peerManager.removeListener(this); + resultHandler.onCompleted(this); + } @Override public boolean equals(Object o) { @@ -302,7 +311,7 @@ public boolean equals(Object o) { BroadcastHandler that = (BroadcastHandler) o; - return !(uid != null ? !uid.equals(that.uid) : that.uid != null); + return uid.equals(that.uid); } @Override diff --git a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java index 919d64543b4..6640c5f9900 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java @@ -21,17 +21,31 @@ import bisq.network.p2p.network.NetworkNode; import bisq.network.p2p.storage.messages.BroadcastMessage; +import bisq.common.Timer; +import bisq.common.UserThread; + import javax.inject.Inject; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.TimeUnit; + +import lombok.Value; +import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.Nullable; +@Slf4j public class Broadcaster implements BroadcastHandler.ResultHandler { + private static final long BROADCAST_INTERVAL_MS = 1000; + private final NetworkNode networkNode; private final PeerManager peerManager; private final Set broadcastHandlers = new CopyOnWriteArraySet<>(); + private final List broadcastRequests = new ArrayList<>(); + private Timer timer; /////////////////////////////////////////////////////////////////////////////////////////// @@ -46,7 +60,10 @@ public Broadcaster(NetworkNode networkNode, PeerManager peerManager) { public void shutDown() { broadcastHandlers.forEach(BroadcastHandler::cancel); - broadcastHandlers.clear(); + + if (timer != null) { + timer.stop(); + } } @@ -59,12 +76,26 @@ public void broadcast(BroadcastMessage message, broadcast(message, sender, null); } + public void broadcast(BroadcastMessage message, @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener) { - BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager); - broadcastHandler.broadcast(message, sender, this, listener); - broadcastHandlers.add(broadcastHandler); + broadcastRequests.add(new BroadcastRequest(message, sender, listener)); + + if (timer == null) { + timer = UserThread.runAfter(this::maybeBroadcastBundle, BROADCAST_INTERVAL_MS, TimeUnit.MILLISECONDS); + } + } + + private void maybeBroadcastBundle() { + if (!broadcastRequests.isEmpty()) { + BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager, this); + broadcastHandler.broadcast(new ArrayList<>(broadcastRequests)); + broadcastHandlers.add(broadcastHandler); + broadcastRequests.clear(); + + timer = null; + } } @@ -77,8 +108,25 @@ public void onCompleted(BroadcastHandler broadcastHandler) { broadcastHandlers.remove(broadcastHandler); } - @Override - public void onFault(BroadcastHandler broadcastHandler) { - broadcastHandlers.remove(broadcastHandler); + + /////////////////////////////////////////////////////////////////////////////////////////// + // BroadcastRequest class + /////////////////////////////////////////////////////////////////////////////////////////// + + @Value + public static class BroadcastRequest { + private BroadcastMessage message; + @Nullable + private NodeAddress sender; + @Nullable + private BroadcastHandler.Listener listener; + + private BroadcastRequest(BroadcastMessage message, + @Nullable NodeAddress sender, + @Nullable BroadcastHandler.Listener listener) { + this.message = message; + this.sender = sender; + this.listener = listener; + } } } From 06f407fda3a1e6e4808df521a4ae8e339d778903 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Tue, 25 Aug 2020 21:31:06 -0500 Subject: [PATCH 04/20] Add todo and curly brackets --- .../main/java/bisq/network/p2p/network/Connection.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/network/Connection.java b/p2p/src/main/java/bisq/network/p2p/network/Connection.java index 3f315b77cbc..050978d9691 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/Connection.java +++ b/p2p/src/main/java/bisq/network/p2p/network/Connection.java @@ -225,7 +225,7 @@ public Capabilities getCapabilities() { // Called from various threads public void sendMessage(NetworkEnvelope networkEnvelope) { - log.debug(">> Send networkEnvelope of type: " + networkEnvelope.getClass().getSimpleName()); + log.debug(">> Send networkEnvelope of type: {}", networkEnvelope.getClass().getSimpleName()); if (!stopped) { if (noCapabilityRequiredOrCapabilityIsSupported(networkEnvelope)) { @@ -319,6 +319,8 @@ public void sendMessage(NetworkEnvelope networkEnvelope) { } } + // TODO: If msg is BundleOfEnvelopes we should check each individual message for capability and filter out those + // which fail. public boolean noCapabilityRequiredOrCapabilityIsSupported(Proto msg) { boolean result; if (msg instanceof AddDataMessage) { @@ -408,12 +410,13 @@ private boolean violatesThrottleLimit(long now, int seconds, int messageCountLim public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { checkArgument(connection.equals(this)); - if (networkEnvelope instanceof BundleOfEnvelopes) + if (networkEnvelope instanceof BundleOfEnvelopes) { for (NetworkEnvelope current : ((BundleOfEnvelopes) networkEnvelope).getEnvelopes()) { UserThread.execute(() -> messageListeners.forEach(e -> e.onMessage(current, connection))); } - else + } else { UserThread.execute(() -> messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection))); + } } From b8152d68aeaf417783c8da6f8678435c2dc352b3 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Tue, 25 Aug 2020 22:42:48 -0500 Subject: [PATCH 05/20] Change BROADCAST_INTERVAL_MS to 2 sec. --- p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java index 6640c5f9900..5e2851a8422 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java @@ -39,7 +39,7 @@ @Slf4j public class Broadcaster implements BroadcastHandler.ResultHandler { - private static final long BROADCAST_INTERVAL_MS = 1000; + private static final long BROADCAST_INTERVAL_MS = 2000; private final NetworkNode networkNode; private final PeerManager peerManager; From 04b6c2abea0c738a96d58a35cd10a84fdc445b0e Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Tue, 25 Aug 2020 22:43:24 -0500 Subject: [PATCH 06/20] Don't wrap into BundleOfEnvelopes is only 1 message is used --- .../network/p2p/peers/BroadcastHandler.java | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java index 530cdbaaa64..d15a869e382 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java @@ -24,6 +24,7 @@ import bisq.common.Timer; import bisq.common.UserThread; +import bisq.common.proto.network.NetworkEnvelope; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -220,15 +221,9 @@ private List getBroadcastRequestsForConnection(Con .collect(Collectors.toList()); } - private BundleOfEnvelopes getBundle(List broadcastRequests) { - return new BundleOfEnvelopes(broadcastRequests.stream() - .map(Broadcaster.BroadcastRequest::getMessage) - .collect(Collectors.toList())); - } - private void sendToPeer(Connection connection, List broadcastRequestsForConnection) { - BundleOfEnvelopes bundleForConnection = getBundle(broadcastRequestsForConnection); - SettableFuture future = networkNode.sendMessage(connection, bundleForConnection); + NetworkEnvelope networkEnvelope = getNetworkEnvelope(broadcastRequestsForConnection); + SettableFuture future = networkNode.sendMessage(connection, networkEnvelope); Futures.addCallback(future, new FutureCallback<>() { @Override @@ -262,6 +257,17 @@ public void onFailure(@NotNull Throwable throwable) { }); } + private NetworkEnvelope getNetworkEnvelope(List broadcastRequests) { + if (broadcastRequests.size() == 1) { + // If we only have 1 message we avoid the overhead of the BundleOfEnvelopes and send the message directly + return broadcastRequests.get(0).getMessage(); + } else { + return new BundleOfEnvelopes(broadcastRequests.stream() + .map(Broadcaster.BroadcastRequest::getMessage) + .collect(Collectors.toList())); + } + } + private void maybeNotifyListeners(List broadcastRequests) { // We use equal checks to avoid duplicated listener calls as it would be the case with >= checks. if (numOfCompletedBroadcasts == 3) { From f67a46791b03b138a15af939dc0d261bfbbc71e9 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Tue, 25 Aug 2020 22:49:12 -0500 Subject: [PATCH 07/20] LAST_ACTIVITY_AGE_MS should be millisec not sec --- .../java/bisq/network/p2p/peers/keepalive/KeepAliveManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/keepalive/KeepAliveManager.java b/p2p/src/main/java/bisq/network/p2p/peers/keepalive/KeepAliveManager.java index 6115df52e20..6d8ad425402 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/keepalive/KeepAliveManager.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/keepalive/KeepAliveManager.java @@ -50,7 +50,7 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, Pe private static final Logger log = LoggerFactory.getLogger(KeepAliveManager.class); private static final int INTERVAL_SEC = new Random().nextInt(5) + 30; - private static final long LAST_ACTIVITY_AGE_MS = INTERVAL_SEC / 2; + private static final long LAST_ACTIVITY_AGE_MS = INTERVAL_SEC * 1000 / 2; private final NetworkNode networkNode; private final PeerManager peerManager; From c6c56b35f9aca10bc327c870645bdc6c225452eb Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Tue, 25 Aug 2020 22:50:36 -0500 Subject: [PATCH 08/20] Increase INTERVAL_SEC to 30-60 sec from 5-35 sec. --- .../java/bisq/network/p2p/peers/keepalive/KeepAliveManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/keepalive/KeepAliveManager.java b/p2p/src/main/java/bisq/network/p2p/peers/keepalive/KeepAliveManager.java index 6d8ad425402..6ace6d87e96 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/keepalive/KeepAliveManager.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/keepalive/KeepAliveManager.java @@ -49,7 +49,7 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, PeerManager.Listener { private static final Logger log = LoggerFactory.getLogger(KeepAliveManager.class); - private static final int INTERVAL_SEC = new Random().nextInt(5) + 30; + private static final int INTERVAL_SEC = new Random().nextInt(30) + 30; private static final long LAST_ACTIVITY_AGE_MS = INTERVAL_SEC * 1000 / 2; private final NetworkNode networkNode; From d59a3adbc30e9e9461d72095d6f356dbeae7bf82 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Tue, 25 Aug 2020 23:30:52 -0500 Subject: [PATCH 09/20] Add logs --- p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java index 5e2851a8422..a0ef75257a2 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java @@ -81,6 +81,8 @@ public void broadcast(BroadcastMessage message, @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener) { broadcastRequests.add(new BroadcastRequest(message, sender, listener)); + log.info("Broadcast requested for {}. We queue it up for next bundled broadcast.", + message.getClass().getSimpleName()); if (timer == null) { timer = UserThread.runAfter(this::maybeBroadcastBundle, BROADCAST_INTERVAL_MS, TimeUnit.MILLISECONDS); @@ -89,9 +91,10 @@ public void broadcast(BroadcastMessage message, private void maybeBroadcastBundle() { if (!broadcastRequests.isEmpty()) { + log.info("Broadcast bundled requests of {} messages", broadcastRequests.size()); BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager, this); - broadcastHandler.broadcast(new ArrayList<>(broadcastRequests)); broadcastHandlers.add(broadcastHandler); + broadcastHandler.broadcast(new ArrayList<>(broadcastRequests)); broadcastRequests.clear(); timer = null; From d8da20acee4fb258cae88df6fae295c8ff25a045 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Wed, 26 Aug 2020 00:20:47 -0500 Subject: [PATCH 10/20] Remove TODO --- p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java index d15a869e382..7ffe569360e 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java @@ -109,7 +109,8 @@ public void broadcast(List broadcastRequests) { delay = 100; } - setupTimeoutHandler(broadcastRequests, delay); // todo missing 1 delay if we start with 1 + setupTimeoutHandler(broadcastRequests, delay); + int iterations = numPeers; for (int i = 0; i < iterations; i++) { long minDelay = (i + 1) * delay; From b1702f7a6d5fa6a9dcf4bf8dfed89adb59726a1d Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Thu, 27 Aug 2020 15:31:41 -0500 Subject: [PATCH 11/20] - Add shutdown handling to broadCaster. It is important that we flush our queued requests at shutdown and wait until broadcast is completed as a maker need to remove his offers at shutdown. - Add handling for the case that there are very few connections (as in dev setup). - Make BundleOfEnvelopes extend BroadcastMessage - Add complete handler for broadCaster to shutdown in P2PService and wait with shutdown of other services until broadcaster is completed. - Remove case for repeated shutdown call on P2PService as it cannot happen. --- .../bisq/network/p2p/BundleOfEnvelopes.java | 3 +- .../java/bisq/network/p2p/P2PService.java | 70 +++++++------- .../network/p2p/peers/BroadcastHandler.java | 92 +++++++++++-------- .../bisq/network/p2p/peers/Broadcaster.java | 30 +++++- 4 files changed, 118 insertions(+), 77 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/BundleOfEnvelopes.java b/p2p/src/main/java/bisq/network/p2p/BundleOfEnvelopes.java index c57e0d530eb..61756c732de 100644 --- a/p2p/src/main/java/bisq/network/p2p/BundleOfEnvelopes.java +++ b/p2p/src/main/java/bisq/network/p2p/BundleOfEnvelopes.java @@ -17,6 +17,7 @@ package bisq.network.p2p; +import bisq.network.p2p.storage.messages.BroadcastMessage; import bisq.network.p2p.storage.payload.CapabilityRequiringPayload; import bisq.common.app.Capabilities; @@ -36,7 +37,7 @@ @EqualsAndHashCode(callSuper = true) @Value -public final class BundleOfEnvelopes extends NetworkEnvelope implements ExtendedDataSizePermission, CapabilityRequiringPayload { +public final class BundleOfEnvelopes extends BroadcastMessage implements ExtendedDataSizePermission, CapabilityRequiringPayload { private final List envelopes; diff --git a/p2p/src/main/java/bisq/network/p2p/P2PService.java b/p2p/src/main/java/bisq/network/p2p/P2PService.java index 563c3905b79..fd0fa2618d2 100644 --- a/p2p/src/main/java/bisq/network/p2p/P2PService.java +++ b/p2p/src/main/java/bisq/network/p2p/P2PService.java @@ -121,7 +121,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis private final BooleanProperty preliminaryDataReceived = new SimpleBooleanProperty(); private final IntegerProperty numConnectedPeers = new SimpleIntegerProperty(0); - private volatile boolean shutDownInProgress; private boolean shutDownComplete; private final Subscription networkReadySubscription; private boolean isBootstrapped; @@ -210,48 +209,51 @@ public void onAllServicesInitialized() { } public void shutDown(Runnable shutDownCompleteHandler) { - if (!shutDownInProgress) { - shutDownInProgress = true; + shutDownResultHandlers.add(shutDownCompleteHandler); - shutDownResultHandlers.add(shutDownCompleteHandler); - - if (p2PDataStorage != null) - p2PDataStorage.shutDown(); + // We need to make sure queued up messages are flushed out before we continue shut down other network + // services + if (broadcaster != null) { + broadcaster.shutDown(this::doShutDown); + } else { + doShutDown(); + } + } - if (peerManager != null) - peerManager.shutDown(); + private void doShutDown() { + log.error("doShutDown"); + if (p2PDataStorage != null) { + p2PDataStorage.shutDown(); + } - if (broadcaster != null) - broadcaster.shutDown(); + if (peerManager != null) { + peerManager.shutDown(); + } - if (requestDataManager != null) - requestDataManager.shutDown(); + if (requestDataManager != null) { + requestDataManager.shutDown(); + } - if (peerExchangeManager != null) - peerExchangeManager.shutDown(); + if (peerExchangeManager != null) { + peerExchangeManager.shutDown(); + } - if (keepAliveManager != null) - keepAliveManager.shutDown(); + if (keepAliveManager != null) { + keepAliveManager.shutDown(); + } - if (networkReadySubscription != null) - networkReadySubscription.unsubscribe(); + if (networkReadySubscription != null) { + networkReadySubscription.unsubscribe(); + } - if (networkNode != null) { - networkNode.shutDown(() -> { - shutDownResultHandlers.stream().forEach(Runnable::run); - shutDownComplete = true; - }); - } else { - shutDownResultHandlers.stream().forEach(Runnable::run); + if (networkNode != null) { + networkNode.shutDown(() -> { + shutDownResultHandlers.forEach(Runnable::run); shutDownComplete = true; - } + }); } else { - log.debug("shutDown already in progress"); - if (shutDownComplete) { - shutDownCompleteHandler.run(); - } else { - shutDownResultHandlers.add(shutDownCompleteHandler); - } + shutDownResultHandlers.forEach(Runnable::run); + shutDownComplete = true; } } @@ -670,6 +672,7 @@ private void addMailboxData(MailboxStoragePayload expirableMailboxStoragePayload BroadcastHandler.Listener listener = new BroadcastHandler.Listener() { @Override public void onSufficientlyBroadcast(List broadcastRequests) { + log.error("onSufficientlyBroadcast"); broadcastRequests.stream() .filter(broadcastRequest -> broadcastRequest.getMessage() instanceof AddDataMessage) .filter(broadcastRequest -> { @@ -681,6 +684,7 @@ public void onSufficientlyBroadcast(List broadcast @Override public void onNotSufficientlyBroadcast(int numOfCompletedBroadcasts, int numOfFailedBroadcast) { + log.error("onNotSufficientlyBroadcast"); sendMailboxMessageListener.onFault("Message was not sufficiently broadcast.\n" + "numOfCompletedBroadcasts: " + numOfCompletedBroadcasts + ".\n" + "numOfFailedBroadcast=" + numOfFailedBroadcast); diff --git a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java index 7ffe569360e..cc5b45bc52f 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java @@ -21,10 +21,10 @@ import bisq.network.p2p.NodeAddress; import bisq.network.p2p.network.Connection; import bisq.network.p2p.network.NetworkNode; +import bisq.network.p2p.storage.messages.BroadcastMessage; import bisq.common.Timer; import bisq.common.UserThread; -import bisq.common.proto.network.NetworkEnvelope; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -71,7 +71,7 @@ public interface Listener { private final String uid; private boolean stopped, timeoutTriggered; - private int numOfCompletedBroadcasts, numOfFailedBroadcasts, numPeers; + private int numOfCompletedBroadcasts, numOfFailedBroadcasts, numPeersForBroadcast; private Timer timeoutTimer; @@ -93,25 +93,31 @@ public interface Listener { // API /////////////////////////////////////////////////////////////////////////////////////////// - public void broadcast(List broadcastRequests) { + public void broadcast(List broadcastRequests, boolean shutDownRequested) { List confirmedConnections = new ArrayList<>(networkNode.getConfirmedConnections()); Collections.shuffle(confirmedConnections); int delay; - if (requestsContainOwnMessage(broadcastRequests)) { - // The broadcastRequests contains at least 1 message we have originated, so we send to all peers and - // with shorter delay - numPeers = confirmedConnections.size(); - delay = 50; + if (shutDownRequested) { + delay = 1; + // We sent to all peers as in case we had offers we want that it gets removed with higher reliability + numPeersForBroadcast = confirmedConnections.size(); } else { - // Relay nodes only send to max 7 peers and with longer delay - numPeers = Math.min(7, confirmedConnections.size()); - delay = 100; + if (requestsContainOwnMessage(broadcastRequests)) { + // The broadcastRequests contains at least 1 message we have originated, so we send to all peers and + // with shorter delay + numPeersForBroadcast = confirmedConnections.size(); + delay = 50; + } else { + // Relay nodes only send to max 7 peers and with longer delay + numPeersForBroadcast = Math.min(7, confirmedConnections.size()); + delay = 100; + } } - setupTimeoutHandler(broadcastRequests, delay); + setupTimeoutHandler(broadcastRequests, delay, shutDownRequested); - int iterations = numPeers; + int iterations = numPeersForBroadcast; for (int i = 0; i < iterations; i++) { long minDelay = (i + 1) * delay; long maxDelay = (i + 2) * delay; @@ -129,8 +135,8 @@ public void broadcast(List broadcastRequests) { // Could be empty list... if (broadcastRequestsForConnection.isEmpty()) { // We decrease numPeers in that case for making completion checks correct. - if (numPeers > 0) { - numPeers--; + if (numPeersForBroadcast > 0) { + numPeersForBroadcast--; } checkForCompletion(); return; @@ -139,8 +145,8 @@ public void broadcast(List broadcastRequests) { if (connection.isStopped()) { // Connection has died in the meantime. We skip it. // We decrease numPeers in that case for making completion checks correct. - if (numPeers > 0) { - numPeers--; + if (numPeersForBroadcast > 0) { + numPeersForBroadcast--; } checkForCompletion(); return; @@ -188,8 +194,12 @@ private boolean requestsContainOwnMessage(List bro return broadcastRequests.stream().anyMatch(e -> myAddress.equals(e.getSender())); } - private void setupTimeoutHandler(List broadcastRequests, int delay) { - long timeoutDelay = BASE_TIMEOUT_MS + delay * (numPeers + 1); // We added 1 in the loop + private void setupTimeoutHandler(List broadcastRequests, + int delay, + boolean shutDownRequested) { + // In case of shutdown we try to complete fast and set a short 1 second timeout + long baseTimeoutMs = shutDownRequested ? TimeUnit.SECONDS.toMillis(1) : BASE_TIMEOUT_MS; + long timeoutDelay = baseTimeoutMs + delay * (numPeersForBroadcast + 1); // We added 1 in the loop timeoutTimer = UserThread.runAfter(() -> { if (stopped) { return; @@ -197,18 +207,20 @@ private void setupTimeoutHandler(List broadcastReq timeoutTriggered = true; - String errorMessage = "Timeout: Broadcast did not complete after " + timeoutDelay + " sec." + "\n" + - "numOfPeers=" + numPeers + "\n" + - "numOfCompletedBroadcasts=" + numOfCompletedBroadcasts + "\n" + - "numOfFailedBroadcasts=" + numOfFailedBroadcasts; - - log.warn(errorMessage); + log.warn("Broadcast did not complete after {} sec.\n" + + "numPeersForBroadcast={}\n" + + "numOfCompletedBroadcasts={}\n" + + "numOfFailedBroadcasts={}", + timeoutDelay / 1000d, + numPeersForBroadcast, + numOfCompletedBroadcasts, + numOfFailedBroadcasts); maybeNotifyListeners(broadcastRequests); cleanup(); - }, timeoutDelay); + }, timeoutDelay, TimeUnit.MILLISECONDS); } // We exclude the requests containing a message we received from that connection @@ -223,8 +235,9 @@ private List getBroadcastRequestsForConnection(Con } private void sendToPeer(Connection connection, List broadcastRequestsForConnection) { - NetworkEnvelope networkEnvelope = getNetworkEnvelope(broadcastRequestsForConnection); - SettableFuture future = networkNode.sendMessage(connection, networkEnvelope); + // Can be BundleOfEnvelopes or a single BroadcastMessage + BroadcastMessage broadcastMessage = getMessage(broadcastRequestsForConnection); + SettableFuture future = networkNode.sendMessage(connection, broadcastMessage); Futures.addCallback(future, new FutureCallback<>() { @Override @@ -236,7 +249,6 @@ public void onSuccess(Connection connection) { } maybeNotifyListeners(broadcastRequestsForConnection); - checkForCompletion(); } @@ -244,7 +256,6 @@ public void onSuccess(Connection connection) { public void onFailure(@NotNull Throwable throwable) { log.warn("Broadcast to {} failed. ErrorMessage={}", connection.getPeersNodeAddressOptional(), throwable.getMessage()); - numOfFailedBroadcasts++; if (stopped) { @@ -252,13 +263,12 @@ public void onFailure(@NotNull Throwable throwable) { } maybeNotifyListeners(broadcastRequestsForConnection); - checkForCompletion(); } }); } - private NetworkEnvelope getNetworkEnvelope(List broadcastRequests) { + private BroadcastMessage getMessage(List broadcastRequests) { if (broadcastRequests.size() == 1) { // If we only have 1 message we avoid the overhead of the BundleOfEnvelopes and send the message directly return broadcastRequests.get(0).getMessage(); @@ -270,22 +280,26 @@ private NetworkEnvelope getNetworkEnvelope(List br } private void maybeNotifyListeners(List broadcastRequests) { + int numOfCompletedBroadcastsTarget = Math.max(1, Math.min(numPeersForBroadcast, 3)); // We use equal checks to avoid duplicated listener calls as it would be the case with >= checks. - if (numOfCompletedBroadcasts == 3) { - // We have heard back from 3 peers so we consider the message was sufficiently broadcast. + if (numOfCompletedBroadcasts == numOfCompletedBroadcastsTarget) { + // We have heard back from 3 peers (or all peers if numPeers is lower) so we consider the message was sufficiently broadcast. broadcastRequests.stream() .filter(broadcastRequest -> broadcastRequest.getListener() != null) .map(Broadcaster.BroadcastRequest::getListener) .forEach(listener -> listener.onSufficientlyBroadcast(broadcastRequests)); } else { - int maxPossibleSuccessCases = numPeers - numOfFailedBroadcasts; - if (maxPossibleSuccessCases == 2) { - // We never can reach required resilience as too many numOfFailedBroadcasts occurred. + // Number of open requests to peers is less than we need to reach numOfCompletedBroadcastsTarget. + // Thus we never can reach required resilience as too many numOfFailedBroadcasts occurred. + int openRequests = numPeersForBroadcast - numOfCompletedBroadcasts - numOfFailedBroadcasts; + int maxPossibleSuccessCases = openRequests + numOfCompletedBroadcasts; + // We subtract 1 as we want to have it called only once, with a < comparision we would trigger repeatedly. + if (maxPossibleSuccessCases == numOfCompletedBroadcastsTarget - 1) { broadcastRequests.stream() .filter(broadcastRequest -> broadcastRequest.getListener() != null) .map(Broadcaster.BroadcastRequest::getListener) .forEach(listener -> listener.onNotSufficientlyBroadcast(numOfCompletedBroadcasts, numOfFailedBroadcasts)); - } else if (timeoutTriggered && numOfCompletedBroadcasts < 3) { + } else if (timeoutTriggered && numOfCompletedBroadcasts < numOfCompletedBroadcastsTarget) { // We did not reach resilience level and timeout prevents to reach it later broadcastRequests.stream() .filter(broadcastRequest -> broadcastRequest.getListener() != null) @@ -296,7 +310,7 @@ private void maybeNotifyListeners(List broadcastRe } private void checkForCompletion() { - if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeers) { + if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeersForBroadcast) { cleanup(); } } diff --git a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java index a0ef75257a2..4abbba55cc6 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import lombok.Value; import lombok.extern.slf4j.Slf4j; @@ -46,6 +47,8 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { private final Set broadcastHandlers = new CopyOnWriteArraySet<>(); private final List broadcastRequests = new ArrayList<>(); private Timer timer; + private boolean shutDownRequested; + private Runnable shutDownResultHandler; /////////////////////////////////////////////////////////////////////////////////////////// @@ -58,12 +61,24 @@ public Broadcaster(NetworkNode networkNode, PeerManager peerManager) { this.peerManager = peerManager; } - public void shutDown() { - broadcastHandlers.forEach(BroadcastHandler::cancel); + public void shutDown(Runnable resultHandler) { + shutDownRequested = true; + shutDownResultHandler = resultHandler; + if (broadcastRequests.isEmpty()) { + doShutDown(); + } else { + // We set delay of broadcasts and timeout to very low values, + // so we can expect that we get onCompleted called very fast and trigger the doShutDown from there. + maybeBroadcastBundle(); + } + } + private void doShutDown() { + broadcastHandlers.forEach(BroadcastHandler::cancel); if (timer != null) { timer.stop(); } + shutDownResultHandler.run(); } @@ -81,6 +96,8 @@ public void broadcast(BroadcastMessage message, @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener) { broadcastRequests.add(new BroadcastRequest(message, sender, listener)); + // Keep that log on INFO for better debugging if the feature works as expected. Later it can + // be remove or set to DEBUG log.info("Broadcast requested for {}. We queue it up for next bundled broadcast.", message.getClass().getSimpleName()); @@ -91,10 +108,12 @@ public void broadcast(BroadcastMessage message, private void maybeBroadcastBundle() { if (!broadcastRequests.isEmpty()) { - log.info("Broadcast bundled requests of {} messages", broadcastRequests.size()); + log.info("Broadcast bundled requests of {} messages. Message types: {}", + broadcastRequests.size(), + broadcastRequests.stream().map(e -> e.getClass().getSimpleName()).collect(Collectors.toList())); BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager, this); broadcastHandlers.add(broadcastHandler); - broadcastHandler.broadcast(new ArrayList<>(broadcastRequests)); + broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested); broadcastRequests.clear(); timer = null; @@ -109,6 +128,9 @@ private void maybeBroadcastBundle() { @Override public void onCompleted(BroadcastHandler broadcastHandler) { broadcastHandlers.remove(broadcastHandler); + if (shutDownRequested) { + doShutDown(); + } } From 627052755b137fb246568d9e3db8c175ef9b1046 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Thu, 27 Aug 2020 15:47:51 -0500 Subject: [PATCH 12/20] Fix log --- p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java index 4abbba55cc6..d1288ffe947 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java @@ -110,7 +110,7 @@ private void maybeBroadcastBundle() { if (!broadcastRequests.isEmpty()) { log.info("Broadcast bundled requests of {} messages. Message types: {}", broadcastRequests.size(), - broadcastRequests.stream().map(e -> e.getClass().getSimpleName()).collect(Collectors.toList())); + broadcastRequests.stream().map(e -> e.getMessage().getClass().getSimpleName()).collect(Collectors.toList())); BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager, this); broadcastHandlers.add(broadcastHandler); broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested); From 21ff2df232f4b2ed229000e9eb93f72471f7710a Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Thu, 27 Aug 2020 16:42:55 -0500 Subject: [PATCH 13/20] Remove dev logs --- p2p/src/main/java/bisq/network/p2p/P2PService.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/P2PService.java b/p2p/src/main/java/bisq/network/p2p/P2PService.java index fd0fa2618d2..1db22f6f40d 100644 --- a/p2p/src/main/java/bisq/network/p2p/P2PService.java +++ b/p2p/src/main/java/bisq/network/p2p/P2PService.java @@ -672,7 +672,6 @@ private void addMailboxData(MailboxStoragePayload expirableMailboxStoragePayload BroadcastHandler.Listener listener = new BroadcastHandler.Listener() { @Override public void onSufficientlyBroadcast(List broadcastRequests) { - log.error("onSufficientlyBroadcast"); broadcastRequests.stream() .filter(broadcastRequest -> broadcastRequest.getMessage() instanceof AddDataMessage) .filter(broadcastRequest -> { @@ -684,7 +683,6 @@ public void onSufficientlyBroadcast(List broadcast @Override public void onNotSufficientlyBroadcast(int numOfCompletedBroadcasts, int numOfFailedBroadcast) { - log.error("onNotSufficientlyBroadcast"); sendMailboxMessageListener.onFault("Message was not sufficiently broadcast.\n" + "numOfCompletedBroadcasts: " + numOfCompletedBroadcasts + ".\n" + "numOfFailedBroadcast=" + numOfFailedBroadcast); From 0e8704e74f546eef66de5c26225cb382d4b2e5b6 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Thu, 27 Aug 2020 16:50:45 -0500 Subject: [PATCH 14/20] Remove dev logs, remove unread property --- p2p/src/main/java/bisq/network/p2p/P2PService.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/P2PService.java b/p2p/src/main/java/bisq/network/p2p/P2PService.java index 1db22f6f40d..23a90f8a3a7 100644 --- a/p2p/src/main/java/bisq/network/p2p/P2PService.java +++ b/p2p/src/main/java/bisq/network/p2p/P2PService.java @@ -121,7 +121,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis private final BooleanProperty preliminaryDataReceived = new SimpleBooleanProperty(); private final IntegerProperty numConnectedPeers = new SimpleIntegerProperty(0); - private boolean shutDownComplete; private final Subscription networkReadySubscription; private boolean isBootstrapped; private final KeepAliveManager keepAliveManager; @@ -221,7 +220,6 @@ public void shutDown(Runnable shutDownCompleteHandler) { } private void doShutDown() { - log.error("doShutDown"); if (p2PDataStorage != null) { p2PDataStorage.shutDown(); } @@ -249,11 +247,9 @@ private void doShutDown() { if (networkNode != null) { networkNode.shutDown(() -> { shutDownResultHandlers.forEach(Runnable::run); - shutDownComplete = true; }); } else { shutDownResultHandlers.forEach(Runnable::run); - shutDownComplete = true; } } From 28a665e02d8b86ca812611fcfe50034d0a014deb Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Thu, 27 Aug 2020 21:29:30 -0500 Subject: [PATCH 15/20] Satisfy annoying Codacy bot --- p2p/src/main/java/bisq/network/p2p/P2PService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/p2p/src/main/java/bisq/network/p2p/P2PService.java b/p2p/src/main/java/bisq/network/p2p/P2PService.java index 23a90f8a3a7..3b0c188e27a 100644 --- a/p2p/src/main/java/bisq/network/p2p/P2PService.java +++ b/p2p/src/main/java/bisq/network/p2p/P2PService.java @@ -444,6 +444,7 @@ public void onAdded(Collection protectedStorageEntries) { @Override public void onRemoved(Collection protectedStorageEntries) { + // not used } /////////////////////////////////////////////////////////////////////////////////////////// From b0eea78a031a6f5f2629e0d22f83f702510cf2dc Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Sat, 29 Aug 2020 13:14:18 -0500 Subject: [PATCH 16/20] Remove unused properties (was added again from a merge failure) --- p2p/src/main/java/bisq/network/p2p/P2PService.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/P2PService.java b/p2p/src/main/java/bisq/network/p2p/P2PService.java index 571ddd666e7..3b0c188e27a 100644 --- a/p2p/src/main/java/bisq/network/p2p/P2PService.java +++ b/p2p/src/main/java/bisq/network/p2p/P2PService.java @@ -121,9 +121,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis private final BooleanProperty preliminaryDataReceived = new SimpleBooleanProperty(); private final IntegerProperty numConnectedPeers = new SimpleIntegerProperty(0); - private volatile boolean shutDownInProgress; - @Getter - private boolean shutDownComplete; private final Subscription networkReadySubscription; private boolean isBootstrapped; private final KeepAliveManager keepAliveManager; From 8d13ff8856ef4544b52ee4762739f217db972283 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Sun, 30 Aug 2020 10:02:19 -0500 Subject: [PATCH 17/20] Add comment about size --- core/src/main/java/bisq/core/offer/OfferPayload.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/java/bisq/core/offer/OfferPayload.java b/core/src/main/java/bisq/core/offer/OfferPayload.java index 3f4411d87a5..7b82506690d 100644 --- a/core/src/main/java/bisq/core/offer/OfferPayload.java +++ b/core/src/main/java/bisq/core/offer/OfferPayload.java @@ -48,6 +48,9 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +// OfferPayload has about 1.4 kb. We should look into options to make it smaller but will be hard to do it in a +// backward compatible way. Maybe a candidate when segwit activation is done as hardfork? + @EqualsAndHashCode @Getter @Slf4j From 49d212e654c70394820d6dba04671c16b33b87d4 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Sun, 30 Aug 2020 10:37:46 -0500 Subject: [PATCH 18/20] Fix tests. I don't know why the tests failed as I just added an overloaded method and it should not have any impact. There is also one exception which makes it even more obscure. I guess its some test framework issue. See comment at the exceptional handling // If we remove the last argument (isNull()) tests fail. No idea why as the broadcast method has an / overloaded method with nullable listener. Seems a testframework issue as it should not matter if the // method with listener is called with null argument or the other method with no listener. We removed the // null value from all other calls but here we can't as it breaks the test. --- .../P2PDataStorageOnMessageHandlerTest.java | 8 ++--- .../bisq/network/p2p/storage/TestState.java | 29 ++++++++++--------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageOnMessageHandlerTest.java b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageOnMessageHandlerTest.java index b5fe473732e..29d6ada7dd6 100644 --- a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageOnMessageHandlerTest.java +++ b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageOnMessageHandlerTest.java @@ -33,8 +33,6 @@ import org.junit.Test; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -69,7 +67,7 @@ public void invalidBroadcastMessage() { this.testState.mockedStorage.onMessage(envelope, mockedConnection); verify(this.testState.appendOnlyDataStoreListener, never()).onAdded(any(PersistableNetworkPayload.class)); - verify(this.testState.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class), eq(null)); + verify(this.testState.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class)); } @Test @@ -82,7 +80,7 @@ public void unsupportedBroadcastMessage() { this.testState.mockedStorage.onMessage(envelope, mockedConnection); verify(this.testState.appendOnlyDataStoreListener, never()).onAdded(any(PersistableNetworkPayload.class)); - verify(this.testState.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class), eq(null)); + verify(this.testState.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class)); } @Test @@ -96,6 +94,6 @@ public void invalidConnectionObject() { this.testState.mockedStorage.onMessage(envelope, mockedConnection); verify(this.testState.appendOnlyDataStoreListener, never()).onAdded(any(PersistableNetworkPayload.class)); - verify(this.testState.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class), eq(null)); + verify(this.testState.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class)); } } diff --git a/p2p/src/test/java/bisq/network/p2p/storage/TestState.java b/p2p/src/test/java/bisq/network/p2p/storage/TestState.java index 0ede4a03c8e..e71246a0310 100644 --- a/p2p/src/test/java/bisq/network/p2p/storage/TestState.java +++ b/p2p/src/test/java/bisq/network/p2p/storage/TestState.java @@ -19,7 +19,6 @@ import bisq.network.p2p.NodeAddress; import bisq.network.p2p.network.NetworkNode; -import bisq.network.p2p.peers.BroadcastHandler; import bisq.network.p2p.peers.Broadcaster; import bisq.network.p2p.storage.messages.AddDataMessage; import bisq.network.p2p.storage.messages.AddPersistableNetworkPayloadMessage; @@ -51,10 +50,10 @@ import java.util.Set; import java.util.concurrent.TimeUnit; -import org.junit.Assert; - import org.mockito.ArgumentCaptor; +import org.junit.Assert; + import static org.mockito.Mockito.*; /** @@ -160,8 +159,7 @@ public static NodeAddress getTestNodeAddress() { /** * Common test helpers that verify the correct events were signaled based on the test expectation and before/after states. */ - private void verifySequenceNumberMapWriteContains(P2PDataStorage.ByteArray payloadHash, - int sequenceNumber) { + private void verifySequenceNumberMapWriteContains(P2PDataStorage.ByteArray payloadHash, int sequenceNumber) { final ArgumentCaptor captor = ArgumentCaptor.forClass(SequenceNumberMap.class); verify(this.mockSeqNrStorage).queueUpForSave(captor.capture(), anyLong()); @@ -187,10 +185,9 @@ void verifyPersistableAdd(SavedTestState beforeState, verify(this.appendOnlyDataStoreListener, never()).onAdded(persistableNetworkPayload); if (expectedBroadcast) - verify(this.mockBroadcaster).broadcast(any(AddPersistableNetworkPayloadMessage.class), - nullable(NodeAddress.class), isNull()); + verify(this.mockBroadcaster).broadcast(any(AddPersistableNetworkPayloadMessage.class), nullable(NodeAddress.class)); else - verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), nullable(NodeAddress.class), nullable(BroadcastHandler.Listener.class)); + verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), nullable(NodeAddress.class)); } void verifyProtectedStorageAdd(SavedTestState beforeState, @@ -219,13 +216,17 @@ void verifyProtectedStorageAdd(SavedTestState beforeState, if (expectedBroadcast) { final ArgumentCaptor captor = ArgumentCaptor.forClass(BroadcastMessage.class); + // If we remove the last argument (isNull()) tests fail. No idea why as the broadcast method has an + // overloaded method with nullable listener. Seems a testframework issue as it should not matter if the + // method with listener is called with null argument or the other method with no listener. We removed the + // null value from all other calls but here we can't as it breaks the test. verify(this.mockBroadcaster).broadcast(captor.capture(), nullable(NodeAddress.class), isNull()); BroadcastMessage broadcastMessage = captor.getValue(); Assert.assertTrue(broadcastMessage instanceof AddDataMessage); Assert.assertEquals(protectedStorageEntry, ((AddDataMessage) broadcastMessage).getProtectedStorageEntry()); } else { - verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), nullable(NodeAddress.class), nullable(BroadcastHandler.Listener.class)); + verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), nullable(NodeAddress.class)); } if (expectedSequenceNrMapWrite) { @@ -275,7 +276,7 @@ void verifyProtectedStorageRemove(SavedTestState beforeState, verify(this.mockSeqNrStorage, never()).queueUpForSave(any(SequenceNumberMap.class), anyLong()); if (!expectedBroadcast) - verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), nullable(NodeAddress.class), nullable(BroadcastHandler.Listener.class)); + verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), nullable(NodeAddress.class)); protectedStorageEntries.forEach(protectedStorageEntry -> { @@ -287,9 +288,9 @@ void verifyProtectedStorageRemove(SavedTestState beforeState, if (expectedBroadcast) { if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry) - verify(this.mockBroadcaster).broadcast(any(RemoveMailboxDataMessage.class), nullable(NodeAddress.class), isNull()); + verify(this.mockBroadcaster).broadcast(any(RemoveMailboxDataMessage.class), nullable(NodeAddress.class)); else - verify(this.mockBroadcaster).broadcast(any(RemoveDataMessage.class), nullable(NodeAddress.class), isNull()); + verify(this.mockBroadcaster).broadcast(any(RemoveDataMessage.class), nullable(NodeAddress.class)); } @@ -319,7 +320,7 @@ void verifyRefreshTTL(SavedTestState beforeState, Assert.assertTrue(entryAfterRefresh.getCreationTimeStamp() > beforeState.creationTimestampBeforeUpdate); final ArgumentCaptor captor = ArgumentCaptor.forClass(BroadcastMessage.class); - verify(this.mockBroadcaster).broadcast(captor.capture(), nullable(NodeAddress.class), isNull()); + verify(this.mockBroadcaster).broadcast(captor.capture(), nullable(NodeAddress.class)); BroadcastMessage broadcastMessage = captor.getValue(); Assert.assertTrue(broadcastMessage instanceof RefreshOfferMessage); @@ -336,7 +337,7 @@ void verifyRefreshTTL(SavedTestState beforeState, Assert.assertEquals(beforeState.creationTimestampBeforeUpdate, entryAfterRefresh.getCreationTimeStamp()); } - verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), nullable(NodeAddress.class), nullable(BroadcastHandler.Listener.class)); + verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), nullable(NodeAddress.class)); verify(this.mockSeqNrStorage, never()).queueUpForSave(any(SequenceNumberMap.class), anyLong()); } } From bc802c861d707a642ffbad25771651f9619d880c Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Mon, 31 Aug 2020 10:37:58 -0500 Subject: [PATCH 19/20] Change log level to avoid too verbose logs --- p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java index d1288ffe947..cdfdde5c467 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java @@ -98,7 +98,7 @@ public void broadcast(BroadcastMessage message, broadcastRequests.add(new BroadcastRequest(message, sender, listener)); // Keep that log on INFO for better debugging if the feature works as expected. Later it can // be remove or set to DEBUG - log.info("Broadcast requested for {}. We queue it up for next bundled broadcast.", + log.debug("Broadcast requested for {}. We queue it up for next bundled broadcast.", message.getClass().getSimpleName()); if (timer == null) { @@ -108,7 +108,7 @@ public void broadcast(BroadcastMessage message, private void maybeBroadcastBundle() { if (!broadcastRequests.isEmpty()) { - log.info("Broadcast bundled requests of {} messages. Message types: {}", + log.debug("Broadcast bundled requests of {} messages. Message types: {}", broadcastRequests.size(), broadcastRequests.stream().map(e -> e.getMessage().getClass().getSimpleName()).collect(Collectors.toList())); BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager, this); From f7951d5943d1f33ed1052eedb2a4f9f0234cc9d7 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Mon, 31 Aug 2020 10:38:46 -0500 Subject: [PATCH 20/20] Combine if/else branches. Improve comments and variables --- .../bisq/network/p2p/peers/BroadcastHandler.java | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java index cc5b45bc52f..d370be6bd31 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java @@ -289,18 +289,14 @@ private void maybeNotifyListeners(List broadcastRe .map(Broadcaster.BroadcastRequest::getListener) .forEach(listener -> listener.onSufficientlyBroadcast(broadcastRequests)); } else { - // Number of open requests to peers is less than we need to reach numOfCompletedBroadcastsTarget. + // We check if number of open requests to peers is less than we need to reach numOfCompletedBroadcastsTarget. // Thus we never can reach required resilience as too many numOfFailedBroadcasts occurred. - int openRequests = numPeersForBroadcast - numOfCompletedBroadcasts - numOfFailedBroadcasts; - int maxPossibleSuccessCases = openRequests + numOfCompletedBroadcasts; + int maxPossibleSuccessCases = numPeersForBroadcast - numOfFailedBroadcasts; // We subtract 1 as we want to have it called only once, with a < comparision we would trigger repeatedly. - if (maxPossibleSuccessCases == numOfCompletedBroadcastsTarget - 1) { - broadcastRequests.stream() - .filter(broadcastRequest -> broadcastRequest.getListener() != null) - .map(Broadcaster.BroadcastRequest::getListener) - .forEach(listener -> listener.onNotSufficientlyBroadcast(numOfCompletedBroadcasts, numOfFailedBroadcasts)); - } else if (timeoutTriggered && numOfCompletedBroadcasts < numOfCompletedBroadcastsTarget) { - // We did not reach resilience level and timeout prevents to reach it later + boolean notEnoughSucceededOrOpen = maxPossibleSuccessCases == numOfCompletedBroadcastsTarget - 1; + // We did not reach resilience level and timeout prevents to reach it later + boolean timeoutAndNotEnoughSucceeded = timeoutTriggered && numOfCompletedBroadcasts < numOfCompletedBroadcastsTarget; + if (notEnoughSucceededOrOpen || timeoutAndNotEnoughSucceeded) { broadcastRequests.stream() .filter(broadcastRequest -> broadcastRequest.getListener() != null) .map(Broadcaster.BroadcastRequest::getListener)