Skip to content
Permalink
Browse files
GIRAPH-1167
closes #56
  • Loading branch information
dlogothetis authored and Maja Kabiljo committed Jan 19, 2018
1 parent 8e2df4f commit f944f5cc3dd7874dac049e0c8c4d7212df781452
Show file tree
Hide file tree
Showing 2 changed files with 381 additions and 0 deletions.
@@ -0,0 +1,217 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.giraph.edge;

import it.unimi.dsi.fastutil.longs.Long2ByteMap;
import it.unimi.dsi.fastutil.longs.Long2ByteOpenHashMap;
import it.unimi.dsi.fastutil.objects.ObjectIterator;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;

import org.apache.giraph.utils.EdgeIterables;
import org.apache.giraph.utils.Trimmable;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.LongWritable;

import com.google.common.collect.UnmodifiableIterator;

/**
* {@link OutEdges} implementation with long ids and byte edge values,
* backed by a {@link Long2ByteOpenHashMap}.
* Parallel edges are not allowed.
* Note: this implementation is optimized for fast random access and mutations,
* and uses less space than a generic {@link HashMapEdges}.
*/
public class LongByteHashMapEdges
implements StrictRandomAccessOutEdges<LongWritable, ByteWritable>,
ReuseObjectsOutEdges<LongWritable, ByteWritable>,
MutableOutEdges<LongWritable, ByteWritable>, Trimmable {
/** Hash map from target vertex id to edge value. */
private Long2ByteOpenHashMap edgeMap;
/** Representative edge value object, used by getEdgeValue(). */
private ByteWritable representativeEdgeValue;

@Override
public void initialize(Iterable<Edge<LongWritable, ByteWritable>> edges) {
EdgeIterables.initialize(this, edges);
}

@Override
public void initialize(int capacity) {
edgeMap = new Long2ByteOpenHashMap(capacity);
}

@Override
public void initialize() {
edgeMap = new Long2ByteOpenHashMap();
}

@Override
public void add(Edge<LongWritable, ByteWritable> edge) {
edgeMap.put(edge.getTargetVertexId().get(), edge.getValue().get());
}

@Override
public void remove(LongWritable targetVertexId) {
edgeMap.remove(targetVertexId.get());
}

@Override
public ByteWritable getEdgeValue(LongWritable targetVertexId) {
if (!edgeMap.containsKey(targetVertexId.get())) {
return null;
}
if (representativeEdgeValue == null) {
representativeEdgeValue = new ByteWritable();
}
representativeEdgeValue.set(edgeMap.get(targetVertexId.get()));
return representativeEdgeValue;
}

@Override
public void setEdgeValue(LongWritable targetVertexId,
ByteWritable edgeValue) {
if (edgeMap.containsKey(targetVertexId.get())) {
edgeMap.put(targetVertexId.get(), edgeValue.get());
}
}

@Override
public int size() {
return edgeMap.size();
}

@Override
public Iterator<Edge<LongWritable, ByteWritable>> iterator() {
// Returns an iterator that reuses objects.
return new UnmodifiableIterator<Edge<LongWritable, ByteWritable>>() {
/** Wrapped map iterator. */
private final ObjectIterator<Long2ByteMap.Entry> mapIterator =
edgeMap.long2ByteEntrySet().fastIterator();
/** Representative edge object. */
private final ReusableEdge<LongWritable, ByteWritable>
representativeEdge =
EdgeFactory.createReusable(new LongWritable(), new ByteWritable());

@Override
public boolean hasNext() {
return mapIterator.hasNext();
}

@Override
public Edge<LongWritable, ByteWritable> next() {
Long2ByteMap.Entry nextEntry = mapIterator.next();
representativeEdge.getTargetVertexId().set(nextEntry.getLongKey());
representativeEdge.getValue().set(nextEntry.getByteValue());
return representativeEdge;
}
};
}

@Override
public void trim() {
edgeMap.trim();
}

/** Helper class for a mutable edge that modifies the backing map entry. */
private static class LongByteHashMapMutableEdge
extends DefaultEdge<LongWritable, ByteWritable> {
/** Backing entry for the edge in the map. */
private Long2ByteMap.Entry entry;

/** Constructor. */
public LongByteHashMapMutableEdge() {
super(new LongWritable(), new ByteWritable());
}

/**
* Make the edge point to the given entry in the backing map.
*
* @param entry Backing entry
*/
public void setEntry(Long2ByteMap.Entry entry) {
// Update the id and value objects from the superclass.
getTargetVertexId().set(entry.getLongKey());
getValue().set(entry.getByteValue());
// Update the entry.
this.entry = entry;
}

@Override
public void setValue(ByteWritable value) {
// Update the value object from the superclass.
getValue().set(value.get());
// Update the value stored in the backing map.
entry.setValue(value.get());
}
}

@Override
public Iterator<MutableEdge<LongWritable, ByteWritable>> mutableIterator() {
return new Iterator<MutableEdge<LongWritable, ByteWritable>>() {
/**
* Wrapped map iterator.
* Note: we cannot use the fast iterator in this case,
* because we need to call setValue() on an entry.
*/
private final ObjectIterator<Long2ByteMap.Entry> mapIterator =
edgeMap.long2ByteEntrySet().iterator();
/** Representative edge object. */
private final LongByteHashMapMutableEdge representativeEdge =
new LongByteHashMapMutableEdge();

@Override
public boolean hasNext() {
return mapIterator.hasNext();
}

@Override
public MutableEdge<LongWritable, ByteWritable> next() {
representativeEdge.setEntry(mapIterator.next());
return representativeEdge;
}

@Override
public void remove() {
mapIterator.remove();
}
};
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(edgeMap.size());
for (Long2ByteMap.Entry entry : edgeMap.long2ByteEntrySet()) {
out.writeLong(entry.getLongKey());
out.writeByte(entry.getByteValue());
}
}

@Override
public void readFields(DataInput in) throws IOException {
int numEdges = in.readInt();
initialize(numEdges);
for (int i = 0; i < numEdges; ++i) {
edgeMap.put(in.readLong(), in.readByte());
}
}
}
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.edge;

import com.google.common.collect.Lists;
import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.LongWritable;
import org.junit.Assert;
import org.junit.Test;

import java.io.DataInput;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;


public class LongByteHashMapEdgesTest {
private static Edge<LongWritable, ByteWritable> createEdge(long id, byte value) {
return EdgeFactory.create(new LongWritable(id), new ByteWritable(value));
}

private static void assertEdges(LongByteHashMapEdges edges, long[] expectedIds,
byte[] expectedValues) {
Assert.assertEquals(expectedIds.length, edges.size());
for (int i = 0; i< expectedIds.length; i++) {
ByteWritable value = edges.getEdgeValue(new LongWritable(expectedIds[i]));
assertNotNull(value);
assertEquals(expectedValues[i], value.get());
}
}

@Test
public void testEdges() {
LongByteHashMapEdges edges = new LongByteHashMapEdges();

List<Edge<LongWritable, ByteWritable>> initialEdges = Lists.newArrayList(
createEdge(1, (byte) 99), createEdge(2, (byte) 77), createEdge(4, (byte) 66));

edges.initialize(initialEdges);
assertEdges(edges, new long[]{1, 2, 4}, new byte[]{99, 77, 66});

edges.add(EdgeFactory.createReusable(new LongWritable(3), new ByteWritable((byte) 55)));
assertEdges(edges, new long[]{1, 2, 3, 4}, new byte[]{99, 77, 55, 66});

edges.remove(new LongWritable(2));
assertEdges(edges, new long[]{1, 3, 4}, new byte[]{99, 55, 66});
}

@Test
public void testMutateEdges() {
LongByteHashMapEdges edges = new LongByteHashMapEdges();

edges.initialize();

// Add 10 edges with id and value set to i, for i = 0..9
for (int i = 0; i < 10; ++i) {
edges.add(createEdge(i, (byte) i));
}

// Use the mutable iterator to remove edges with even id
Iterator<MutableEdge<LongWritable, ByteWritable>> edgeIt =
edges.mutableIterator();
while (edgeIt.hasNext()) {
if (edgeIt.next().getTargetVertexId().get() % 2 == 0) {
edgeIt.remove();
}
}

// We should now have 5 edges
assertEquals(5, edges.size());
// The edge ids should be all odd
for (Edge<LongWritable, ByteWritable> edge : edges) {
assertEquals(1, edge.getTargetVertexId().get() % 2);
assertEquals(1, edge.getValue().get() % 2);
}
}

@Test
public void testSerialization() throws IOException {
LongByteHashMapEdges edges = new LongByteHashMapEdges();

edges.initialize();

// Add 10 edges with id and value set to i, for i = 0..9
for (int i = 0; i < 10; ++i) {
edges.add(createEdge(i, (byte) i));
}

edges.trim();

// Use the mutable iterator to remove edges with even id
Iterator<MutableEdge<LongWritable, ByteWritable>> edgeIt =
edges.mutableIterator();
while (edgeIt.hasNext()) {
if (edgeIt.next().getTargetVertexId().get() % 2 == 0) {
edgeIt.remove();
}
}

// We should now have 5 edges
assertEdges(edges, new long[]{1, 3, 5, 7, 9}, new byte[]{1, 3, 5, 7, 9});

ExtendedDataOutput tempBuffer = new UnsafeByteArrayOutputStream();

edges.write(tempBuffer);

DataInput input = new UnsafeByteArrayInputStream(
tempBuffer.getByteArray(), 0, tempBuffer.getPos());

edges = new LongByteHashMapEdges();
edges.readFields(input);

assertEquals(5, edges.size());

for (Edge<LongWritable, ByteWritable> edge : edges) {
assertEquals(1, edge.getTargetVertexId().get() % 2);
assertEquals(1, edge.getValue().get() % 2);
}
}

/**
* This implementation does not allow parallel edges.
*/
@Test
public void testParallelEdges() {
LongByteHashMapEdges edges = new LongByteHashMapEdges();

List<Edge<LongWritable, ByteWritable>> initialEdges = Lists.newArrayList(
createEdge(2, (byte) 1), createEdge(2, (byte) 2), createEdge(2, (byte) 3));

edges.initialize(initialEdges);
assertEquals(1, edges.size());

edges.remove(new LongWritable(2));
assertEquals(0, edges.size());

edges.add(EdgeFactory.create(new LongWritable(2), new ByteWritable((byte) 4)));
assertEquals(1, edges.size());

edges.trim();
assertEquals(1, edges.size());
}
}

0 comments on commit f944f5c

Please sign in to comment.