From 5f2b396e7a14e3dde7e4ddbb2a7f7536fd533b0e Mon Sep 17 00:00:00 2001 From: Dionysios Logothetis Date: Fri, 12 Jan 2018 21:00:55 -0800 Subject: [PATCH] Add Long2ByteHashMapEdges implementation --- .../giraph/edge/LongByteHashMapEdges.java | 217 ++++++++++++++++++ .../giraph/edge/LongByteHashMapEdgesTest.java | 164 +++++++++++++ 2 files changed, 381 insertions(+) create mode 100644 giraph-core/src/main/java/org/apache/giraph/edge/LongByteHashMapEdges.java create mode 100644 giraph-core/src/test/java/org/apache/giraph/edge/LongByteHashMapEdgesTest.java diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongByteHashMapEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongByteHashMapEdges.java new file mode 100644 index 000000000..fd5f2783e --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongByteHashMapEdges.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.edge; + +import it.unimi.dsi.fastutil.longs.Long2ByteMap; +import it.unimi.dsi.fastutil.longs.Long2ByteOpenHashMap; +import it.unimi.dsi.fastutil.objects.ObjectIterator; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.giraph.utils.EdgeIterables; +import org.apache.giraph.utils.Trimmable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.LongWritable; + +import com.google.common.collect.UnmodifiableIterator; + +/** + * {@link OutEdges} implementation with long ids and byte edge values, + * backed by a {@link Long2ByteOpenHashMap}. + * Parallel edges are not allowed. + * Note: this implementation is optimized for fast random access and mutations, + * and uses less space than a generic {@link HashMapEdges}. + */ +public class LongByteHashMapEdges + implements StrictRandomAccessOutEdges, + ReuseObjectsOutEdges, + MutableOutEdges, Trimmable { + /** Hash map from target vertex id to edge value. */ + private Long2ByteOpenHashMap edgeMap; + /** Representative edge value object, used by getEdgeValue(). */ + private ByteWritable representativeEdgeValue; + + @Override + public void initialize(Iterable> edges) { + EdgeIterables.initialize(this, edges); + } + + @Override + public void initialize(int capacity) { + edgeMap = new Long2ByteOpenHashMap(capacity); + } + + @Override + public void initialize() { + edgeMap = new Long2ByteOpenHashMap(); + } + + @Override + public void add(Edge edge) { + edgeMap.put(edge.getTargetVertexId().get(), edge.getValue().get()); + } + + @Override + public void remove(LongWritable targetVertexId) { + edgeMap.remove(targetVertexId.get()); + } + + @Override + public ByteWritable getEdgeValue(LongWritable targetVertexId) { + if (!edgeMap.containsKey(targetVertexId.get())) { + return null; + } + if (representativeEdgeValue == null) { + representativeEdgeValue = new ByteWritable(); + } + representativeEdgeValue.set(edgeMap.get(targetVertexId.get())); + return representativeEdgeValue; + } + + @Override + public void setEdgeValue(LongWritable targetVertexId, + ByteWritable edgeValue) { + if (edgeMap.containsKey(targetVertexId.get())) { + edgeMap.put(targetVertexId.get(), edgeValue.get()); + } + } + + @Override + public int size() { + return edgeMap.size(); + } + + @Override + public Iterator> iterator() { + // Returns an iterator that reuses objects. + return new UnmodifiableIterator>() { + /** Wrapped map iterator. */ + private final ObjectIterator mapIterator = + edgeMap.long2ByteEntrySet().fastIterator(); + /** Representative edge object. */ + private final ReusableEdge + representativeEdge = + EdgeFactory.createReusable(new LongWritable(), new ByteWritable()); + + @Override + public boolean hasNext() { + return mapIterator.hasNext(); + } + + @Override + public Edge next() { + Long2ByteMap.Entry nextEntry = mapIterator.next(); + representativeEdge.getTargetVertexId().set(nextEntry.getLongKey()); + representativeEdge.getValue().set(nextEntry.getByteValue()); + return representativeEdge; + } + }; + } + + @Override + public void trim() { + edgeMap.trim(); + } + + /** Helper class for a mutable edge that modifies the backing map entry. */ + private static class LongByteHashMapMutableEdge + extends DefaultEdge { + /** Backing entry for the edge in the map. */ + private Long2ByteMap.Entry entry; + + /** Constructor. */ + public LongByteHashMapMutableEdge() { + super(new LongWritable(), new ByteWritable()); + } + + /** + * Make the edge point to the given entry in the backing map. + * + * @param entry Backing entry + */ + public void setEntry(Long2ByteMap.Entry entry) { + // Update the id and value objects from the superclass. + getTargetVertexId().set(entry.getLongKey()); + getValue().set(entry.getByteValue()); + // Update the entry. + this.entry = entry; + } + + @Override + public void setValue(ByteWritable value) { + // Update the value object from the superclass. + getValue().set(value.get()); + // Update the value stored in the backing map. + entry.setValue(value.get()); + } + } + + @Override + public Iterator> mutableIterator() { + return new Iterator>() { + /** + * Wrapped map iterator. + * Note: we cannot use the fast iterator in this case, + * because we need to call setValue() on an entry. + */ + private final ObjectIterator mapIterator = + edgeMap.long2ByteEntrySet().iterator(); + /** Representative edge object. */ + private final LongByteHashMapMutableEdge representativeEdge = + new LongByteHashMapMutableEdge(); + + @Override + public boolean hasNext() { + return mapIterator.hasNext(); + } + + @Override + public MutableEdge next() { + representativeEdge.setEntry(mapIterator.next()); + return representativeEdge; + } + + @Override + public void remove() { + mapIterator.remove(); + } + }; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(edgeMap.size()); + for (Long2ByteMap.Entry entry : edgeMap.long2ByteEntrySet()) { + out.writeLong(entry.getLongKey()); + out.writeByte(entry.getByteValue()); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + int numEdges = in.readInt(); + initialize(numEdges); + for (int i = 0; i < numEdges; ++i) { + edgeMap.put(in.readLong(), in.readByte()); + } + } +} diff --git a/giraph-core/src/test/java/org/apache/giraph/edge/LongByteHashMapEdgesTest.java b/giraph-core/src/test/java/org/apache/giraph/edge/LongByteHashMapEdgesTest.java new file mode 100644 index 000000000..d4de07d75 --- /dev/null +++ b/giraph-core/src/test/java/org/apache/giraph/edge/LongByteHashMapEdgesTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.edge; + +import com.google.common.collect.Lists; +import org.apache.giraph.utils.ExtendedDataOutput; +import org.apache.giraph.utils.UnsafeByteArrayInputStream; +import org.apache.giraph.utils.UnsafeByteArrayOutputStream; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.LongWritable; +import org.junit.Assert; +import org.junit.Test; + +import java.io.DataInput; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + + +public class LongByteHashMapEdgesTest { + private static Edge createEdge(long id, byte value) { + return EdgeFactory.create(new LongWritable(id), new ByteWritable(value)); + } + + private static void assertEdges(LongByteHashMapEdges edges, long[] expectedIds, + byte[] expectedValues) { + Assert.assertEquals(expectedIds.length, edges.size()); + for (int i = 0; i< expectedIds.length; i++) { + ByteWritable value = edges.getEdgeValue(new LongWritable(expectedIds[i])); + assertNotNull(value); + assertEquals(expectedValues[i], value.get()); + } + } + + @Test + public void testEdges() { + LongByteHashMapEdges edges = new LongByteHashMapEdges(); + + List> initialEdges = Lists.newArrayList( + createEdge(1, (byte) 99), createEdge(2, (byte) 77), createEdge(4, (byte) 66)); + + edges.initialize(initialEdges); + assertEdges(edges, new long[]{1, 2, 4}, new byte[]{99, 77, 66}); + + edges.add(EdgeFactory.createReusable(new LongWritable(3), new ByteWritable((byte) 55))); + assertEdges(edges, new long[]{1, 2, 3, 4}, new byte[]{99, 77, 55, 66}); + + edges.remove(new LongWritable(2)); + assertEdges(edges, new long[]{1, 3, 4}, new byte[]{99, 55, 66}); + } + + @Test + public void testMutateEdges() { + LongByteHashMapEdges edges = new LongByteHashMapEdges(); + + edges.initialize(); + + // Add 10 edges with id and value set to i, for i = 0..9 + for (int i = 0; i < 10; ++i) { + edges.add(createEdge(i, (byte) i)); + } + + // Use the mutable iterator to remove edges with even id + Iterator> edgeIt = + edges.mutableIterator(); + while (edgeIt.hasNext()) { + if (edgeIt.next().getTargetVertexId().get() % 2 == 0) { + edgeIt.remove(); + } + } + + // We should now have 5 edges + assertEquals(5, edges.size()); + // The edge ids should be all odd + for (Edge edge : edges) { + assertEquals(1, edge.getTargetVertexId().get() % 2); + assertEquals(1, edge.getValue().get() % 2); + } + } + + @Test + public void testSerialization() throws IOException { + LongByteHashMapEdges edges = new LongByteHashMapEdges(); + + edges.initialize(); + + // Add 10 edges with id and value set to i, for i = 0..9 + for (int i = 0; i < 10; ++i) { + edges.add(createEdge(i, (byte) i)); + } + + edges.trim(); + + // Use the mutable iterator to remove edges with even id + Iterator> edgeIt = + edges.mutableIterator(); + while (edgeIt.hasNext()) { + if (edgeIt.next().getTargetVertexId().get() % 2 == 0) { + edgeIt.remove(); + } + } + + // We should now have 5 edges + assertEdges(edges, new long[]{1, 3, 5, 7, 9}, new byte[]{1, 3, 5, 7, 9}); + + ExtendedDataOutput tempBuffer = new UnsafeByteArrayOutputStream(); + + edges.write(tempBuffer); + + DataInput input = new UnsafeByteArrayInputStream( + tempBuffer.getByteArray(), 0, tempBuffer.getPos()); + + edges = new LongByteHashMapEdges(); + edges.readFields(input); + + assertEquals(5, edges.size()); + + for (Edge edge : edges) { + assertEquals(1, edge.getTargetVertexId().get() % 2); + assertEquals(1, edge.getValue().get() % 2); + } + } + + /** + * This implementation does not allow parallel edges. + */ + @Test + public void testParallelEdges() { + LongByteHashMapEdges edges = new LongByteHashMapEdges(); + + List> initialEdges = Lists.newArrayList( + createEdge(2, (byte) 1), createEdge(2, (byte) 2), createEdge(2, (byte) 3)); + + edges.initialize(initialEdges); + assertEquals(1, edges.size()); + + edges.remove(new LongWritable(2)); + assertEquals(0, edges.size()); + + edges.add(EdgeFactory.create(new LongWritable(2), new ByteWritable((byte) 4))); + assertEquals(1, edges.size()); + + edges.trim(); + assertEquals(1, edges.size()); + } +}