From 2117d1dbba8f18b08da70e890c30111edb3aebe3 Mon Sep 17 00:00:00 2001 From: spupyrev Date: Tue, 30 Aug 2016 17:32:13 -0700 Subject: [PATCH] faster maps Summary: The idea is to replace HashMap to Long2ObjectOpenHashMap (and Map to Int2Object...) This will save space and speed up some applications. I changed the type of such a map in TestGraph.java, which gives up to 2x speed up on an example of page rank computation (see comment below) JIRA: https://issues.apache.org/jira/browse/GIRAPH-1049 Test Plan: TestBasicCollections.java contain some tests Reviewers: sergey.edunov, maja.kabiljo, dionysis.logothetis, heslami, ikabiljo Reviewed By: heslami Differential Revision: https://reviews.facebook.net/D55587 --- .../framework/api/local/InternalApi.java | 119 +++++- .../api/local/InternalMessageStore.java | 360 ++++-------------- .../framework/api/local/LocalBlockRunner.java | 8 +- .../test_setup/NumericTestGraph.java | 2 +- .../giraph/bsp/CentralizedServiceWorker.java | 14 +- .../flow_control/CreditBasedFlowControl.java | 37 +- .../messages/AbstractListPerVertexStore.java | 48 ++- .../ByteArrayMessagesPerVertexStore.java | 50 ++- .../messages/InMemoryMessageStoreFactory.java | 55 +-- .../giraph/comm/messages/MessageStore.java | 12 + .../comm/messages/MessageStoreFactory.java | 5 +- .../messages/OneMessagePerVertexStore.java | 51 ++- .../comm/messages/PartitionSplitInfo.java | 70 ++++ .../messages/PointerListPerVertexStore.java | 41 +- .../comm/messages/SimpleMessageStore.java | 15 +- .../primitives/IdByteArrayMessageStore.java | 52 ++- .../IdOneMessagePerVertexStore.java | 48 ++- .../primitives/IntByteArrayMessageStore.java | 257 ------------- .../primitives/IntFloatMessageStore.java | 42 +- .../primitives/LongDoubleMessageStore.java | 39 +- ...eStore.java => LongAbstractListStore.java} | 76 ++-- ...ssageStore.java => LongAbstractStore.java} | 29 +- .../long_id/LongByteArrayMessageStore.java | 177 --------- ...ava => LongPointerListPerVertexStore.java} | 53 ++- .../queue/AsyncMessageStoreWrapper.java | 19 +- .../ooc/data/DiskBackedMessageStore.java | 27 +- .../ops/collections/Basic2ObjectMap.java | 191 +++++++++- .../collections/BasicCollectionsUtils.java | 73 ++++ .../org/apache/giraph/utils/TestGraph.java | 32 +- .../utils/VerboseByteStructMessageWrite.java | 44 ++- .../giraph/worker/BspServiceWorker.java | 27 +- .../TestIntFloatPrimitiveMessageStores.java | 10 +- .../TestLongDoublePrimitiveMessageStores.java | 10 +- .../queue/AsyncMessageStoreWrapperTest.java | 21 +- .../giraph/master/TestSwitchClasses.java | 2 +- .../giraph/types/TestBasicCollections.java | 207 ++++++++++ .../org/apache/giraph/utils/MockUtils.java | 15 +- .../scc/SccComputationTestInMemory.java | 6 +- 38 files changed, 1305 insertions(+), 1039 deletions(-) create mode 100644 giraph-core/src/main/java/org/apache/giraph/comm/messages/PartitionSplitInfo.java delete mode 100644 giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java rename giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/{LongAbstractListMessageStore.java => LongAbstractListStore.java} (72%) rename giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/{LongAbstractMessageStore.java => LongAbstractStore.java} (85%) delete mode 100644 giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java rename giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/{LongPointerListMessageStore.java => LongPointerListPerVertexStore.java} (77%) create mode 100644 giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicCollectionsUtils.java create mode 100644 giraph-core/src/test/java/org/apache/giraph/types/TestBasicCollections.java diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java index a4703b459..a8d5ef792 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java @@ -17,12 +17,20 @@ */ package org.apache.giraph.block_app.framework.api.local; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.Preconditions; + +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.ints.IntList; + import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; @@ -35,7 +43,8 @@ import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; import org.apache.giraph.block_app.framework.api.BlockWorkerValueAccessor; import org.apache.giraph.block_app.framework.api.Counter; -import org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalConcurrentMessageStore; +import org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalChecksMessageStore; +import org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalWrappedMessageStore; import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic; import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces; import org.apache.giraph.block_app.framework.output.BlockOutputDesc; @@ -44,8 +53,10 @@ import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler.BroadcastHandleImpl; import org.apache.giraph.comm.SendMessageCache.TargetVertexIdIterator; +import org.apache.giraph.comm.messages.PartitionSplitInfo; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.MessageClasses; import org.apache.giraph.edge.Edge; import org.apache.giraph.edge.OutEdges; import org.apache.giraph.graph.Vertex; @@ -62,8 +73,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import com.google.common.base.Preconditions; - /** * Internal implementation of Block API interfaces - representing an in-memory * giraph instance. @@ -371,17 +380,28 @@ public void afterMasterBeforeWorker(BlockWorkerPieces computation) { previousMessages = nextMessages; previousWorkerMessages = nextWorkerMessages; - nextMessages = InternalConcurrentMessageStore.createMessageStore( - conf, computation, runAllChecks); + nextMessages = createMessageStore( + conf, + computation.getOutgoingMessageClasses(conf), + createPartitionInfo(), + runAllChecks + ); nextWorkerMessages = new ArrayList<>(); + // finalize previous messages + if (previousMessages != null) { + previousMessages.finalizeStore(); + } + // process mutations: - Set targets = previousMessages == null ? - Collections.EMPTY_SET : previousMessages.targetsSet(); - if (createVertexOnMsgs) { - for (I target : targets) { + if (createVertexOnMsgs && previousMessages != null) { + Iterator iter = previousMessages.targetVertexIds(); + while (iter.hasNext()) { + I target = iter.next(); if (getPartition(target).getVertex(target) == null) { - mutations.putIfAbsent(target, new VertexMutations()); + // need a copy as the key might be reusable + I copyId = WritableUtils.createCopy(target); + mutations.putIfAbsent(copyId, new VertexMutations()); } } } @@ -393,8 +413,11 @@ public void afterMasterBeforeWorker(BlockWorkerPieces computation) { getPartition(vertexIndex).getVertex(vertexIndex); VertexMutations curMutations = entry.getValue(); Vertex vertex = vertexResolver.resolve( - vertexIndex, originalVertex, curMutations, - targets.contains(vertexIndex)); + vertexIndex, + originalVertex, + curMutations, + previousMessages != null && previousMessages.hasMessage(vertexIndex) + ); if (vertex != null) { getPartition(vertex.getId()).putVertex(vertex); @@ -406,6 +429,76 @@ public void afterMasterBeforeWorker(BlockWorkerPieces computation) { mutations.clear(); } + private + InternalMessageStore createMessageStore( + ImmutableClassesGiraphConfiguration conf, + MessageClasses messageClasses, + PartitionSplitInfo partitionInfo, + boolean runAllChecks + ) { + InternalMessageStore messageStore = + InternalWrappedMessageStore.create(conf, messageClasses, partitionInfo); + if (runAllChecks) { + return new InternalChecksMessageStore( + messageStore, conf, messageClasses.createMessageValueFactory(conf)); + } else { + return messageStore; + } + } + + private PartitionSplitInfo createPartitionInfo() { + return new PartitionSplitInfo() { + /** Ids of partitions */ + private IntList partitionIds; + /** Queue of partitions to be precessed in a superstep */ + private Queue> partitionQueue; + + @Override + public int getPartitionId(I vertexId) { + return partitionerFactory.getPartition(vertexId, partitions.size(), 1); + } + + @Override + public Iterable getPartitionIds() { + if (partitionIds == null) { + partitionIds = new IntArrayList(partitions.size()); + for (int i = 0; i < partitions.size(); i++) { + partitionIds.add(i); + } + } + Preconditions.checkState(partitionIds.size() == partitions.size()); + return partitionIds; + } + + @Override + public long getPartitionVertexCount(Integer partitionId) { + return partitions.get(partitionId).getVertexCount(); + } + + @Override + public void startIteration() { + checkState(partitionQueue == null || partitionQueue.isEmpty(), + "startIteration: It seems that some of " + + "of the partitions from previous iteration over partition store are" + + " not yet processed."); + + partitionQueue = new LinkedList>(); + for (Partition partition : partitions) { + partitionQueue.add(partition); + } + } + + @Override + public Partition getNextPartition() { + return partitionQueue.poll(); + } + + @Override + public void putPartition(Partition partition) { + } + }; + } + public List> getPartitions() { return partitions; } diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java index 92d9821bb..d8ea68ac4 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java @@ -17,32 +17,24 @@ */ package org.apache.giraph.block_app.framework.api.local; +import com.google.common.collect.Iterators; + import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; -import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces; -import org.apache.giraph.combiner.MessageCombiner; -import org.apache.giraph.comm.messages.MessageEncodeAndStoreType; +import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory; +import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.comm.messages.PartitionSplitInfo; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.conf.MessageClasses; import org.apache.giraph.factories.MessageValueFactory; -import org.apache.giraph.types.ops.TypeOps; -import org.apache.giraph.types.ops.TypeOpsUtils; -import org.apache.giraph.utils.ExtendedDataInput; -import org.apache.giraph.utils.ExtendedDataOutput; -import org.apache.giraph.utils.UnsafeReusableByteArrayInput; import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import com.google.common.collect.AbstractIterator; - /** * Interface for internal message store, used by LocalBlockRunner * @@ -52,309 +44,93 @@ @SuppressWarnings("rawtypes") interface InternalMessageStore { - Set targetsSet(); + Iterator targetVertexIds(); + boolean hasMessage(I id); Iterable takeMessages(I id); void sendMessage(I id, M message); void sendMessageToMultipleEdges(Iterator idIter, M message); + void finalizeStore(); /** - * Abstract Internal message store implementation that uses - * ConcurrentHashMap to store objects received thus far. + * A wrapper that uses InMemoryMessageStoreFactory to + * create MessageStore * * @param Vertex id type * @param Message type - * @param Receiver object that particular implementation uses - * (message, array of messages, byte array, etc) */ - abstract class InternalConcurrentMessageStore - - implements InternalMessageStore { - private final ConcurrentHashMap received = - new ConcurrentHashMap<>(); - - private final Class idClass; - private final TypeOps idTypeOps; - - InternalConcurrentMessageStore(Class idClass) { - this.idClass = idClass; - idTypeOps = TypeOpsUtils.getTypeOpsOrNull(idClass); - } - - public I copyId(I id) { - if (idTypeOps != null) { - return idTypeOps.createCopy(id); - } else { - return WritableUtils.createCopy(id, idClass, null); - } - } - - R getReceiverFor(I id) { - R value = received.get(id); - - if (value == null) { - id = copyId(id); - value = createNewReceiver(); - R oldValue = received.putIfAbsent(id, value); - if (oldValue != null) { - value = oldValue; - } - } - return value; - } - - R removeFor(I id) { - return received.remove(id); - } - - abstract R createNewReceiver(); - - @Override - public Set targetsSet() { - return received.keySet(); - } - - @Override - public void sendMessageToMultipleEdges(Iterator idIter, M message) { - while (idIter.hasNext()) { - sendMessage(idIter.next(), message); - } - } - - public static - InternalMessageStore createMessageStore( - final ImmutableClassesGiraphConfiguration conf, - final MessageClasses messageClasses + class InternalWrappedMessageStore + + implements InternalMessageStore { + private final MessageStore messageStore; + private final PartitionSplitInfo partitionInfo; + + public InternalWrappedMessageStore( + ImmutableClassesGiraphConfiguration conf, + MessageStore messageStore, + PartitionSplitInfo partitionInfo ) { - MessageCombiner combiner = - messageClasses.createMessageCombiner(conf); - if (combiner != null) { - return new InternalCombinerMessageStore<>( - conf.getVertexIdClass(), combiner); - } else if (messageClasses.getMessageEncodeAndStoreType().equals( - MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) { - return new InternalSharedByteMessageStore<>( - conf.getVertexIdClass(), - messageClasses.createMessageValueFactory(conf)); - } else { - return new InternalByteMessageStore<>( - conf.getVertexIdClass(), - messageClasses.createMessageValueFactory(conf), - conf); - } + this.messageStore = messageStore; + this.partitionInfo = partitionInfo; } public static - InternalMessageStore createMessageStore( - final ImmutableClassesGiraphConfiguration conf, - final BlockWorkerPieces pieces, boolean runAllChecks) { - @SuppressWarnings("unchecked") - MessageClasses messageClasses = - pieces.getOutgoingMessageClasses(conf); - - InternalMessageStore messageStore = - createMessageStore(conf, messageClasses); - if (runAllChecks) { - return new InternalChecksMessageStore( - messageStore, conf, messageClasses.createMessageValueFactory(conf)); - } else { - return messageStore; - } - } - } - - /** - * InternalMessageStore that combines messages as they are received. - * - * @param Vertex id value type - * @param Message type - */ - static class InternalCombinerMessageStore - - extends InternalConcurrentMessageStore { - private final MessageCombiner messageCombiner; - - public InternalCombinerMessageStore(Class idClass, - MessageCombiner messageCombiner) { - super(idClass); - this.messageCombiner = messageCombiner; - } - - @Override - public Iterable takeMessages(I id) { - M message = removeFor(id); - if (message != null) { - return Collections.singleton(message); - } else { - return null; - } - } - - @Override - public void sendMessage(I id, M message) { - M mainMessage = getReceiverFor(id); - synchronized (mainMessage) { - messageCombiner.combine(id, mainMessage, message); - } - } - - @Override - M createNewReceiver() { - return messageCombiner.createInitialMessage(); - } - } - - /** - * InternalMessageStore that keeps messages for each vertex in byte array. - * - * @param Vertex id value type - * @param Message type - */ - static class InternalByteMessageStore - - extends InternalConcurrentMessageStore { - private final MessageValueFactory messageFactory; - private final ImmutableClassesGiraphConfiguration conf; - - public InternalByteMessageStore( - Class idClass, MessageValueFactory messageFactory, - ImmutableClassesGiraphConfiguration conf + InternalMessageStore create( + ImmutableClassesGiraphConfiguration conf, + MessageClasses messageClasses, + PartitionSplitInfo partitionInfo ) { - super(idClass); - this.messageFactory = messageFactory; - this.conf = conf; - } - - @Override - public Iterable takeMessages(I id) { - final ExtendedDataOutput out = removeFor(id); - if (out == null) { - return null; - } - - return new Iterable() { - @Override - public Iterator iterator() { - final ExtendedDataInput in = conf.createExtendedDataInput( - out.getByteArray(), 0, out.getPos() - ); - - final M message = messageFactory.newInstance(); - return new AbstractIterator() { - @Override - protected M computeNext() { - if (in.available() == 0) { - return endOfData(); - } - try { - message.readFields(in); - } catch (IOException e) { - throw new RuntimeException(e); - } - return message; - } - }; - } - }; + InMemoryMessageStoreFactory factory = + new InMemoryMessageStoreFactory<>(); + factory.initialize(partitionInfo, conf); + return new InternalWrappedMessageStore<>( + conf, + factory.newStore(messageClasses), + partitionInfo + ); } @Override public void sendMessage(I id, M message) { - ExtendedDataOutput out = getReceiverFor(id); - - synchronized (out) { - try { - message.write(out); - } catch (IOException e) { - throw new RuntimeException(e); - } + try { + messageStore.addMessage(id, message); + } catch (IOException e) { + throw new RuntimeException(e); } } @Override - ExtendedDataOutput createNewReceiver() { - return conf.createExtendedDataOutput(); - } - } - - /** - * InternalMessageStore that creates byte[] for each message, and - * all receivers share the same byte[]. - * - * @param Vertex id value type - * @param Message type - */ - static class InternalSharedByteMessageStore - - extends InternalConcurrentMessageStore> { - private final MessageValueFactory messageFactory; - - public InternalSharedByteMessageStore( - Class idClass, MessageValueFactory messageFactory) { - super(idClass); - this.messageFactory = messageFactory; + public void sendMessageToMultipleEdges(Iterator idIter, M message) { + while (idIter.hasNext()) { + sendMessage(idIter.next(), message); + } } @Override public Iterable takeMessages(I id) { - final List out = removeFor(id); - if (out == null) { - return null; - } - - return new Iterable() { - @Override - public Iterator iterator() { - final Iterator byteIter = out.iterator(); - final M message = messageFactory.newInstance(); - final UnsafeReusableByteArrayInput reusableInput = - new UnsafeReusableByteArrayInput(); - - return new Iterator() { - @Override - public boolean hasNext() { - return byteIter.hasNext(); - } - - @Override - public M next() { - WritableUtils.fromByteArrayUnsafe( - byteIter.next(), message, reusableInput); - return message; - } - - @Override - public void remove() { - byteIter.remove(); - } - }; - } - }; - } - - private void storeMessage(I id, byte[] messageData) { - List out = getReceiverFor(id); - synchronized (out) { - out.add(messageData); - } + Iterable result = messageStore.getVertexMessages(id); + messageStore.clearVertexMessages(id); + return result; } @Override - List createNewReceiver() { - return new ArrayList<>(); + public Iterator targetVertexIds() { + List> iterators = new ArrayList<>(); + for (int partition : partitionInfo.getPartitionIds()) { + Iterable vertices = + messageStore.getPartitionDestinationVertices(partition); + iterators.add(vertices.iterator()); + } + return Iterators.concat(iterators.iterator()); } @Override - public void sendMessage(I id, M message) { - storeMessage(id, WritableUtils.toByteArrayUnsafe(message)); + public boolean hasMessage(I id) { + return messageStore.hasMessagesForVertex(id); } @Override - public void sendMessageToMultipleEdges(Iterator idIter, M message) { - byte[] messageData = WritableUtils.toByteArrayUnsafe(message); - while (idIter.hasNext()) { - storeMessage(idIter.next(), messageData); - } + public void finalizeStore() { + messageStore.finalizeStore(); } } @@ -369,9 +145,11 @@ static class InternalChecksMessageStore private final ImmutableClassesGiraphConfiguration conf; private final MessageValueFactory messageFactory; - public InternalChecksMessageStore(InternalMessageStore messageStore, - ImmutableClassesGiraphConfiguration conf, - MessageValueFactory messageFactory) { + public InternalChecksMessageStore( + InternalMessageStore messageStore, + ImmutableClassesGiraphConfiguration conf, + MessageValueFactory messageFactory + ) { this.messageStore = messageStore; this.conf = conf; this.messageFactory = messageFactory; @@ -428,8 +206,18 @@ public Iterable takeMessages(I id) { } @Override - public Set targetsSet() { - return messageStore.targetsSet(); + public boolean hasMessage(I id) { + return messageStore.hasMessage(id); + } + + @Override + public Iterator targetVertexIds() { + return messageStore.targetVertexIds(); + } + + @Override + public void finalizeStore() { + messageStore.finalizeStore(); } } } diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java index 90aa8a298..33cd84b60 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java @@ -63,6 +63,9 @@ public class LocalBlockRunner { /** Number of threads to use */ public static final IntConfOption NUM_THREADS = new IntConfOption( "test.LocalBlockRunner.NUM_THREADS", 3, ""); + /** Number of vertex partitions */ + public static final IntConfOption NUM_PARTITIONS = new IntConfOption( + "test.LocalBlockRunner.NUM_PARTITIONS", 16, ""); /** * Whether to run all supported checks. Disable if you are running this * not within a unit test, and on a large graph, where performance matters. @@ -148,7 +151,8 @@ void runBlockWithVertexOutput( Preconditions.checkNotNull(block); Preconditions.checkNotNull(graph); ImmutableClassesGiraphConfiguration conf = graph.getConf(); - int numPartitions = NUM_THREADS.get(conf); + int numThreads = NUM_THREADS.get(conf); + int numPartitions = NUM_PARTITIONS.get(conf); boolean runAllChecks = RUN_ALL_CHECKS.get(conf); boolean serializeMaster = SERIALIZE_MASTER.get(conf); final boolean doOutputDuringComputation = conf.doOutputDuringComputation(); @@ -171,7 +175,7 @@ public void progress() { } })); - ExecutorService executor = Executors.newFixedThreadPool(numPartitions); + ExecutorService executor = Executors.newFixedThreadPool(numThreads); if (runAllChecks) { for (Vertex vertex : graph) { diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java index c5d2fb180..5b1508e14 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java @@ -101,7 +101,7 @@ public V getValue(Number vertexId) { * Get number of vertices in the graph */ public int getVertexCount() { - return testGraph.getVertices().size(); + return testGraph.getVertexCount(); } /** diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java index 5249829dd..e5e6b63ab 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java @@ -18,8 +18,13 @@ package org.apache.giraph.bsp; +import java.io.IOException; +import java.util.Collection; +import java.util.List; + import org.apache.giraph.comm.ServerData; import org.apache.giraph.comm.WorkerClient; +import org.apache.giraph.comm.messages.PartitionSplitInfo; import org.apache.giraph.graph.AddressesAndPartitionsWritable; import org.apache.giraph.graph.FinishedSuperstepStats; import org.apache.giraph.graph.GlobalStats; @@ -30,18 +35,14 @@ import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.partition.PartitionStats; import org.apache.giraph.partition.PartitionStore; -import org.apache.giraph.worker.WorkerInputSplitsHandler; import org.apache.giraph.worker.WorkerAggregatorHandler; import org.apache.giraph.worker.WorkerContext; import org.apache.giraph.worker.WorkerInfo; +import org.apache.giraph.worker.WorkerInputSplitsHandler; import org.apache.giraph.worker.WorkerObserver; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import java.io.IOException; -import java.util.Collection; -import java.util.List; - /** * All workers should have access to this centralized service to * execute the following methods. @@ -53,7 +54,7 @@ @SuppressWarnings("rawtypes") public interface CentralizedServiceWorker - extends CentralizedService { + extends CentralizedService, PartitionSplitInfo { /** * Setup (must be called prior to any other function) * @@ -146,6 +147,7 @@ FinishedSuperstepStats finishSuperstep( * @param vertexId Vertex id * @return Partition id */ + @Override int getPartitionId(I vertexId); /** diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java index 0e1d3d64f..9b15b9b8b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java @@ -18,22 +18,8 @@ package org.apache.giraph.comm.flow_control; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.commons.lang3.tuple.MutablePair; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.giraph.comm.netty.NettyClient; -import org.apache.giraph.comm.netty.handler.AckSignalFlag; -import org.apache.giraph.comm.requests.SendResumeRequest; -import org.apache.giraph.comm.requests.WritableRequest; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.conf.IntConfOption; -import org.apache.giraph.utils.AdjustableSemaphore; -import org.apache.giraph.utils.CallableFactory; -import org.apache.giraph.utils.LogStacktraceCallable; -import org.apache.giraph.utils.ThreadUtils; -import org.apache.log4j.Logger; +import static com.google.common.base.Preconditions.checkState; +import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS; import java.util.ArrayDeque; import java.util.ArrayList; @@ -52,8 +38,23 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import static com.google.common.base.Preconditions.checkState; -import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.giraph.comm.netty.NettyClient; +import org.apache.giraph.comm.netty.handler.AckSignalFlag; +import org.apache.giraph.comm.requests.SendResumeRequest; +import org.apache.giraph.comm.requests.WritableRequest; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.IntConfOption; +import org.apache.giraph.utils.AdjustableSemaphore; +import org.apache.giraph.utils.CallableFactory; +import org.apache.giraph.utils.LogStacktraceCallable; +import org.apache.giraph.utils.ThreadUtils; +import org.apache.log4j.Logger; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; /** * Representation of credit-based flow control policy. With this policy there diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java index c28dff539..d1cfc3ba8 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java @@ -18,18 +18,17 @@ package org.apache.giraph.comm.messages; -import org.apache.giraph.bsp.CentralizedServiceWorker; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentMap; + import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.factories.MessageValueFactory; -import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.utils.VertexIdIterator; +import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ConcurrentMap; - /** * Abstract Implementation of {@link SimpleMessageStore} where * multiple messages are stored per vertex as a list @@ -46,14 +45,14 @@ public abstract class AbstractListPerVertexStore messageValueFactory, - CentralizedServiceWorker service, + PartitionSplitInfo partitionInfo, ImmutableClassesGiraphConfiguration config) { - super(messageValueFactory, service, config); + super(messageValueFactory, partitionInfo, config); } /** @@ -71,15 +70,38 @@ public AbstractListPerVertexStore( * @return pointer list */ protected L getOrCreateList(VertexIdIterator iterator) { - PartitionOwner owner = - service.getVertexPartitionOwner(iterator.getCurrentVertexId()); - int partitionId = owner.getPartitionId(); + int partitionId = getPartitionId(iterator.getCurrentVertexId()); ConcurrentMap partitionMap = getOrCreatePartitionMap(partitionId); L list = partitionMap.get(iterator.getCurrentVertexId()); if (list == null) { L newList = createList(); list = partitionMap.putIfAbsent( - iterator.releaseCurrentVertexId(), newList); + iterator.releaseCurrentVertexId(), newList); + if (list == null) { + list = newList; + } + } + return list; + } + + /** + * Get the list of pointers for a vertex + * Each pointer has information of how to access an encoded message + * for this vertex + * This method will take ownership of the vertex id from the + * iterator if necessary (when used in the partition map entry) + * + * @param vertexId vertex id + * @return pointer list + */ + protected L getOrCreateList(I vertexId) { + int partitionId = getPartitionId(vertexId); + ConcurrentMap partitionMap = getOrCreatePartitionMap(partitionId); + L list = partitionMap.get(vertexId); + if (list == null) { + L newList = createList(); + I copyId = WritableUtils.createCopy(vertexId); + list = partitionMap.putIfAbsent(copyId, newList); if (list == null) { list = newList; } diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java index efbe11bf7..7bf52e455 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java @@ -18,6 +18,8 @@ package org.apache.giraph.comm.messages; +import com.google.common.collect.Iterators; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -33,12 +35,11 @@ import org.apache.giraph.utils.VertexIdMessageBytesIterator; import org.apache.giraph.utils.VertexIdMessageIterator; import org.apache.giraph.utils.VertexIdMessages; +import org.apache.giraph.utils.WritableUtils; import org.apache.giraph.utils.io.DataInputOutput; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import com.google.common.collect.Iterators; - /** * Implementation of {@link SimpleMessageStore} where multiple messages are * stored per vertex as byte backed datastructures. @@ -53,14 +54,14 @@ public class ByteArrayMessagesPerVertexStore messageValueFactory, - CentralizedServiceWorker service, + PartitionSplitInfo partitionInfo, ImmutableClassesGiraphConfiguration config) { - super(messageValueFactory, service, config); + super(messageValueFactory, partitionInfo, config); } @Override @@ -137,6 +138,26 @@ public void addPartitionMessages( } } + @Override + public void addMessage(I vertexId, M message) throws IOException { + ConcurrentMap partitionMap = + getOrCreatePartitionMap(getPartitionId(vertexId)); + DataInputOutput dataInputOutput = partitionMap.get(vertexId); + if (dataInputOutput == null) { + DataInputOutput newDataOutput = config.createMessagesInputOutput(); + I copyId = WritableUtils.createCopy(vertexId); + dataInputOutput = partitionMap.putIfAbsent(copyId, newDataOutput); + if (dataInputOutput == null) { + dataInputOutput = newDataOutput; + } + } + + synchronized (dataInputOutput) { + VerboseByteStructMessageWrite.verboseWriteCurrentMessage( + vertexId, message, dataInputOutput.getDataOutput()); + } + } + @Override protected Iterable getMessagesAsIterable( DataInputOutput dataInputOutput) { @@ -199,7 +220,7 @@ MessageStoreFactory> newFactory( public static class Factory implements MessageStoreFactory> { /** Service worker */ - private CentralizedServiceWorker service; + private PartitionSplitInfo partitionInfo; /** Hadoop configuration */ private ImmutableClassesGiraphConfiguration config; @@ -207,12 +228,14 @@ public static class Factory public Factory() { } /** - * @param service Worker service + * @param partitionInfo Partition split info * @param config Hadoop configuration */ - public Factory(CentralizedServiceWorker service, - ImmutableClassesGiraphConfiguration config) { - this.service = service; + public Factory( + PartitionSplitInfo partitionInfo, + ImmutableClassesGiraphConfiguration config + ) { + this.partitionInfo = partitionInfo; this.config = config; } @@ -221,14 +244,15 @@ public MessageStore newStore( MessageClasses messageClasses) { return new ByteArrayMessagesPerVertexStore( messageClasses.createMessageValueFactory(config), - service, config); + partitionInfo, config); } @Override - public void initialize(CentralizedServiceWorker service, + public void initialize(PartitionSplitInfo partitionInfo, ImmutableClassesGiraphConfiguration conf) { - this.service = service; + this.partitionInfo = partitionInfo; this.config = conf; } } + } diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java index 99a12c51f..aa6a70372 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java @@ -18,15 +18,12 @@ package org.apache.giraph.comm.messages; -import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.comm.messages.primitives.IdByteArrayMessageStore; import org.apache.giraph.comm.messages.primitives.IdOneMessagePerVertexStore; -import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore; import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore; import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore; -import org.apache.giraph.comm.messages.primitives.long_id.LongByteArrayMessageStore; -import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListMessageStore; +import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListPerVertexStore; import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; @@ -59,8 +56,8 @@ public class InMemoryMessageStoreFactory service; + /** Partition info */ + protected PartitionSplitInfo partitionInfo; /** Hadoop configuration */ protected ImmutableClassesGiraphConfiguration conf; @@ -87,24 +84,22 @@ protected MessageStore newStoreWithCombiner( if (vertexIdClass.equals(IntWritable.class) && messageClass.equals(FloatWritable.class)) { messageStore = new IntFloatMessageStore( - (CentralizedServiceWorker) service, + (PartitionSplitInfo) partitionInfo, (MessageCombiner) messageCombiner); } else if (vertexIdClass.equals(LongWritable.class) && messageClass.equals(DoubleWritable.class)) { messageStore = new LongDoubleMessageStore( - (CentralizedServiceWorker) service, + (PartitionSplitInfo) partitionInfo, (MessageCombiner) messageCombiner); } else { PrimitiveIdTypeOps idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIdClass); if (idTypeOps != null) { messageStore = new IdOneMessagePerVertexStore<>( - messageValueFactory, service, messageCombiner, - conf); + messageValueFactory, partitionInfo, messageCombiner, conf); } else { - messageStore = - new OneMessagePerVertexStore(messageValueFactory, service, - messageCombiner, conf); + messageStore = new OneMessagePerVertexStore( + messageValueFactory, partitionInfo, messageCombiner, conf); } } return messageStore; @@ -124,21 +119,11 @@ protected MessageStore newStoreWithoutCombiner( MessageEncodeAndStoreType encodeAndStore) { MessageStore messageStore = null; Class vertexIdClass = conf.getVertexIdClass(); - if (vertexIdClass.equals(IntWritable.class)) { // INT - messageStore = new IntByteArrayMessageStore(messageValueFactory, - service, conf); - } else if (vertexIdClass.equals(LongWritable.class)) { // LONG - if (encodeAndStore.equals( - MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION) || - encodeAndStore.equals( - MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION)) { - messageStore = new LongByteArrayMessageStore(messageValueFactory, - service, conf); - } else if (encodeAndStore.equals( - MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) { - messageStore = new LongPointerListMessageStore(messageValueFactory, - service, conf); - } + // a special case for LongWritable with POINTER_LIST_PER_VERTEX + if (vertexIdClass.equals(LongWritable.class) && encodeAndStore.equals( + MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) { + messageStore = new LongPointerListPerVertexStore( + messageValueFactory, partitionInfo, conf); } else { // GENERAL if (encodeAndStore.equals( MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION) || @@ -148,15 +133,15 @@ protected MessageStore newStoreWithoutCombiner( TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIdClass); if (idTypeOps != null) { messageStore = new IdByteArrayMessageStore<>( - messageValueFactory, service, conf); + messageValueFactory, partitionInfo, conf); } else { messageStore = new ByteArrayMessagesPerVertexStore<>( - messageValueFactory, service, conf); + messageValueFactory, partitionInfo, conf); } } else if (encodeAndStore.equals( MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) { - messageStore = new PointerListPerVertexStore<>(messageValueFactory, - service, conf); + messageStore = new PointerListPerVertexStore<>( + messageValueFactory, partitionInfo, conf); } } return messageStore; @@ -193,7 +178,7 @@ public MessageStore newStore( if (asyncMessageStoreThreads > 0) { messageStore = new AsyncMessageStoreWrapper( messageStore, - service.getPartitionStore().getPartitionIds(), + partitionInfo.getPartitionIds(), asyncMessageStoreThreads); } @@ -201,9 +186,9 @@ public MessageStore newStore( } @Override - public void initialize(CentralizedServiceWorker service, + public void initialize(PartitionSplitInfo partitionInfo, ImmutableClassesGiraphConfiguration conf) { - this.service = service; + this.partitionInfo = partitionInfo; this.conf = conf; } } diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java index 9c56d85e4..27e04ca67 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java @@ -21,6 +21,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; + import org.apache.giraph.utils.VertexIdMessages; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -88,6 +89,17 @@ public interface MessageStore messages); + /** + * Adds a message for a particular vertex + * The method is used by InternalMessageStore to send local messages; for + * the general case, use a more efficient addPartitionMessages + * + * @param vertexId Id of target vertex + * @param message A message to send + * @throws IOException + */ + void addMessage(I vertexId, M message) throws IOException; + /** * Called before start of computation in bspworker * Since it is run from a single thread while the store is not being diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java index 6a18aa8cf..ee45f9af1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java @@ -18,7 +18,6 @@ package org.apache.giraph.comm.messages; -import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.conf.MessageClasses; import org.apache.hadoop.io.Writable; @@ -45,9 +44,9 @@ public interface MessageStoreFactory service, + void initialize(PartitionSplitInfo partitionInfo, ImmutableClassesGiraphConfiguration conf); } diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java index 1d6701482..09534bc1b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java @@ -31,6 +31,7 @@ import org.apache.giraph.factories.MessageValueFactory; import org.apache.giraph.utils.VertexIdMessageIterator; import org.apache.giraph.utils.VertexIdMessages; +import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -49,16 +50,17 @@ public class OneMessagePerVertexStore messageValueFactory, - CentralizedServiceWorker service, - MessageCombiner messageCombiner, - ImmutableClassesGiraphConfiguration config) { - super(messageValueFactory, service, config); + MessageValueFactory messageValueFactory, + PartitionSplitInfo partitionInfo, + MessageCombiner messageCombiner, + ImmutableClassesGiraphConfiguration config + ) { + super(messageValueFactory, partitionInfo, config); this.messageCombiner = messageCombiner; } @@ -98,6 +100,25 @@ public void addPartitionMessages( } } + @Override + public void addMessage(I vertexId, M message) throws IOException { + ConcurrentMap partitionMap = + getOrCreatePartitionMap(getPartitionId(vertexId)); + M currentMessage = partitionMap.get(vertexId); + if (currentMessage == null) { + M newMessage = messageCombiner.createInitialMessage(); + // need a copy as vertexId might be reusable + I copyId = WritableUtils.createCopy(vertexId); + currentMessage = partitionMap.putIfAbsent(copyId, newMessage); + if (currentMessage == null) { + currentMessage = newMessage; + } + } + synchronized (currentMessage) { + messageCombiner.combine(vertexId, currentMessage, message); + } + } + @Override protected Iterable getMessagesAsIterable(M message) { return Collections.singleton(message); @@ -147,17 +168,19 @@ private static class Factory implements MessageStoreFactory> { /** Service worker */ - private CentralizedServiceWorker service; + private PartitionSplitInfo partitionInfo; /** Hadoop configuration */ private ImmutableClassesGiraphConfiguration config; /** - * @param service Worker service + * @param partitionInfo Partition split info * @param config Hadoop configuration */ - public Factory(CentralizedServiceWorker service, - ImmutableClassesGiraphConfiguration config) { - this.service = service; + public Factory( + PartitionSplitInfo partitionInfo, + ImmutableClassesGiraphConfiguration config + ) { + this.partitionInfo = partitionInfo; this.config = config; } @@ -165,14 +188,14 @@ public Factory(CentralizedServiceWorker service, public MessageStore newStore( MessageClasses messageClasses) { return new OneMessagePerVertexStore( - messageClasses.createMessageValueFactory(config), service, + messageClasses.createMessageValueFactory(config), partitionInfo, messageClasses.createMessageCombiner(config), config); } @Override - public void initialize(CentralizedServiceWorker service, + public void initialize(PartitionSplitInfo partitionInfo, ImmutableClassesGiraphConfiguration conf) { - this.service = service; + this.partitionInfo = partitionInfo; this.config = conf; } } diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/PartitionSplitInfo.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PartitionSplitInfo.java new file mode 100644 index 000000000..e6858253c --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PartitionSplitInfo.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.comm.messages; + +import org.apache.giraph.partition.Partition; +import org.apache.hadoop.io.WritableComparable; + +/** + * Interface providing partition split information. + * + * @param Vertex id type. + */ +public interface PartitionSplitInfo { + /** + * Get the partition id that a vertex id would belong to. + * + * @param vertexId Vertex id + * @return Partition id + */ + int getPartitionId(I vertexId); + + /** + * Get the ids of all the stored partitions (on current worker) as Iterable + * + * @return The partition ids + */ + Iterable getPartitionIds(); + + /** + * Return the number of vertices in a partition. + * + * @param partitionId Partition id + * @return The number of vertices in the specified partition + */ + long getPartitionVertexCount(Integer partitionId); + + /** + * {@link org.apache.giraph.partition.PartitionStore#startIteration()} + */ + void startIteration(); + + /** + * {@link org.apache.giraph.partition.PartitionStore#getNextPartition()} + * + * @return The next partition to process + */ + Partition getNextPartition(); + + /** + * {@link org.apache.giraph.partition.PartitionStore#putPartition(Partition)} + * + * @param partition Partition + */ + void putPartition(Partition partition); +} diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java index 4b32a17a6..429ff69e4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java @@ -19,23 +19,22 @@ package org.apache.giraph.comm.messages; import it.unimi.dsi.fastutil.longs.LongArrayList; -import org.apache.giraph.bsp.CentralizedServiceWorker; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.concurrent.ConcurrentMap; + import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.factories.MessageValueFactory; import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer; +import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut; import org.apache.giraph.utils.ExtendedDataOutput; import org.apache.giraph.utils.VertexIdMessageIterator; import org.apache.giraph.utils.VertexIdMessages; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.concurrent.ConcurrentMap; - -import static org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut; - /** * Implementation of {@link SimpleMessageStore} where multiple messages are * stored as a list of long pointers to extended data output objects @@ -54,14 +53,15 @@ public class PointerListPerVertexStore messageValueFactory, - CentralizedServiceWorker service, - ImmutableClassesGiraphConfiguration config) { - super(messageValueFactory, service, config); + MessageValueFactory messageValueFactory, + PartitionSplitInfo partitionInfo, + ImmutableClassesGiraphConfiguration config + ) { + super(messageValueFactory, partitionInfo, config); bytesBuffer = new ExtendedByteArrayOutputBuffer(config); } @@ -105,6 +105,21 @@ public void addPartitionMessages( } } + @Override + public void addMessage(I vertexId, M message) throws IOException { + LongArrayList list = getOrCreateList(vertexId); + IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut(); + long pointer = indexAndDataOut.getIndex(); + pointer <<= 32; + ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput(); + pointer += dataOutput.getPos(); + message.write(dataOutput); + + synchronized (list) { + list.add(pointer); + } + } + /** * Get messages as an iterable from message storage * diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java index 9c3ef7f91..bb250674a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java @@ -20,14 +20,15 @@ import com.google.common.collect.MapMaker; import com.google.common.collect.Maps; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentMap; + import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.factories.MessageValueFactory; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -46,8 +47,8 @@ public abstract class SimpleMessageStore implements MessageStore { /** Message class */ protected final MessageValueFactory messageValueFactory; - /** Service worker */ - protected final CentralizedServiceWorker service; + /** Partition split info */ + protected final PartitionSplitInfo partitionInfo; /** Map from partition id to map from vertex id to messages for that vertex */ protected final ConcurrentMap> map; /** Giraph configuration */ @@ -57,15 +58,15 @@ public abstract class SimpleMessageStore messageValueFactory, - CentralizedServiceWorker service, + PartitionSplitInfo partitionInfo, ImmutableClassesGiraphConfiguration config) { this.messageValueFactory = messageValueFactory; - this.service = service; + this.partitionInfo = partitionInfo; this.config = config; map = new MapMaker().concurrencyLevel( config.getNettyServerExecutionConcurrency()).makeMap(); @@ -114,7 +115,7 @@ protected abstract void writeMessages(T messages, DataOutput out) throws * @return Id of partiton */ protected int getPartitionId(I vertexId) { - return service.getVertexPartitionOwner(vertexId).getPartitionId(); + return partitionInfo.getPartitionId(vertexId); } /** diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java index 57f3ff63b..7c281027b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java @@ -17,6 +17,8 @@ */ package org.apache.giraph.comm.messages.primitives; +import com.google.common.collect.Lists; + import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import java.io.DataInput; @@ -25,9 +27,9 @@ import java.util.Iterator; import java.util.List; -import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.messages.MessageStore; import org.apache.giraph.comm.messages.MessagesIterable; +import org.apache.giraph.comm.messages.PartitionSplitInfo; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.factories.MessageValueFactory; import org.apache.giraph.types.ops.PrimitiveIdTypeOps; @@ -43,8 +45,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import com.google.common.collect.Lists; - /** * Special message store to be used when IDs are primitive and no combiner is * used. @@ -60,8 +60,8 @@ public class IdByteArrayMessageStore messageValueFactory; /** Map from partition id to map from vertex id to message */ private final Int2ObjectOpenHashMap> map; - /** Service worker */ - private final CentralizedServiceWorker service; + /** Partition split info */ + private final PartitionSplitInfo partitionInfo; /** Giraph configuration */ private final ImmutableClassesGiraphConfiguration config; /** Vertex id TypeOps */ @@ -87,26 +87,27 @@ public void write(DataOutput out, DataInputOutput value) * Constructor * * @param messageValueFactory Factory for creating message values - * @param service Service worker + * @param partitionInfo Partition split info * @param config Hadoop configuration */ public IdByteArrayMessageStore(MessageValueFactory messageValueFactory, - CentralizedServiceWorker service, - ImmutableClassesGiraphConfiguration config) { + PartitionSplitInfo partitionInfo, + ImmutableClassesGiraphConfiguration config + ) { this.messageValueFactory = messageValueFactory; - this.service = service; + this.partitionInfo = partitionInfo; this.config = config; idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOps(config.getVertexIdClass()); map = new Int2ObjectOpenHashMap>(); - for (int partitionId : service.getPartitionStore().getPartitionIds()) { + for (int partitionId : partitionInfo.getPartitionIds()) { + int capacity = Math.max(10, + (int) partitionInfo.getPartitionVertexCount(partitionId)); Basic2ObjectMap partitionMap = - idTypeOps.create2ObjectOpenHashMap( - Math.max(10, - (int) service.getPartitionStore() - .getPartitionVertexCount(partitionId)), - dataInputOutputWriter); + idTypeOps.create2ObjectOpenHashMap( + capacity, + dataInputOutputWriter); map.put(partitionId, partitionMap); } @@ -119,7 +120,7 @@ public IdByteArrayMessageStore(MessageValueFactory messageValueFactory, * @return Map which holds messages for partition which vertex belongs to. */ private Basic2ObjectMap getPartitionMap(I vertexId) { - return map.get(service.getPartitionId(vertexId)); + return map.get(partitionInfo.getPartitionId(vertexId)); } /** @@ -180,6 +181,25 @@ public void addPartitionMessages(int partitionId, } } + /** + * Adds a message for a particular vertex + * + * @param vertexId Id of target vertex + * @param message A message to send + * @throws IOException + */ + @Override + public void addMessage(I vertexId, M message) throws IOException { + Basic2ObjectMap partitionMap = + getPartitionMap(vertexId); + synchronized (partitionMap) { + DataInputOutput dataInputOutput = getDataInputOutput( + partitionMap, vertexId); + VerboseByteStructMessageWrite.verboseWriteCurrentMessage( + vertexId, message, dataInputOutput.getDataOutput()); + } + } + @Override public void clearPartition(int partitionId) { map.get(partitionId).clear(); diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java index 4463ddbec..415eafcf7 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java @@ -17,6 +17,8 @@ */ package org.apache.giraph.comm.messages.primitives; +import com.google.common.collect.Lists; + import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import java.io.DataInput; @@ -26,9 +28,9 @@ import java.util.Iterator; import java.util.List; -import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.comm.messages.PartitionSplitInfo; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.factories.MessageValueFactory; import org.apache.giraph.types.ops.PrimitiveIdTypeOps; @@ -41,8 +43,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import com.google.common.collect.Lists; - /** * Special message store to be used when IDs are primitive and message doesn't * need to be, and message combiner is used. @@ -62,8 +62,8 @@ public class IdOneMessagePerVertexStore messageValueFactory; /** Message messageCombiner */ private final MessageCombiner messageCombiner; - /** Service worker */ - private final CentralizedServiceWorker service; + /** Partition split info */ + private final PartitionSplitInfo partitionInfo; /** Giraph configuration */ private final ImmutableClassesGiraphConfiguration config; /** Vertex id TypeOps */ @@ -87,16 +87,16 @@ public void write(DataOutput out, M value) throws IOException { * Constructor * * @param messageValueFactory Message value factory - * @param service Service worker + * @param partitionInfo Partition split info * @param messageCombiner Message messageCombiner * @param config Config */ public IdOneMessagePerVertexStore( MessageValueFactory messageValueFactory, - CentralizedServiceWorker service, + PartitionSplitInfo partitionInfo, MessageCombiner messageCombiner, ImmutableClassesGiraphConfiguration config) { - this.service = service; + this.partitionInfo = partitionInfo; this.config = config; this.messageValueFactory = messageValueFactory; this.messageCombiner = messageCombiner; @@ -104,10 +104,11 @@ public IdOneMessagePerVertexStore( idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOps(config.getVertexIdClass()); map = new Int2ObjectOpenHashMap<>(); - for (int partitionId : service.getPartitionStore().getPartitionIds()) { + for (int partitionId : partitionInfo.getPartitionIds()) { Basic2ObjectMap partitionMap = idTypeOps.create2ObjectOpenHashMap( - Math.max(10, (int) service.getPartitionStore() - .getPartitionVertexCount(partitionId)), messageWriter); + Math.max(10, (int) partitionInfo.getPartitionVertexCount(partitionId)), + messageWriter + ); map.put(partitionId, partitionMap); } } @@ -119,7 +120,7 @@ public IdOneMessagePerVertexStore( * @return Map which holds messages for partition which vertex belongs to. */ private Basic2ObjectMap getPartitionMap(I vertexId) { - return map.get(service.getPartitionId(vertexId)); + return map.get(partitionInfo.getPartitionId(vertexId)); } @Override @@ -151,6 +152,29 @@ public void addPartitionMessages( } } + /** + * Adds a message for a particular vertex + * + * @param vertexId Id of target vertex + * @param message A message to send + * @throws IOException + */ + @Override + public void addMessage(I vertexId, M message) throws IOException { + Basic2ObjectMap partitionMap = getPartitionMap(vertexId); + synchronized (partitionMap) { + M currentMessage = partitionMap.get(vertexId); + if (currentMessage == null) { + M newMessage = messageCombiner.createInitialMessage(); + currentMessage = partitionMap.put(vertexId, newMessage); + if (currentMessage == null) { + currentMessage = newMessage; + } + } + messageCombiner.combine(vertexId, currentMessage, message); + } + } + @Override public void clearPartition(int partitionId) { map.get(partitionId).clear(); diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java deleted file mode 100644 index 4ef9e76ba..000000000 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.comm.messages.primitives; - -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.comm.messages.MessageStore; -import org.apache.giraph.comm.messages.MessagesIterable; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.factories.MessageValueFactory; -import org.apache.giraph.utils.VertexIdMessageBytesIterator; -import org.apache.giraph.utils.VertexIdMessageIterator; -import org.apache.giraph.utils.VertexIdMessages; -import org.apache.giraph.utils.EmptyIterable; -import org.apache.giraph.utils.VerboseByteStructMessageWrite; -import org.apache.giraph.utils.io.DataInputOutput; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Writable; - -import com.google.common.collect.Lists; - -import it.unimi.dsi.fastutil.ints.Int2ObjectMap; -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; -import it.unimi.dsi.fastutil.ints.IntIterator; -import it.unimi.dsi.fastutil.objects.ObjectIterator; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.List; - -/** - * Special message store to be used when ids are IntWritable and no combiner - * is used. - * Uses fastutil primitive maps in order to decrease number of objects and - * get better performance. - * - * @param Message type - */ -public class IntByteArrayMessageStore - implements MessageStore { - /** Message value factory */ - protected final MessageValueFactory messageValueFactory; - /** Map from partition id to map from vertex id to message */ - private final - Int2ObjectOpenHashMap> map; - /** Service worker */ - private final CentralizedServiceWorker service; - /** Giraph configuration */ - private final ImmutableClassesGiraphConfiguration config; - - /** - * Constructor - * - * @param messageValueFactory Factory for creating message values - * @param service Service worker - * @param config Hadoop configuration - */ - public IntByteArrayMessageStore( - MessageValueFactory messageValueFactory, - CentralizedServiceWorker service, - ImmutableClassesGiraphConfiguration - config) { - this.messageValueFactory = messageValueFactory; - this.service = service; - this.config = config; - - map = - new Int2ObjectOpenHashMap>(); - for (int partitionId : service.getPartitionStore().getPartitionIds()) { - Int2ObjectOpenHashMap partitionMap = - new Int2ObjectOpenHashMap( - (int) service.getPartitionStore() - .getPartitionVertexCount(partitionId)); - map.put(partitionId, partitionMap); - } - } - - @Override - public boolean isPointerListEncoding() { - return false; - } - - /** - * Get map which holds messages for partition which vertex belongs to. - * - * @param vertexId Id of the vertex - * @return Map which holds messages for partition which vertex belongs to. - */ - private Int2ObjectOpenHashMap getPartitionMap( - IntWritable vertexId) { - return map.get(service.getPartitionId(vertexId)); - } - - /** - * Get the DataInputOutput for a vertex id, creating if necessary. - * - * @param partitionMap Partition map to look in - * @param vertexId Id of the vertex - * @return DataInputOutput for this vertex id (created if necessary) - */ - private DataInputOutput getDataInputOutput( - Int2ObjectOpenHashMap partitionMap, - int vertexId) { - DataInputOutput dataInputOutput = partitionMap.get(vertexId); - if (dataInputOutput == null) { - dataInputOutput = config.createMessagesInputOutput(); - partitionMap.put(vertexId, dataInputOutput); - } - return dataInputOutput; - } - - @Override - public void addPartitionMessages(int partitionId, - VertexIdMessages messages) { - Int2ObjectOpenHashMap partitionMap = - map.get(partitionId); - synchronized (partitionMap) { - VertexIdMessageBytesIterator - vertexIdMessageBytesIterator = - messages.getVertexIdMessageBytesIterator(); - // Try to copy the message buffer over rather than - // doing a deserialization of a message just to know its size. This - // should be more efficient for complex objects where serialization is - // expensive. If this type of iterator is not available, fall back to - // deserializing/serializing the messages - if (vertexIdMessageBytesIterator != null) { - while (vertexIdMessageBytesIterator.hasNext()) { - vertexIdMessageBytesIterator.next(); - DataInputOutput dataInputOutput = getDataInputOutput(partitionMap, - vertexIdMessageBytesIterator.getCurrentVertexId().get()); - vertexIdMessageBytesIterator.writeCurrentMessageBytes( - dataInputOutput.getDataOutput()); - } - } else { - try { - VertexIdMessageIterator - iterator = messages.getVertexIdMessageIterator(); - while (iterator.hasNext()) { - iterator.next(); - DataInputOutput dataInputOutput = getDataInputOutput(partitionMap, - iterator.getCurrentVertexId().get()); - VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator, - dataInputOutput.getDataOutput()); - } - } catch (IOException e) { - throw new RuntimeException("addPartitionMessages: IOException while" + - " adding messages for a partition: " + e); - } - } - } - } - - @Override - public void finalizeStore() { - } - - @Override - public void clearPartition(int partitionId) { - map.get(partitionId).clear(); - } - - @Override - public boolean hasMessagesForVertex(IntWritable vertexId) { - return getPartitionMap(vertexId).containsKey(vertexId.get()); - } - - @Override - public boolean hasMessagesForPartition(int partitionId) { - Int2ObjectOpenHashMap partitionMessages = - map.get(partitionId); - return partitionMessages != null && !partitionMessages.isEmpty(); - } - - @Override - public Iterable getVertexMessages( - IntWritable vertexId) { - DataInputOutput dataInputOutput = - getPartitionMap(vertexId).get(vertexId.get()); - if (dataInputOutput == null) { - return EmptyIterable.get(); - } else { - return new MessagesIterable(dataInputOutput, messageValueFactory); - } - } - - @Override - public void clearVertexMessages(IntWritable vertexId) { - getPartitionMap(vertexId).remove(vertexId.get()); - } - - @Override - public void clearAll() { - map.clear(); - } - - @Override - public Iterable getPartitionDestinationVertices( - int partitionId) { - Int2ObjectOpenHashMap partitionMap = - map.get(partitionId); - List vertices = - Lists.newArrayListWithCapacity(partitionMap.size()); - IntIterator iterator = partitionMap.keySet().iterator(); - while (iterator.hasNext()) { - vertices.add(new IntWritable(iterator.nextInt())); - } - return vertices; - } - - @Override - public void writePartition(DataOutput out, - int partitionId) throws IOException { - Int2ObjectOpenHashMap partitionMap = - map.get(partitionId); - out.writeInt(partitionMap.size()); - ObjectIterator> iterator = - partitionMap.int2ObjectEntrySet().fastIterator(); - while (iterator.hasNext()) { - Int2ObjectMap.Entry entry = iterator.next(); - out.writeInt(entry.getIntKey()); - entry.getValue().write(out); - } - } - - @Override - public void readFieldsForPartition(DataInput in, - int partitionId) throws IOException { - int size = in.readInt(); - Int2ObjectOpenHashMap partitionMap = - new Int2ObjectOpenHashMap(size); - while (size-- > 0) { - int vertexId = in.readInt(); - DataInputOutput dataInputOutput = config.createMessagesInputOutput(); - dataInputOutput.readFields(in); - partitionMap.put(vertexId, dataInputOutput); - } - synchronized (map) { - map.put(partitionId, partitionMap); - } - } -} diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java index 715bf456a..77b268916 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java @@ -18,6 +18,8 @@ package org.apache.giraph.comm.messages.primitives; +import com.google.common.collect.Lists; + import it.unimi.dsi.fastutil.ints.Int2FloatMap; import it.unimi.dsi.fastutil.ints.Int2FloatOpenHashMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; @@ -30,17 +32,14 @@ import java.util.Collections; import java.util.List; -import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.comm.messages.PartitionSplitInfo; import org.apache.giraph.utils.EmptyIterable; import org.apache.giraph.utils.VertexIdMessageIterator; import org.apache.giraph.utils.VertexIdMessages; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Writable; - -import com.google.common.collect.Lists; /** * Special message store to be used when ids are IntWritable and messages @@ -55,26 +54,26 @@ public class IntFloatMessageStore /** Message messageCombiner */ private final MessageCombiner messageCombiner; - /** Service worker */ - private final CentralizedServiceWorker service; + /** Partition split info */ + private final PartitionSplitInfo partitionInfo; /** * Constructor * - * @param service Service worker + * @param partitionInfo Partition split info * @param messageCombiner Message messageCombiner */ public IntFloatMessageStore( - CentralizedServiceWorker service, - MessageCombiner messageCombiner) { - this.service = service; + PartitionSplitInfo partitionInfo, + MessageCombiner messageCombiner + ) { + this.partitionInfo = partitionInfo; this.messageCombiner = messageCombiner; map = new Int2ObjectOpenHashMap(); - for (int partitionId : service.getPartitionStore().getPartitionIds()) { + for (int partitionId : partitionInfo.getPartitionIds()) { Int2FloatOpenHashMap partitionMap = new Int2FloatOpenHashMap( - (int) service.getPartitionStore() - .getPartitionVertexCount(partitionId)); + (int) partitionInfo.getPartitionVertexCount(partitionId)); map.put(partitionId, partitionMap); } } @@ -91,7 +90,7 @@ public boolean isPointerListEncoding() { * @return Map which holds messages for partition which vertex belongs to. */ private Int2FloatOpenHashMap getPartitionMap(IntWritable vertexId) { - return map.get(service.getPartitionId(vertexId)); + return map.get(partitionInfo.getPartitionId(vertexId)); } @Override @@ -117,11 +116,26 @@ public void addPartitionMessages(int partitionId, reusableMessage); message = reusableCurrentMessage.get(); } + // FIXME: messageCombiner should create an initial message instead partitionMap.put(vertexId, message); } } } + @Override + public void addMessage( + IntWritable vertexId, + FloatWritable message + ) throws IOException { + Int2FloatOpenHashMap partitionMap = getPartitionMap(vertexId); + synchronized (partitionMap) { + float originalValue = partitionMap.get(vertexId.get()); + FloatWritable originalMessage = new FloatWritable(originalValue); + messageCombiner.combine(vertexId, originalMessage, message); + partitionMap.put(vertexId.get(), originalMessage.get()); + } + } + @Override public void finalizeStore() { } diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java index 4fc4843e8..19aede4f1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java @@ -30,15 +30,14 @@ import java.util.Collections; import java.util.List; -import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.comm.messages.PartitionSplitInfo; import org.apache.giraph.utils.EmptyIterable; import org.apache.giraph.utils.VertexIdMessageIterator; import org.apache.giraph.utils.VertexIdMessages; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Writable; import com.google.common.collect.Lists; @@ -56,26 +55,25 @@ public class LongDoubleMessageStore private final MessageCombiner messageCombiner; /** Service worker */ - private final CentralizedServiceWorker service; + private final PartitionSplitInfo partitionInfo; /** * Constructor * - * @param service Service worker + * @param partitionInfo Partition split info * @param messageCombiner Message messageCombiner */ public LongDoubleMessageStore( - CentralizedServiceWorker service, - MessageCombiner messageCombiner) { - this.service = service; - this.messageCombiner = - messageCombiner; + PartitionSplitInfo partitionInfo, + MessageCombiner messageCombiner + ) { + this.partitionInfo = partitionInfo; + this.messageCombiner = messageCombiner; map = new Int2ObjectOpenHashMap(); - for (int partitionId : service.getPartitionStore().getPartitionIds()) { + for (int partitionId : partitionInfo.getPartitionIds()) { Long2DoubleOpenHashMap partitionMap = new Long2DoubleOpenHashMap( - (int) service.getPartitionStore() - .getPartitionVertexCount(partitionId)); + (int) partitionInfo.getPartitionVertexCount(partitionId)); map.put(partitionId, partitionMap); } } @@ -92,7 +90,7 @@ public boolean isPointerListEncoding() { * @return Map which holds messages for partition which vertex belongs to. */ private Long2DoubleOpenHashMap getPartitionMap(LongWritable vertexId) { - return map.get(service.getPartitionId(vertexId)); + return map.get(partitionInfo.getPartitionId(vertexId)); } @Override @@ -118,11 +116,26 @@ public void addPartitionMessages(int partitionId, reusableMessage); message = reusableCurrentMessage.get(); } + // FIXME: messageCombiner should create an initial message instead partitionMap.put(vertexId, message); } } } + @Override + public void addMessage( + LongWritable vertexId, + DoubleWritable message + ) throws IOException { + Long2DoubleOpenHashMap partitionMap = getPartitionMap(vertexId); + synchronized (partitionMap) { + double originalValue = partitionMap.get(vertexId.get()); + DoubleWritable originalMessage = new DoubleWritable(originalValue); + messageCombiner.combine(vertexId, originalMessage, message); + partitionMap.put(vertexId.get(), originalMessage.get()); + } + } + @Override public void finalizeStore() { } diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListStore.java similarity index 72% rename from giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java rename to giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListStore.java index d1c33be60..aee6a61db 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListStore.java @@ -20,18 +20,17 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; -import org.apache.giraph.bsp.CentralizedServiceWorker; + +import java.util.List; + +import org.apache.giraph.comm.messages.PartitionSplitInfo; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.factories.MessageValueFactory; import org.apache.giraph.graph.Vertex; import org.apache.giraph.partition.Partition; -import org.apache.giraph.partition.PartitionOwner; -import org.apache.giraph.utils.VertexIdIterator; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; -import java.util.List; - /** * Special message store to be used when ids are LongWritable and no combiner * is used. @@ -41,8 +40,8 @@ * @param message type * @param list type */ -public abstract class LongAbstractListMessageStore extends LongAbstractMessageStore { +public abstract class LongAbstractListStore extends LongAbstractStore { /** * Map used to store messages for nascent vertices i.e., ones * that did not exist at the start of current superstep but will get @@ -55,20 +54,20 @@ public abstract class LongAbstractListMessageStore messageValueFactory, - CentralizedServiceWorker service, + PartitionSplitInfo partitionInfo, ImmutableClassesGiraphConfiguration config) { - super(messageValueFactory, service, config); + super(messageValueFactory, partitionInfo, config); populateMap(); // create map for vertex ids (i.e., nascent vertices) not known yet nascentMap = new Int2ObjectOpenHashMap<>(); - for (int partitionId : service.getPartitionStore().getPartitionIds()) { + for (int partitionId : partitionInfo.getPartitionIds()) { nascentMap.put(partitionId, new Long2ObjectOpenHashMap()); } } @@ -78,9 +77,9 @@ public LongAbstractListMessageStore( */ private void populateMap() { // TODO - can parallelize? // populate with vertex ids already known - service.getPartitionStore().startIteration(); + partitionInfo.startIteration(); while (true) { - Partition partition = service.getPartitionStore().getNextPartition(); + Partition partition = partitionInfo.getNextPartition(); if (partition == null) { break; } @@ -90,7 +89,7 @@ private void populateMap() { // TODO - can parallelize? LongWritable vertexId = (LongWritable) vertex.getId(); partitionMap.put(vertexId.get(), createList()); } - service.getPartitionStore().putPartition(partition); + partitionInfo.putPartition(partition); } } @@ -103,29 +102,29 @@ private void populateMap() { // TODO - can parallelize? /** * Get list for the current vertexId * - * @param iterator vertexId iterator + * @param vertexId vertex id * @return list for current vertexId */ - protected L getList( - VertexIdIterator iterator) { - PartitionOwner owner = - service.getVertexPartitionOwner(iterator.getCurrentVertexId()); - long vertexId = iterator.getCurrentVertexId().get(); - int partitionId = owner.getPartitionId(); + protected L getList(LongWritable vertexId) { + long id = vertexId.get(); + int partitionId = partitionInfo.getPartitionId(vertexId); Long2ObjectOpenHashMap partitionMap = map.get(partitionId); - if (!partitionMap.containsKey(vertexId)) { - synchronized (nascentMap) { - // assumption: not many nascent vertices are created - // so overall synchronization is negligible - Long2ObjectOpenHashMap nascentPartitionMap = - nascentMap.get(partitionId); - if (nascentPartitionMap.get(vertexId) == null) { - nascentPartitionMap.put(vertexId, createList()); + L list = partitionMap.get(id); + if (list == null) { + Long2ObjectOpenHashMap nascentPartitionMap = + nascentMap.get(partitionId); + // assumption: not many nascent vertices are created + // so overall synchronization is negligible + synchronized (nascentPartitionMap) { + list = nascentPartitionMap.get(id); + if (list == null) { + list = createList(); + nascentPartitionMap.put(id, list); } - return nascentPartitionMap.get(vertexId); + return list; } } - return partitionMap.get(vertexId); + return list; } @Override @@ -137,6 +136,19 @@ public void finalizeStore() { nascentMap.clear(); } + @Override + public boolean hasMessagesForVertex(LongWritable vertexId) { + int partitionId = partitionInfo.getPartitionId(vertexId); + Long2ObjectOpenHashMap partitionMap = map.get(partitionId); + L list = partitionMap.get(vertexId.get()); + if (list != null && !list.isEmpty()) { + return true; + } + Long2ObjectOpenHashMap nascentMessages = nascentMap.get(partitionId); + return nascentMessages != null && + nascentMessages.containsKey(vertexId.get()); + } + // TODO - discussion /* some approaches for ensuring correctness with parallel inserts diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractStore.java similarity index 85% rename from giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java rename to giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractStore.java index b3ed4b2b8..385388d57 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractStore.java @@ -19,18 +19,20 @@ package org.apache.giraph.comm.messages.primitives.long_id; import com.google.common.collect.Lists; + import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; import it.unimi.dsi.fastutil.longs.LongIterator; -import org.apache.giraph.bsp.CentralizedServiceWorker; + +import java.util.List; + import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.comm.messages.PartitionSplitInfo; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.factories.MessageValueFactory; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; -import java.util.List; - /** * Special message store to be used when ids are LongWritable and no combiner * is used. @@ -40,7 +42,7 @@ * @param message type * @param datastructure used to hold messages */ -public abstract class LongAbstractMessageStore +public abstract class LongAbstractStore implements MessageStore { /** Message value factory */ protected final MessageValueFactory messageValueFactory; @@ -48,7 +50,7 @@ public abstract class LongAbstractMessageStore protected final Int2ObjectOpenHashMap> map; /** Service worker */ - protected final CentralizedServiceWorker service; + protected final PartitionSplitInfo partitionInfo; /** Giraph configuration */ protected final ImmutableClassesGiraphConfiguration config; @@ -57,23 +59,22 @@ public abstract class LongAbstractMessageStore * Constructor * * @param messageValueFactory Factory for creating message values - * @param service Service worker - * @param config Hadoop configuration + * @param partitionInfo Partition split info + * @param config Hadoop configuration */ - public LongAbstractMessageStore( + public LongAbstractStore( MessageValueFactory messageValueFactory, - CentralizedServiceWorker service, + PartitionSplitInfo partitionInfo, ImmutableClassesGiraphConfiguration config) { this.messageValueFactory = messageValueFactory; - this.service = service; + this.partitionInfo = partitionInfo; this.config = config; map = new Int2ObjectOpenHashMap<>(); - for (int partitionId : service.getPartitionStore().getPartitionIds()) { + for (int partitionId : partitionInfo.getPartitionIds()) { Long2ObjectOpenHashMap partitionMap = new Long2ObjectOpenHashMap( - (int) service.getPartitionStore() - .getPartitionVertexCount(partitionId)); + (int) partitionInfo.getPartitionVertexCount(partitionId)); map.put(partitionId, partitionMap); } } @@ -86,7 +87,7 @@ public LongAbstractMessageStore( */ protected Long2ObjectOpenHashMap getPartitionMap( LongWritable vertexId) { - return map.get(service.getPartitionId(vertexId)); + return map.get(partitionInfo.getPartitionId(vertexId)); } @Override diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java deleted file mode 100644 index bcdab98dd..000000000 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.comm.messages.primitives.long_id; - -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.comm.messages.MessagesIterable; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.factories.MessageValueFactory; -import org.apache.giraph.utils.VertexIdMessageBytesIterator; -import org.apache.giraph.utils.VertexIdMessageIterator; -import org.apache.giraph.utils.VertexIdMessages; -import org.apache.giraph.utils.VerboseByteStructMessageWrite; -import org.apache.giraph.utils.EmptyIterable; -import org.apache.giraph.utils.io.DataInputOutput; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Writable; - -import it.unimi.dsi.fastutil.longs.Long2ObjectMap; -import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; -import it.unimi.dsi.fastutil.objects.ObjectIterator; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -/** - * Special message store to be used when ids are LongWritable and no combiner - * is used. - * Uses fastutil primitive maps in order to decrease number of objects and - * get better performance. - * - * @param Message type - */ -public class LongByteArrayMessageStore - extends LongAbstractMessageStore { - - /** - * Constructor - * - * @param messageValueFactory Factory for creating message values - * @param service Service worker - * @param config Hadoop configuration - */ - public LongByteArrayMessageStore( - MessageValueFactory messageValueFactory, - CentralizedServiceWorker service, - ImmutableClassesGiraphConfiguration config) { - super(messageValueFactory, service, config); - } - - @Override - public boolean isPointerListEncoding() { - return false; - } - - /** - * Get the DataInputOutput for a vertex id, creating if necessary. - * - * @param partitionMap Partition map to look in - * @param vertexId Id of the vertex - * @return DataInputOutput for this vertex id (created if necessary) - */ - private DataInputOutput getDataInputOutput( - Long2ObjectOpenHashMap partitionMap, long vertexId) { - DataInputOutput dataInputOutput = partitionMap.get(vertexId); - if (dataInputOutput == null) { - dataInputOutput = config.createMessagesInputOutput(); - partitionMap.put(vertexId, dataInputOutput); - } - return dataInputOutput; - } - - @Override - public void addPartitionMessages(int partitionId, - VertexIdMessages messages) { - Long2ObjectOpenHashMap partitionMap = map.get(partitionId); - synchronized (partitionMap) { - VertexIdMessageBytesIterator - vertexIdMessageBytesIterator = - messages.getVertexIdMessageBytesIterator(); - // Try to copy the message buffer over rather than - // doing a deserialization of a message just to know its size. This - // should be more efficient for complex objects where serialization is - // expensive. If this type of iterator is not available, fall back to - // deserializing/serializing the messages - if (vertexIdMessageBytesIterator != null) { - while (vertexIdMessageBytesIterator.hasNext()) { - vertexIdMessageBytesIterator.next(); - DataInputOutput dataInputOutput = getDataInputOutput(partitionMap, - vertexIdMessageBytesIterator.getCurrentVertexId().get()); - vertexIdMessageBytesIterator.writeCurrentMessageBytes( - dataInputOutput.getDataOutput()); - } - } else { - try { - VertexIdMessageIterator - iterator = messages.getVertexIdMessageIterator(); - while (iterator.hasNext()) { - iterator.next(); - DataInputOutput dataInputOutput = getDataInputOutput(partitionMap, - iterator.getCurrentVertexId().get()); - VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator, - dataInputOutput.getDataOutput()); - } - } catch (IOException e) { - throw new RuntimeException("addPartitionMessages: IOException while" + - " adding messages for a partition: " + e); - } - } - } - } - - @Override - public void finalizeStore() { - } - - @Override - public Iterable getVertexMessages( - LongWritable vertexId) { - DataInputOutput dataInputOutput = - getPartitionMap(vertexId).get(vertexId.get()); - if (dataInputOutput == null) { - return EmptyIterable.get(); - } else { - return new MessagesIterable(dataInputOutput, messageValueFactory); - } - } - - @Override - public void writePartition(DataOutput out, int partitionId) - throws IOException { - Long2ObjectOpenHashMap partitionMap = - map.get(partitionId); - out.writeInt(partitionMap.size()); - ObjectIterator> iterator = - partitionMap.long2ObjectEntrySet().fastIterator(); - while (iterator.hasNext()) { - Long2ObjectMap.Entry entry = iterator.next(); - out.writeLong(entry.getLongKey()); - entry.getValue().write(out); - } - } - - @Override - public void readFieldsForPartition(DataInput in, - int partitionId) throws IOException { - int size = in.readInt(); - Long2ObjectOpenHashMap partitionMap = - new Long2ObjectOpenHashMap(size); - while (size-- > 0) { - long vertexId = in.readLong(); - DataInputOutput dataInputOutput = config.createMessagesInputOutput(); - dataInputOutput.readFields(in); - partitionMap.put(vertexId, dataInputOutput); - } - synchronized (map) { - map.put(partitionId, partitionMap); - } - } -} diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListPerVertexStore.java similarity index 77% rename from giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java rename to giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListPerVertexStore.java index eef75ba4d..525225cb6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListPerVertexStore.java @@ -19,25 +19,25 @@ package org.apache.giraph.comm.messages.primitives.long_id; import it.unimi.dsi.fastutil.longs.LongArrayList; -import org.apache.giraph.bsp.CentralizedServiceWorker; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.comm.messages.PartitionSplitInfo; import org.apache.giraph.comm.messages.PointerListMessagesIterable; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.factories.MessageValueFactory; import org.apache.giraph.utils.EmptyIterable; import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer; +import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut; import org.apache.giraph.utils.ExtendedDataOutput; import org.apache.giraph.utils.VertexIdMessageIterator; import org.apache.giraph.utils.VertexIdMessages; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import static org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut; - /** * This stores messages in * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer} @@ -45,8 +45,8 @@ * * @param message type */ -public class LongPointerListMessageStore - extends LongAbstractListMessageStore +public class LongPointerListPerVertexStore + extends LongAbstractListStore implements MessageStore { /** Buffers of byte array outputs used to store messages - thread safe */ @@ -56,15 +56,15 @@ public class LongPointerListMessageStore * Constructor * * @param messageValueFactory Factory for creating message values - * @param service Service worker + * @param partitionInfo Partition split info * @param config Hadoop configuration */ - public LongPointerListMessageStore( + public LongPointerListPerVertexStore( MessageValueFactory messageValueFactory, - CentralizedServiceWorker service, + PartitionSplitInfo partitionInfo, ImmutableClassesGiraphConfiguration config) { - super(messageValueFactory, service, config); + super(messageValueFactory, partitionInfo, config); bytesBuffer = new ExtendedByteArrayOutputBuffer(config); } @@ -79,8 +79,10 @@ protected LongArrayList createList() { } @Override - public void addPartitionMessages(int partitionId, - VertexIdMessages messages) { + public void addPartitionMessages( + int partitionId, + VertexIdMessages messages + ) { try { VertexIdMessageIterator iterator = messages.getVertexIdMessageIterator(); @@ -89,7 +91,8 @@ public void addPartitionMessages(int partitionId, while (iterator.hasNext()) { iterator.next(); M msg = iterator.getCurrentMessage(); - list = getList(iterator); + list = getList(iterator.getCurrentVertexId()); + if (iterator.isNewMessage()) { IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut(); pointer = indexAndDataOut.getIndex(); @@ -109,8 +112,22 @@ public void addPartitionMessages(int partitionId, } @Override - public Iterable getVertexMessages( - LongWritable vertexId) { + public void addMessage(LongWritable vertexId, M message) throws IOException { + LongArrayList list = getList(vertexId); + IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut(); + long pointer = indexAndDataOut.getIndex(); + pointer <<= 32; + ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput(); + pointer += dataOutput.getPos(); + message.write(dataOutput); + + synchronized (list) { + list.add(pointer); + } + } + + @Override + public Iterable getVertexMessages(LongWritable vertexId) { LongArrayList list = getPartitionMap(vertexId).get( vertexId.get()); if (list == null) { diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java index 62736946d..e8f00ec3e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java @@ -19,12 +19,6 @@ import it.unimi.dsi.fastutil.ints.Int2IntArrayMap; import it.unimi.dsi.fastutil.ints.Int2IntMap; -import org.apache.giraph.comm.messages.MessageStore; -import org.apache.giraph.utils.ThreadUtils; -import org.apache.giraph.utils.VertexIdMessages; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; @@ -35,6 +29,13 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; +import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.utils.ThreadUtils; +import org.apache.giraph.utils.VertexIdMessages; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + /** * This class decouples message receiving and processing * into separate threads thus reducing contention. @@ -156,6 +157,12 @@ public void addPartitionMessages( } } + @Override + public void addMessage(I vertexId, M message) throws IOException { + // TODO: implement if LocalBlockRunner needs async message store + throw new UnsupportedOperationException(); + } + @Override public void finalizeStore() { store.finalizeStore(); diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java index c8d0f794e..de67af4b9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java @@ -18,6 +18,10 @@ package org.apache.giraph.ooc.data; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + import org.apache.giraph.comm.messages.MessageStore; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.factories.MessageValueFactory; @@ -32,9 +36,6 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.log4j.Logger; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; /** * Implementation of a message store used for out-of-core mechanism. @@ -137,6 +138,26 @@ public void addPartitionMessages( } } + @Override + public void addMessage(I vertexId, M message) throws IOException { + if (useMessageCombiner) { + messageStore.addMessage(vertexId, message); + } else { + // TODO: implement if LocalBlockRunner needs this message store + throw new UnsupportedOperationException(); + } + } + + /** + * Gets the path that should be used specifically for message data. + * + * @param basePath path prefix to build the actual path from + * @param superstep superstep for which message data should be stored + * @return path to files specific for message data + */ + private static String getPath(String basePath, long superstep) { + return basePath + "_messages-S" + superstep; + } @Override public long loadPartitionData(int partitionId) diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java index 299dced24..8f0029396 100644 --- a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java @@ -23,23 +23,29 @@ import it.unimi.dsi.fastutil.longs.Long2ObjectMap; import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; import it.unimi.dsi.fastutil.longs.LongIterator; +import it.unimi.dsi.fastutil.objects.Object2ObjectMap; +import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap; import it.unimi.dsi.fastutil.objects.ObjectIterator; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Collection; import java.util.Iterator; import org.apache.giraph.types.ops.IntTypeOps; import org.apache.giraph.types.ops.LongTypeOps; import org.apache.giraph.types.ops.PrimitiveIdTypeOps; +import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; +import com.google.common.base.Preconditions; + /** * Basic2ObjectMap with only basic set of operations. - * All operations that return object T are returning reusable object, + * All operations that return object K are returning reusable object, * which is modified after calling any other function. * * @param Key type @@ -81,13 +87,11 @@ public abstract class Basic2ObjectMap implements Writable { * @return the old value, or null if no value was present for the given key. */ public abstract V remove(K key); - /** * TypeOps for type of keys this object holds * @return TypeOps */ public abstract PrimitiveIdTypeOps getKeyTypeOps(); - /** * Fast iterator over keys within this map, which doesn't allocate new * element for each returned element. @@ -99,6 +103,20 @@ public abstract class Basic2ObjectMap implements Writable { */ public abstract Iterator fastKeyIterator(); + /** + * Iterator over map values. + * + * @return Iterator + */ + public abstract Iterator valueIterator(); + + /** + * A collection of all values. + * + * @return Iterator + */ + public abstract Collection values(); + /** * Iterator that reuses key object. * @@ -211,6 +229,16 @@ public void reset() { }; } + @Override + public Iterator valueIterator() { + return map.values().iterator(); + } + + @Override + public Collection values() { + return map.values(); + } + @Override public void write(DataOutput out) throws IOException { out.writeInt(map.size()); @@ -318,8 +346,23 @@ public void reset() { }; } + @Override + public Iterator valueIterator() { + return map.values().iterator(); + } + + @Override + public Collection values() { + return map.values(); + } + @Override public void write(DataOutput out) throws IOException { + Preconditions.checkState( + valueWriter != null, + "valueWriter is not provided" + ); + out.writeInt(map.size()); ObjectIterator> iterator = map.long2ObjectEntrySet().fastIterator(); @@ -332,6 +375,11 @@ public void write(DataOutput out) throws IOException { @Override public void readFields(DataInput in) throws IOException { + Preconditions.checkState( + valueWriter != null, + "valueWriter is not provided" + ); + int size = in.readInt(); map.clear(); map.trim(size); @@ -342,4 +390,141 @@ public void readFields(DataInput in) throws IOException { } } } + + /** Writable implementation of Basic2ObjectMap */ + public static final class BasicObject2ObjectOpenHashMap + extends Basic2ObjectMap { + /** Map */ + private final Object2ObjectOpenHashMap map; + /** Key writer */ + private final WritableWriter keyWriter; + /** Value writer */ + private final WritableWriter valueWriter; + + /** + * Constructor + * + * @param keyWriter Writer of keys + * @param valueWriter Writer of values + */ + public BasicObject2ObjectOpenHashMap( + WritableWriter keyWriter, + WritableWriter valueWriter + ) { + this.map = new Object2ObjectOpenHashMap<>(); + this.keyWriter = keyWriter; + this.valueWriter = valueWriter; + } + + /** + * Constructor + * + * @param capacity Map capacity + * @param keyWriter Writer of keys + * @param valueWriter Writer of values + */ + public BasicObject2ObjectOpenHashMap( + int capacity, + WritableWriter keyWriter, + WritableWriter valueWriter + ) { + this.map = new Object2ObjectOpenHashMap<>(capacity); + this.keyWriter = keyWriter; + this.valueWriter = valueWriter; + } + + @Override + public void clear() { + map.clear(); + } + + @Override + public int size() { + return map.size(); + } + + @Override + public boolean containsKey(K key) { + return map.containsKey(key); + } + + @Override + public V put(K key, V value) { + // we need a copy since the key object is mutable + K copyKey = WritableUtils.createCopy(key); + return map.put(copyKey, value); + } + + @Override + public V get(K key) { + return map.get(key); + } + + @Override + public V remove(K key) { + return map.remove(key); + } + + @Override + public PrimitiveIdTypeOps getKeyTypeOps() { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator fastKeyIterator() { + return map.keySet().iterator(); + } + + @Override + public Iterator valueIterator() { + return map.values().iterator(); + } + + @Override + public Collection values() { + return map.values(); + } + + @Override + public void write(DataOutput out) throws IOException { + Preconditions.checkState( + keyWriter != null, + "keyWriter is not provided" + ); + Preconditions.checkState( + valueWriter != null, + "valueWriter is not provided" + ); + + out.writeInt(map.size()); + ObjectIterator> iterator = + map.object2ObjectEntrySet().fastIterator(); + while (iterator.hasNext()) { + Object2ObjectMap.Entry entry = iterator.next(); + keyWriter.write(out, entry.getKey()); + valueWriter.write(out, entry.getValue()); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + Preconditions.checkState( + keyWriter != null, + "keyWriter is not provided" + ); + Preconditions.checkState( + valueWriter != null, + "valueWriter is not provided" + ); + + int size = in.readInt(); + map.clear(); + map.trim(size); + while (size-- > 0) { + K key = keyWriter.readFields(in); + V value = valueWriter.readFields(in); + map.put(key, value); + } + } + } } diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicCollectionsUtils.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicCollectionsUtils.java new file mode 100644 index 000000000..23df7d337 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicCollectionsUtils.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.types.ops.collections; + +import org.apache.giraph.types.ops.PrimitiveIdTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.types.ops.collections.Basic2ObjectMap.BasicObject2ObjectOpenHashMap; +import org.apache.hadoop.io.Writable; + +/** + * Utility functions for constructing basic collections + */ +public class BasicCollectionsUtils { + /** No instances */ + private BasicCollectionsUtils() { } + + /** + * Construct OpenHashMap with primitive keys. + * + * @param Vertex id type + * @param Value type + * @param idClass Class type + * @return map + */ + public static + Basic2ObjectMap create2ObjectMap(Class idClass) { + return create2ObjectMap(idClass, null, null); + } + + /** + * Construct OpenHashMap with primitive keys. + * + * If keyWriter/valueWriter are not provided, + * readFields/write will throw an Exception, if called. + * + * @param Vertex id type + * @param Value type + * @param idClass Class type + * @param keyWriter writer for keys + * @param valueWriter writer for values + * @return map + */ + public static + Basic2ObjectMap create2ObjectMap( + Class idClass, + WritableWriter keyWriter, + WritableWriter valueWriter + ) { + PrimitiveIdTypeOps idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOpsOrNull( + idClass + ); + if (idTypeOps != null) { + return idTypeOps.create2ObjectOpenHashMap(valueWriter); + } else { + return new BasicObject2ObjectOpenHashMap<>(keyWriter, valueWriter); + } + } +} diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java index 2865a534c..46f7b4850 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java @@ -18,7 +18,7 @@ package org.apache.giraph.utils; -import java.util.HashMap; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; @@ -29,12 +29,13 @@ import org.apache.giraph.edge.EdgeFactory; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexValueCombiner; +import org.apache.giraph.types.ops.collections.Basic2ObjectMap; +import org.apache.giraph.types.ops.collections.BasicCollectionsUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import com.google.common.base.Objects; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; /** * TestGraph class for in-memory testing. @@ -50,7 +51,7 @@ public class TestGraph vertexValueCombiner; /** The vertex values */ - protected HashMap> vertices = Maps.newHashMap(); + protected Basic2ObjectMap> vertices; /** The configuration */ protected ImmutableClassesGiraphConfiguration conf; @@ -60,12 +61,19 @@ public class TestGraph(conf); vertexValueCombiner = this.conf.createVertexValueCombiner(); + vertices = BasicCollectionsUtils.create2ObjectMap( + this.conf.getVertexIdClass() + ); } - public HashMap> getVertices() { - return vertices; + public Collection> getVertices() { + return vertices.values(); + } + + public int getVertexCount() { + return vertices.size(); } public ImmutableClassesGiraphConfiguration getConf() { @@ -108,7 +116,7 @@ public TestGraph addVertex(Vertex vertex) { * @return this */ public TestGraph addVertex(I id, V value, - Entry... edges) { + Entry... edges) { addVertex(makeVertex(id, value, edges)); return this; } @@ -174,14 +182,6 @@ public TestGraph addEdge(I vertexId, I toVertex, E edgeValue) { .addEdge(EdgeFactory.create(toVertex, edgeValue)); return this; } - /** - * An iterator over the ids - * - * @return the iterator - */ - public Iterator idIterator() { - return vertices.keySet().iterator(); - } /** * An iterator over the vertices @@ -190,7 +190,7 @@ public Iterator idIterator() { */ @Override public Iterator> iterator() { - return vertices.values().iterator(); + return vertices.valueIterator(); } /** diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java index aa2549011..c3c43fbc8 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java @@ -18,12 +18,12 @@ package org.apache.giraph.utils; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - import java.io.DataOutput; import java.io.IOException; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + /** Verbose Error mesage for ByteArray based messages */ public class VerboseByteStructMessageWrite { /** @@ -43,13 +43,37 @@ private VerboseByteStructMessageWrite() { * @throws IOException * @throws RuntimeException */ - public static void - verboseWriteCurrentMessage(VertexIdMessageIterator iterator, - DataOutput out) throws IOException { + public static + void verboseWriteCurrentMessage( + VertexIdMessageIterator iterator, + DataOutput out + ) throws IOException { + verboseWriteCurrentMessage( + iterator.getCurrentVertexId(), iterator.getCurrentMessage(), out); + } + + /** + * verboseWriteCurrentMessage + * de-serialize, then write messages + * + * @param vertexId vertexId + * @param message message + * @param out DataOutput + * @param vertexId + * @param message + * @throws IOException + * @throws RuntimeException + */ + public static + void verboseWriteCurrentMessage( + I vertexId, + M message, + DataOutput out + ) throws IOException { try { - iterator.getCurrentMessage().write(out); + message.write(out); } catch (NegativeArraySizeException e) { - handleNegativeArraySize(iterator.getCurrentVertexId()); + handleNegativeArraySize(vertexId); } } @@ -59,8 +83,8 @@ private VerboseByteStructMessageWrite() { * @param vertexId vertexId * @param vertexId type */ - public static void handleNegativeArraySize( - I vertexId) { + public static + void handleNegativeArraySize(I vertexId) { throw new RuntimeException("The numbers of bytes sent to vertex " + vertexId + " exceeded the max capacity of " + "its ExtendedDataOutput. Please consider setting " + diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index cdb9b7efd..c51521d5e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -183,7 +183,7 @@ public class BspServiceWorker getPartitionIds() { + return getPartitionStore().getPartitionIds(); + } + + @Override + public long getPartitionVertexCount(Integer partitionId) { + return getPartitionStore().getPartitionVertexCount(partitionId); + } + + @Override + public void startIteration() { + getPartitionStore().startIteration(); + } + + @Override + public Partition getNextPartition() { + return getPartitionStore().getNextPartition(); + } + + @Override + public void putPartition(Partition partition) { + getPartitionStore().putPartition(partition); + } + @Override public ServerData getServerData() { return workerServer.getServerData(); diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java index e3b2db0d9..2c5f2dbaa 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java @@ -25,7 +25,7 @@ import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.combiner.FloatSumMessageCombiner; -import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore; +import org.apache.giraph.comm.messages.primitives.IdByteArrayMessageStore; import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; @@ -71,8 +71,10 @@ public Integer answer(InvocationOnMock invocation) { ); PartitionStore partitionStore = Mockito.mock(PartitionStore.class); Mockito.when(service.getPartitionStore()).thenReturn(partitionStore); + Mockito.when(service.getPartitionIds()).thenReturn( + Lists.newArrayList(0, 1)); Mockito.when(partitionStore.getPartitionIds()).thenReturn( - Lists.newArrayList(0, 1)); + Lists.newArrayList(0, 1)); Partition partition = Mockito.mock(Partition.class); Mockito.when(partition.getVertexCount()).thenReturn(Long.valueOf(1)); Mockito.when(partitionStore.getNextPartition()).thenReturn(partition); @@ -144,8 +146,8 @@ public void testIntFloatMessageStore() { @Test public void testIntByteArrayMessageStore() { - IntByteArrayMessageStore messageStore = - new IntByteArrayMessageStore(new + IdByteArrayMessageStore messageStore = + new IdByteArrayMessageStore<>(new TestMessageValueFactory(FloatWritable.class), service, conf); insertIntFloatMessages(messageStore); diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java index dc9850bca..5508b2cb2 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java @@ -25,7 +25,7 @@ import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.combiner.DoubleSumMessageCombiner; -import org.apache.giraph.comm.messages.primitives.long_id.LongByteArrayMessageStore; +import org.apache.giraph.comm.messages.primitives.IdByteArrayMessageStore; import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; @@ -68,8 +68,10 @@ public Integer answer(InvocationOnMock invocation) { ); PartitionStore partitionStore = Mockito.mock(PartitionStore.class); Mockito.when(service.getPartitionStore()).thenReturn(partitionStore); + Mockito.when(service.getPartitionIds()).thenReturn( + Lists.newArrayList(0, 1)); Mockito.when(partitionStore.getPartitionIds()).thenReturn( - Lists.newArrayList(0, 1)); + Lists.newArrayList(0, 1)); Partition partition = Mockito.mock(Partition.class); Mockito.when(partition.getVertexCount()).thenReturn(Long.valueOf(1)); Mockito.when(partitionStore.getNextPartition()).thenReturn(partition); @@ -145,8 +147,8 @@ public void testLongDoubleMessageStore() { @Test public void testLongByteArrayMessageStore() { - LongByteArrayMessageStore messageStore = - new LongByteArrayMessageStore( + IdByteArrayMessageStore messageStore = + new IdByteArrayMessageStore<>( new TestMessageValueFactory(DoubleWritable.class), service, createLongDoubleConf()); insertLongDoubleMessages(messageStore); diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java index ffc128896..1294c8865 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java @@ -18,6 +18,14 @@ package org.apache.giraph.comm.messages.queue; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; + import org.apache.giraph.comm.messages.MessageStore; import org.apache.giraph.factories.TestMessageValueFactory; import org.apache.giraph.utils.ByteArrayVertexIdMessages; @@ -26,14 +34,6 @@ import org.apache.hadoop.io.LongWritable; import org.junit.Test; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Arrays; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertNotNull; - /** * Test case for AsyncMessageStoreWrapper */ @@ -70,6 +70,10 @@ public void addPartitionMessages(int partition, VertexIdMessages messages) { counters[partition]++; } + @Override + public void addMessage(LongWritable vertexId, IntWritable message) throws IOException { + } + @Override public boolean isPointerListEncoding() { return false; @@ -124,5 +128,6 @@ public void writePartition(DataOutput out, int partitionId) throws IOException { public void readFieldsForPartition(DataInput in, int partitionId) throws IOException { } + } } diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java index d56c0fba7..954e4205e 100644 --- a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java +++ b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java @@ -56,7 +56,7 @@ public void testSwitchingClasses() throws Exception { graph.addVertex(id2, new StatusValue()); graph = InternalVertexRunner.runWithInMemoryOutput(conf, graph); - Assert.assertEquals(2, graph.getVertices().size()); + Assert.assertEquals(2, graph.getVertexCount()); } private static void checkVerticesOnFinalSuperstep( diff --git a/giraph-core/src/test/java/org/apache/giraph/types/TestBasicCollections.java b/giraph-core/src/test/java/org/apache/giraph/types/TestBasicCollections.java new file mode 100644 index 000000000..53c4f9bc3 --- /dev/null +++ b/giraph-core/src/test/java/org/apache/giraph/types/TestBasicCollections.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.types; + +import io.netty.util.internal.ThreadLocalRandom; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.giraph.types.ops.collections.Basic2ObjectMap; +import org.apache.giraph.types.ops.collections.BasicCollectionsUtils; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test BasicSets and Basic2ObjectMaps + */ +public class TestBasicCollections { + private void testLongWritable2Object(Map input) { + Basic2ObjectMap map = BasicCollectionsUtils.create2ObjectMap(LongWritable.class); + + LongWritable longW = new LongWritable(); + // adding + long keySum = 0; + for (Long key : input.keySet()) { + longW.set(key.longValue()); + Assert.assertNull(map.put(longW, input.get(key))); + keySum += key.longValue(); + } + Assert.assertEquals(input.size(), map.size()); + // iterator + long sum = 0; + Iterator iterator = map.fastKeyIterator(); + while (iterator.hasNext()) { + sum += iterator.next().get(); + } + Assert.assertEquals(keySum, sum); + // removal + for (Long key : input.keySet()) { + longW.set(key.longValue()); + Assert.assertEquals(input.get(key), map.get(longW)); + map.remove(longW); + } + Assert.assertEquals(0, map.size()); + } + + private void testFloatWritable2Object(Map input) { + Basic2ObjectMap map = BasicCollectionsUtils.create2ObjectMap(FloatWritable.class); + + FloatWritable floatW = new FloatWritable(); + // adding + float keySum = 0; + for (Float key : input.keySet()) { + floatW.set(key.longValue()); + Assert.assertNull(map.put(floatW, input.get(key))); + keySum += key.longValue(); + } + Assert.assertEquals(input.size(), map.size()); + // iterator + float sum = 0; + Iterator iterator = map.fastKeyIterator(); + while (iterator.hasNext()) { + sum += iterator.next().get(); + } + Assert.assertEquals(keySum, sum, 1e-6); + // removal + for (Float key : input.keySet()) { + floatW.set(key.longValue()); + Assert.assertEquals(input.get(key), map.get(floatW)); + map.remove(floatW); + } + Assert.assertEquals(0, map.size()); + } + + @Test + public void testLongWritable2Object() { + Map input = new HashMap<>(); + input.put(-1l, "a"); + input.put(0l, "b"); + input.put(100l, "c"); + input.put(26256l, "d"); + input.put(-1367367l, "a"); + input.put(-35635l, "e"); + input.put(1234567l, "f"); + testLongWritable2Object(input); + } + + @Test + public void testFloatWritable2Object() { + Map input = new HashMap<>(); + input.put(-1f, "a"); + input.put(0f, "b"); + input.put(1.23f, "c"); + input.put(-12.34f, "d"); + input.put(-1367367.45f, "a"); + input.put(-3.456f, "e"); + input.put(12.78f, "f"); + testFloatWritable2Object(input); + } + + private V getConcurrently(Basic2ObjectMap map, K key, V defaultValue) { + synchronized (map) { + V value = map.get(key); + + if (value == null) { + value = defaultValue; + map.put(key, value); + } + return value; + } + } + + private void removeConcurrently(Basic2ObjectMap map, K key) { + synchronized (map) { + map.remove(key); + } + } + + @Test + public void testLongWritable2ObjectConcurrent() throws InterruptedException { + final int numThreads = 10; + final int numValues = 100000; + + final Map map = new ConcurrentHashMap<>(); + for (int i = 0; i < numValues; i++) { + double value = ThreadLocalRandom.current().nextDouble(); + map.put(i, value); + } + + final int PARTS = 8; + final Basic2ObjectMap[] basicMaps = new Basic2ObjectMap[PARTS]; + for (int i = 0; i < PARTS; i++) { + basicMaps[i] = BasicCollectionsUtils.create2ObjectMap(IntWritable.class); + } + + long startTime = System.currentTimeMillis(); + + // adding in several threads + Thread[] threads = new Thread[numThreads]; + for (int t = 0; t < threads.length; t++) { + threads[t] = new Thread(new Runnable() { + @Override + public void run() { + IntWritable intW = new IntWritable(); + for (int i = 0; i < numValues; i++) { + intW.set(i); + double value = getConcurrently(basicMaps[(i * 123 + 17) % PARTS], intW, map.get(i)); + Assert.assertEquals(map.get(i).doubleValue(), value, 1e-6); + } + } + }); + threads[t].start(); + } + for (Thread t : threads) { + t.join(); + } + int totalSize = 0; + for (int i = 0; i < PARTS; i++) { + totalSize += basicMaps[i].size(); + } + Assert.assertEquals(numValues, totalSize); + + long endTime = System.currentTimeMillis(); + System.out.println("Add Time: " + (endTime - startTime) / 1000.0); + + // removing all objects + for (int t = 0; t < threads.length; t++) { + threads[t] = new Thread(new Runnable() { + @Override + public void run() { + IntWritable intW = new IntWritable(); + for (int i = 0; i < numValues; i++) { + intW.set(i); + removeConcurrently(basicMaps[(i * 123 + 17) % PARTS], intW); + } + } + }); + threads[t].start(); + } + for (Thread t : threads) { + t.join(); + } + for (int i = 0; i < PARTS; i++) { + Assert.assertEquals(0, basicMaps[i].size()); + } + } +} diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java index 63403abbc..b56998f95 100644 --- a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java +++ b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java @@ -166,7 +166,7 @@ IntWritable> mockServiceGetVertexPartitionOwner(final int numOfPartitions) { CentralizedServiceWorker service = Mockito.mock(CentralizedServiceWorker.class); - Answer answer = new Answer() { + Answer answerOwner = new Answer() { @Override public PartitionOwner answer(InvocationOnMock invocation) throws Throwable { @@ -175,7 +175,18 @@ public PartitionOwner answer(InvocationOnMock invocation) throws } }; Mockito.when(service.getVertexPartitionOwner( - Mockito.any(IntWritable.class))).thenAnswer(answer); + Mockito.any(IntWritable.class))).thenAnswer(answerOwner); + + Answer answerId = new Answer() { + @Override + public Integer answer(InvocationOnMock invocation) throws + Throwable { + IntWritable vertexId = (IntWritable) invocation.getArguments()[0]; + return vertexId.get() % numOfPartitions; + } + }; + Mockito.when(service.getPartitionId( + Mockito.any(IntWritable.class))).thenAnswer(answerId); return service; } diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/scc/SccComputationTestInMemory.java b/giraph-examples/src/test/java/org/apache/giraph/examples/scc/SccComputationTestInMemory.java index 833c43e43..28d5f5c8b 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/examples/scc/SccComputationTestInMemory.java +++ b/giraph-examples/src/test/java/org/apache/giraph/examples/scc/SccComputationTestInMemory.java @@ -54,7 +54,7 @@ public static Entry[] makeEdges(long... args) { /** * Connects the {@outgoingVertices} to the given vertex id * with null-valued edges. - * + * * @param graph * @param id * @param outgoingVertices @@ -110,9 +110,7 @@ public void testToyData() throws Exception { private Map> parse( TestGraph g) { Map> scc = new HashMap>(); - for (LongWritable v : g.getVertices().keySet()) { - Vertex vertex = g - .getVertex(v); + for (Vertex vertex : g) { long sccId = vertex.getValue().get(); List verticesIds = scc.get(sccId); if (verticesIds == null) {// New SCC