Skip to content
This repository has been archived by the owner on Oct 30, 2023. It is now read-only.

Commit

Permalink
unsafe byte readers/writers
Browse files Browse the repository at this point in the history
Summary: using unsafe readers/writers

Test Plan:
tested on PageRank app, and Fanout computation. In both cases, there is a ~20% speedup

JIRA: https://issues.apache.org/jira/browse/GIRAPH-1049

Reviewers: sergey.edunov, maja.kabiljo, dionysis.logothetis, ikabiljo

Reviewed By: ikabiljo

Differential Revision: https://reviews.facebook.net/D55509
  • Loading branch information
spupyrev authored and Igor Kabiljo committed Mar 15, 2016
1 parent fafecee commit 4170eeb
Showing 1 changed file with 29 additions and 17 deletions.
Expand Up @@ -34,8 +34,8 @@
import org.apache.giraph.factories.MessageValueFactory;
import org.apache.giraph.types.ops.TypeOps;
import org.apache.giraph.types.ops.TypeOpsUtils;
import org.apache.giraph.utils.ExtendedByteArrayDataInput;
import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
import org.apache.giraph.utils.ExtendedDataInput;
import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.Writable;
Expand Down Expand Up @@ -69,7 +69,7 @@ interface InternalMessageStore
abstract class InternalConcurrentMessageStore
<I extends WritableComparable, M extends Writable, R>
implements InternalMessageStore<I, M> {
protected final ConcurrentHashMap<I, R> received =
private final ConcurrentHashMap<I, R> received =
new ConcurrentHashMap<>();

private final Class<I> idClass;
Expand Down Expand Up @@ -102,6 +102,10 @@ R getReceiverFor(I id) {
return value;
}

R removeFor(I id) {
return received.remove(id);
}

abstract R createNewReceiver();

@Override
Expand All @@ -118,8 +122,9 @@ public void sendMessageToMultipleEdges(Iterator<I> idIter, M message) {

public static <I extends WritableComparable, M extends Writable>
InternalMessageStore<I, M> createMessageStore(
final ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
final MessageClasses<I, M> messageClasses) {
final ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
final MessageClasses<I, M> messageClasses
) {
MessageCombiner<? super I, M> combiner =
messageClasses.createMessageCombiner(conf);
if (combiner != null) {
Expand All @@ -132,8 +137,9 @@ InternalMessageStore<I, M> createMessageStore(
messageClasses.createMessageValueFactory(conf));
} else {
return new InternalByteMessageStore<>(
conf.getVertexIdClass(),
messageClasses.createMessageValueFactory(conf));
conf.getVertexIdClass(),
messageClasses.createMessageValueFactory(conf),
conf);
}
}

Expand Down Expand Up @@ -175,7 +181,7 @@ public InternalCombinerMessageStore(Class<I> idClass,

@Override
public Iterable<M> takeMessages(I id) {
M message = received.remove(id);
M message = removeFor(id);
if (message != null) {
return Collections.singleton(message);
} else {
Expand Down Expand Up @@ -206,27 +212,33 @@ M createNewReceiver() {
static class InternalByteMessageStore
<I extends WritableComparable, M extends Writable>
extends InternalConcurrentMessageStore<I, M,
ExtendedByteArrayDataOutput> {
ExtendedDataOutput> {
private final MessageValueFactory<M> messageFactory;
private final ImmutableClassesGiraphConfiguration<I, ?, ?> conf;

public InternalByteMessageStore(
Class<I> idClass, MessageValueFactory<M> messageFactory) {
Class<I> idClass, MessageValueFactory<M> messageFactory,
ImmutableClassesGiraphConfiguration<I, ?, ?> conf
) {
super(idClass);
this.messageFactory = messageFactory;
this.conf = conf;
}

@Override
public Iterable<M> takeMessages(I id) {
final ExtendedByteArrayDataOutput out = received.remove(id);
final ExtendedDataOutput out = removeFor(id);
if (out == null) {
return null;
}

return new Iterable<M>() {
@Override
public Iterator<M> iterator() {
final ExtendedByteArrayDataInput in = new ExtendedByteArrayDataInput(
out.getByteArray(), 0, out.getPos());
final ExtendedDataInput in = conf.createExtendedDataInput(
out.getByteArray(), 0, out.getPos()
);

final M message = messageFactory.newInstance();
return new AbstractIterator<M>() {
@Override
Expand All @@ -248,7 +260,7 @@ protected M computeNext() {

@Override
public void sendMessage(I id, M message) {
ExtendedByteArrayDataOutput out = getReceiverFor(id);
ExtendedDataOutput out = getReceiverFor(id);

synchronized (out) {
try {
Expand All @@ -260,8 +272,8 @@ public void sendMessage(I id, M message) {
}

@Override
ExtendedByteArrayDataOutput createNewReceiver() {
return new ExtendedByteArrayDataOutput();
ExtendedDataOutput createNewReceiver() {
return conf.createExtendedDataOutput();
}
}

Expand All @@ -285,7 +297,7 @@ public InternalSharedByteMessageStore(

@Override
public Iterable<M> takeMessages(I id) {
final List<byte[]> out = received.remove(id);
final List<byte[]> out = removeFor(id);
if (out == null) {
return null;
}
Expand Down

0 comments on commit 4170eeb

Please sign in to comment.