From 407ff15941962add9380f36f5ea17a13cf7926df Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Wed, 20 Jun 2018 12:08:29 +0200 Subject: [PATCH 01/12] Add ConfirmationMessage and ConfirmationSourceType --- .../bisq/network/p2p/ConfirmationMessage.java | 103 ++++++++++++++++++ .../network/p2p/ConfirmationSourceType.java | 25 +++++ 2 files changed, 128 insertions(+) create mode 100644 src/main/java/bisq/network/p2p/ConfirmationMessage.java create mode 100644 src/main/java/bisq/network/p2p/ConfirmationSourceType.java diff --git a/src/main/java/bisq/network/p2p/ConfirmationMessage.java b/src/main/java/bisq/network/p2p/ConfirmationMessage.java new file mode 100644 index 0000000..0682506 --- /dev/null +++ b/src/main/java/bisq/network/p2p/ConfirmationMessage.java @@ -0,0 +1,103 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p; + +import bisq.common.app.Version; +import bisq.common.proto.ProtoUtil; +import bisq.common.proto.network.NetworkEnvelope; + +import io.bisq.generated.protobuffer.PB; + +import java.util.Optional; + +import lombok.EqualsAndHashCode; +import lombok.Value; + +import javax.annotation.Nullable; + +@EqualsAndHashCode(callSuper = true) +@Value +public final class ConfirmationMessage extends NetworkEnvelope { + private final String uid; + private final ConfirmationSourceType sourceType; //e.g. TradeMessage, DisputeMessage,... + private final String sourceUid; // uid of source (TradeMessage) + private final String sourceId; // id of source (tradeId, disputeId) + private final boolean result; // true if source message was processed successfully + @Nullable + private final String errorMessage; // optional error message if source message processing failed + + public ConfirmationMessage(String uid, + ConfirmationSourceType sourceType, + String sourceUid, + String sourceId, + boolean result, + String errorMessage) { + this(uid, + sourceType, + sourceUid, + sourceId, + result, + errorMessage, + Version.getP2PMessageVersion()); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // PROTO BUFFER + /////////////////////////////////////////////////////////////////////////////////////////// + + private ConfirmationMessage(String uid, + ConfirmationSourceType sourceType, + String sourceUid, + String sourceId, + boolean result, + @Nullable String errorMessage, + int messageVersion) { + super(messageVersion); + this.uid = uid; + this.sourceType = sourceType; + this.sourceUid = sourceUid; + this.sourceId = sourceId; + this.result = result; + this.errorMessage = errorMessage; + } + + + @Override + public PB.NetworkEnvelope toProtoNetworkEnvelope() { + PB.ConfirmationMessage.Builder builder = PB.ConfirmationMessage.newBuilder() + .setUid(uid) + .setSourceType(sourceType.name()) + .setSourceUid(sourceUid) + .setSourceId(sourceId) + .setResult(result); + Optional.ofNullable(errorMessage).ifPresent(builder::setErrorMessage); + return getNetworkEnvelopeBuilder().setConfirmationMessage(builder).build(); + } + + public static ConfirmationMessage fromProto(PB.ConfirmationMessage proto, int messageVersion) { + ConfirmationSourceType sourceType = ProtoUtil.enumFromProto(ConfirmationSourceType.class, proto.getSourceType()); + return new ConfirmationMessage(proto.getUid(), + sourceType, + proto.getSourceUid(), + proto.getSourceId(), + proto.getResult(), + proto.getErrorMessage().isEmpty() ? null : proto.getErrorMessage(), + messageVersion); + } +} diff --git a/src/main/java/bisq/network/p2p/ConfirmationSourceType.java b/src/main/java/bisq/network/p2p/ConfirmationSourceType.java new file mode 100644 index 0000000..3ba01f7 --- /dev/null +++ b/src/main/java/bisq/network/p2p/ConfirmationSourceType.java @@ -0,0 +1,25 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p; + +public enum ConfirmationSourceType { + UNDEFINED, + OFFER_MESSAGE, + TRADE_MESSAGE, + DISPUTE_MESSAGE +} From 8895ef741d5ed6ead3f39bc3f2013be145b3963d Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Wed, 20 Jun 2018 12:17:30 +0200 Subject: [PATCH 02/12] Rename ConfirmationMessage to AckMessage --- ...nfirmationMessage.java => AckMessage.java} | 40 +++++++++---------- ...rceType.java => AckMessageSourceType.java} | 2 +- 2 files changed, 21 insertions(+), 21 deletions(-) rename src/main/java/bisq/network/p2p/{ConfirmationMessage.java => AckMessage.java} (67%) rename src/main/java/bisq/network/p2p/{ConfirmationSourceType.java => AckMessageSourceType.java} (95%) diff --git a/src/main/java/bisq/network/p2p/ConfirmationMessage.java b/src/main/java/bisq/network/p2p/AckMessage.java similarity index 67% rename from src/main/java/bisq/network/p2p/ConfirmationMessage.java rename to src/main/java/bisq/network/p2p/AckMessage.java index 0682506..6ab692e 100644 --- a/src/main/java/bisq/network/p2p/ConfirmationMessage.java +++ b/src/main/java/bisq/network/p2p/AckMessage.java @@ -32,21 +32,21 @@ @EqualsAndHashCode(callSuper = true) @Value -public final class ConfirmationMessage extends NetworkEnvelope { +public final class AckMessage extends NetworkEnvelope { private final String uid; - private final ConfirmationSourceType sourceType; //e.g. TradeMessage, DisputeMessage,... + private final AckMessageSourceType sourceType; //e.g. TradeMessage, DisputeMessage,... private final String sourceUid; // uid of source (TradeMessage) private final String sourceId; // id of source (tradeId, disputeId) private final boolean result; // true if source message was processed successfully @Nullable private final String errorMessage; // optional error message if source message processing failed - public ConfirmationMessage(String uid, - ConfirmationSourceType sourceType, - String sourceUid, - String sourceId, - boolean result, - String errorMessage) { + public AckMessage(String uid, + AckMessageSourceType sourceType, + String sourceUid, + String sourceId, + boolean result, + String errorMessage) { this(uid, sourceType, sourceUid, @@ -61,13 +61,13 @@ public ConfirmationMessage(String uid, // PROTO BUFFER /////////////////////////////////////////////////////////////////////////////////////////// - private ConfirmationMessage(String uid, - ConfirmationSourceType sourceType, - String sourceUid, - String sourceId, - boolean result, - @Nullable String errorMessage, - int messageVersion) { + private AckMessage(String uid, + AckMessageSourceType sourceType, + String sourceUid, + String sourceId, + boolean result, + @Nullable String errorMessage, + int messageVersion) { super(messageVersion); this.uid = uid; this.sourceType = sourceType; @@ -80,19 +80,19 @@ private ConfirmationMessage(String uid, @Override public PB.NetworkEnvelope toProtoNetworkEnvelope() { - PB.ConfirmationMessage.Builder builder = PB.ConfirmationMessage.newBuilder() + PB.AckMessage.Builder builder = PB.AckMessage.newBuilder() .setUid(uid) .setSourceType(sourceType.name()) .setSourceUid(sourceUid) .setSourceId(sourceId) .setResult(result); Optional.ofNullable(errorMessage).ifPresent(builder::setErrorMessage); - return getNetworkEnvelopeBuilder().setConfirmationMessage(builder).build(); + return getNetworkEnvelopeBuilder().setAckMessage(builder).build(); } - public static ConfirmationMessage fromProto(PB.ConfirmationMessage proto, int messageVersion) { - ConfirmationSourceType sourceType = ProtoUtil.enumFromProto(ConfirmationSourceType.class, proto.getSourceType()); - return new ConfirmationMessage(proto.getUid(), + public static AckMessage fromProto(PB.AckMessage proto, int messageVersion) { + AckMessageSourceType sourceType = ProtoUtil.enumFromProto(AckMessageSourceType.class, proto.getSourceType()); + return new AckMessage(proto.getUid(), sourceType, proto.getSourceUid(), proto.getSourceId(), diff --git a/src/main/java/bisq/network/p2p/ConfirmationSourceType.java b/src/main/java/bisq/network/p2p/AckMessageSourceType.java similarity index 95% rename from src/main/java/bisq/network/p2p/ConfirmationSourceType.java rename to src/main/java/bisq/network/p2p/AckMessageSourceType.java index 3ba01f7..adfddf6 100644 --- a/src/main/java/bisq/network/p2p/ConfirmationSourceType.java +++ b/src/main/java/bisq/network/p2p/AckMessageSourceType.java @@ -17,7 +17,7 @@ package bisq.network.p2p; -public enum ConfirmationSourceType { +public enum AckMessageSourceType { UNDEFINED, OFFER_MESSAGE, TRADE_MESSAGE, From 82374d44fbb2d1bdac4d2a199ca304dcc3e596ac Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Wed, 20 Jun 2018 15:44:16 +0200 Subject: [PATCH 03/12] Make AckMessage MailboxMessage - Add Capability support - Add senderNodeAddress and sourceMsgClassName to AckMessage - Send msg at handleTaskRunnerSuccess and handleTaskRunnerFault - Avoid that DAO methods get called in case of connection loss --- .../java/bisq/network/p2p/AckMessage.java | 55 ++++++++++++++++++- .../java/bisq/network/p2p/P2PService.java | 6 +- 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/src/main/java/bisq/network/p2p/AckMessage.java b/src/main/java/bisq/network/p2p/AckMessage.java index 6ab692e..913d6a8 100644 --- a/src/main/java/bisq/network/p2p/AckMessage.java +++ b/src/main/java/bisq/network/p2p/AckMessage.java @@ -17,13 +17,22 @@ package bisq.network.p2p; +import bisq.network.p2p.storage.payload.CapabilityRequiringPayload; +import bisq.network.p2p.storage.payload.ExpirablePayload; + +import bisq.common.app.Capabilities; import bisq.common.app.Version; import bisq.common.proto.ProtoUtil; import bisq.common.proto.network.NetworkEnvelope; import io.bisq.generated.protobuffer.PB; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeUnit; import lombok.EqualsAndHashCode; import lombok.Value; @@ -32,23 +41,28 @@ @EqualsAndHashCode(callSuper = true) @Value -public final class AckMessage extends NetworkEnvelope { +public final class AckMessage extends NetworkEnvelope implements MailboxMessage, ExpirablePayload, CapabilityRequiringPayload { private final String uid; + private final NodeAddress senderNodeAddress; private final AckMessageSourceType sourceType; //e.g. TradeMessage, DisputeMessage,... + private final String sourceMsgClassName; private final String sourceUid; // uid of source (TradeMessage) private final String sourceId; // id of source (tradeId, disputeId) private final boolean result; // true if source message was processed successfully @Nullable private final String errorMessage; // optional error message if source message processing failed - public AckMessage(String uid, + public AckMessage(NodeAddress senderNodeAddress, AckMessageSourceType sourceType, + String sourceMsgClassName, String sourceUid, String sourceId, boolean result, String errorMessage) { - this(uid, + this(UUID.randomUUID().toString(), + senderNodeAddress, sourceType, + sourceMsgClassName, sourceUid, sourceId, result, @@ -62,7 +76,9 @@ public AckMessage(String uid, /////////////////////////////////////////////////////////////////////////////////////////// private AckMessage(String uid, + NodeAddress senderNodeAddress, AckMessageSourceType sourceType, + String sourceMsgClassName, String sourceUid, String sourceId, boolean result, @@ -70,7 +86,9 @@ private AckMessage(String uid, int messageVersion) { super(messageVersion); this.uid = uid; + this.senderNodeAddress = senderNodeAddress; this.sourceType = sourceType; + this.sourceMsgClassName = sourceMsgClassName; this.sourceUid = sourceUid; this.sourceId = sourceId; this.result = result; @@ -82,7 +100,9 @@ private AckMessage(String uid, public PB.NetworkEnvelope toProtoNetworkEnvelope() { PB.AckMessage.Builder builder = PB.AckMessage.newBuilder() .setUid(uid) + .setSenderNodeAddress(senderNodeAddress.toProtoMessage()) .setSourceType(sourceType.name()) + .setSourceMsgClassName(sourceMsgClassName) .setSourceUid(sourceUid) .setSourceId(sourceId) .setResult(result); @@ -93,11 +113,40 @@ public PB.NetworkEnvelope toProtoNetworkEnvelope() { public static AckMessage fromProto(PB.AckMessage proto, int messageVersion) { AckMessageSourceType sourceType = ProtoUtil.enumFromProto(AckMessageSourceType.class, proto.getSourceType()); return new AckMessage(proto.getUid(), + NodeAddress.fromProto(proto.getSenderNodeAddress()), sourceType, + proto.getSourceMsgClassName(), proto.getSourceUid(), proto.getSourceId(), proto.getResult(), proto.getErrorMessage().isEmpty() ? null : proto.getErrorMessage(), messageVersion); } + + @Override + public List getRequiredCapabilities() { + return new ArrayList<>(Collections.singletonList( + Capabilities.Capability.ACK_MSG.ordinal() + )); + } + + + @Override + public String toString() { + return "AckMessage{" + + "\n uid='" + uid + '\'' + + ",\n senderNodeAddress=" + senderNodeAddress + + ",\n sourceType=" + sourceType + + ",\n sourceMsgClassName='" + sourceMsgClassName + '\'' + + ",\n sourceUid='" + sourceUid + '\'' + + ",\n sourceId='" + sourceId + '\'' + + ",\n result=" + result + + ",\n errorMessage='" + errorMessage + '\'' + + "\n} " + super.toString(); + } + + @Override + public long getTTL() { + return TimeUnit.DAYS.toMillis(10); + } } diff --git a/src/main/java/bisq/network/p2p/P2PService.java b/src/main/java/bisq/network/p2p/P2PService.java index c278417..67ec3e4 100644 --- a/src/main/java/bisq/network/p2p/P2PService.java +++ b/src/main/java/bisq/network/p2p/P2PService.java @@ -458,7 +458,8 @@ public void onRemoved(ProtectedStorageEntry data) { public void sendEncryptedDirectMessage(NodeAddress peerNodeAddress, PubKeyRing pubKeyRing, NetworkEnvelope message, SendDirectMessageListener sendDirectMessageListener) { - Log.traceCall(); + //TODO for testing + log.error("sendEncryptedDirectMessage message " + message); checkNotNull(peerNodeAddress, "PeerAddress must not be null (sendEncryptedDirectMessage)"); if (isBootstrapped()) { doSendEncryptedDirectMessage(peerNodeAddress, pubKeyRing, message, sendDirectMessageListener); @@ -548,7 +549,8 @@ private void processProtectedMailboxStorageEntry(ProtectedMailboxStorageEntry pr public void sendEncryptedMailboxMessage(NodeAddress peersNodeAddress, PubKeyRing peersPubKeyRing, NetworkEnvelope message, SendMailboxMessageListener sendMailboxMessageListener) { - Log.traceCall("message " + message); + //TODO for testing + log.error("sendEncryptedMailboxMessage message " + message); checkNotNull(peersNodeAddress, "PeerAddress must not be null (sendEncryptedMailboxMessage)"); checkNotNull(networkNode.getNodeAddress(), From 11e7b4d99e3cdca3e5b7f4e58ca49283b685a2e9 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Thu, 21 Jun 2018 18:33:11 +0200 Subject: [PATCH 04/12] Persist MessageState in processModel --- .../java/bisq/network/p2p/AckMessage.java | 50 +++++++++++++------ .../java/bisq/network/p2p/P2PService.java | 1 + 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/src/main/java/bisq/network/p2p/AckMessage.java b/src/main/java/bisq/network/p2p/AckMessage.java index 913d6a8..b4b3e27 100644 --- a/src/main/java/bisq/network/p2p/AckMessage.java +++ b/src/main/java/bisq/network/p2p/AckMessage.java @@ -24,6 +24,7 @@ import bisq.common.app.Version; import bisq.common.proto.ProtoUtil; import bisq.common.proto.network.NetworkEnvelope; +import bisq.common.proto.persistable.PersistablePayload; import io.bisq.generated.protobuffer.PB; @@ -39,16 +40,19 @@ import javax.annotation.Nullable; -@EqualsAndHashCode(callSuper = true) +// We exclude uid from hashcode and equals to detect duplicate entries of the same AckMessage +@EqualsAndHashCode(callSuper = true, exclude = {"uid"}) @Value -public final class AckMessage extends NetworkEnvelope implements MailboxMessage, ExpirablePayload, CapabilityRequiringPayload { +public final class AckMessage extends NetworkEnvelope implements MailboxMessage, PersistablePayload, + ExpirablePayload, CapabilityRequiringPayload { + private final String uid; private final NodeAddress senderNodeAddress; private final AckMessageSourceType sourceType; //e.g. TradeMessage, DisputeMessage,... private final String sourceMsgClassName; private final String sourceUid; // uid of source (TradeMessage) private final String sourceId; // id of source (tradeId, disputeId) - private final boolean result; // true if source message was processed successfully + private final boolean success; // true if source message was processed successfully @Nullable private final String errorMessage; // optional error message if source message processing failed @@ -57,7 +61,7 @@ public AckMessage(NodeAddress senderNodeAddress, String sourceMsgClassName, String sourceUid, String sourceId, - boolean result, + boolean success, String errorMessage) { this(UUID.randomUUID().toString(), senderNodeAddress, @@ -65,7 +69,7 @@ public AckMessage(NodeAddress senderNodeAddress, sourceMsgClassName, sourceUid, sourceId, - result, + success, errorMessage, Version.getP2PMessageVersion()); } @@ -81,7 +85,7 @@ private AckMessage(String uid, String sourceMsgClassName, String sourceUid, String sourceId, - boolean result, + boolean success, @Nullable String errorMessage, int messageVersion) { super(messageVersion); @@ -91,10 +95,22 @@ private AckMessage(String uid, this.sourceMsgClassName = sourceMsgClassName; this.sourceUid = sourceUid; this.sourceId = sourceId; - this.result = result; + this.success = success; this.errorMessage = errorMessage; } + public PB.AckMessage toProtoMessage() { + PB.AckMessage.Builder builder = PB.AckMessage.newBuilder() + .setUid(uid) + .setSenderNodeAddress(senderNodeAddress.toProtoMessage()) + .setSourceType(sourceType.name()) + .setSourceMsgClassName(sourceMsgClassName) + .setSourceUid(sourceUid) + .setSourceId(sourceId) + .setSuccess(success); + Optional.ofNullable(errorMessage).ifPresent(builder::setErrorMessage); + return builder.build(); + } @Override public PB.NetworkEnvelope toProtoNetworkEnvelope() { @@ -105,7 +121,7 @@ public PB.NetworkEnvelope toProtoNetworkEnvelope() { .setSourceMsgClassName(sourceMsgClassName) .setSourceUid(sourceUid) .setSourceId(sourceId) - .setResult(result); + .setSuccess(success); Optional.ofNullable(errorMessage).ifPresent(builder::setErrorMessage); return getNetworkEnvelopeBuilder().setAckMessage(builder).build(); } @@ -118,11 +134,16 @@ public static AckMessage fromProto(PB.AckMessage proto, int messageVersion) { proto.getSourceMsgClassName(), proto.getSourceUid(), proto.getSourceId(), - proto.getResult(), + proto.getSuccess(), proto.getErrorMessage().isEmpty() ? null : proto.getErrorMessage(), messageVersion); } + + /////////////////////////////////////////////////////////////////////////////////////////// + // API + /////////////////////////////////////////////////////////////////////////////////////////// + @Override public List getRequiredCapabilities() { return new ArrayList<>(Collections.singletonList( @@ -130,6 +151,10 @@ public List getRequiredCapabilities() { )); } + @Override + public long getTTL() { + return TimeUnit.DAYS.toMillis(10); + } @Override public String toString() { @@ -140,13 +165,8 @@ public String toString() { ",\n sourceMsgClassName='" + sourceMsgClassName + '\'' + ",\n sourceUid='" + sourceUid + '\'' + ",\n sourceId='" + sourceId + '\'' + - ",\n result=" + result + + ",\n success=" + success + ",\n errorMessage='" + errorMessage + '\'' + "\n} " + super.toString(); } - - @Override - public long getTTL() { - return TimeUnit.DAYS.toMillis(10); - } } diff --git a/src/main/java/bisq/network/p2p/P2PService.java b/src/main/java/bisq/network/p2p/P2PService.java index 67ec3e4..ef5face 100644 --- a/src/main/java/bisq/network/p2p/P2PService.java +++ b/src/main/java/bisq/network/p2p/P2PService.java @@ -456,6 +456,7 @@ public void onRemoved(ProtectedStorageEntry data) { // DirectMessages /////////////////////////////////////////////////////////////////////////////////////////// + // TODO OfferAvailabilityResponse is called twice! public void sendEncryptedDirectMessage(NodeAddress peerNodeAddress, PubKeyRing pubKeyRing, NetworkEnvelope message, SendDirectMessageListener sendDirectMessageListener) { //TODO for testing From b76a854029a311c7d591a64f6f5a167d172e447b Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Fri, 22 Jun 2018 16:21:39 +0200 Subject: [PATCH 05/12] Add supportedCapabilities to Peer If the AckMessage is sent as a mailbox message we need to know if the peer supports the capability. If there is no connection already created or if the connection is new and the capabilities not set by the peer, we would not know his capabilities. One solution would be to send first a request fro his capabilities but that adds time delay and complexity. We use the persisted peer data to store the capability. In the use case of a trade we have been earlier in contact with the peer so we should have his capability received. The peer list gets cleaned up with old entries after a while but we expect that the trade messages are sent earlier before a peer gets removed from the peers list. And even if we would not have the peer there the worst thing is that the AckMessage will not be sent. We log a warning for such cases. We use a weakReference for the listener for notifying the peers if their capabilities property gets changed. Peers might get removed from the list anytime. With the weak reference we don't need to worry about removing the listeners to avoid memory leaks. --- .../java/bisq/network/p2p/P2PService.java | 126 ++++++++++++------ .../bisq/network/p2p/network/Connection.java | 55 +++++--- .../SupportedCapabilitiesListener.java | 24 ++++ .../network/p2p/peers/BroadcastHandler.java | 2 +- .../bisq/network/p2p/peers/PeerManager.java | 6 +- .../peers/getdata/GetDataRequestHandler.java | 3 +- .../network/p2p/peers/peerexchange/Peer.java | 40 ++++-- .../messages/GetPeersResponse.java | 14 +- 8 files changed, 191 insertions(+), 79 deletions(-) create mode 100644 src/main/java/bisq/network/p2p/network/SupportedCapabilitiesListener.java diff --git a/src/main/java/bisq/network/p2p/P2PService.java b/src/main/java/bisq/network/p2p/P2PService.java index ef5face..25df4e2 100644 --- a/src/main/java/bisq/network/p2p/P2PService.java +++ b/src/main/java/bisq/network/p2p/P2PService.java @@ -31,6 +31,7 @@ import bisq.network.p2p.peers.PeerManager; import bisq.network.p2p.peers.getdata.RequestDataManager; import bisq.network.p2p.peers.keepalive.KeepAliveManager; +import bisq.network.p2p.peers.peerexchange.Peer; import bisq.network.p2p.peers.peerexchange.PeerExchangeManager; import bisq.network.p2p.seed.SeedNodeRepository; import bisq.network.p2p.storage.HashMapChangedListener; @@ -38,6 +39,7 @@ import bisq.network.p2p.storage.messages.AddDataMessage; import bisq.network.p2p.storage.messages.BroadcastMessage; import bisq.network.p2p.storage.messages.RefreshOfferMessage; +import bisq.network.p2p.storage.payload.CapabilityRequiringPayload; import bisq.network.p2p.storage.payload.MailboxStoragePayload; import bisq.network.p2p.storage.payload.PersistableNetworkPayload; import bisq.network.p2p.storage.payload.ProtectedMailboxStorageEntry; @@ -459,8 +461,6 @@ public void onRemoved(ProtectedStorageEntry data) { // TODO OfferAvailabilityResponse is called twice! public void sendEncryptedDirectMessage(NodeAddress peerNodeAddress, PubKeyRing pubKeyRing, NetworkEnvelope message, SendDirectMessageListener sendDirectMessageListener) { - //TODO for testing - log.error("sendEncryptedDirectMessage message " + message); checkNotNull(peerNodeAddress, "PeerAddress must not be null (sendEncryptedDirectMessage)"); if (isBootstrapped()) { doSendEncryptedDirectMessage(peerNodeAddress, pubKeyRing, message, sendDirectMessageListener); @@ -550,8 +550,6 @@ private void processProtectedMailboxStorageEntry(ProtectedMailboxStorageEntry pr public void sendEncryptedMailboxMessage(NodeAddress peersNodeAddress, PubKeyRing peersPubKeyRing, NetworkEnvelope message, SendMailboxMessageListener sendMailboxMessageListener) { - //TODO for testing - log.error("sendEncryptedMailboxMessage message " + message); checkNotNull(peersNodeAddress, "PeerAddress must not be null (sendEncryptedMailboxMessage)"); checkNotNull(networkNode.getNodeAddress(), @@ -559,50 +557,92 @@ public void sendEncryptedMailboxMessage(NodeAddress peersNodeAddress, PubKeyRing checkArgument(!keyRing.getPubKeyRing().equals(peersPubKeyRing), "We got own keyring instead of that from peer"); - if (isBootstrapped()) { - if (!networkNode.getAllConnections().isEmpty()) { - try { - log.debug("\n\nEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE\n" + - "Encrypt message:\nmessage={}" - + "\nEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE\n", message); - PrefixedSealedAndSignedMessage prefixedSealedAndSignedMessage = new PrefixedSealedAndSignedMessage( - networkNode.getNodeAddress(), - encryptionService.encryptAndSign(peersPubKeyRing, message), - peersNodeAddress.getAddressPrefixHash(), - UUID.randomUUID().toString()); - SettableFuture future = networkNode.sendMessage(peersNodeAddress, prefixedSealedAndSignedMessage); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(@Nullable Connection connection) { - log.trace("SendEncryptedMailboxMessage onSuccess"); - sendMailboxMessageListener.onArrived(); - } + if (!isBootstrapped()) + throw new NetworkNotReadyException(); - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("We cannot send message to peer. Peer might be offline. We will store message in mailbox. peersNodeAddress=" + peersNodeAddress); - PublicKey receiverStoragePublicKey = peersPubKeyRing.getSignaturePubKey(); - addMailboxData(new MailboxStoragePayload(prefixedSealedAndSignedMessage, - keyRing.getSignatureKeyPair().getPublic(), - receiverStoragePublicKey), - receiverStoragePublicKey, - sendMailboxMessageListener); - } - }); - } catch (CryptoException e) { - log.error("sendEncryptedMessage failed"); - e.printStackTrace(); - sendMailboxMessageListener.onFault("sendEncryptedMailboxMessage failed " + e); + if (networkNode.getAllConnections().isEmpty()) { + sendMailboxMessageListener.onFault("There are no P2P network nodes connected. " + + "Please check your internet connection."); + return; + } + if (capabilityRequiredAndCapabilityNotSupported(peersNodeAddress, message)) { + sendMailboxMessageListener.onFault("We did not send the EncryptedMailboxMessage " + + "because the peer does not support the capability."); + return; + } + + try { + log.debug("\n\nEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE\n" + + "Encrypt message:\nmessage={}" + + "\nEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE\n", message); + + PrefixedSealedAndSignedMessage prefixedSealedAndSignedMessage = new PrefixedSealedAndSignedMessage( + networkNode.getNodeAddress(), + encryptionService.encryptAndSign(peersPubKeyRing, message), + peersNodeAddress.getAddressPrefixHash(), + UUID.randomUUID().toString()); + SettableFuture future = networkNode.sendMessage(peersNodeAddress, prefixedSealedAndSignedMessage); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Connection connection) { + log.trace("SendEncryptedMailboxMessage onSuccess"); + sendMailboxMessageListener.onArrived(); } - } else { - sendMailboxMessageListener.onFault("There are no P2P network nodes connected. " + - "Please check your internet connection."); - } - } else { - throw new NetworkNotReadyException(); + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("We cannot send message to peer. Peer might be offline. We will store message in mailbox. peersNodeAddress=" + peersNodeAddress); + PublicKey receiverStoragePublicKey = peersPubKeyRing.getSignaturePubKey(); + addMailboxData(new MailboxStoragePayload(prefixedSealedAndSignedMessage, + keyRing.getSignatureKeyPair().getPublic(), + receiverStoragePublicKey), + receiverStoragePublicKey, + sendMailboxMessageListener); + } + }); + } catch (CryptoException e) { + log.error("sendEncryptedMessage failed"); + e.printStackTrace(); + sendMailboxMessageListener.onFault("sendEncryptedMailboxMessage failed " + e); } } + private boolean capabilityRequiredAndCapabilityNotSupported(NodeAddress peersNodeAddress, NetworkEnvelope message) { + if (!(message instanceof CapabilityRequiringPayload)) + return false; + + // We only expect AckMessage so far + if (!(message instanceof AckMessage)) + log.warn("We got a CapabilityRequiringPayload for the mailbox message which is not a AckMessage. " + + "peersNodeAddress={}", peersNodeAddress); + + Set allPeers = peerManager.getPersistedPeers(); + allPeers.addAll(peerManager.getReportedPeers()); + allPeers.addAll(peerManager.getLivePeers(null)); + // We might have multiple entries of the same peer without the supportedCapabilities field set if we received + // it from old versions, so we filter those. + Optional optionalPeer = allPeers.stream() + .filter(peer -> peer.getNodeAddress().equals(peersNodeAddress)) + .filter(peer -> peer.getSupportedCapabilities() != null) + .filter(peer -> !peer.getSupportedCapabilities().isEmpty()) + .findAny(); + if (optionalPeer.isPresent()) { + Peer peer = optionalPeer.get(); + boolean result = Connection.isCapabilityRequired(message) && + !Connection.isCapabilitySupported((CapabilityRequiringPayload) message, peer.getSupportedCapabilities()); + + if (result) + log.warn("We don't send the message because the peer does not support the required capability. " + + "peersNodeAddress={}", peersNodeAddress); + + return result; + } + log.warn("We don't have the peer in our persisted peers so we don't know his capabilities. " + + "We decide to not sent the msg. peersNodeAddress={}", peersNodeAddress); + return true; + + } + private void addMailboxData(MailboxStoragePayload expirableMailboxStoragePayload, PublicKey receiversPublicKey, diff --git a/src/main/java/bisq/network/p2p/network/Connection.java b/src/main/java/bisq/network/p2p/network/Connection.java index 5f47e10..95b3290 100644 --- a/src/main/java/bisq/network/p2p/network/Connection.java +++ b/src/main/java/bisq/network/p2p/network/Connection.java @@ -77,6 +77,8 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; +import java.lang.ref.WeakReference; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,6 +150,7 @@ public static int getPermittedMessageSize() { private final List> messageTimeStamps = new ArrayList<>(); private final CopyOnWriteArraySet messageListeners = new CopyOnWriteArraySet<>(); private volatile long lastSendTimeStamp = 0; + private final CopyOnWriteArraySet> supportedCapabilitiesListeners = new CopyOnWriteArraySet<>(); /////////////////////////////////////////////////////////////////////////////////////////// @@ -217,7 +220,7 @@ public void sendMessage(NetworkEnvelope networkEnvelope) { log.debug(">> Send networkEnvelope of type: " + networkEnvelope.getClass().getSimpleName()); if (!stopped) { - if (!isCapabilityRequired(networkEnvelope) || isCapabilitySupported(networkEnvelope)) { + if (noCapabilityRequiredOrCapabilityIsSupported(networkEnvelope)) { try { Log.traceCall(); @@ -275,35 +278,44 @@ public void sendMessage(NetworkEnvelope networkEnvelope) { } } + public boolean noCapabilityRequiredOrCapabilityIsSupported(NetworkEnvelope networkEnvelope) { + return !isCapabilityRequired(networkEnvelope) || isCapabilitySupported(networkEnvelope); + } + + @SuppressWarnings("BooleanMethodIsAlwaysInverted") + public static boolean isCapabilityRequired(NetworkEnvelope networkEnvelop) { + if (networkEnvelop instanceof AddDataMessage) { + final ProtectedStoragePayload protectedStoragePayload = (((AddDataMessage) networkEnvelop).getProtectedStorageEntry()).getProtectedStoragePayload(); + return protectedStoragePayload instanceof CapabilityRequiringPayload; + } else if (networkEnvelop instanceof AddPersistableNetworkPayloadMessage) { + final PersistableNetworkPayload persistableNetworkPayload = ((AddPersistableNetworkPayloadMessage) networkEnvelop).getPersistableNetworkPayload(); + return persistableNetworkPayload instanceof CapabilityRequiringPayload; + } else { + return networkEnvelop instanceof CapabilityRequiringPayload; + } + } + public boolean isCapabilitySupported(NetworkEnvelope networkEnvelop) { if (networkEnvelop instanceof AddDataMessage) { final ProtectedStoragePayload protectedStoragePayload = (((AddDataMessage) networkEnvelop).getProtectedStorageEntry()).getProtectedStoragePayload(); - return !(protectedStoragePayload instanceof CapabilityRequiringPayload) || isCapabilitySupported((CapabilityRequiringPayload) protectedStoragePayload); + return isCapabilitySupported((CapabilityRequiringPayload) protectedStoragePayload); } else if (networkEnvelop instanceof AddPersistableNetworkPayloadMessage) { final PersistableNetworkPayload persistableNetworkPayload = ((AddPersistableNetworkPayloadMessage) networkEnvelop).getPersistableNetworkPayload(); - return !(persistableNetworkPayload instanceof CapabilityRequiringPayload) || isCapabilitySupported((CapabilityRequiringPayload) persistableNetworkPayload); + return isCapabilitySupported((CapabilityRequiringPayload) persistableNetworkPayload); } else { - return !(networkEnvelop instanceof CapabilityRequiringPayload) || isCapabilitySupported((CapabilityRequiringPayload) networkEnvelop); + return isCapabilitySupported((CapabilityRequiringPayload) networkEnvelop); } } - private boolean isCapabilitySupported(CapabilityRequiringPayload payload) { + public boolean isCapabilitySupported(CapabilityRequiringPayload payload) { + return isCapabilitySupported(payload, sharedModel.getSupportedCapabilities()); + } + + public static boolean isCapabilitySupported(CapabilityRequiringPayload payload, List supportedCapabilities) { final List requiredCapabilities = payload.getRequiredCapabilities(); - final List supportedCapabilities = sharedModel.getSupportedCapabilities(); return Capabilities.isCapabilitySupported(requiredCapabilities, supportedCapabilities); } - @SuppressWarnings("BooleanMethodIsAlwaysInverted") - public boolean isCapabilityRequired(NetworkEnvelope networkEnvelop) { - boolean isCapabilityRequiringAddDataMessage = networkEnvelop instanceof AddDataMessage && - (((AddDataMessage) networkEnvelop).getProtectedStorageEntry()).getProtectedStoragePayload() instanceof CapabilityRequiringPayload; - boolean isCapabilityRequiringAddPersistableNetworkPayloadMessage = networkEnvelop instanceof AddPersistableNetworkPayloadMessage && - (((AddPersistableNetworkPayloadMessage) networkEnvelop).getPersistableNetworkPayload() instanceof CapabilityRequiringPayload); - boolean isCapabilityRequiringNetworkEnvelope = networkEnvelop instanceof CapabilityRequiringPayload; - return isCapabilityRequiringAddDataMessage || - isCapabilityRequiringAddPersistableNetworkPayloadMessage || - isCapabilityRequiringNetworkEnvelope; - } public List getSupportedCapabilities() { return sharedModel.getSupportedCapabilities(); @@ -322,6 +334,10 @@ public void removeMessageListener(MessageListener messageListener) { "That might happen because of async behaviour of CopyOnWriteArraySet"); } + public void addSupportedCapabilitiesListenerAsWeakReference(SupportedCapabilitiesListener listener) { + supportedCapabilitiesListeners.add(new WeakReference<>(listener)); + } + @SuppressWarnings({"unused", "UnusedReturnValue"}) public boolean reportIllegalRequest(RuleViolation ruleViolation) { return sharedModel.reportInvalidRequest(ruleViolation); @@ -636,6 +652,11 @@ public List getSupportedCapabilities() { @SuppressWarnings("NullableProblems") public void setSupportedCapabilities(List supportedCapabilities) { this.supportedCapabilities = supportedCapabilities; + connection.supportedCapabilitiesListeners.forEach(l -> { + SupportedCapabilitiesListener supportedCapabilitiesListener = l.get(); + if (supportedCapabilitiesListener != null) + supportedCapabilitiesListener.onChanged(supportedCapabilities); + }); } public void handleConnectionException(Throwable e) { diff --git a/src/main/java/bisq/network/p2p/network/SupportedCapabilitiesListener.java b/src/main/java/bisq/network/p2p/network/SupportedCapabilitiesListener.java new file mode 100644 index 0000000..1209fd3 --- /dev/null +++ b/src/main/java/bisq/network/p2p/network/SupportedCapabilitiesListener.java @@ -0,0 +1,24 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p.network; + +import java.util.List; + +public interface SupportedCapabilitiesListener { + void onChanged(List supportedCapabilities); +} diff --git a/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java b/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java index ec237aa..bcd3d67 100644 --- a/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java +++ b/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java @@ -168,7 +168,7 @@ private void sendToPeer(Connection connection, BroadcastMessage message) { "message = " + Utilities.toTruncatedString(message); if (!stopped) { if (!connection.isStopped()) { - if (!connection.isCapabilityRequired(message) || connection.isCapabilitySupported(message)) { + if (connection.noCapabilityRequiredOrCapabilityIsSupported(message)) { NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get(); log.trace("Broadcast message to " + nodeAddress + "."); SettableFuture future = networkNode.sendMessage(connection, message); diff --git a/src/main/java/bisq/network/p2p/peers/PeerManager.java b/src/main/java/bisq/network/p2p/peers/PeerManager.java index 2c90cfe..86bd1f9 100644 --- a/src/main/java/bisq/network/p2p/peers/PeerManager.java +++ b/src/main/java/bisq/network/p2p/peers/PeerManager.java @@ -660,7 +660,11 @@ private Set getConnectedReportedPeers() { // networkNode.getConfirmedConnections includes: // filter(connection -> connection.getPeersNodeAddressOptional().isPresent()) return networkNode.getConfirmedConnections().stream() - .map(c -> new Peer(c.getPeersNodeAddressOptional().get())) + .map(connection -> { + Peer peer = new Peer(connection.getPeersNodeAddressOptional().get(), connection.getSupportedCapabilities()); + connection.addSupportedCapabilitiesListenerAsWeakReference(peer); + return peer; + }) .collect(Collectors.toSet()); } diff --git a/src/main/java/bisq/network/p2p/peers/getdata/GetDataRequestHandler.java b/src/main/java/bisq/network/p2p/peers/getdata/GetDataRequestHandler.java index 742e15a..55d8580 100644 --- a/src/main/java/bisq/network/p2p/peers/getdata/GetDataRequestHandler.java +++ b/src/main/java/bisq/network/p2p/peers/getdata/GetDataRequestHandler.java @@ -143,8 +143,7 @@ private Set getFilteredPersistableNetworkPayload(GetD return dataStorage.getPersistableNetworkPayloadList().getMap().entrySet().stream() .filter(e -> !excludedKeysAsByteArray.contains(e.getKey())) .map(Map.Entry::getValue) - .filter(payload -> (!(payload instanceof CapabilityRequiringPayload) || - connection.isCapabilitySupported(getDataRequest))) + .filter(payload -> (connection.noCapabilityRequiredOrCapabilityIsSupported(getDataRequest))) .filter(payload -> tempLookupSet.add(new P2PDataStorage.ByteArray(payload.getHash()))) .collect(Collectors.toSet()); } diff --git a/src/main/java/bisq/network/p2p/peers/peerexchange/Peer.java b/src/main/java/bisq/network/p2p/peers/peerexchange/Peer.java index 34692cf..d492595 100644 --- a/src/main/java/bisq/network/p2p/peers/peerexchange/Peer.java +++ b/src/main/java/bisq/network/p2p/peers/peerexchange/Peer.java @@ -18,41 +18,54 @@ package bisq.network.p2p.peers.peerexchange; import bisq.network.p2p.NodeAddress; +import bisq.network.p2p.network.SupportedCapabilitiesListener; import bisq.common.proto.network.NetworkPayload; import bisq.common.proto.persistable.PersistablePayload; import io.bisq.generated.protobuffer.PB; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.Nullable; @Getter -@EqualsAndHashCode(exclude = {"date"}) +@EqualsAndHashCode(exclude = {"date", "failedConnectionAttempts"}) @ToString -public final class Peer implements NetworkPayload, PersistablePayload { +@Slf4j +public final class Peer implements NetworkPayload, PersistablePayload, SupportedCapabilitiesListener { private static final int MAX_FAILED_CONNECTION_ATTEMPTS = 5; private final NodeAddress nodeAddress; private final long date; + // Added in v. 0.7.1 + @Setter + private List supportedCapabilities = new ArrayList<>(); + @Setter - private int failedConnectionAttempts = 0; + transient private int failedConnectionAttempts = 0; - public Peer(NodeAddress nodeAddress) { - this(nodeAddress, new Date().getTime()); + public Peer(NodeAddress nodeAddress, @Nullable List supportedCapabilities) { + this(nodeAddress, new Date().getTime(), + supportedCapabilities == null ? new ArrayList<>() : supportedCapabilities); } /////////////////////////////////////////////////////////////////////////////////////////// // PROTO BUFFER /////////////////////////////////////////////////////////////////////////////////////////// - private Peer(NodeAddress nodeAddress, long date) { + private Peer(NodeAddress nodeAddress, long date, List supportedCapabilities) { this.nodeAddress = nodeAddress; this.date = date; + this.supportedCapabilities = supportedCapabilities; } @Override @@ -60,12 +73,15 @@ public PB.Peer toProtoMessage() { return PB.Peer.newBuilder() .setNodeAddress(nodeAddress.toProtoMessage()) .setDate(date) + .addAllSupportedCapabilities(supportedCapabilities) .build(); } - public static Peer fromProto(PB.Peer peer) { - return new Peer(NodeAddress.fromProto(peer.getNodeAddress()), - peer.getDate()); + public static Peer fromProto(PB.Peer proto) { + return new Peer(NodeAddress.fromProto(proto.getNodeAddress()), + proto.getDate(), + proto.getSupportedCapabilitiesList().isEmpty() ? + new ArrayList<>() : new ArrayList<>(proto.getSupportedCapabilitiesList())); } @@ -84,4 +100,10 @@ public boolean tooManyFailedConnectionAttempts() { public Date getDate() { return new Date(date); } + + @Override + public void onChanged(List supportedCapabilities) { + if (supportedCapabilities != null && !supportedCapabilities.isEmpty()) + this.supportedCapabilities = supportedCapabilities; + } } diff --git a/src/main/java/bisq/network/p2p/peers/peerexchange/messages/GetPeersResponse.java b/src/main/java/bisq/network/p2p/peers/peerexchange/messages/GetPeersResponse.java index 5904a28..1f6788b 100644 --- a/src/main/java/bisq/network/p2p/peers/peerexchange/messages/GetPeersResponse.java +++ b/src/main/java/bisq/network/p2p/peers/peerexchange/messages/GetPeersResponse.java @@ -81,12 +81,14 @@ public PB.NetworkEnvelope toProtoNetworkEnvelope() { } public static GetPeersResponse fromProto(PB.GetPeersResponse proto, int messageVersion) { - HashSet reportedPeers = new HashSet<>( - proto.getReportedPeersList() - .stream() - .map(peer -> new Peer(new NodeAddress(peer.getNodeAddress().getHostName(), - peer.getNodeAddress().getPort()))) - .collect(Collectors.toList())); + HashSet reportedPeers = proto.getReportedPeersList() + .stream() + .map(peer -> { + NodeAddress nodeAddress = new NodeAddress(peer.getNodeAddress().getHostName(), + peer.getNodeAddress().getPort()); + return new Peer(nodeAddress, peer.getSupportedCapabilitiesList()); + }) + .collect(Collectors.toCollection(HashSet::new)); return new GetPeersResponse(proto.getRequestNonce(), reportedPeers, proto.getSupportedCapabilitiesList().isEmpty() ? null : proto.getSupportedCapabilitiesList(), From 1903358f8ac54b965bc08ef720646c73e62e0dec Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Sat, 23 Jun 2018 10:44:29 +0200 Subject: [PATCH 06/12] Add AckMessage to dispute messages --- src/main/java/bisq/network/p2p/P2PService.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/bisq/network/p2p/P2PService.java b/src/main/java/bisq/network/p2p/P2PService.java index 25df4e2..b5e8c3d 100644 --- a/src/main/java/bisq/network/p2p/P2PService.java +++ b/src/main/java/bisq/network/p2p/P2PService.java @@ -581,6 +581,8 @@ public void sendEncryptedMailboxMessage(NodeAddress peersNodeAddress, PubKeyRing encryptionService.encryptAndSign(peersPubKeyRing, message), peersNodeAddress.getAddressPrefixHash(), UUID.randomUUID().toString()); + + log.debug("sendEncryptedMailboxMessage msg={}, peersNodeAddress={}", message, peersNodeAddress); SettableFuture future = networkNode.sendMessage(peersNodeAddress, prefixedSealedAndSignedMessage); Futures.addCallback(future, new FutureCallback() { @Override From ea7f917a91b2233a8b1628863e5d7d0fe96752bd Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Sun, 24 Jun 2018 16:09:24 +0200 Subject: [PATCH 07/12] Improve dispute AckMsg handling - Show errors at send dispute msg and at ackMsg. - Add supportedCapabilities of persisted or reported peers to a peer from a new connection (which has empty supportedCapabilities). --- src/main/java/bisq/network/p2p/P2PService.java | 3 ++- .../bisq/network/p2p/network/Connection.java | 7 ++----- .../bisq/network/p2p/network/NetworkNode.java | 3 ++- .../bisq/network/p2p/peers/PeerManager.java | 18 ++++++++++++++++-- .../network/p2p/peers/peerexchange/Peer.java | 16 ++++++++++++++-- .../peerexchange/PeerExchangeHandler.java | 1 - .../peerexchange/PeerExchangeManager.java | 2 +- 7 files changed, 37 insertions(+), 13 deletions(-) diff --git a/src/main/java/bisq/network/p2p/P2PService.java b/src/main/java/bisq/network/p2p/P2PService.java index b5e8c3d..4af4130 100644 --- a/src/main/java/bisq/network/p2p/P2PService.java +++ b/src/main/java/bisq/network/p2p/P2PService.java @@ -639,8 +639,9 @@ private boolean capabilityRequiredAndCapabilityNotSupported(NodeAddress peersNod return result; } + log.warn("We don't have the peer in our persisted peers so we don't know his capabilities. " + - "We decide to not sent the msg. peersNodeAddress={}", peersNodeAddress); + "We decide to not sent the msg. peersNodeAddress={}, allPeers={}", peersNodeAddress, allPeers); return true; } diff --git a/src/main/java/bisq/network/p2p/network/Connection.java b/src/main/java/bisq/network/p2p/network/Connection.java index 95b3290..233b217 100644 --- a/src/main/java/bisq/network/p2p/network/Connection.java +++ b/src/main/java/bisq/network/p2p/network/Connection.java @@ -316,7 +316,7 @@ public static boolean isCapabilitySupported(CapabilityRequiringPayload payload, return Capabilities.isCapabilitySupported(requiredCapabilities, supportedCapabilities); } - + @Nullable public List getSupportedCapabilities() { return sharedModel.getSupportedCapabilities(); } @@ -612,10 +612,7 @@ public SharedModel(Connection connection, Socket socket) { public boolean reportInvalidRequest(RuleViolation ruleViolation) { log.warn("We got reported the ruleViolation {} at connection {}", ruleViolation, connection); int numRuleViolations; - if (ruleViolations.containsKey(ruleViolation)) - numRuleViolations = ruleViolations.get(ruleViolation); - else - numRuleViolations = 0; + numRuleViolations = ruleViolations.getOrDefault(ruleViolation, 0); numRuleViolations++; ruleViolations.put(ruleViolation, numRuleViolations); diff --git a/src/main/java/bisq/network/p2p/network/NetworkNode.java b/src/main/java/bisq/network/p2p/network/NetworkNode.java index d1804c3..85e5c0a 100644 --- a/src/main/java/bisq/network/p2p/network/NetworkNode.java +++ b/src/main/java/bisq/network/p2p/network/NetworkNode.java @@ -98,7 +98,7 @@ public abstract class NetworkNode implements MessageListener { abstract public void start(@Nullable SetupListener setupListener); public SettableFuture sendMessage(@NotNull NodeAddress peersNodeAddress, NetworkEnvelope networkEnvelop) { - Log.traceCall("peersNodeAddress=" + peersNodeAddress + "\n\tmessage=" + Utilities.toTruncatedString(networkEnvelop)); + log.debug("sendMessage: peersNodeAddress=" + peersNodeAddress + "\n\tmessage=" + Utilities.toTruncatedString(networkEnvelop)); checkNotNull(peersNodeAddress, "peerAddress must not be null"); Connection connection = getOutboundConnection(peersNodeAddress); @@ -206,6 +206,7 @@ public void onSuccess(Connection connection) { } public void onFailure(@NotNull Throwable throwable) { + log.error("onFailure at sendMessage: peersNodeAddress=" + peersNodeAddress + "\n\tmessage=" + Utilities.toTruncatedString(networkEnvelop)); UserThread.execute(() -> resultFuture.setException(throwable)); } }); diff --git a/src/main/java/bisq/network/p2p/peers/PeerManager.java b/src/main/java/bisq/network/p2p/peers/PeerManager.java index 86bd1f9..40adadd 100644 --- a/src/main/java/bisq/network/p2p/peers/PeerManager.java +++ b/src/main/java/bisq/network/p2p/peers/PeerManager.java @@ -660,8 +660,22 @@ private Set getConnectedReportedPeers() { // networkNode.getConfirmedConnections includes: // filter(connection -> connection.getPeersNodeAddressOptional().isPresent()) return networkNode.getConfirmedConnections().stream() - .map(connection -> { - Peer peer = new Peer(connection.getPeersNodeAddressOptional().get(), connection.getSupportedCapabilities()); + .map((Connection connection) -> { + List supportedCapabilities = connection.getSupportedCapabilities(); + // If we have a new connection the supportedCapabilities is empty. + // We lookup if we have already stored the supportedCapabilities at the persisted or reported peers + // and if so we use that. + if (supportedCapabilities == null || supportedCapabilities.isEmpty()) { + Set allPeers = new HashSet<>(getPersistedPeers()); + allPeers.addAll(getReportedPeers()); + supportedCapabilities = allPeers.stream().filter(peer -> peer.getNodeAddress().equals(connection.getPeersNodeAddressOptional().get())) + .filter(peer -> !peer.getSupportedCapabilities().isEmpty()) + .findAny() + .map(Peer::getSupportedCapabilities) + .filter(list -> !list.isEmpty()) + .orElse(new ArrayList<>()); + } + Peer peer = new Peer(connection.getPeersNodeAddressOptional().get(), supportedCapabilities); connection.addSupportedCapabilitiesListenerAsWeakReference(peer); return peer; }) diff --git a/src/main/java/bisq/network/p2p/peers/peerexchange/Peer.java b/src/main/java/bisq/network/p2p/peers/peerexchange/Peer.java index d492595..49e7b4f 100644 --- a/src/main/java/bisq/network/p2p/peers/peerexchange/Peer.java +++ b/src/main/java/bisq/network/p2p/peers/peerexchange/Peer.java @@ -32,14 +32,12 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; -import lombok.ToString; import lombok.extern.slf4j.Slf4j; import javax.annotation.Nullable; @Getter @EqualsAndHashCode(exclude = {"date", "failedConnectionAttempts"}) -@ToString @Slf4j public final class Peer implements NetworkPayload, PersistablePayload, SupportedCapabilitiesListener { private static final int MAX_FAILED_CONNECTION_ATTEMPTS = 5; @@ -66,6 +64,9 @@ private Peer(NodeAddress nodeAddress, long date, List supportedCapabili this.nodeAddress = nodeAddress; this.date = date; this.supportedCapabilities = supportedCapabilities; + + if (supportedCapabilities.isEmpty()) + log.warn("supportedCapabilities is empty"); } @Override @@ -106,4 +107,15 @@ public void onChanged(List supportedCapabilities) { if (supportedCapabilities != null && !supportedCapabilities.isEmpty()) this.supportedCapabilities = supportedCapabilities; } + + + @Override + public String toString() { + return "Peer{" + + "\n nodeAddress=" + nodeAddress + + ",\n supportedCapabilities=" + supportedCapabilities + + ",\n failedConnectionAttempts=" + failedConnectionAttempts + + ",\n date=" + date + + "\n}"; + } } diff --git a/src/main/java/bisq/network/p2p/peers/peerexchange/PeerExchangeHandler.java b/src/main/java/bisq/network/p2p/peers/peerexchange/PeerExchangeHandler.java index a4fc92e..f08ec1f 100644 --- a/src/main/java/bisq/network/p2p/peers/peerexchange/PeerExchangeHandler.java +++ b/src/main/java/bisq/network/p2p/peers/peerexchange/PeerExchangeHandler.java @@ -102,7 +102,6 @@ public void sendGetPeersRequestAfterRandomDelay(NodeAddress nodeAddress) { } private void sendGetPeersRequest(NodeAddress nodeAddress) { - Log.traceCall("nodeAddress=" + nodeAddress + " / this=" + this); log.debug("sendGetPeersRequest to nodeAddress={}", nodeAddress); if (!stopped) { if (networkNode.getNodeAddress() != null) { diff --git a/src/main/java/bisq/network/p2p/peers/peerexchange/PeerExchangeManager.java b/src/main/java/bisq/network/p2p/peers/peerexchange/PeerExchangeManager.java index f315b3d..41f4869 100644 --- a/src/main/java/bisq/network/p2p/peers/peerexchange/PeerExchangeManager.java +++ b/src/main/java/bisq/network/p2p/peers/peerexchange/PeerExchangeManager.java @@ -312,7 +312,7 @@ private void requestWithAvailablePeers() { log.debug("Number of peers in list for connectToMorePeers: {}", list.size()); log.trace("Filtered connectToMorePeers list: list=" + list); if (!list.isEmpty()) { - // Dont shuffle as we want the seed nodes at the last entries + // Don't shuffle as we want the seed nodes at the last entries NodeAddress nextCandidate = list.get(0); list.remove(nextCandidate); requestReportedPeers(nextCandidate, list); From e83e001e1563ced17157ac47469f251ce32cf917 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Mon, 25 Jun 2018 10:42:12 +0200 Subject: [PATCH 08/12] Add support for AckMessage at offer availability protocol --- src/main/java/bisq/network/p2p/P2PService.java | 6 +++--- .../java/bisq/network/p2p/SendDirectMessageListener.java | 2 +- src/main/java/bisq/network/p2p/network/NetworkNode.java | 2 +- src/main/java/bisq/network/p2p/peers/peerexchange/Peer.java | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/bisq/network/p2p/P2PService.java b/src/main/java/bisq/network/p2p/P2PService.java index 4af4130..4d39c0e 100644 --- a/src/main/java/bisq/network/p2p/P2PService.java +++ b/src/main/java/bisq/network/p2p/P2PService.java @@ -471,7 +471,7 @@ public void sendEncryptedDirectMessage(NodeAddress peerNodeAddress, PubKeyRing p private void doSendEncryptedDirectMessage(@NotNull NodeAddress peersNodeAddress, PubKeyRing pubKeyRing, NetworkEnvelope message, SendDirectMessageListener sendDirectMessageListener) { - Log.traceCall(); + log.debug("doSendEncryptedDirectMessage peersNodeAddress={}, message={}", peersNodeAddress, message.getClass().getSimpleName()); checkNotNull(peersNodeAddress, "Peer node address must not be null at doSendEncryptedDirectMessage"); checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at doSendEncryptedDirectMessage"); try { @@ -494,14 +494,14 @@ public void onSuccess(@Nullable Connection connection) { public void onFailure(@NotNull Throwable throwable) { log.error(throwable.toString()); throwable.printStackTrace(); - sendDirectMessageListener.onFault(); + sendDirectMessageListener.onFault(throwable.toString()); } }); } catch (CryptoException e) { e.printStackTrace(); log.error(message.toString()); log.error(e.toString()); - sendDirectMessageListener.onFault(); + sendDirectMessageListener.onFault(e.toString()); } } diff --git a/src/main/java/bisq/network/p2p/SendDirectMessageListener.java b/src/main/java/bisq/network/p2p/SendDirectMessageListener.java index 9f272ea..c416440 100644 --- a/src/main/java/bisq/network/p2p/SendDirectMessageListener.java +++ b/src/main/java/bisq/network/p2p/SendDirectMessageListener.java @@ -20,5 +20,5 @@ public interface SendDirectMessageListener { void onArrived(); - void onFault(); + void onFault(String ErrorMessage); } diff --git a/src/main/java/bisq/network/p2p/network/NetworkNode.java b/src/main/java/bisq/network/p2p/network/NetworkNode.java index 85e5c0a..c97991f 100644 --- a/src/main/java/bisq/network/p2p/network/NetworkNode.java +++ b/src/main/java/bisq/network/p2p/network/NetworkNode.java @@ -206,7 +206,7 @@ public void onSuccess(Connection connection) { } public void onFailure(@NotNull Throwable throwable) { - log.error("onFailure at sendMessage: peersNodeAddress=" + peersNodeAddress + "\n\tmessage=" + Utilities.toTruncatedString(networkEnvelop)); + log.info("onFailure at sendMessage: peersNodeAddress=" + peersNodeAddress + "\n\tmessage=" + Utilities.toTruncatedString(networkEnvelop)); UserThread.execute(() -> resultFuture.setException(throwable)); } }); diff --git a/src/main/java/bisq/network/p2p/peers/peerexchange/Peer.java b/src/main/java/bisq/network/p2p/peers/peerexchange/Peer.java index 49e7b4f..df508b9 100644 --- a/src/main/java/bisq/network/p2p/peers/peerexchange/Peer.java +++ b/src/main/java/bisq/network/p2p/peers/peerexchange/Peer.java @@ -66,7 +66,7 @@ private Peer(NodeAddress nodeAddress, long date, List supportedCapabili this.supportedCapabilities = supportedCapabilities; if (supportedCapabilities.isEmpty()) - log.warn("supportedCapabilities is empty"); + log.warn("SupportedCapabilities is empty. nodeAddress={}", nodeAddress); } @Override From 40dea55c1f66eb066dea4884efcf9a322d9d9b44 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Mon, 25 Jun 2018 10:43:54 +0200 Subject: [PATCH 09/12] Add UidMessage --- .../java/bisq/network/p2p/MailboxMessage.java | 4 +--- .../java/bisq/network/p2p/UidMessage.java | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+), 3 deletions(-) create mode 100644 src/main/java/bisq/network/p2p/UidMessage.java diff --git a/src/main/java/bisq/network/p2p/MailboxMessage.java b/src/main/java/bisq/network/p2p/MailboxMessage.java index 236245a..719c33f 100644 --- a/src/main/java/bisq/network/p2p/MailboxMessage.java +++ b/src/main/java/bisq/network/p2p/MailboxMessage.java @@ -18,8 +18,6 @@ package bisq.network.p2p; -public interface MailboxMessage extends DirectMessage { +public interface MailboxMessage extends DirectMessage, UidMessage { NodeAddress getSenderNodeAddress(); - - String getUid(); } diff --git a/src/main/java/bisq/network/p2p/UidMessage.java b/src/main/java/bisq/network/p2p/UidMessage.java new file mode 100644 index 0000000..3260d61 --- /dev/null +++ b/src/main/java/bisq/network/p2p/UidMessage.java @@ -0,0 +1,23 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p; + + +public interface UidMessage { + String getUid(); +} From 67ddf29a99d741d45ea783cca2fe444182f76bca Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Mon, 25 Jun 2018 11:16:35 +0200 Subject: [PATCH 10/12] Improve logging --- src/main/java/bisq/network/p2p/P2PService.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/main/java/bisq/network/p2p/P2PService.java b/src/main/java/bisq/network/p2p/P2PService.java index 4d39c0e..c2b6f5e 100644 --- a/src/main/java/bisq/network/p2p/P2PService.java +++ b/src/main/java/bisq/network/p2p/P2PService.java @@ -471,7 +471,8 @@ public void sendEncryptedDirectMessage(NodeAddress peerNodeAddress, PubKeyRing p private void doSendEncryptedDirectMessage(@NotNull NodeAddress peersNodeAddress, PubKeyRing pubKeyRing, NetworkEnvelope message, SendDirectMessageListener sendDirectMessageListener) { - log.debug("doSendEncryptedDirectMessage peersNodeAddress={}, message={}", peersNodeAddress, message.getClass().getSimpleName()); + log.info("Send encrypted direct message {} to peer {}", + message.getClass().getSimpleName(), peersNodeAddress); checkNotNull(peersNodeAddress, "Peer node address must not be null at doSendEncryptedDirectMessage"); checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at doSendEncryptedDirectMessage"); try { @@ -487,11 +488,15 @@ private void doSendEncryptedDirectMessage(@NotNull NodeAddress peersNodeAddress, Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(@Nullable Connection connection) { + log.info("Encrypted direct message arrived at peer: Message {}; peer {}", + message.getClass().getSimpleName(), peersNodeAddress); sendDirectMessageListener.onArrived(); } @Override public void onFailure(@NotNull Throwable throwable) { + log.warn("Sending encrypted direct message failed: Message {}; peer {}; error={}", + message.getClass().getSimpleName(), peersNodeAddress, throwable.toString()); log.error(throwable.toString()); throwable.printStackTrace(); sendDirectMessageListener.onFault(throwable.toString()); @@ -550,6 +555,9 @@ private void processProtectedMailboxStorageEntry(ProtectedMailboxStorageEntry pr public void sendEncryptedMailboxMessage(NodeAddress peersNodeAddress, PubKeyRing peersPubKeyRing, NetworkEnvelope message, SendMailboxMessageListener sendMailboxMessageListener) { + log.info("Send encrypted mailbox message {} to peer {}", + message.getClass().getSimpleName(), peersNodeAddress); + checkNotNull(peersNodeAddress, "PeerAddress must not be null (sendEncryptedMailboxMessage)"); checkNotNull(networkNode.getNodeAddress(), @@ -587,13 +595,15 @@ public void sendEncryptedMailboxMessage(NodeAddress peersNodeAddress, PubKeyRing Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(@Nullable Connection connection) { - log.trace("SendEncryptedMailboxMessage onSuccess"); + log.info("Encrypted mailbox message arrived at peer: Message {}; peer {}", + message.getClass().getSimpleName(), peersNodeAddress); sendMailboxMessageListener.onArrived(); } @Override public void onFailure(@NotNull Throwable throwable) { - log.info("We cannot send message to peer. Peer might be offline. We will store message in mailbox. peersNodeAddress=" + peersNodeAddress); + log.info("Encrypted mailbox message stored in mailbox: Message {}; peer {}", + message.getClass().getSimpleName(), peersNodeAddress); PublicKey receiverStoragePublicKey = peersPubKeyRing.getSignaturePubKey(); addMailboxData(new MailboxStoragePayload(prefixedSealedAndSignedMessage, keyRing.getSignatureKeyPair().getPublic(), From bb164969d69eaabbcbd2222011e561b3a0ae4406 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Mon, 25 Jun 2018 18:08:47 +0200 Subject: [PATCH 11/12] Improve logging --- .../java/bisq/network/p2p/AckMessage.java | 48 +++++++++++-------- .../java/bisq/network/p2p/P2PService.java | 23 +++++---- .../bisq/network/p2p/network/Connection.java | 10 ++-- 3 files changed, 44 insertions(+), 37 deletions(-) diff --git a/src/main/java/bisq/network/p2p/AckMessage.java b/src/main/java/bisq/network/p2p/AckMessage.java index b4b3e27..9e4f5db 100644 --- a/src/main/java/bisq/network/p2p/AckMessage.java +++ b/src/main/java/bisq/network/p2p/AckMessage.java @@ -37,25 +37,38 @@ import lombok.EqualsAndHashCode; import lombok.Value; +import lombok.extern.slf4j.Slf4j; import javax.annotation.Nullable; // We exclude uid from hashcode and equals to detect duplicate entries of the same AckMessage @EqualsAndHashCode(callSuper = true, exclude = {"uid"}) @Value +@Slf4j public final class AckMessage extends NetworkEnvelope implements MailboxMessage, PersistablePayload, ExpirablePayload, CapabilityRequiringPayload { private final String uid; private final NodeAddress senderNodeAddress; - private final AckMessageSourceType sourceType; //e.g. TradeMessage, DisputeMessage,... + private final AckMessageSourceType sourceType; private final String sourceMsgClassName; - private final String sourceUid; // uid of source (TradeMessage) - private final String sourceId; // id of source (tradeId, disputeId) - private final boolean success; // true if source message was processed successfully @Nullable - private final String errorMessage; // optional error message if source message processing failed - + private final String sourceUid; + private final String sourceId; + private final boolean success; + @Nullable + private final String errorMessage; + + /** + * + * @param senderNodeAddress Address of sender + * @param sourceType Type of source e.g. TradeMessage, DisputeMessage,... + * @param sourceMsgClassName Class name of source msg + * @param sourceUid Optional Uid of source (TradeMessage). Can be null if we receive trades/offers from old clients + * @param sourceId Id of source (tradeId, disputeId) + * @param success True if source message was processed successfully + * @param errorMessage Optional error message if source message processing failed + */ public AckMessage(NodeAddress senderNodeAddress, AckMessageSourceType sourceType, String sourceMsgClassName, @@ -83,7 +96,7 @@ private AckMessage(String uid, NodeAddress senderNodeAddress, AckMessageSourceType sourceType, String sourceMsgClassName, - String sourceUid, + @Nullable String sourceUid, String sourceId, boolean success, @Nullable String errorMessage, @@ -100,30 +113,25 @@ private AckMessage(String uid, } public PB.AckMessage toProtoMessage() { - PB.AckMessage.Builder builder = PB.AckMessage.newBuilder() - .setUid(uid) - .setSenderNodeAddress(senderNodeAddress.toProtoMessage()) - .setSourceType(sourceType.name()) - .setSourceMsgClassName(sourceMsgClassName) - .setSourceUid(sourceUid) - .setSourceId(sourceId) - .setSuccess(success); - Optional.ofNullable(errorMessage).ifPresent(builder::setErrorMessage); - return builder.build(); + return getBuilder().build(); } @Override public PB.NetworkEnvelope toProtoNetworkEnvelope() { + return getNetworkEnvelopeBuilder().setAckMessage(getBuilder()).build(); + } + + public PB.AckMessage.Builder getBuilder() { PB.AckMessage.Builder builder = PB.AckMessage.newBuilder() .setUid(uid) .setSenderNodeAddress(senderNodeAddress.toProtoMessage()) .setSourceType(sourceType.name()) .setSourceMsgClassName(sourceMsgClassName) - .setSourceUid(sourceUid) .setSourceId(sourceId) .setSuccess(success); + Optional.ofNullable(sourceUid).ifPresent(builder::setSourceUid); Optional.ofNullable(errorMessage).ifPresent(builder::setErrorMessage); - return getNetworkEnvelopeBuilder().setAckMessage(builder).build(); + return builder; } public static AckMessage fromProto(PB.AckMessage proto, int messageVersion) { @@ -132,7 +140,7 @@ public static AckMessage fromProto(PB.AckMessage proto, int messageVersion) { NodeAddress.fromProto(proto.getSenderNodeAddress()), sourceType, proto.getSourceMsgClassName(), - proto.getSourceUid(), + proto.getSourceUid().isEmpty() ? null : proto.getSourceUid(), proto.getSourceId(), proto.getSuccess(), proto.getErrorMessage().isEmpty() ? null : proto.getErrorMessage(), diff --git a/src/main/java/bisq/network/p2p/P2PService.java b/src/main/java/bisq/network/p2p/P2PService.java index c2b6f5e..a3c8470 100644 --- a/src/main/java/bisq/network/p2p/P2PService.java +++ b/src/main/java/bisq/network/p2p/P2PService.java @@ -471,10 +471,19 @@ public void sendEncryptedDirectMessage(NodeAddress peerNodeAddress, PubKeyRing p private void doSendEncryptedDirectMessage(@NotNull NodeAddress peersNodeAddress, PubKeyRing pubKeyRing, NetworkEnvelope message, SendDirectMessageListener sendDirectMessageListener) { - log.info("Send encrypted direct message {} to peer {}", + log.debug("Send encrypted direct message {} to peer {}", message.getClass().getSimpleName(), peersNodeAddress); + checkNotNull(peersNodeAddress, "Peer node address must not be null at doSendEncryptedDirectMessage"); + checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at doSendEncryptedDirectMessage"); + + if (capabilityRequiredAndCapabilityNotSupported(peersNodeAddress, message)) { + sendDirectMessageListener.onFault("We did not send the EncryptedMessage " + + "because the peer does not support the capability."); + return; + } + try { log.debug("\n\nEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE\n" + "Encrypt message:\nmessage={}" @@ -488,15 +497,11 @@ private void doSendEncryptedDirectMessage(@NotNull NodeAddress peersNodeAddress, Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(@Nullable Connection connection) { - log.info("Encrypted direct message arrived at peer: Message {}; peer {}", - message.getClass().getSimpleName(), peersNodeAddress); sendDirectMessageListener.onArrived(); } @Override public void onFailure(@NotNull Throwable throwable) { - log.warn("Sending encrypted direct message failed: Message {}; peer {}; error={}", - message.getClass().getSimpleName(), peersNodeAddress, throwable.toString()); log.error(throwable.toString()); throwable.printStackTrace(); sendDirectMessageListener.onFault(throwable.toString()); @@ -555,9 +560,6 @@ private void processProtectedMailboxStorageEntry(ProtectedMailboxStorageEntry pr public void sendEncryptedMailboxMessage(NodeAddress peersNodeAddress, PubKeyRing peersPubKeyRing, NetworkEnvelope message, SendMailboxMessageListener sendMailboxMessageListener) { - log.info("Send encrypted mailbox message {} to peer {}", - message.getClass().getSimpleName(), peersNodeAddress); - checkNotNull(peersNodeAddress, "PeerAddress must not be null (sendEncryptedMailboxMessage)"); checkNotNull(networkNode.getNodeAddress(), @@ -573,6 +575,7 @@ public void sendEncryptedMailboxMessage(NodeAddress peersNodeAddress, PubKeyRing "Please check your internet connection."); return; } + if (capabilityRequiredAndCapabilityNotSupported(peersNodeAddress, message)) { sendMailboxMessageListener.onFault("We did not send the EncryptedMailboxMessage " + "because the peer does not support the capability."); @@ -595,15 +598,11 @@ public void sendEncryptedMailboxMessage(NodeAddress peersNodeAddress, PubKeyRing Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(@Nullable Connection connection) { - log.info("Encrypted mailbox message arrived at peer: Message {}; peer {}", - message.getClass().getSimpleName(), peersNodeAddress); sendMailboxMessageListener.onArrived(); } @Override public void onFailure(@NotNull Throwable throwable) { - log.info("Encrypted mailbox message stored in mailbox: Message {}; peer {}", - message.getClass().getSimpleName(), peersNodeAddress); PublicKey receiverStoragePublicKey = peersPubKeyRing.getSignaturePubKey(); addMailboxData(new MailboxStoragePayload(prefixedSealedAndSignedMessage, keyRing.getSignatureKeyPair().getPublic(), diff --git a/src/main/java/bisq/network/p2p/network/Connection.java b/src/main/java/bisq/network/p2p/network/Connection.java index 233b217..4b380d1 100644 --- a/src/main/java/bisq/network/p2p/network/Connection.java +++ b/src/main/java/bisq/network/p2p/network/Connection.java @@ -295,19 +295,19 @@ public static boolean isCapabilityRequired(NetworkEnvelope networkEnvelop) { } } - public boolean isCapabilitySupported(NetworkEnvelope networkEnvelop) { + private boolean isCapabilitySupported(NetworkEnvelope networkEnvelop) { if (networkEnvelop instanceof AddDataMessage) { final ProtectedStoragePayload protectedStoragePayload = (((AddDataMessage) networkEnvelop).getProtectedStorageEntry()).getProtectedStoragePayload(); - return isCapabilitySupported((CapabilityRequiringPayload) protectedStoragePayload); + return protectedStoragePayload instanceof CapabilityRequiringPayload && isCapabilitySupported((CapabilityRequiringPayload) protectedStoragePayload); } else if (networkEnvelop instanceof AddPersistableNetworkPayloadMessage) { final PersistableNetworkPayload persistableNetworkPayload = ((AddPersistableNetworkPayloadMessage) networkEnvelop).getPersistableNetworkPayload(); - return isCapabilitySupported((CapabilityRequiringPayload) persistableNetworkPayload); + return persistableNetworkPayload instanceof CapabilityRequiringPayload && isCapabilitySupported((CapabilityRequiringPayload) persistableNetworkPayload); } else { - return isCapabilitySupported((CapabilityRequiringPayload) networkEnvelop); + return networkEnvelop instanceof CapabilityRequiringPayload && isCapabilitySupported((CapabilityRequiringPayload) networkEnvelop); } } - public boolean isCapabilitySupported(CapabilityRequiringPayload payload) { + private boolean isCapabilitySupported(CapabilityRequiringPayload payload) { return isCapabilitySupported(payload, sharedModel.getSupportedCapabilities()); } From 4b0ff3698042db9e46484e0516eddccb7d265dc6 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Mon, 25 Jun 2018 18:09:22 +0200 Subject: [PATCH 12/12] Rename networkEnvelop to networkEnvelope --- .../network/crypto/EncryptionService.java | 8 ++--- .../java/bisq/network/p2p/P2PService.java | 10 +++--- .../bisq/network/p2p/network/Connection.java | 32 +++++++++---------- .../network/p2p/network/MessageListener.java | 2 +- .../bisq/network/p2p/network/NetworkNode.java | 24 +++++++------- .../bisq/network/p2p/network/Statistic.java | 8 ++--- .../p2p/peers/getdata/RequestDataHandler.java | 10 +++--- .../p2p/peers/getdata/RequestDataManager.java | 8 ++--- .../p2p/peers/keepalive/KeepAliveHandler.java | 8 ++--- .../p2p/peers/keepalive/KeepAliveManager.java | 8 ++--- .../peerexchange/PeerExchangeHandler.java | 8 ++--- .../peerexchange/PeerExchangeManager.java | 8 ++--- .../network/p2p/storage/P2PDataStorage.java | 26 +++++++-------- 13 files changed, 80 insertions(+), 80 deletions(-) diff --git a/src/main/java/bisq/network/crypto/EncryptionService.java b/src/main/java/bisq/network/crypto/EncryptionService.java index 1a0ab7d..092639c 100644 --- a/src/main/java/bisq/network/crypto/EncryptionService.java +++ b/src/main/java/bisq/network/crypto/EncryptionService.java @@ -56,8 +56,8 @@ public EncryptionService(KeyRing keyRing, NetworkProtoResolver networkProtoResol this.networkProtoResolver = networkProtoResolver; } - public SealedAndSigned encryptAndSign(PubKeyRing pubKeyRing, NetworkEnvelope networkEnvelop) throws CryptoException { - return encryptHybridWithSignature(networkEnvelop, keyRing.getSignatureKeyPair(), pubKeyRing.getEncryptionPubKey()); + public SealedAndSigned encryptAndSign(PubKeyRing pubKeyRing, NetworkEnvelope networkEnvelope) throws CryptoException { + return encryptHybridWithSignature(networkEnvelope, keyRing.getSignatureKeyPair(), pubKeyRing.getEncryptionPubKey()); } /** @@ -92,8 +92,8 @@ public DecryptedMessageWithPubKey decryptAndVerify(SealedAndSigned sealedAndSign decryptedDataTuple.getSigPublicKey()); } - private static byte[] encryptPayloadWithHmac(NetworkEnvelope networkEnvelop, SecretKey secretKey) throws CryptoException { - return Encryption.encryptPayloadWithHmac(networkEnvelop.toProtoNetworkEnvelope().toByteArray(), secretKey); + private static byte[] encryptPayloadWithHmac(NetworkEnvelope networkEnvelope, SecretKey secretKey) throws CryptoException { + return Encryption.encryptPayloadWithHmac(networkEnvelope.toProtoNetworkEnvelope().toByteArray(), secretKey); } /** diff --git a/src/main/java/bisq/network/p2p/P2PService.java b/src/main/java/bisq/network/p2p/P2PService.java index a3c8470..63c5bf6 100644 --- a/src/main/java/bisq/network/p2p/P2PService.java +++ b/src/main/java/bisq/network/p2p/P2PService.java @@ -403,12 +403,12 @@ public void onError(Throwable throwable) { /////////////////////////////////////////////////////////////////////////////////////////// @Override - public void onMessage(NetworkEnvelope networkEnvelop, Connection connection) { - if (networkEnvelop instanceof PrefixedSealedAndSignedMessage) { - Log.traceCall("\n\t" + networkEnvelop.toString() + "\n\tconnection=" + connection); + public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { + if (networkEnvelope instanceof PrefixedSealedAndSignedMessage) { + Log.traceCall("\n\t" + networkEnvelope.toString() + "\n\tconnection=" + connection); // Seed nodes don't have set the encryptionService try { - PrefixedSealedAndSignedMessage prefixedSealedAndSignedMessage = (PrefixedSealedAndSignedMessage) networkEnvelop; + PrefixedSealedAndSignedMessage prefixedSealedAndSignedMessage = (PrefixedSealedAndSignedMessage) networkEnvelope; if (verifyAddressPrefixHash(prefixedSealedAndSignedMessage)) { // We set connectionType to that connection to avoid that is get closed when // we get too many connection attempts. @@ -430,7 +430,7 @@ public void onMessage(NetworkEnvelope networkEnvelop, Connection connection) { log.debug("Wrong receiverAddressMaskHash. The message is not intended for us."); } } catch (CryptoException e) { - log.debug(networkEnvelop.toString()); + log.debug(networkEnvelope.toString()); log.debug(e.toString()); log.debug("Decryption of prefixedSealedAndSignedMessage.sealedAndSigned failed. " + "That is expected if the message is not intended for us."); diff --git a/src/main/java/bisq/network/p2p/network/Connection.java b/src/main/java/bisq/network/p2p/network/Connection.java index 4b380d1..f9c0b30 100644 --- a/src/main/java/bisq/network/p2p/network/Connection.java +++ b/src/main/java/bisq/network/p2p/network/Connection.java @@ -283,27 +283,27 @@ public boolean noCapabilityRequiredOrCapabilityIsSupported(NetworkEnvelope netwo } @SuppressWarnings("BooleanMethodIsAlwaysInverted") - public static boolean isCapabilityRequired(NetworkEnvelope networkEnvelop) { - if (networkEnvelop instanceof AddDataMessage) { - final ProtectedStoragePayload protectedStoragePayload = (((AddDataMessage) networkEnvelop).getProtectedStorageEntry()).getProtectedStoragePayload(); + public static boolean isCapabilityRequired(NetworkEnvelope networkEnvelope) { + if (networkEnvelope instanceof AddDataMessage) { + final ProtectedStoragePayload protectedStoragePayload = (((AddDataMessage) networkEnvelope).getProtectedStorageEntry()).getProtectedStoragePayload(); return protectedStoragePayload instanceof CapabilityRequiringPayload; - } else if (networkEnvelop instanceof AddPersistableNetworkPayloadMessage) { - final PersistableNetworkPayload persistableNetworkPayload = ((AddPersistableNetworkPayloadMessage) networkEnvelop).getPersistableNetworkPayload(); + } else if (networkEnvelope instanceof AddPersistableNetworkPayloadMessage) { + final PersistableNetworkPayload persistableNetworkPayload = ((AddPersistableNetworkPayloadMessage) networkEnvelope).getPersistableNetworkPayload(); return persistableNetworkPayload instanceof CapabilityRequiringPayload; } else { - return networkEnvelop instanceof CapabilityRequiringPayload; + return networkEnvelope instanceof CapabilityRequiringPayload; } } - private boolean isCapabilitySupported(NetworkEnvelope networkEnvelop) { - if (networkEnvelop instanceof AddDataMessage) { - final ProtectedStoragePayload protectedStoragePayload = (((AddDataMessage) networkEnvelop).getProtectedStorageEntry()).getProtectedStoragePayload(); + private boolean isCapabilitySupported(NetworkEnvelope networkEnvelope) { + if (networkEnvelope instanceof AddDataMessage) { + final ProtectedStoragePayload protectedStoragePayload = (((AddDataMessage) networkEnvelope).getProtectedStorageEntry()).getProtectedStoragePayload(); return protectedStoragePayload instanceof CapabilityRequiringPayload && isCapabilitySupported((CapabilityRequiringPayload) protectedStoragePayload); - } else if (networkEnvelop instanceof AddPersistableNetworkPayloadMessage) { - final PersistableNetworkPayload persistableNetworkPayload = ((AddPersistableNetworkPayloadMessage) networkEnvelop).getPersistableNetworkPayload(); + } else if (networkEnvelope instanceof AddPersistableNetworkPayloadMessage) { + final PersistableNetworkPayload persistableNetworkPayload = ((AddPersistableNetworkPayloadMessage) networkEnvelope).getPersistableNetworkPayload(); return persistableNetworkPayload instanceof CapabilityRequiringPayload && isCapabilitySupported((CapabilityRequiringPayload) persistableNetworkPayload); } else { - return networkEnvelop instanceof CapabilityRequiringPayload && isCapabilitySupported((CapabilityRequiringPayload) networkEnvelop); + return networkEnvelope instanceof CapabilityRequiringPayload && isCapabilitySupported((CapabilityRequiringPayload) networkEnvelope); } } @@ -344,7 +344,7 @@ public boolean reportIllegalRequest(RuleViolation ruleViolation) { } // TODO either use the argument or delete it - private boolean violatesThrottleLimit(NetworkEnvelope networkEnvelop) { + private boolean violatesThrottleLimit(NetworkEnvelope networkEnvelope) { long now = System.currentTimeMillis(); boolean violated = false; //TODO remove message storage after network is tested stable @@ -381,7 +381,7 @@ private boolean violatesThrottleLimit(NetworkEnvelope networkEnvelop) { messageTimeStamps.remove(0); } - messageTimeStamps.add(new Tuple2<>(now, networkEnvelop)); + messageTimeStamps.add(new Tuple2<>(now, networkEnvelope)); return violated; } @@ -391,9 +391,9 @@ private boolean violatesThrottleLimit(NetworkEnvelope networkEnvelop) { // Only receive non - CloseConnectionMessage network_messages @Override - public void onMessage(NetworkEnvelope networkEnvelop, Connection connection) { + public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { checkArgument(connection.equals(this)); - UserThread.execute(() -> messageListeners.stream().forEach(e -> e.onMessage(networkEnvelop, connection))); + UserThread.execute(() -> messageListeners.stream().forEach(e -> e.onMessage(networkEnvelope, connection))); } diff --git a/src/main/java/bisq/network/p2p/network/MessageListener.java b/src/main/java/bisq/network/p2p/network/MessageListener.java index 74eb0c2..aaf4ad7 100644 --- a/src/main/java/bisq/network/p2p/network/MessageListener.java +++ b/src/main/java/bisq/network/p2p/network/MessageListener.java @@ -20,5 +20,5 @@ import bisq.common.proto.network.NetworkEnvelope; public interface MessageListener { - void onMessage(NetworkEnvelope networkEnvelop, Connection connection); + void onMessage(NetworkEnvelope networkEnvelope, Connection connection); } diff --git a/src/main/java/bisq/network/p2p/network/NetworkNode.java b/src/main/java/bisq/network/p2p/network/NetworkNode.java index c97991f..ef1b97d 100644 --- a/src/main/java/bisq/network/p2p/network/NetworkNode.java +++ b/src/main/java/bisq/network/p2p/network/NetworkNode.java @@ -97,8 +97,8 @@ public abstract class NetworkNode implements MessageListener { // when the events happen. abstract public void start(@Nullable SetupListener setupListener); - public SettableFuture sendMessage(@NotNull NodeAddress peersNodeAddress, NetworkEnvelope networkEnvelop) { - log.debug("sendMessage: peersNodeAddress=" + peersNodeAddress + "\n\tmessage=" + Utilities.toTruncatedString(networkEnvelop)); + public SettableFuture sendMessage(@NotNull NodeAddress peersNodeAddress, NetworkEnvelope networkEnvelope) { + log.debug("sendMessage: peersNodeAddress=" + peersNodeAddress + "\n\tmessage=" + Utilities.toTruncatedString(networkEnvelope)); checkNotNull(peersNodeAddress, "peerAddress must not be null"); Connection connection = getOutboundConnection(peersNodeAddress); @@ -106,7 +106,7 @@ public SettableFuture sendMessage(@NotNull NodeAddress peersNodeAddr connection = getInboundConnection(peersNodeAddress); if (connection != null) { - return sendMessage(connection, networkEnvelop); + return sendMessage(connection, networkEnvelope); } else { log.debug("We have not found any connection for peerAddress {}.\n\t" + "We will create a new outbound connection.", peersNodeAddress); @@ -144,7 +144,7 @@ public SettableFuture sendMessage(@NotNull NodeAddress peersNodeAddr } catch (Throwable throwable) { log.error("Error at closing socket " + throwable); } - existingConnection.sendMessage(networkEnvelop); + existingConnection.sendMessage(networkEnvelope); return existingConnection; } else { final ConnectionListener connectionListener = new ConnectionListener() { @@ -183,11 +183,11 @@ public void onError(Throwable throwable) { + "\nmyNodeAddress=" + getNodeAddress() + "\npeersNodeAddress=" + peersNodeAddress + "\nuid=" + outboundConnection.getUid() - + "\nmessage=" + networkEnvelop + + "\nmessage=" + networkEnvelope + "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n"); // can take a while when using tor - outboundConnection.sendMessage(networkEnvelop); + outboundConnection.sendMessage(networkEnvelope); return outboundConnection; } } catch (Throwable throwable) { @@ -206,7 +206,7 @@ public void onSuccess(Connection connection) { } public void onFailure(@NotNull Throwable throwable) { - log.info("onFailure at sendMessage: peersNodeAddress=" + peersNodeAddress + "\n\tmessage=" + Utilities.toTruncatedString(networkEnvelop)); + log.info("onFailure at sendMessage: peersNodeAddress=" + peersNodeAddress + "\n\tmessage=" + Utilities.toTruncatedString(networkEnvelope)); UserThread.execute(() -> resultFuture.setException(throwable)); } }); @@ -257,12 +257,12 @@ public Socks5Proxy getSocksProxy() { } - public SettableFuture sendMessage(Connection connection, NetworkEnvelope networkEnvelop) { - Log.traceCall("\n\tmessage=" + Utilities.toTruncatedString(networkEnvelop) + "\n\tconnection=" + connection); + public SettableFuture sendMessage(Connection connection, NetworkEnvelope networkEnvelope) { + Log.traceCall("\n\tmessage=" + Utilities.toTruncatedString(networkEnvelope) + "\n\tconnection=" + connection); // connection.sendMessage might take a bit (compression, write to stream), so we use a thread to not block ListenableFuture future = executorService.submit(() -> { Thread.currentThread().setName("NetworkNode:SendMessage-to-" + connection.getUid()); - connection.sendMessage(networkEnvelop); + connection.sendMessage(networkEnvelope); return connection; }); final SettableFuture resultFuture = SettableFuture.create(); @@ -338,8 +338,8 @@ void addSetupListener(SetupListener setupListener) { /////////////////////////////////////////////////////////////////////////////////////////// @Override - public void onMessage(NetworkEnvelope networkEnvelop, Connection connection) { - messageListeners.stream().forEach(e -> e.onMessage(networkEnvelop, connection)); + public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { + messageListeners.stream().forEach(e -> e.onMessage(networkEnvelope, connection)); } diff --git a/src/main/java/bisq/network/p2p/network/Statistic.java b/src/main/java/bisq/network/p2p/network/Statistic.java index 1c4cd52..0809e49 100644 --- a/src/main/java/bisq/network/p2p/network/Statistic.java +++ b/src/main/java/bisq/network/p2p/network/Statistic.java @@ -99,8 +99,8 @@ public void addReceivedBytes(int value) { } // TODO would need msg inspection to get useful information... - public void addReceivedMessage(NetworkEnvelope networkEnvelop) { - String messageClassName = networkEnvelop.getClass().getSimpleName(); + public void addReceivedMessage(NetworkEnvelope networkEnvelope) { + String messageClassName = networkEnvelope.getClass().getSimpleName(); int counter = 1; if (receivedMessages.containsKey(messageClassName)) counter = receivedMessages.get(messageClassName) + 1; @@ -108,8 +108,8 @@ public void addReceivedMessage(NetworkEnvelope networkEnvelop) { receivedMessages.put(messageClassName, counter); } - public void addSentMessage(NetworkEnvelope networkEnvelop) { - String messageClassName = networkEnvelop.getClass().getSimpleName(); + public void addSentMessage(NetworkEnvelope networkEnvelope) { + String messageClassName = networkEnvelope.getClass().getSimpleName(); int counter = 1; if (sentMessages.containsKey(messageClassName)) counter = sentMessages.get(messageClassName) + 1; diff --git a/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java b/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java index f787229..45f90d6 100644 --- a/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java +++ b/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java @@ -194,18 +194,18 @@ public void onFailure(@NotNull Throwable throwable) { /////////////////////////////////////////////////////////////////////////////////////////// @Override - public void onMessage(NetworkEnvelope networkEnvelop, Connection connection) { - if (networkEnvelop instanceof GetDataResponse) { + public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { + if (networkEnvelope instanceof GetDataResponse) { if (connection.getPeersNodeAddressOptional().isPresent() && connection.getPeersNodeAddressOptional().get().equals(peersNodeAddress)) { - Log.traceCall(networkEnvelop.toString() + "\n\tconnection=" + connection); + Log.traceCall(networkEnvelope.toString() + "\n\tconnection=" + connection); if (!stopped) { - GetDataResponse getDataResponse = (GetDataResponse) networkEnvelop; + GetDataResponse getDataResponse = (GetDataResponse) networkEnvelope; Map> payloadByClassName = new HashMap<>(); final Set dataSet = getDataResponse.getDataSet(); dataSet.stream().forEach(e -> { final ProtectedStoragePayload protectedStoragePayload = e.getProtectedStoragePayload(); if (protectedStoragePayload == null) { - log.warn("StoragePayload was null: {}", networkEnvelop.toString()); + log.warn("StoragePayload was null: {}", networkEnvelope.toString()); return; } diff --git a/src/main/java/bisq/network/p2p/peers/getdata/RequestDataManager.java b/src/main/java/bisq/network/p2p/peers/getdata/RequestDataManager.java index 5304f1b..094228c 100644 --- a/src/main/java/bisq/network/p2p/peers/getdata/RequestDataManager.java +++ b/src/main/java/bisq/network/p2p/peers/getdata/RequestDataManager.java @@ -268,9 +268,9 @@ public void onAwakeFromStandby() { /////////////////////////////////////////////////////////////////////////////////////////// @Override - public void onMessage(NetworkEnvelope networkEnvelop, Connection connection) { - if (networkEnvelop instanceof GetDataRequest) { - Log.traceCall(networkEnvelop.toString() + "\n\tconnection=" + connection); + public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { + if (networkEnvelope instanceof GetDataRequest) { + Log.traceCall(networkEnvelope.toString() + "\n\tconnection=" + connection); if (!stopped) { if (peerManager.isSeedNode(connection)) connection.setPeerType(Connection.PeerType.SEED_NODE); @@ -298,7 +298,7 @@ public void onFault(String errorMessage, @Nullable Connection connection) { } }); getDataRequestHandlers.put(uid, getDataRequestHandler); - getDataRequestHandler.handle((GetDataRequest) networkEnvelop, connection); + getDataRequestHandler.handle((GetDataRequest) networkEnvelope, connection); } else { log.warn("We have already a GetDataRequestHandler for that connection started. " + "We start a cleanup timer if the handler has not closed by itself in between 2 minutes."); diff --git a/src/main/java/bisq/network/p2p/peers/keepalive/KeepAliveHandler.java b/src/main/java/bisq/network/p2p/peers/keepalive/KeepAliveHandler.java index d4f4d6e..fae54bb 100644 --- a/src/main/java/bisq/network/p2p/peers/keepalive/KeepAliveHandler.java +++ b/src/main/java/bisq/network/p2p/peers/keepalive/KeepAliveHandler.java @@ -144,11 +144,11 @@ public void onFailure(@NotNull Throwable throwable) { /////////////////////////////////////////////////////////////////////////////////////////// @Override - public void onMessage(NetworkEnvelope networkEnvelop, Connection connection) { - if (networkEnvelop instanceof Pong) { - Log.traceCall(networkEnvelop.toString() + "\n\tconnection=" + connection); + public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { + if (networkEnvelope instanceof Pong) { + Log.traceCall(networkEnvelope.toString() + "\n\tconnection=" + connection); if (!stopped) { - Pong pong = (Pong) networkEnvelop; + Pong pong = (Pong) networkEnvelope; if (pong.getRequestNonce() == nonce) { int roundTripTime = (int) (System.currentTimeMillis() - sendTs); log.trace("roundTripTime=" + roundTripTime + "\n\tconnection=" + connection); diff --git a/src/main/java/bisq/network/p2p/peers/keepalive/KeepAliveManager.java b/src/main/java/bisq/network/p2p/peers/keepalive/KeepAliveManager.java index 06833fd..b8a4e26 100644 --- a/src/main/java/bisq/network/p2p/peers/keepalive/KeepAliveManager.java +++ b/src/main/java/bisq/network/p2p/peers/keepalive/KeepAliveManager.java @@ -101,11 +101,11 @@ public void start() { /////////////////////////////////////////////////////////////////////////////////////////// @Override - public void onMessage(NetworkEnvelope networkEnvelop, Connection connection) { - if (networkEnvelop instanceof Ping) { - Log.traceCall(networkEnvelop.toString() + "\n\tconnection=" + connection); + public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { + if (networkEnvelope instanceof Ping) { + Log.traceCall(networkEnvelope.toString() + "\n\tconnection=" + connection); if (!stopped) { - Ping ping = (Ping) networkEnvelop; + Ping ping = (Ping) networkEnvelope; // We get from peer last measured rrt connection.getStatistic().setRoundTripTime(ping.getLastRoundTripTime()); diff --git a/src/main/java/bisq/network/p2p/peers/peerexchange/PeerExchangeHandler.java b/src/main/java/bisq/network/p2p/peers/peerexchange/PeerExchangeHandler.java index f08ec1f..779d8e7 100644 --- a/src/main/java/bisq/network/p2p/peers/peerexchange/PeerExchangeHandler.java +++ b/src/main/java/bisq/network/p2p/peers/peerexchange/PeerExchangeHandler.java @@ -161,11 +161,11 @@ public void onFailure(@NotNull Throwable throwable) { /////////////////////////////////////////////////////////////////////////////////////////// @Override - public void onMessage(NetworkEnvelope networkEnvelop, Connection connection) { - if (networkEnvelop instanceof GetPeersResponse) { + public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { + if (networkEnvelope instanceof GetPeersResponse) { if (!stopped) { - Log.traceCall(networkEnvelop.toString() + "\n\tconnection=" + connection); - GetPeersResponse getPeersResponse = (GetPeersResponse) networkEnvelop; + Log.traceCall(networkEnvelope.toString() + "\n\tconnection=" + connection); + GetPeersResponse getPeersResponse = (GetPeersResponse) networkEnvelope; if (peerManager.isSeedNode(connection)) connection.setPeerType(Connection.PeerType.SEED_NODE); diff --git a/src/main/java/bisq/network/p2p/peers/peerexchange/PeerExchangeManager.java b/src/main/java/bisq/network/p2p/peers/peerexchange/PeerExchangeManager.java index 41f4869..55ad5b5 100644 --- a/src/main/java/bisq/network/p2p/peers/peerexchange/PeerExchangeManager.java +++ b/src/main/java/bisq/network/p2p/peers/peerexchange/PeerExchangeManager.java @@ -196,9 +196,9 @@ public void onAwakeFromStandby() { /////////////////////////////////////////////////////////////////////////////////////////// @Override - public void onMessage(NetworkEnvelope networkEnvelop, Connection connection) { - if (networkEnvelop instanceof GetPeersRequest) { - Log.traceCall(networkEnvelop.toString() + "\n\tconnection=" + connection); + public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { + if (networkEnvelope instanceof GetPeersRequest) { + Log.traceCall(networkEnvelope.toString() + "\n\tconnection=" + connection); if (!stopped) { if (peerManager.isSeedNode(connection)) connection.setPeerType(Connection.PeerType.SEED_NODE); @@ -218,7 +218,7 @@ public void onFault(String errorMessage, Connection connection) { peerManager.handleConnectionFault(connection); } }); - getPeersRequestHandler.handle((GetPeersRequest) networkEnvelop, connection); + getPeersRequestHandler.handle((GetPeersRequest) networkEnvelope, connection); } else { log.warn("We have stopped already. We ignore that onMessage call."); } diff --git a/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java b/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java index 60fcffe..c4a3004 100644 --- a/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java +++ b/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java @@ -269,20 +269,20 @@ public void onBootstrapComplete() { /////////////////////////////////////////////////////////////////////////////////////////// @Override - public void onMessage(NetworkEnvelope networkEnvelop, Connection connection) { - if (networkEnvelop instanceof BroadcastMessage) { - Log.traceCall(Utilities.toTruncatedString(networkEnvelop) + "\n\tconnection=" + connection); + public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { + if (networkEnvelope instanceof BroadcastMessage) { + Log.traceCall(Utilities.toTruncatedString(networkEnvelope) + "\n\tconnection=" + connection); connection.getPeersNodeAddressOptional().ifPresent(peersNodeAddress -> { - if (networkEnvelop instanceof AddDataMessage) { - addProtectedStorageEntry(((AddDataMessage) networkEnvelop).getProtectedStorageEntry(), peersNodeAddress, null, false); - } else if (networkEnvelop instanceof RemoveDataMessage) { - remove(((RemoveDataMessage) networkEnvelop).getProtectedStorageEntry(), peersNodeAddress, false); - } else if (networkEnvelop instanceof RemoveMailboxDataMessage) { - removeMailboxData(((RemoveMailboxDataMessage) networkEnvelop).getProtectedMailboxStorageEntry(), peersNodeAddress, false); - } else if (networkEnvelop instanceof RefreshOfferMessage) { - refreshTTL((RefreshOfferMessage) networkEnvelop, peersNodeAddress, false); - } else if (networkEnvelop instanceof AddPersistableNetworkPayloadMessage) { - addPersistableNetworkPayload(((AddPersistableNetworkPayloadMessage) networkEnvelop).getPersistableNetworkPayload(), + if (networkEnvelope instanceof AddDataMessage) { + addProtectedStorageEntry(((AddDataMessage) networkEnvelope).getProtectedStorageEntry(), peersNodeAddress, null, false); + } else if (networkEnvelope instanceof RemoveDataMessage) { + remove(((RemoveDataMessage) networkEnvelope).getProtectedStorageEntry(), peersNodeAddress, false); + } else if (networkEnvelope instanceof RemoveMailboxDataMessage) { + removeMailboxData(((RemoveMailboxDataMessage) networkEnvelope).getProtectedMailboxStorageEntry(), peersNodeAddress, false); + } else if (networkEnvelope instanceof RefreshOfferMessage) { + refreshTTL((RefreshOfferMessage) networkEnvelope, peersNodeAddress, false); + } else if (networkEnvelope instanceof AddPersistableNetworkPayloadMessage) { + addPersistableNetworkPayload(((AddPersistableNetworkPayloadMessage) networkEnvelope).getPersistableNetworkPayload(), peersNodeAddress, false, true, false, true); } });