Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist and republish mailbox messages #5072

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
0656a52
Refactor: Move mailbox related classed to mailbox package. Make const…
chimp1984 Jan 10, 2021
73cfe0b
Make some methods default
chimp1984 Jan 10, 2021
a28a2f5
Move mailbox related code from P2PService to MailboxMessageService
chimp1984 Jan 10, 2021
d470d73
Remove getMailBoxMessages from P2PService and use MailboxMessageServi…
chimp1984 Jan 10, 2021
ee3f158
Remove addDecryptedMailboxListener from P2PService and use MailboxMes…
chimp1984 Jan 10, 2021
33703c2
Remove removePrivateNotification from P2PService and use MailboxMessa…
chimp1984 Jan 10, 2021
b8fc7f3
Remove sendEncryptedMailboxMessage from P2PService and use MailboxMes…
chimp1984 Jan 10, 2021
4fc35a7
Cleanups
chimp1984 Jan 10, 2021
4c7b560
Add new methods, make decryptedMessageWithPubKey nullable
chimp1984 Jan 10, 2021
504fb2e
Refactor: Rename
chimp1984 Jan 10, 2021
cf8c93d
Filter expired entries at startup
chimp1984 Jan 10, 2021
79363e7
Refactor: Rename
chimp1984 Jan 10, 2021
63a0117
Remove nullable return value from decryptProtectedMailboxStorageEntry
chimp1984 Jan 11, 2021
5245c20
Add filter for isMine at getMyMailBoxMessages
chimp1984 Jan 11, 2021
bfbc657
Remove old code
chimp1984 Jan 11, 2021
6146362
Add onRemoved method to remove mailbox entries once a remove message …
chimp1984 Jan 11, 2021
32f8874
Persist map for removed mailbox messages (AddOncePayload more generally)
chimp1984 Jan 11, 2021
75e547f
Refactor: move classes to persistence package
chimp1984 Jan 11, 2021
e69a3f6
Refactor: rename method
chimp1984 Jan 11, 2021
1886b4e
Set addressPrefixHash to empty bytes.
chimp1984 Jan 11, 2021
5ba7d78
Add RemovedPayloadsStorageService
chimp1984 Jan 11, 2021
41a92ea
Refactor: Rename class
chimp1984 Jan 11, 2021
9edad02
Use a hashset for listeners
chimp1984 Jan 11, 2021
de131c8
Use MailboxItem value at mailboxItemsByUid instead of a list:
chimp1984 Jan 11, 2021
a3b2aad
Refactor: move class to new utils package
chimp1984 Jan 11, 2021
026858d
Refactor: move duplicated method
chimp1984 Jan 11, 2021
873b557
Add logs
chimp1984 Jan 11, 2021
7a2501e
Set uid inside constructor
chimp1984 Jan 11, 2021
35ef3b8
Add support for TTL defined by payload message so we can use
chimp1984 Jan 11, 2021
659fe15
Add comment
chimp1984 Jan 11, 2021
320e5f1
Refactor: Rename method
chimp1984 Jan 11, 2021
a54813a
Increase timout for BroadcastHandler to 120 sec
chimp1984 Jan 12, 2021
8e0181e
Add republishExistingProtectedMailboxStorageEntry method
chimp1984 Jan 12, 2021
e933bfd
Add temp. ignore to a failing test.
chimp1984 Jan 12, 2021
8d866d5
Improve logs, rename methods, variables
chimp1984 Jan 12, 2021
a9802e6
Refactor: return early
chimp1984 Jan 12, 2021
d39a697
Fix wrong delay, increase timeout
chimp1984 Jan 12, 2021
b4ab8a4
Fix incorrect comparison at RemovedPayloadsService.readPersisted
chimp1984 Jan 12, 2021
337ea89
Add program argument flag republishMailboxEntries
chimp1984 Jan 12, 2021
32aeda7
Add null check
chimp1984 Jan 12, 2021
3875bd8
Add removeMailboxMsg with MailboxMessage as argument
chimp1984 Jan 12, 2021
0f08794
Remove unneeded wrapping of tradeMessage into DecryptedMessageWithPubKey
chimp1984 Jan 12, 2021
bffe2b8
Store PrivateNotificationMessage instead of DecryptedMessageWithPubKey
chimp1984 Jan 12, 2021
f536aba
Refactor handling of removeMailboxMsg
chimp1984 Jan 12, 2021
456ce70
Refactor: rename dispatchMessage to onSupportMessage (no functionalit…
chimp1984 Jan 12, 2021
2235c4f
Use removeMailboxMsg method with MailboxMessage as param type
chimp1984 Jan 12, 2021
575196c
Refactor handleDecryptedMessageWithPubKey to use MailboxMessage
chimp1984 Jan 12, 2021
5cdae86
Refactor: rename method to make it more explicit that its only used f…
chimp1984 Jan 12, 2021
d353140
Refactor: rename getMyDecryptedMessages method to make it more explicit
chimp1984 Jan 12, 2021
c229c3f
Refactor handling of DecryptedMessageWithPubKey
chimp1984 Jan 12, 2021
008fb3b
Inline isMyMessage methods
chimp1984 Jan 12, 2021
5c429bb
Add containsKey check
chimp1984 Jan 12, 2021
4c003cd
Clone reportedPeers to avoid ConcurrentModificationExceptions
chimp1984 Jan 12, 2021
46f6026
Remove outdated code
chimp1984 Jan 12, 2021
470230c
Add TradeMailboxMessage and use a TTL of 15 days
chimp1984 Jan 12, 2021
2b05ed5
Let MailboxMessage extend ExpirablePayload:
chimp1984 Jan 12, 2021
364a7f5
Add toString
chimp1984 Jan 12, 2021
a21d79a
Refactor: rename
chimp1984 Jan 12, 2021
3e69ede
Remove comment
chimp1984 Jan 12, 2021
3834785
Bugfix: Add processing of SendersNodeAddressMessage to
chimp1984 Jan 12, 2021
36657f5
Use TradeMailboxMessage as type for getMessage in SendMailboxMessageTask
chimp1984 Jan 12, 2021
c12e239
Refactor: rename method
chimp1984 Jan 12, 2021
2fb3bba
Use MailboxMessage as param type in sendEncryptedMailboxMessage
chimp1984 Jan 12, 2021
d434cb3
Sort persisted mail box messages by age and limit total number.
chimp1984 Jan 13, 2021
541f367
Dont log filter content if a filter got updated
chimp1984 Jan 14, 2021
7ba5ab5
Change prio for mailboxMessageList persitence
chimp1984 Jan 14, 2021
5d13fdc
Return early
chimp1984 Jan 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 13 additions & 6 deletions core/src/main/java/bisq/core/alert/PrivateNotificationManager.java
Expand Up @@ -50,6 +50,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import static org.bitcoinj.core.Utils.HEX;

public class PrivateNotificationManager {
Expand All @@ -64,7 +66,8 @@ public class PrivateNotificationManager {
private final String pubKeyAsHex;

private ECKey privateNotificationSigningKey;
private DecryptedMessageWithPubKey decryptedMessageWithPubKey;
@Nullable
private PrivateNotificationMessage privateNotificationMessage;


///////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -91,10 +94,9 @@ public PrivateNotificationManager(P2PService p2PService,
}

private void handleMessage(DecryptedMessageWithPubKey decryptedMessageWithPubKey, NodeAddress senderNodeAddress) {
this.decryptedMessageWithPubKey = decryptedMessageWithPubKey;
NetworkEnvelope networkEnvelope = decryptedMessageWithPubKey.getNetworkEnvelope();
if (networkEnvelope instanceof PrivateNotificationMessage) {
PrivateNotificationMessage privateNotificationMessage = (PrivateNotificationMessage) networkEnvelope;
privateNotificationMessage = (PrivateNotificationMessage) networkEnvelope;
log.info("Received PrivateNotificationMessage from {} with uid={}",
senderNodeAddress, privateNotificationMessage.getUid());
if (privateNotificationMessage.getSenderNodeAddress().equals(senderNodeAddress)) {
Expand All @@ -116,8 +118,11 @@ public ReadOnlyObjectProperty<PrivateNotificationPayload> privateNotificationPro
return privateNotificationMessageProperty;
}

public boolean sendPrivateNotificationMessageIfKeyIsValid(PrivateNotificationPayload privateNotification, PubKeyRing pubKeyRing, NodeAddress peersNodeAddress,
String privKeyString, SendMailboxMessageListener sendMailboxMessageListener) {
public boolean sendPrivateNotificationMessageIfKeyIsValid(PrivateNotificationPayload privateNotification,
PubKeyRing pubKeyRing,
NodeAddress peersNodeAddress,
String privKeyString,
SendMailboxMessageListener sendMailboxMessageListener) {
boolean isKeyValid = isKeyValid(privKeyString);
if (isKeyValid) {
signAndAddSignatureToPrivateNotificationMessage(privateNotification);
Expand All @@ -137,7 +142,9 @@ public boolean sendPrivateNotificationMessageIfKeyIsValid(PrivateNotificationPay
}

public void removePrivateNotification() {
mailboxMessageService.removeMailboxMsg(decryptedMessageWithPubKey);
if (privateNotificationMessage != null) {
mailboxMessageService.removeMailboxMsg(privateNotificationMessage);
}
}

private boolean isKeyValid(String privKeyString) {
Expand Down
22 changes: 10 additions & 12 deletions core/src/main/java/bisq/core/support/SupportManager.java
Expand Up @@ -84,7 +84,7 @@ public SupportManager(P2PService p2PService, WalletsSetup walletsSetup) {
// Abstract methods
///////////////////////////////////////////////////////////////////////////////////////////

protected abstract void dispatchMessage(SupportMessage networkEnvelope);
protected abstract void onSupportMessage(SupportMessage networkEnvelope);

public abstract NodeAddress getPeerNodeAddress(ChatMessage message);

Expand Down Expand Up @@ -159,8 +159,7 @@ protected void onChatMessage(ChatMessage chatMessage) {
sendAckMessage(chatMessage, receiverPubKeyRing, true, null);
}

private void onAckMessage(AckMessage ackMessage,
@Nullable DecryptedMessageWithPubKey decryptedMessageWithPubKey) {
private void onAckMessage(AckMessage ackMessage) {
if (ackMessage.getSourceType() == getAckMessageSourceType()) {
if (ackMessage.isSuccess()) {
log.info("Received AckMessage for {} with tradeId {} and uid {}",
Expand All @@ -179,10 +178,6 @@ private void onAckMessage(AckMessage ackMessage,
msg.setAckError(ackMessage.getErrorMessage());
});
requestPersistence();

if (decryptedMessageWithPubKey != null) {
mailboxMessageService.removeMailboxMsg(decryptedMessageWithPubKey);
}
}
}

Expand Down Expand Up @@ -307,9 +302,9 @@ private void applyMessages() {
decryptedDirectMessageWithPubKeys.forEach(decryptedMessageWithPubKey -> {
NetworkEnvelope networkEnvelope = decryptedMessageWithPubKey.getNetworkEnvelope();
if (networkEnvelope instanceof SupportMessage) {
dispatchMessage((SupportMessage) networkEnvelope);
onSupportMessage((SupportMessage) networkEnvelope);
} else if (networkEnvelope instanceof AckMessage) {
onAckMessage((AckMessage) networkEnvelope, null);
onAckMessage((AckMessage) networkEnvelope);
}
});
decryptedDirectMessageWithPubKeys.clear();
Expand All @@ -318,10 +313,13 @@ private void applyMessages() {
NetworkEnvelope networkEnvelope = decryptedMessageWithPubKey.getNetworkEnvelope();
log.debug("decryptedMessageWithPubKey.message " + networkEnvelope);
if (networkEnvelope instanceof SupportMessage) {
dispatchMessage((SupportMessage) networkEnvelope);
mailboxMessageService.removeMailboxMsg(decryptedMessageWithPubKey);
SupportMessage supportMessage = (SupportMessage) networkEnvelope;
onSupportMessage(supportMessage);
mailboxMessageService.removeMailboxMsg(supportMessage);
} else if (networkEnvelope instanceof AckMessage) {
onAckMessage((AckMessage) networkEnvelope, decryptedMessageWithPubKey);
AckMessage ackMessage = (AckMessage) networkEnvelope;
onAckMessage(ackMessage);
mailboxMessageService.removeMailboxMsg(ackMessage);
}
});
decryptedMailboxMessageWithPubKeys.clear();
Expand Down
Expand Up @@ -112,7 +112,7 @@ public SupportType getSupportType() {
}

@Override
public void dispatchMessage(SupportMessage message) {
public void onSupportMessage(SupportMessage message) {
if (canProcessMessage(message)) {
log.info("Received {} with tradeId {} and uid {}",
message.getClass().getSimpleName(), message.getTradeId(), message.getUid());
Expand Down
Expand Up @@ -102,7 +102,7 @@ public SupportType getSupportType() {
}

@Override
public void dispatchMessage(SupportMessage message) {
public void onSupportMessage(SupportMessage message) {
if (canProcessMessage(message)) {
log.info("Received {} with tradeId {} and uid {}",
message.getClass().getSimpleName(), message.getTradeId(), message.getUid());
Expand Down
Expand Up @@ -96,7 +96,7 @@ public SupportType getSupportType() {
}

@Override
public void dispatchMessage(SupportMessage message) {
public void onSupportMessage(SupportMessage message) {
if (canProcessMessage(message)) {
log.info("Received {} with tradeId {} and uid {}",
message.getClass().getSimpleName(), message.getTradeId(), message.getUid());
Expand Down
Expand Up @@ -139,7 +139,7 @@ protected AckMessageSourceType getAckMessageSourceType() {
// API
///////////////////////////////////////////////////////////////////////////////////////////

public void dispatchMessage(SupportMessage message) {
public void onSupportMessage(SupportMessage message) {
if (canProcessMessage(message)) {
log.info("Received {} with tradeId {} and uid {}",
message.getClass().getSimpleName(), message.getTradeId(), message.getUid());
Expand Down
Expand Up @@ -25,6 +25,7 @@
import bisq.network.p2p.BootstrapListener;
import bisq.network.p2p.DecryptedMessageWithPubKey;
import bisq.network.p2p.P2PService;
import bisq.network.p2p.mailbox.MailboxMessage;
import bisq.network.p2p.mailbox.MailboxMessageService;

import bisq.common.crypto.PubKeyRing;
Expand Down Expand Up @@ -88,7 +89,8 @@ private void handleDecryptedMessageWithPubKey(DecryptedMessageWithPubKey decrypt
trades.stream()
.filter(trade -> isMessageForTrade(decryptedMessageWithPubKey, trade))
.filter(trade -> isPubKeyValid(decryptedMessageWithPubKey, trade))
.forEach(trade -> removeEntryFromMailbox(decryptedMessageWithPubKey, trade));
.filter(trade -> decryptedMessageWithPubKey.getNetworkEnvelope() instanceof MailboxMessage)
.forEach(trade -> removeEntryFromMailbox((MailboxMessage) decryptedMessageWithPubKey.getNetworkEnvelope(), trade));
}

private boolean isMessageForTrade(DecryptedMessageWithPubKey decryptedMessageWithPubKey, Trade trade) {
Expand All @@ -102,10 +104,11 @@ private boolean isMessageForTrade(DecryptedMessageWithPubKey decryptedMessageWit
return false;
}

private void removeEntryFromMailbox(DecryptedMessageWithPubKey decryptedMessageWithPubKey, Trade trade) {
log.info("We found a pending mailbox message ({}) for trade {}. As the trade is closed we remove the mailbox message.",
decryptedMessageWithPubKey.getNetworkEnvelope().getClass().getSimpleName(), trade.getId());
mailboxMessageService.removeMailboxMsg(decryptedMessageWithPubKey);
private void removeEntryFromMailbox(MailboxMessage mailboxMessage, Trade trade) {
log.info("We found a pending mailbox message ({}) for trade {}. " +
"As the trade is closed we remove the mailbox message.",
mailboxMessage.getClass().getSimpleName(), trade.getId());
mailboxMessageService.removeMailboxMsg(mailboxMessage);
}

private boolean isMyMessage(TradeMessage message, Trade trade) {
Expand All @@ -117,15 +120,15 @@ private boolean isMyMessage(AckMessage ackMessage, Trade trade) {
ackMessage.getSourceId().equals(trade.getId());
}

private boolean isPubKeyValid(DecryptedMessageWithPubKey message, Trade trade) {
private boolean isPubKeyValid(DecryptedMessageWithPubKey decryptedMessageWithPubKey, Trade trade) {
// We can only validate the peers pubKey if we have it already. If we are the taker we get it from the offer
// Otherwise it depends on the state of the trade protocol if we have received the peers pubKeyRing already.
PubKeyRing peersPubKeyRing = trade.getProcessModel().getTradingPeer().getPubKeyRing();
boolean isValid = true;
if (peersPubKeyRing != null &&
!message.getSignaturePubKey().equals(peersPubKeyRing.getSignaturePubKey())) {
!decryptedMessageWithPubKey.getSignaturePubKey().equals(peersPubKeyRing.getSignaturePubKey())) {
isValid = false;
log.warn("SignaturePubKey in message does not match the SignaturePubKey we have set for our trading peer.");
log.warn("SignaturePubKey in decryptedMessageWithPubKey does not match the SignaturePubKey we have set for our trading peer.");
}
return isValid;
}
Expand Down
55 changes: 24 additions & 31 deletions core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java
Expand Up @@ -40,8 +40,6 @@
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.taskrunner.Task;

import java.security.PublicKey;

import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -140,42 +138,37 @@ private void handleDecryptedMessageWithPubKey(DecryptedMessageWithPubKey decrypt
protected void handleDecryptedMessageWithPubKey(DecryptedMessageWithPubKey decryptedMessageWithPubKey,
NodeAddress peer) {
NetworkEnvelope networkEnvelope = decryptedMessageWithPubKey.getNetworkEnvelope();
if (networkEnvelope instanceof TradeMessage &&
isMyMessage((TradeMessage) networkEnvelope) &&
isPubKeyValid(decryptedMessageWithPubKey)) {
if (networkEnvelope instanceof MailboxMessage && networkEnvelope instanceof TradeMessage) {
TradeMessage tradeMessage = (TradeMessage) networkEnvelope;

// We only remove here if we have already completed the trade.
// Otherwise removal is done after successfully applied the task runner.
if (trade.isWithdrawn()) {
processModel.getP2PService().getMailboxMessageService().removeMailboxMsg(decryptedMessageWithPubKey);
log.info("Remove {} from the P2P network.", tradeMessage.getClass().getSimpleName());
return;
if (isMyMessage(tradeMessage) && isPubKeyValid(decryptedMessageWithPubKey)) {
// We only remove here if we have already completed the trade.
// Otherwise removal is done after successfully applied the task runner.
if (trade.isWithdrawn()) {
MailboxMessage mailboxMessage = (MailboxMessage) tradeMessage;
processModel.getP2PService().getMailboxMessageService().removeMailboxMsg(mailboxMessage);
log.info("Remove {} from the P2P network as trade is already completed.",
tradeMessage.getClass().getSimpleName());
return;
}
onMailboxMessage(tradeMessage, peer);
}

onMailboxMessage(tradeMessage, peer);
} else if (networkEnvelope instanceof AckMessage &&
isMyMessage((AckMessage) networkEnvelope) &&
isPubKeyValid(decryptedMessageWithPubKey)) {
if (!trade.isWithdrawn()) {
// We only apply the msg if we have not already completed the trade
onAckMessage((AckMessage) networkEnvelope, peer);
} else if (networkEnvelope instanceof AckMessage) {
AckMessage ackMessage = (AckMessage) networkEnvelope;
if (isMyMessage(ackMessage) && isPubKeyValid(decryptedMessageWithPubKey)) {
if (!trade.isWithdrawn()) {
// We only apply the msg if we have not already completed the trade
onAckMessage(ackMessage, peer);
}
// In any case we remove the msg
processModel.getP2PService().getMailboxMessageService().removeMailboxMsg(ackMessage);
log.info("Remove {} from the P2P network.", ackMessage.getClass().getSimpleName());
}
// In any case we remove the msg
processModel.getP2PService().getMailboxMessageService().removeMailboxMsg(decryptedMessageWithPubKey);
log.info("Remove {} from the P2P network.", networkEnvelope.getClass().getSimpleName());
}
}

public void removeMailboxMessageAfterProcessing(TradeMessage tradeMessage) {
if (tradeMessage instanceof MailboxMessage &&
processModel.getTradingPeer() != null &&
processModel.getTradingPeer().getPubKeyRing() != null &&
processModel.getTradingPeer().getPubKeyRing().getSignaturePubKey() != null) {
PublicKey sigPubKey = processModel.getTradingPeer().getPubKeyRing().getSignaturePubKey();
// We reconstruct the DecryptedMessageWithPubKey from the message and the peers signature pubKey
DecryptedMessageWithPubKey decryptedMessageWithPubKey = new DecryptedMessageWithPubKey(tradeMessage, sigPubKey);
processModel.getP2PService().getMailboxMessageService().removeMailboxMsg(decryptedMessageWithPubKey);
if (tradeMessage instanceof MailboxMessage) {
processModel.getP2PService().getMailboxMessageService().removeMailboxMsg((MailboxMessage) tradeMessage);
log.info("Remove {} from the P2P network.", tradeMessage.getClass().getSimpleName());
}
}
Expand Down
Expand Up @@ -261,16 +261,15 @@ public void onFailure(@NotNull Throwable throwable) {
}

/**
* The DecryptedMessageWithPubKey has been applied and we remove it from our local storage and from the network.
* The mailboxMessage has been applied and we remove it from our local storage and from the network.
*
* @param decryptedMessageWithPubKey The DecryptedMessageWithPubKey to be removed
* @param mailboxMessage The MailboxMessage to be removed
*/
public void removeMailboxMsg(DecryptedMessageWithPubKey decryptedMessageWithPubKey) {
public void removeMailboxMsg(MailboxMessage mailboxMessage) {
if (isBootstrapped) {
// We need to delay a bit to not get a ConcurrentModificationException as we might iterate over
// mailboxMessageList while getting called.
UserThread.execute(() -> {
MailboxMessage mailboxMessage = (MailboxMessage) decryptedMessageWithPubKey.getNetworkEnvelope();
String uid = mailboxMessage.getUid();

// We called removeMailboxEntryFromNetwork at processMyMailboxItem,
Expand All @@ -287,7 +286,7 @@ public void removeMailboxMsg(DecryptedMessageWithPubKey decryptedMessageWithPubK
});
} else {
// In case the network was not ready yet we try again later
UserThread.runAfter(() -> removeMailboxMsg(decryptedMessageWithPubKey), 30);
UserThread.runAfter(() -> removeMailboxMsg(mailboxMessage), 30);
}
}

Expand Down