Skip to content
Permalink
Browse files
JIRA-1151
closes 42
  • Loading branch information
yukselakinci authored and aching committed Aug 22, 2017
1 parent ea7753f commit 6903248d4ed5a383bf0fa080992c038185221a59
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 5 deletions.
@@ -29,6 +29,7 @@
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.factories.MessageValueFactory;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
@@ -60,6 +61,8 @@
protected final int maxMessagesSizePerWorker;
/** NettyWorkerClientRequestProcessor for message sending */
protected final NettyWorkerClientRequestProcessor<I, ?, ?> clientProcessor;
/** Cached message value factory */
protected MessageValueFactory<M> messageValueFactory;
/**
* Constructor
*
@@ -76,12 +79,13 @@ public SendMessageCache(ImmutableClassesGiraphConfiguration conf,
ADDITIONAL_MSG_REQUEST_SIZE.get(conf));
maxMessagesSizePerWorker = maxMsgSize;
clientProcessor = processor;
messageValueFactory =
conf.createOutgoingMessageValueFactory();
}

@Override
public VertexIdMessages<I, M> createVertexIdData() {
return new ByteArrayVertexIdMessages<I, M>(
getConf().<M>createOutgoingMessageValueFactory());
return new ByteArrayVertexIdMessages<I, M>(messageValueFactory);
}

/**
@@ -145,7 +145,7 @@ private int addOneToManyMessage(
msgVidsCache[workerInfo.getTaskId()];
if (workerData == null) {
workerData = new ByteArrayOneMessageToManyIds<I, M>(
getConf().<M>createOutgoingMessageValueFactory());
messageValueFactory);
workerData.setConf(getConf());
workerData.initialize(getSendWorkerInitialBufferSize(
workerInfo.getTaskId()));
@@ -43,6 +43,7 @@
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.factories.MessageValueFactory;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.metrics.GiraphMetrics;
@@ -111,6 +112,8 @@
private final Counter localRequests;
/** Number of requests that were handled locally */
private final Counter remoteRequests;
/** Cached message value factory */
private final MessageValueFactory messageValueFactory;

/**
* Constructor.
@@ -158,6 +161,7 @@ public NettyWorkerClientRequestProcessor(
localRequests = smr.getCounter(MetricNames.LOCAL_REQUESTS);
remoteRequests = smr.getCounter(MetricNames.REMOTE_REQUESTS);
setupGauges(smr, localRequests, remoteRequests);
messageValueFactory = configuration.createOutgoingMessageValueFactory();
}

@Override
@@ -213,7 +217,7 @@ private void sendPartitionMessages(WorkerInfo workerInfo,
serverData.getCurrentMessageStore();
ByteArrayVertexIdMessages<I, Writable> vertexIdMessages =
new ByteArrayVertexIdMessages<I, Writable>(
configuration.createOutgoingMessageValueFactory());
messageValueFactory);
vertexIdMessages.setConf(configuration);
vertexIdMessages.initialize();
for (I vertexId :
@@ -231,7 +235,7 @@ private void sendPartitionMessages(WorkerInfo workerInfo,
doRequest(workerInfo, messagesRequest);
vertexIdMessages =
new ByteArrayVertexIdMessages<I, Writable>(
configuration.createOutgoingMessageValueFactory());
messageValueFactory);
vertexIdMessages.setConf(configuration);
vertexIdMessages.initialize();
}

0 comments on commit 6903248

Please sign in to comment.