Skip to content
This repository has been archived by the owner on Apr 23, 2019. It is now read-only.

Commit

Permalink
Merge pull request #210 from atoulme/plumtree
Browse files Browse the repository at this point in the history
Expose hash of the payload separately from the payload
  • Loading branch information
atoulme committed Apr 7, 2019
2 parents 2c0c4a1 + a2f9775 commit 64478ad
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ enum Verb {
*
* @param verb the type of message
* @param peer the target of the message
* @param hash the hash of the message
* @param payload the bytes to send
*/
void sendMessage(Verb verb, Peer peer, @Nullable Bytes payload);
void sendMessage(Verb verb, Peer peer, Bytes hash, @Nullable Bytes payload);
}
10 changes: 5 additions & 5 deletions plumtree/src/main/java/net/consensys/cava/plumtree/State.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,21 @@ void fullMessageReceived(@Nullable Peer sender, Bytes message) {
if (sender == null || messageValidator.validate(message, sender)) {
for (Peer peer : peerRepository.eagerPushPeers()) {
if (sender == null || !sender.equals(peer)) {
messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, message);
messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, hash, message);
}
}
lazyQueue.addAll(
peerRepository
.lazyPushPeers()
.stream()
.filter(p -> !lazyPeers.contains(p))
.map(peer -> (Runnable) (() -> messageSender.sendMessage(MessageSender.Verb.IHAVE, peer, hash)))
.map(peer -> (Runnable) (() -> messageSender.sendMessage(MessageSender.Verb.IHAVE, peer, hash, null)))
.collect(Collectors.toList()));
messageListener.accept(message);
}
} else {
if (sender != null) {
messageSender.sendMessage(MessageSender.Verb.PRUNE, sender, null);
messageSender.sendMessage(MessageSender.Verb.PRUNE, sender, hash, null);
peerRepository.moveToLazy(sender);
}
}
Expand All @@ -108,7 +108,7 @@ public void run() {
if (newPeerIndex == lazyPeers.size()) {
newPeerIndex = 0;
}
messageSender.sendMessage(MessageSender.Verb.GRAFT, lazyPeers.get(index), hash);
messageSender.sendMessage(MessageSender.Verb.GRAFT, lazyPeers.get(index), hash, null);
scheduleGraftMessage(newPeerIndex++);
}
};
Expand Down Expand Up @@ -237,7 +237,7 @@ public void receivePruneMessage(Peer peer) {
*/
public void receiveGraftMessage(Peer peer, Bytes messageHash) {
peerRepository.moveToEager(peer);
messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, messageHash);
messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, messageHash, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public final class VertxGossipServer {
private static final class Message {

public MessageSender.Verb verb;
public String hash;
public String payload;
}
private final class SocketHandler {
Expand Down Expand Up @@ -135,9 +136,10 @@ public AsyncCompletion start() {
if (res.failed()) {
completion.completeExceptionally(res.cause());
} else {
state = new State(peerRepository, messageHashing, (verb, peer, payload) -> {
state = new State(peerRepository, messageHashing, (verb, peer, hash, payload) -> {
Message message = new Message();
message.verb = verb;
message.hash = hash.toHexString();
message.payload = payload == null ? null : payload.toHexString();
try {
((SocketPeer) peer).socket().write(Buffer.buffer(mapper.writeValueAsBytes(message)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ private static class MockMessageSender implements MessageSender {

Verb verb;
Peer peer;
Bytes hash;
Bytes payload;

@Override
public void sendMessage(Verb verb, Peer peer, Bytes payload) {
public void sendMessage(Verb verb, Peer peer, Bytes hash, Bytes payload) {
this.verb = verb;
this.peer = peer;
this.hash = hash;
this.payload = payload;

}
Expand Down Expand Up @@ -139,7 +141,7 @@ void receiveFullMessageFromEagerPeerWithALazyPeer() {
assertEquals(msg, messageSender.payload);
assertEquals(otherPeer, messageSender.peer);
state.processQueue();
assertEquals(Hash.keccak256(msg), messageSender.payload);
assertEquals(Hash.keccak256(msg), messageSender.hash);
assertEquals(lazyPeer, messageSender.peer);
assertEquals(MessageSender.Verb.IHAVE, messageSender.verb);
}
Expand Down Expand Up @@ -174,7 +176,7 @@ void receivePartialMessageFromLazyPeerAndNoFullMessage() throws Exception {
Bytes message = Bytes32.random();
state.receiveIHaveMessage(lazyPeer, message);
Thread.sleep(200);
assertEquals(message, messageSender.payload);
assertEquals(message, messageSender.hash);
assertEquals(lazyPeer, messageSender.peer);
assertEquals(MessageSender.Verb.GRAFT, messageSender.verb);
}
Expand Down

0 comments on commit 64478ad

Please sign in to comment.