diff --git a/plumtree/src/main/java/net/consensys/cava/plumtree/MessageSender.java b/plumtree/src/main/java/net/consensys/cava/plumtree/MessageSender.java index bc583e7f..3d8442ad 100644 --- a/plumtree/src/main/java/net/consensys/cava/plumtree/MessageSender.java +++ b/plumtree/src/main/java/net/consensys/cava/plumtree/MessageSender.java @@ -33,7 +33,8 @@ enum Verb { * * @param verb the type of message * @param peer the target of the message + * @param hash the hash of the message * @param payload the bytes to send */ - void sendMessage(Verb verb, Peer peer, @Nullable Bytes payload); + void sendMessage(Verb verb, Peer peer, Bytes hash, @Nullable Bytes payload); } diff --git a/plumtree/src/main/java/net/consensys/cava/plumtree/State.java b/plumtree/src/main/java/net/consensys/cava/plumtree/State.java index 5c755d3b..85f47acd 100644 --- a/plumtree/src/main/java/net/consensys/cava/plumtree/State.java +++ b/plumtree/src/main/java/net/consensys/cava/plumtree/State.java @@ -80,7 +80,7 @@ void fullMessageReceived(@Nullable Peer sender, Bytes message) { if (sender == null || messageValidator.validate(message, sender)) { for (Peer peer : peerRepository.eagerPushPeers()) { if (sender == null || !sender.equals(peer)) { - messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, message); + messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, hash, message); } } lazyQueue.addAll( @@ -88,13 +88,13 @@ void fullMessageReceived(@Nullable Peer sender, Bytes message) { .lazyPushPeers() .stream() .filter(p -> !lazyPeers.contains(p)) - .map(peer -> (Runnable) (() -> messageSender.sendMessage(MessageSender.Verb.IHAVE, peer, hash))) + .map(peer -> (Runnable) (() -> messageSender.sendMessage(MessageSender.Verb.IHAVE, peer, hash, null))) .collect(Collectors.toList())); messageListener.accept(message); } } else { if (sender != null) { - messageSender.sendMessage(MessageSender.Verb.PRUNE, sender, null); + messageSender.sendMessage(MessageSender.Verb.PRUNE, sender, hash, null); peerRepository.moveToLazy(sender); } } @@ -108,7 +108,7 @@ public void run() { if (newPeerIndex == lazyPeers.size()) { newPeerIndex = 0; } - messageSender.sendMessage(MessageSender.Verb.GRAFT, lazyPeers.get(index), hash); + messageSender.sendMessage(MessageSender.Verb.GRAFT, lazyPeers.get(index), hash, null); scheduleGraftMessage(newPeerIndex++); } }; @@ -237,7 +237,7 @@ public void receivePruneMessage(Peer peer) { */ public void receiveGraftMessage(Peer peer, Bytes messageHash) { peerRepository.moveToEager(peer); - messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, messageHash); + messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, messageHash, null); } /** diff --git a/plumtree/src/main/java/net/consensys/cava/plumtree/vertx/VertxGossipServer.java b/plumtree/src/main/java/net/consensys/cava/plumtree/vertx/VertxGossipServer.java index 89a98de5..a2fada1b 100644 --- a/plumtree/src/main/java/net/consensys/cava/plumtree/vertx/VertxGossipServer.java +++ b/plumtree/src/main/java/net/consensys/cava/plumtree/vertx/VertxGossipServer.java @@ -47,6 +47,7 @@ public final class VertxGossipServer { private static final class Message { public MessageSender.Verb verb; + public String hash; public String payload; } private final class SocketHandler { @@ -135,9 +136,10 @@ public AsyncCompletion start() { if (res.failed()) { completion.completeExceptionally(res.cause()); } else { - state = new State(peerRepository, messageHashing, (verb, peer, payload) -> { + state = new State(peerRepository, messageHashing, (verb, peer, hash, payload) -> { Message message = new Message(); message.verb = verb; + message.hash = hash.toHexString(); message.payload = payload == null ? null : payload.toHexString(); try { ((SocketPeer) peer).socket().write(Buffer.buffer(mapper.writeValueAsBytes(message))); diff --git a/plumtree/src/test/java/net/consensys/cava/plumtree/StateTest.java b/plumtree/src/test/java/net/consensys/cava/plumtree/StateTest.java index 0088d2c3..06ca3b92 100644 --- a/plumtree/src/test/java/net/consensys/cava/plumtree/StateTest.java +++ b/plumtree/src/test/java/net/consensys/cava/plumtree/StateTest.java @@ -37,12 +37,14 @@ private static class MockMessageSender implements MessageSender { Verb verb; Peer peer; + Bytes hash; Bytes payload; @Override - public void sendMessage(Verb verb, Peer peer, Bytes payload) { + public void sendMessage(Verb verb, Peer peer, Bytes hash, Bytes payload) { this.verb = verb; this.peer = peer; + this.hash = hash; this.payload = payload; } @@ -139,7 +141,7 @@ void receiveFullMessageFromEagerPeerWithALazyPeer() { assertEquals(msg, messageSender.payload); assertEquals(otherPeer, messageSender.peer); state.processQueue(); - assertEquals(Hash.keccak256(msg), messageSender.payload); + assertEquals(Hash.keccak256(msg), messageSender.hash); assertEquals(lazyPeer, messageSender.peer); assertEquals(MessageSender.Verb.IHAVE, messageSender.verb); } @@ -174,7 +176,7 @@ void receivePartialMessageFromLazyPeerAndNoFullMessage() throws Exception { Bytes message = Bytes32.random(); state.receiveIHaveMessage(lazyPeer, message); Thread.sleep(200); - assertEquals(message, messageSender.payload); + assertEquals(message, messageSender.hash); assertEquals(lazyPeer, messageSender.peer); assertEquals(MessageSender.Verb.GRAFT, messageSender.verb); }