-
Notifications
You must be signed in to change notification settings - Fork 114
Conversation
import java.util.List; | ||
import java.util.stream.Collectors; | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No author comments.
@@ -0,0 +1,27 @@ | |||
package org.apache.gossip.model; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All files Requires apache licence header
messages.add(msg); | ||
} | ||
|
||
public List<PerNodeDataMessage> getMessages() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indent is two spaces. Use our style sheet.
@@ -35,8 +30,8 @@ public static MessageHandler defaultHandler() { | |||
return concurrentHandler( | |||
new TypedMessageHandler(Response.class, new ResponseHandler()), | |||
new TypedMessageHandler(ShutdownMessage.class, new ShutdownMessageHandler()), | |||
new TypedMessageHandler(PerNodeDataMessage.class, new PerNodeDataMessageHandler()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets not remove the old system. Eventually I think the bulk only way makes sense but there is no issue in supporting both in the code and choosing with a switch
message.setExpireAt(innerEntry.getValue().getExpireAt()); | ||
message.setKey(innerEntry.getValue().getKey()); | ||
message.setNodeId(innerEntry.getValue().getNodeId()); | ||
message.setTimestamp(innerEntry.getValue().getTimestamp()); | ||
message.setPayload(innerEntry.getValue().getPayload()); | ||
message.setReplicable(innerEntry.getValue().getReplicable()); | ||
gossipCore.sendOneWay(message, member.getUri()); | ||
udpMessage.addMessage(message); | ||
if (udpMessage.getMessages().size() == 100) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be a variable that can be controlled through settings
message.setExpireAt(innerEntry.getValue().getExpireAt()); | ||
message.setKey(innerEntry.getValue().getKey()); | ||
message.setNodeId(innerEntry.getValue().getNodeId()); | ||
message.setTimestamp(innerEntry.getValue().getTimestamp()); | ||
message.setPayload(innerEntry.getValue().getPayload()); | ||
message.setReplicable(innerEntry.getValue().getReplicable()); | ||
gossipCore.sendOneWay(message, member.getUri()); | ||
udpMessage.addMessage(message); | ||
if (udpMessage.getMessages().size() == 100) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the same variable to control this.
1db0a19
to
38e3a12
Compare
I still have to add tests for bulk transfer. But maybe you can check it out and let me know if there are any other issues. |
38e3a12
to
8ba025c
Compare
Instead of new tests I have used parameterized tests to run existing tests with bulk transfer activated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. I left a comment with minor changes.
return; | ||
} | ||
long startTime = System.currentTimeMillis(); | ||
if (gossipSettings.isBulkTransfer()) { | ||
sendSharedDataInBulkInternal(me, member); | ||
} else |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use { } even for one line if so your intention is clear.
public final void sendPerNodeData(LocalMember me, LocalMember member){ | ||
if (member == null){ | ||
return; | ||
} | ||
long startTime = System.currentTimeMillis(); | ||
if (gossipSettings.isBulkTransfer()) { | ||
sendPerNodeDataInBulkInternal(me, member); | ||
} else |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same deal use { } even for one line if
this.base = base; | ||
this.bulkTransfer = bulkTransfer; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slick. I like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While going through the tests, I noticed the same can be done in TenNodeThreeSeedTest
by parameterizing base.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets handle that in a separate ticket.
8ba025c
to
7c457eb
Compare
Done! |
This is the first version I had in mind. The simplest way is to create a bulk message which carries a list of messages. The handler simply unpacks the list of messages and the rest will stay the same. I have failing tests to go through yet. But maybe someone can let me know if this is the way to go or there is a better alternative.