Skip to content
This repository has been archived by the owner on Oct 30, 2023. It is now read-only.

Commit

Permalink
faster maps
Browse files Browse the repository at this point in the history
Summary:
The idea is to replace HashMap<LongWritable, V> to Long2ObjectOpenHashMap<V> (and Map<Int...> to Int2Object...)
This will save space and speed up some applications.

I changed the type of such a map in TestGraph.java, which gives up to 2x speed up on an
example of page rank computation (see comment below)

JIRA: https://issues.apache.org/jira/browse/GIRAPH-1049

Test Plan: TestBasicCollections.java contain some tests

Reviewers: sergey.edunov, maja.kabiljo, dionysis.logothetis, heslami, ikabiljo

Reviewed By: heslami

Differential Revision: https://reviews.facebook.net/D55587
  • Loading branch information
spupyrev authored and Igor Kabiljo committed Aug 31, 2016
1 parent 2ae95bd commit 2117d1d
Show file tree
Hide file tree
Showing 38 changed files with 1,305 additions and 1,039 deletions.
Expand Up @@ -17,12 +17,20 @@
*/
package org.apache.giraph.block_app.framework.api.local;

import static com.google.common.base.Preconditions.checkState;

import com.google.common.base.Preconditions;

import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;

Expand All @@ -35,7 +43,8 @@
import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
import org.apache.giraph.block_app.framework.api.BlockWorkerValueAccessor;
import org.apache.giraph.block_app.framework.api.Counter;
import org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalConcurrentMessageStore;
import org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalChecksMessageStore;
import org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalWrappedMessageStore;
import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic;
import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
import org.apache.giraph.block_app.framework.output.BlockOutputDesc;
Expand All @@ -44,8 +53,10 @@
import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler.BroadcastHandleImpl;
import org.apache.giraph.comm.SendMessageCache.TargetVertexIdIterator;
import org.apache.giraph.comm.messages.PartitionSplitInfo;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.MessageClasses;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.graph.Vertex;
Expand All @@ -62,8 +73,6 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import com.google.common.base.Preconditions;

/**
* Internal implementation of Block API interfaces - representing an in-memory
* giraph instance.
Expand Down Expand Up @@ -371,17 +380,28 @@ public void afterMasterBeforeWorker(BlockWorkerPieces computation) {
previousMessages = nextMessages;
previousWorkerMessages = nextWorkerMessages;

nextMessages = InternalConcurrentMessageStore.createMessageStore(
conf, computation, runAllChecks);
nextMessages = createMessageStore(
conf,
computation.getOutgoingMessageClasses(conf),
createPartitionInfo(),
runAllChecks
);
nextWorkerMessages = new ArrayList<>();

// finalize previous messages
if (previousMessages != null) {
previousMessages.finalizeStore();
}

// process mutations:
Set<I> targets = previousMessages == null ?
Collections.EMPTY_SET : previousMessages.targetsSet();
if (createVertexOnMsgs) {
for (I target : targets) {
if (createVertexOnMsgs && previousMessages != null) {
Iterator<I> iter = previousMessages.targetVertexIds();
while (iter.hasNext()) {
I target = iter.next();
if (getPartition(target).getVertex(target) == null) {
mutations.putIfAbsent(target, new VertexMutations<I, V, E>());
// need a copy as the key might be reusable
I copyId = WritableUtils.createCopy(target);
mutations.putIfAbsent(copyId, new VertexMutations<I, V, E>());
}
}
}
Expand All @@ -393,8 +413,11 @@ public void afterMasterBeforeWorker(BlockWorkerPieces computation) {
getPartition(vertexIndex).getVertex(vertexIndex);
VertexMutations<I, V, E> curMutations = entry.getValue();
Vertex<I, V, E> vertex = vertexResolver.resolve(
vertexIndex, originalVertex, curMutations,
targets.contains(vertexIndex));
vertexIndex,
originalVertex,
curMutations,
previousMessages != null && previousMessages.hasMessage(vertexIndex)
);

if (vertex != null) {
getPartition(vertex.getId()).putVertex(vertex);
Expand All @@ -406,6 +429,76 @@ public void afterMasterBeforeWorker(BlockWorkerPieces computation) {
mutations.clear();
}

private <M extends Writable>
InternalMessageStore<I, M> createMessageStore(
ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
MessageClasses<I, M> messageClasses,
PartitionSplitInfo<I> partitionInfo,
boolean runAllChecks
) {
InternalMessageStore<I, M> messageStore =
InternalWrappedMessageStore.create(conf, messageClasses, partitionInfo);
if (runAllChecks) {
return new InternalChecksMessageStore<I, M>(
messageStore, conf, messageClasses.createMessageValueFactory(conf));
} else {
return messageStore;
}
}

private PartitionSplitInfo<I> createPartitionInfo() {
return new PartitionSplitInfo<I>() {
/** Ids of partitions */
private IntList partitionIds;
/** Queue of partitions to be precessed in a superstep */
private Queue<Partition<I, V, E>> partitionQueue;

@Override
public int getPartitionId(I vertexId) {
return partitionerFactory.getPartition(vertexId, partitions.size(), 1);
}

@Override
public Iterable<Integer> getPartitionIds() {
if (partitionIds == null) {
partitionIds = new IntArrayList(partitions.size());
for (int i = 0; i < partitions.size(); i++) {
partitionIds.add(i);
}
}
Preconditions.checkState(partitionIds.size() == partitions.size());
return partitionIds;
}

@Override
public long getPartitionVertexCount(Integer partitionId) {
return partitions.get(partitionId).getVertexCount();
}

@Override
public void startIteration() {
checkState(partitionQueue == null || partitionQueue.isEmpty(),
"startIteration: It seems that some of " +
"of the partitions from previous iteration over partition store are" +
" not yet processed.");

partitionQueue = new LinkedList<Partition<I, V, E>>();
for (Partition<I, V, E> partition : partitions) {
partitionQueue.add(partition);
}
}

@Override
public Partition getNextPartition() {
return partitionQueue.poll();
}

@Override
public void putPartition(Partition partition) {
}
};
}

public List<Partition<I, V, E>> getPartitions() {
return partitions;
}
Expand Down

0 comments on commit 2117d1d

Please sign in to comment.