Skip to content
Permalink
Browse files
GIRAPH-1182
closes #66
  • Loading branch information
Maja Kabiljo committed Mar 22, 2018
1 parent 251b167 commit d16b6b8ce843e6e931cb8dbc92823f82ab10ce0d
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
@@ -18,6 +18,7 @@

package org.apache.giraph.comm.netty;

import java.net.InetSocketAddress;
import java.util.List;
import com.google.common.collect.Lists;

@@ -36,14 +37,18 @@ public class ChannelRotater {
private final List<Channel> channelList = Lists.newArrayList();
/** Task id of this channel */
private final Integer taskId;
/** Address these channels are associated with */
private final InetSocketAddress address;

/**
* Constructor
*
* @param taskId Id of the task these channels as associated with
* @param address Address these channels are associated with
*/
public ChannelRotater(Integer taskId) {
public ChannelRotater(Integer taskId, InetSocketAddress address) {
this.taskId = taskId;
this.address = address;
}

public Integer getTaskId() {
@@ -68,7 +73,9 @@ public synchronized void addChannel(Channel channel) {
*/
public synchronized Channel nextChannel() {
if (channelList.isEmpty()) {
throw new IllegalArgumentException("nextChannel: No channels exist!");
throw new IllegalArgumentException(
"nextChannel: No channels exist for hostname " +
address.getHostName());
}

++index;
@@ -533,7 +533,8 @@ public void connectAllAddresses(Collection<? extends TaskInfo> tasks) {
addressChannelMap.get(waitingConnection.address);
if (rotater == null) {
ChannelRotater newRotater =
new ChannelRotater(waitingConnection.taskId);
new ChannelRotater(waitingConnection.taskId,
waitingConnection.address);
rotater = addressChannelMap.putIfAbsent(
waitingConnection.address, newRotater);
if (rotater == null) {

0 comments on commit d16b6b8

Please sign in to comment.