From ca36f1d499a70b1aee85ac637de53f153a9f4c92 Mon Sep 17 00:00:00 2001 From: Igor Kabiljo Date: Tue, 29 Dec 2015 15:14:32 -0800 Subject: [PATCH] Use Partitions in LocalBlockRunner Summary: Speed up LocalBlockRunner, by not operating on a TestGraph, but on vertices stored in partitions. With it - deprecate old non-SimplePartitionerFactory way of specifying partitioning. (and with it renamed SimplePartitionerFactory to old name GraphPartitionerFactory, and changing it to GraphPartitionerFactoryInterface) Test Plan: Run unit-test for speed: testEmptyIterationsSmallGraph 6.5 -> 6.3 testEmptyIterationsSyntheticGraphLowDegree() 42.0 -> 13.8 testEmptyIterationsSyntheticGraphHighDegree() 3.6 -> 2.0 testPageRankSyntheticGraphLowDegree() 51.0 -> 47.2 testPageRankSyntheticGraphHighDegree() 20.3 -> 17.4 Reviewers: maja.kabiljo, sergey.edunov, dionysis.logothetis Reviewed By: dionysis.logothetis Differential Revision: https://reviews.facebook.net/D52425 --- .../api/local/TestLocalBlockRunnerSpeed.java | 144 ++++++++++++++++++ .../block_app/framework/BlockUtils.java | 7 + .../framework/api/local/InternalApi.java | 67 ++++++-- .../framework/api/local/LocalBlockRunner.java | 71 +++++---- .../framework/api/local/VertexSaver.java | 34 ----- .../framework/internal/BlockMasterLogic.java | 20 ++- .../internal/BlockWorkerContextLogic.java | 6 +- .../test_setup/graphs/SyntheticGraphInit.java | 7 - .../MultipleSimultanousMutationsTest.java | 89 +++++++++++ .../apache/giraph/conf/GiraphConstants.java | 2 +- .../SuperstepHashPartitionerFactory.java | 125 --------------- .../giraph/integration/package-info.java | 21 --- .../partition/GraphPartitionerFactory.java | 96 ++++++++++-- ... => GraphPartitionerFactoryInterface.java} | 44 +++--- .../partition/HashMasterPartitioner.java | 117 -------------- .../partition/HashPartitionerFactory.java | 23 +-- .../HashRangePartitionerFactory.java | 25 +-- .../partition/HashWorkerPartitioner.java | 77 ---------- .../LongMappingStorePartitionerFactory.java | 6 +- ...r.java => MasterGraphPartitionerImpl.java} | 13 +- .../SimpleIntRangePartitionerFactory.java | 6 +- .../SimpleLongRangePartitionerFactory.java | 6 +- .../partition/SimplePartitionerFactory.java | 121 --------------- ...r.java => WorkerGraphPartitionerImpl.java} | 4 +- .../giraph/writable/kryo/HadoopKryo.java | 4 + .../apache/giraph/TestGraphPartitioner.java | 50 +----- 26 files changed, 509 insertions(+), 676 deletions(-) create mode 100644 giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/api/local/TestLocalBlockRunnerSpeed.java delete mode 100644 giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/VertexSaver.java create mode 100644 giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/MultipleSimultanousMutationsTest.java delete mode 100644 giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java delete mode 100644 giraph-core/src/main/java/org/apache/giraph/integration/package-info.java rename giraph-core/src/main/java/org/apache/giraph/partition/{HashRangeWorkerPartitioner.java => GraphPartitionerFactoryInterface.java} (50%) delete mode 100644 giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java delete mode 100644 giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java rename giraph-core/src/main/java/org/apache/giraph/partition/{SimpleMasterPartitioner.java => MasterGraphPartitionerImpl.java} (91%) delete mode 100644 giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java rename giraph-core/src/main/java/org/apache/giraph/partition/{SimpleWorkerPartitioner.java => WorkerGraphPartitionerImpl.java} (96%) diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/api/local/TestLocalBlockRunnerSpeed.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/api/local/TestLocalBlockRunnerSpeed.java new file mode 100644 index 000000000..44145e35b --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/api/local/TestLocalBlockRunnerSpeed.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api.local; + +import org.apache.giraph.block_app.examples.pagerank.AbstractPageRankExampleBlockFactory; +import org.apache.giraph.block_app.examples.pagerank.PageRankExampleBlockFactory; +import org.apache.giraph.block_app.framework.BlockUtils; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.RepeatBlock; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.block_app.test_setup.TestGraphUtils; +import org.apache.giraph.block_app.test_setup.graphs.EachVertexInit; +import org.apache.giraph.block_app.test_setup.graphs.Small1GraphInit; +import org.apache.giraph.block_app.test_setup.graphs.SyntheticGraphInit; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +public class TestLocalBlockRunnerSpeed { + public static class EmptyPiecesBlockFactory extends AbstractPageRankExampleBlockFactory { + @Override + public Block createBlock(GiraphConfiguration conf) { + return new RepeatBlock(NUM_ITERATIONS.get(conf), new Piece<>()); + } + } + + @BeforeClass + public static void warmup() throws Exception { + TestGraphUtils.runTest( + new Small1GraphInit(), + null, + (GiraphConfiguration conf) -> { + LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false); + BlockUtils.setBlockFactoryClass(conf, EmptyPiecesBlockFactory.class); + BlockUtils.LOG_EXECUTION_STATUS.set(conf, false); + AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 1000); + }); + } + + @Test + @Ignore("use for benchmarking") + public void testEmptyIterationsSmallGraph() throws Exception { + TestGraphUtils.runTest( + new Small1GraphInit(), + null, + (GiraphConfiguration conf) -> { + LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false); + BlockUtils.setBlockFactoryClass(conf, EmptyPiecesBlockFactory.class); + BlockUtils.LOG_EXECUTION_STATUS.set(conf, false); + AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 10000); + }); + } + + @Test + @Ignore("use for benchmarking") + public void testEmptyIterationsSyntheticGraphLowDegree() throws Exception { + TestGraphUtils.runTest( + new SyntheticGraphInit(), + null, + (GiraphConfiguration conf) -> { + LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false); + BlockUtils.setBlockFactoryClass(conf, EmptyPiecesBlockFactory.class); + AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 1000); + + SyntheticGraphInit.NUM_VERTICES.set(conf, 500000); + SyntheticGraphInit.NUM_EDGES_PER_VERTEX.set(conf, 10); + SyntheticGraphInit.NUM_COMMUNITIES.set(conf, 1000); + }); + } + + @Test + @Ignore("use for benchmarking") + public void testEmptyIterationsSyntheticGraphHighDegree() throws Exception { + TestGraphUtils.runTest( + new SyntheticGraphInit(), + null, + (GiraphConfiguration conf) -> { + LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false); + BlockUtils.setBlockFactoryClass(conf, EmptyPiecesBlockFactory.class); + AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 1000); + + SyntheticGraphInit.NUM_VERTICES.set(conf, 50000); + SyntheticGraphInit.NUM_EDGES_PER_VERTEX.set(conf, 100); + SyntheticGraphInit.NUM_COMMUNITIES.set(conf, 1000); + }); + } + + @Test + @Ignore("use for benchmarking") + public void testPageRankSyntheticGraphLowDegree() throws Exception { + TestGraphUtils.runTest( + TestGraphUtils.chainModifiers( + new SyntheticGraphInit(), + new EachVertexInit<>((vertex) -> vertex.getValue().set(1.0))), + null, + (GiraphConfiguration conf) -> { + LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false); + BlockUtils.setBlockFactoryClass(conf, PageRankExampleBlockFactory.class); + AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 100); + + SyntheticGraphInit.NUM_VERTICES.set(conf, 500000); + SyntheticGraphInit.NUM_EDGES_PER_VERTEX.set(conf, 10); + SyntheticGraphInit.NUM_COMMUNITIES.set(conf, 1000); + }); + } + + @Test + @Ignore("use for benchmarking") + public void testPageRankSyntheticGraphHighDegree() throws Exception { + TestGraphUtils.runTest( + TestGraphUtils.chainModifiers( + new SyntheticGraphInit(), + new EachVertexInit<>((vertex) -> vertex.getValue().set(1.0))), + null, + (GiraphConfiguration conf) -> { + LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false); + BlockUtils.setBlockFactoryClass(conf, PageRankExampleBlockFactory.class); + AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 100); + + SyntheticGraphInit.NUM_VERTICES.set(conf, 50000); + SyntheticGraphInit.NUM_EDGES_PER_VERTEX.set(conf, 100); + SyntheticGraphInit.NUM_COMMUNITIES.set(conf, 1000); + }); + } +} diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java index e00909c2b..6bf6d92f6 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java @@ -28,6 +28,7 @@ import org.apache.giraph.block_app.framework.block.Block; import org.apache.giraph.block_app.framework.piece.AbstractPiece; import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.conf.BooleanConfOption; import org.apache.giraph.conf.ClassConfOption; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; @@ -58,6 +59,12 @@ public class BlockUtils { Object.class, Object.class, "block worker context value class"); + /** Property describing whether to log execution status as application runs */ + public static final + BooleanConfOption LOG_EXECUTION_STATUS = new BooleanConfOption( + "giraph.block_utils.log_execution_status", true, + "Log execution status (of which pieces are being executed, etc)"); + private static final Logger LOG = Logger.getLogger(BlockUtils.class); /** Dissallow constructor */ diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java index 3ca8b1c23..a4703b459 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java @@ -18,7 +18,6 @@ package org.apache.giraph.block_app.framework.api.local; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -53,6 +52,8 @@ import org.apache.giraph.graph.VertexMutations; import org.apache.giraph.graph.VertexResolver; import org.apache.giraph.master.AggregatorToGlobalCommTranslation; +import org.apache.giraph.partition.GraphPartitionerFactory; +import org.apache.giraph.partition.Partition; import org.apache.giraph.reducers.ReduceOperation; import org.apache.giraph.utils.TestGraph; import org.apache.giraph.utils.WritableUtils; @@ -74,7 +75,10 @@ @SuppressWarnings({ "rawtypes", "unchecked" }) class InternalApi implements BlockMasterApi, BlockOutputHandleAccessor { - private final TestGraph graph; + private final TestGraph inputGraph; + private final List> partitions; + private final GraphPartitionerFactory partitionerFactory; + private final ImmutableClassesGiraphConfiguration conf; private final boolean runAllChecks; private final InternalAggregators globalComm; @@ -94,8 +98,22 @@ class InternalApi graph, ImmutableClassesGiraphConfiguration conf, + int numPartitions, boolean runAllChecks) { - this.graph = graph; + this.inputGraph = graph; + this.partitions = new ArrayList<>(numPartitions); + for (int i = 0; i < numPartitions; i++) { + this.partitions.add(conf.createPartition(i, null)); + } + this.partitionerFactory = conf.createGraphPartitioner(); + Preconditions.checkNotNull(this.partitionerFactory); + Preconditions.checkState(this.partitions.size() == numPartitions); + + for (Vertex vertex : graph) { + getPartition(vertex.getId()).putVertex(vertex); + } + graph.clear(); + this.conf = conf; this.runAllChecks = runAllChecks; this.globalComm = new InternalAggregators(runAllChecks); @@ -362,8 +380,8 @@ public void afterMasterBeforeWorker(BlockWorkerPieces computation) { Collections.EMPTY_SET : previousMessages.targetsSet(); if (createVertexOnMsgs) { for (I target : targets) { - if (!graph.getVertices().containsKey(target)) { - mutations.put(target, new VertexMutations()); + if (getPartition(target).getVertex(target) == null) { + mutations.putIfAbsent(target, new VertexMutations()); } } } @@ -371,23 +389,25 @@ public void afterMasterBeforeWorker(BlockWorkerPieces computation) { VertexResolver vertexResolver = conf.createVertexResolver(); for (Map.Entry> entry : mutations.entrySet()) { I vertexIndex = entry.getKey(); - Vertex originalVertex = graph.getVertex(vertexIndex); + Vertex originalVertex = + getPartition(vertexIndex).getVertex(vertexIndex); VertexMutations curMutations = entry.getValue(); Vertex vertex = vertexResolver.resolve( vertexIndex, originalVertex, curMutations, targets.contains(vertexIndex)); if (vertex != null) { - graph.addVertex(vertex); + getPartition(vertex.getId()).putVertex(vertex); } else if (originalVertex != null) { - graph.getVertices().remove(originalVertex.getId()); + getPartition(originalVertex.getId()).removeVertex( + originalVertex.getId()); } } mutations.clear(); } - public Collection> getAllVertices() { - return graph.getVertices().values(); + public List> getPartitions() { + return partitions; } public InternalWorkerApi getWorkerApi() { @@ -397,15 +417,19 @@ public InternalWorkerApi getWorkerApi() { @Override public long getTotalNumEdges() { int numEdges = 0; - for (Vertex vertex : graph.getVertices().values()) { - numEdges += vertex.getNumEdges(); + for (Partition partition : partitions) { + numEdges += partition.getEdgeCount(); } return numEdges; } @Override public long getTotalNumVertices() { - return graph.getVertices().size(); + int numVertices = 0; + for (Partition partition : partitions) { + numVertices += partition.getVertexCount(); + } + return numVertices; } @Override @@ -438,4 +462,21 @@ public BlockWorkerContextLogic getWorkerContextLogic() { public int getWorkerCount() { return 1; } + + private int getPartitionId(I id) { + Preconditions.checkNotNull(id); + return partitionerFactory.getPartition(id, partitions.size(), 1); + } + + private Partition getPartition(I id) { + return partitions.get(getPartitionId(id)); + } + + public void postApplication() { + for (Partition partition : partitions) { + for (Vertex vertex : partition) { + inputGraph.setVertex(vertex); + } + } + } } diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java index d582cb2a4..90aa8a298 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java @@ -17,10 +17,8 @@ */ package org.apache.giraph.block_app.framework.api.local; -import java.util.ArrayList; -import java.util.Collection; +import java.io.IOException; import java.util.List; -import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -40,8 +38,11 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.conf.IntConfOption; import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.SimpleVertexWriter; +import org.apache.giraph.partition.Partition; import org.apache.giraph.utils.InternalVertexRunner; import org.apache.giraph.utils.TestGraph; +import org.apache.giraph.utils.Trimmable; import org.apache.giraph.utils.WritableUtils; import org.apache.giraph.writable.kryo.KryoWritableWrapper; import org.apache.hadoop.io.Writable; @@ -101,7 +102,7 @@ TestGraph runApp( public static void runApp(TestGraph graph) { - VertexSaver noOpVertexSaver = noOpVertexSaver(); + SimpleVertexWriter noOpVertexSaver = noOpVertexSaver(); runAppWithVertexOutput(graph, noOpVertexSaver); } @@ -113,7 +114,7 @@ void runApp(TestGraph graph) { void runBlock( TestGraph graph, Block block, Object executionStage) { - VertexSaver noOpVertexSaver = noOpVertexSaver(); + SimpleVertexWriter noOpVertexSaver = noOpVertexSaver(); runBlockWithVertexOutput( block, executionStage, graph, noOpVertexSaver); } @@ -126,7 +127,7 @@ void runBlock( public static void runAppWithVertexOutput( - TestGraph graph, final VertexSaver vertexSaver) { + TestGraph graph, final SimpleVertexWriter vertexSaver) { BlockFactory factory = BlockUtils.createBlockFactory(graph.getConf()); runBlockWithVertexOutput( factory.createBlock(graph.getConf()), @@ -142,18 +143,18 @@ void runAppWithVertexOutput( void runBlockWithVertexOutput( Block block, Object executionStage, TestGraph graph, - final VertexSaver vertexSaver + final SimpleVertexWriter vertexSaver ) { Preconditions.checkNotNull(block); Preconditions.checkNotNull(graph); ImmutableClassesGiraphConfiguration conf = graph.getConf(); - int numWorkers = NUM_THREADS.get(conf); + int numPartitions = NUM_THREADS.get(conf); boolean runAllChecks = RUN_ALL_CHECKS.get(conf); boolean serializeMaster = SERIALIZE_MASTER.get(conf); final boolean doOutputDuringComputation = conf.doOutputDuringComputation(); final InternalApi internalApi = - new InternalApi(graph, conf, runAllChecks); + new InternalApi(graph, conf, numPartitions, runAllChecks); final InternalWorkerApi internalWorkerApi = internalApi.getWorkerApi(); BlockUtils.checkBlockTypes(block, executionStage, conf); @@ -170,8 +171,7 @@ public void progress() { } })); - ExecutorService executor = Executors.newFixedThreadPool(numWorkers); - Random rand = new Random(); + ExecutorService executor = Executors.newFixedThreadPool(numPartitions); if (runAllChecks) { for (Vertex vertex : graph) { @@ -204,9 +204,15 @@ public void progress() { blockMasterLogic.computeNext(superstep); if (workerPieces == null) { if (!conf.doOutputDuringComputation()) { - Collection> vertices = internalApi.getAllVertices(); - for (Vertex vertex : vertices) { - vertexSaver.saveVertex(vertex); + List> partitions = internalApi.getPartitions(); + for (Partition partition : partitions) { + for (Vertex vertex : partition) { + try { + vertexSaver.writeVertex(vertex); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } } } int left = executor.shutdownNow().size(); @@ -214,14 +220,7 @@ public void progress() { break; } else { internalApi.afterMasterBeforeWorker(workerPieces); - List>> verticesPerWorker = new ArrayList<>(); - for (int i = 0; i < numWorkers; i++) { - verticesPerWorker.add(new ArrayList>()); - } - Collection> allVertices = internalApi.getAllVertices(); - for (Vertex vertex : allVertices) { - verticesPerWorker.get(rand.nextInt(numWorkers)).add(vertex); - } + List> partitions = internalApi.getPartitions(); workerContextLogic.preSuperstep( internalWorkerApi, @@ -229,10 +228,10 @@ public void progress() { KryoWritableWrapper.wrapAndCopy(workerPieces), superstep, internalApi.takeWorkerMessages()); - final CountDownLatch latch = new CountDownLatch(numWorkers); + final CountDownLatch latch = new CountDownLatch(numPartitions); final AtomicReference exception = new AtomicReference<>(); anyVertexAlive.set(false); - for (final List> curVertices : verticesPerWorker) { + for (final Partition partition : partitions) { executor.execute(new Runnable() { @Override public void run() { @@ -244,16 +243,28 @@ public void run() { BlockWorkerLogic localLogic = new BlockWorkerLogic(localPieces); localLogic.preSuperstep(internalWorkerApi, internalWorkerApi); - for (Vertex vertex : curVertices) { + for (Vertex vertex : partition) { Iterable messages = internalApi.takeMessages(vertex.getId()); if (vertex.isHalted() && !Iterables.isEmpty(messages)) { vertex.wakeUp(); } + // Equivalent of ComputeCallable.computePartition if (!vertex.isHalted()) { localLogic.compute(vertex, messages); + + // Need to unwrap the mutated edges (possibly) + vertex.unwrapMutableEdges(); + //Compact edges representation if possible + if (vertex instanceof Trimmable) { + ((Trimmable) vertex).trim(); + } + // Write vertex to superstep output + // (no-op if it is not used) if (doOutputDuringComputation) { - vertexSaver.saveVertex(vertex); + vertexSaver.writeVertex(vertex); } + // Need to save the vertex changes (possibly) + partition.saveVertex(vertex); } if (!vertex.isHalted()) { @@ -295,14 +306,16 @@ public void run() { } workerContextLogic.postApplication(); + internalApi.postApplication(); } private static - VertexSaver noOpVertexSaver() { - return new VertexSaver() { + SimpleVertexWriter noOpVertexSaver() { + return new SimpleVertexWriter() { @Override - public void saveVertex(Vertex vertex) { + public void writeVertex(Vertex vertex) + throws IOException, InterruptedException { // No-op } }; diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/VertexSaver.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/VertexSaver.java deleted file mode 100644 index 0053644de..000000000 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/VertexSaver.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.block_app.framework.api.local; - -import org.apache.giraph.graph.Vertex; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -/** - * Interface to use for saving vertices - * - * @param Vertex id - * @param Vertex value - * @param Edge value - */ -public interface VertexSaver { - void saveVertex(Vertex vertex); -} diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java index 4892a33f3..a52bb7714 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java @@ -126,8 +126,12 @@ public BlockWorkerPieces computeNext(long superstep) { postApplication(); return null; } else { - LOG.info( - "Master executing " + previousPiece + ", in superstep " + superstep); + boolean logExecutionStatus = + BlockUtils.LOG_EXECUTION_STATUS.get(masterApi.getConf()); + if (logExecutionStatus) { + LOG.info("Master executing " + previousPiece + + ", in superstep " + superstep); + } previousPiece.masterCompute(masterApi); ((BlockOutputHandleAccessor) masterApi).getBlockOutputHandle(). returnAllWriters(); @@ -149,8 +153,10 @@ public BlockWorkerPieces computeNext(long superstep) { BlockCounters.setStageCounters( "Master finished stage: ", previousPiece.getExecutionStage(), masterApi); - LOG.info( - "Master passing next " + nextPiece + ", in superstep " + superstep); + if (logExecutionStatus) { + LOG.info( + "Master passing next " + nextPiece + ", in superstep " + superstep); + } // if there is nothing more to compute, no need for additional superstep // this can only happen if application uses no pieces. @@ -160,8 +166,10 @@ public BlockWorkerPieces computeNext(long superstep) { result = null; } else { result = new BlockWorkerPieces<>(previousPiece, nextPiece); - LOG.info("Master in " + superstep + " superstep passing " + - result + " to be executed"); + if (logExecutionStatus) { + LOG.info("Master in " + superstep + " superstep passing " + + result + " to be executed"); + } } previousPiece = nextPiece; diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java index 8b8e1748e..ca2bb5ab0 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java @@ -65,8 +65,10 @@ public void preSuperstep( BlockWorkerContextSendApi sendApi, BlockWorkerPieces workerPieces, long superstep, List messages) { - LOG.info("Worker executing " + workerPieces + " in " + superstep + - " superstep"); + if (BlockUtils.LOG_EXECUTION_STATUS.get(receiveApi.getConf())) { + LOG.info("Worker executing " + workerPieces + " in " + superstep + + " superstep"); + } this.sendApi = sendApi; this.workerPieces = workerPieces; if (workerPieces.getReceiver() != null) { diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SyntheticGraphInit.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SyntheticGraphInit.java index 3de158a91..cd485b402 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SyntheticGraphInit.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SyntheticGraphInit.java @@ -58,7 +58,6 @@ public SyntheticGraphInit() { this.edgeSupplier = null; } - @Override public void modifyGraph(NumericTestGraph graph) { GiraphConfiguration conf = graph.getConf(); @@ -84,11 +83,5 @@ public void modifyGraph(NumericTestGraph graph) { i, j, edgeSupplier != null ? edgeSupplier.get() : null); } } - -// if (vertexModifier != null) { -// for (int i = 0; i < numVertices; i++) { -// vertexModifier.modifyVertexValue(i, graph.getVertex(i).getValue()); -// } -// } } } diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/MultipleSimultanousMutationsTest.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/MultipleSimultanousMutationsTest.java new file mode 100644 index 000000000..e2c316e48 --- /dev/null +++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/MultipleSimultanousMutationsTest.java @@ -0,0 +1,89 @@ +package org.apache.giraph.block_app.framework; + +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.block_app.test_setup.NumericTestGraph; +import org.apache.giraph.block_app.test_setup.TestGraphChecker; +import org.apache.giraph.block_app.test_setup.TestGraphModifier; +import org.apache.giraph.block_app.test_setup.TestGraphUtils; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.ReusableEdge; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test when vertex gets multiple simultaneous mutations + * (i.e. to non-existent vertex, send a message and do add edge request) + * and confirm all mutations are correctly processed + */ +public class MultipleSimultanousMutationsTest { + @Test + public void createVertexOnMsgsTest() throws Exception { + TestGraphUtils.runTest( + new TestGraphModifier() { + @Override + public void modifyGraph(NumericTestGraph graph) { + graph.addEdge(1, 2, 2); + } + }, + new TestGraphChecker() { + @Override + public void checkOutput(NumericTestGraph graph) { + Assert.assertEquals(1, graph.getVertex(1).getNumEdges()); + Assert.assertNull(graph.getVertex(1).getEdgeValue(new LongWritable(-1))); + Assert.assertEquals(2, graph.getVertex(1).getEdgeValue(new LongWritable(2)).get()); + + Assert.assertEquals(1, graph.getVertex(2).getNumEdges()); + Assert.assertEquals(-1, graph.getVertex(2).getEdgeValue(new LongWritable(-1)).get()); + } + }, + new BulkConfigurator() { + @Override + public void configure(GiraphConfiguration conf) { + BlockUtils.setBlockFactoryClass(conf, SendingAndAddEdgeBlockFactory.class); + } + }); + } + + public static class SendingAndAddEdgeBlockFactory extends TestLongNullNullBlockFactory { + @Override + protected Class getEdgeValueClass(GiraphConfiguration conf) { + return LongWritable.class; + } + + @Override + public Block createBlock(GiraphConfiguration conf) { + return new Piece() { + @Override + protected Class getMessageClass() { + return NullWritable.class; + } + + @Override + public VertexSender getVertexSender( + final BlockWorkerSendApi workerApi, + Object executionStage) { + final ReusableEdge reusableEdge = workerApi.getConf().createReusableEdge(); + reusableEdge.setTargetVertexId(new LongWritable(-1)); + reusableEdge.setValue(new LongWritable(-1)); + return new VertexSender() { + @Override + public void vertexSend(Vertex vertex) { + for (Edge edge : vertex.getEdges()) { + workerApi.addEdgeRequest(edge.getTargetVertexId(), reusableEdge); + workerApi.sendMessage(edge.getTargetVertexId(), NullWritable.get()); + } + } + }; + } + }; + } + } +} diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index e74703efb..8ad3767fc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -873,7 +873,7 @@ public interface GiraphConstants { "Overrides default partition count calculation if not -1"); /** Vertex key space size for - * {@link org.apache.giraph.partition.SimpleWorkerPartitioner} + * {@link org.apache.giraph.partition.WorkerGraphPartitionerImpl} */ String PARTITION_VERTEX_KEY_SPACE_SIZE = "giraph.vertexKeySpaceSize"; diff --git a/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java deleted file mode 100644 index ebc62f6d0..000000000 --- a/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.integration; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.worker.WorkerInfo; -import org.apache.giraph.partition.BasicPartitionOwner; -import org.apache.giraph.partition.HashMasterPartitioner; -import org.apache.giraph.partition.HashPartitionerFactory; -import org.apache.giraph.partition.MasterGraphPartitioner; -import org.apache.giraph.partition.PartitionOwner; -import org.apache.giraph.partition.PartitionStats; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.log4j.Logger; - -/** - * Example graph partitioner that builds on {@link HashMasterPartitioner} to - * send the partitions to the worker that matches the superstep. It is for - * testing only and should never be used in practice. - * - * @param Vertex id - * @param Vertex value - * @param Edge value - */ -@SuppressWarnings("rawtypes") -public class SuperstepHashPartitionerFactory - extends HashPartitionerFactory { - /** - * Changes the {@link HashMasterPartitioner} to make ownership of the - * partitions based on a superstep. For testing only as it is totally - * unbalanced. - * - * @param vertex id - * @param vertex data - * @param edge data - */ - private static class SuperstepMasterPartition - extends HashMasterPartitioner { - /** Class logger */ - private static Logger LOG = - Logger.getLogger(SuperstepMasterPartition.class); - - /** - * Construction with configuration. - * - * @param conf Configuration to be stored. - */ - public SuperstepMasterPartition(ImmutableClassesGiraphConfiguration conf) { - super(conf); - } - - @Override - public Collection generateChangedPartitionOwners( - Collection allPartitionStatsList, - Collection availableWorkerInfos, - int maxWorkers, - long superstep) { - // Assign all the partitions to - // superstep mod availableWorkerInfos - // Guaranteed to be different if the workers (and their order) - // do not change - long workerIndex = superstep % availableWorkerInfos.size(); - int i = 0; - WorkerInfo chosenWorkerInfo = null; - for (WorkerInfo workerInfo : availableWorkerInfos) { - if (workerIndex == i) { - chosenWorkerInfo = workerInfo; - } - ++i; - } - if (LOG.isInfoEnabled()) { - LOG.info("generateChangedPartitionOwners: Chosen worker " + - "for superstep " + superstep + " is " + - chosenWorkerInfo); - } - - List partitionOwnerList = new ArrayList(); - for (PartitionOwner partitionOwner : - getCurrentPartitionOwners()) { - WorkerInfo prevWorkerinfo = - partitionOwner.getWorkerInfo().equals(chosenWorkerInfo) ? - null : partitionOwner.getWorkerInfo(); - PartitionOwner tmpPartitionOwner = - new BasicPartitionOwner(partitionOwner.getPartitionId(), - chosenWorkerInfo, - prevWorkerinfo, - null); - partitionOwnerList.add(tmpPartitionOwner); - LOG.info("partition owner was " + partitionOwner + - ", new " + tmpPartitionOwner); - } - setPartitionOwnerList(partitionOwnerList); - return partitionOwnerList; - } - } - - @Override - public MasterGraphPartitioner - createMasterGraphPartitioner() { - return new SuperstepMasterPartition(getConf()); - } -} diff --git a/giraph-core/src/main/java/org/apache/giraph/integration/package-info.java b/giraph-core/src/main/java/org/apache/giraph/integration/package-info.java deleted file mode 100644 index 4c6ae30a1..000000000 --- a/giraph-core/src/main/java/org/apache/giraph/integration/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Package of all helper integration test objects. - */ -package org.apache.giraph.integration; diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java index c5e2f3ea6..5726d2534 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java @@ -18,7 +18,7 @@ package org.apache.giraph.partition; -import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; +import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; import org.apache.giraph.worker.LocalData; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -26,34 +26,98 @@ /** * Defines the partitioning framework for this application. * - * @param Vertex index value + * Abstracts and implements all GraphPartitionerFactoryInterface logic + * on top of two functions which define partitioning scheme: + * - which partition vertex should be in, and + * - which partition should belong to which worker + * + * @param Vertex id value * @param Vertex value * @param Edge value */ @SuppressWarnings("rawtypes") -public interface GraphPartitionerFactory extends - ImmutableClassesGiraphConfigurable { +public abstract class GraphPartitionerFactory + extends DefaultImmutableClassesGiraphConfigurable + implements GraphPartitionerFactoryInterface { + @Override + public void initialize(LocalData localData) { + } + + @Override + public final MasterGraphPartitioner createMasterGraphPartitioner() { + return new MasterGraphPartitionerImpl(getConf()) { + @Override + protected int getWorkerIndex(int partition, int partitionCount, + int workerCount) { + return GraphPartitionerFactory.this.getWorker( + partition, partitionCount, workerCount); + } + }; + } + + @Override + public final WorkerGraphPartitioner createWorkerGraphPartitioner() { + return new WorkerGraphPartitionerImpl() { + @Override + protected int getPartitionIndex(I id, int partitionCount, + int workerCount) { + return GraphPartitionerFactory.this.getPartition(id, + partitionCount, workerCount); + } + }; + } + + /** + * Calculates in which partition current vertex belongs to, + * from interval [0, partitionCount). + * + * @param id Vertex id + * @param partitionCount Number of partitions + * @param workerCount Number of workers + * @return partition + */ + public abstract int getPartition(I id, int partitionCount, + int workerCount); /** - * Use some local data present in the worker + * Calculates worker that should be responsible for passed partition. * - * @param localData localData present in the worker + * @param partition Current partition + * @param partitionCount Number of partitions + * @param workerCount Number of workers + * @return index of worker responsible for current partition */ - void initialize(LocalData localData); + public abstract int getWorker( + int partition, int partitionCount, int workerCount); + /** - * Create the {@link MasterGraphPartitioner} used by the master. - * Instantiated once by the master and reused. + * Utility function for calculating in which partition value + * from interval [0, max) should belong to. * - * @return Instantiated master graph partitioner + * @param value Value for which partition is requested + * @param max Maximum possible value + * @param partitions Number of partitions, equally sized. + * @return Index of partition where value belongs to. */ - MasterGraphPartitioner createMasterGraphPartitioner(); + public static int getPartitionInRange(int value, int max, int partitions) { + double keyRange = ((double) max) / partitions; + int part = (int) ((value % max) / keyRange); + return Math.max(0, Math.min(partitions - 1, part)); + } /** - * Create the {@link WorkerGraphPartitioner} used by the worker. - * Instantiated once by every worker and reused. + * Utility function for calculating in which partition value + * from interval [0, max) should belong to. * - * @return Instantiated worker graph partitioner + * @param value Value for which partition is requested + * @param max Maximum possible value + * @param partitions Number of partitions, equally sized. + * @return Index of partition where value belongs to. */ - WorkerGraphPartitioner createWorkerGraphPartitioner(); + public static int getPartitionInRange(long value, long max, int partitions) { + double keyRange = ((double) max) / partitions; + int part = (int) ((value % max) / keyRange); + return Math.max(0, Math.min(partitions - 1, part)); + } } diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangeWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactoryInterface.java similarity index 50% rename from giraph-core/src/main/java/org/apache/giraph/partition/HashRangeWorkerPartitioner.java rename to giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactoryInterface.java index 81c3d7d57..5551100f0 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangeWorkerPartitioner.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactoryInterface.java @@ -18,32 +18,42 @@ package org.apache.giraph.partition; +import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; +import org.apache.giraph.worker.LocalData; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import com.google.common.primitives.UnsignedInts; - /** - * Implements range-based partitioning from the id hash code. + * Defines the partitioning framework for this application. * * @param Vertex index value * @param Vertex value * @param Edge value */ @SuppressWarnings("rawtypes") -public class HashRangeWorkerPartitioner - extends HashWorkerPartitioner { - /** A transformed hashCode() must be strictly smaller than this. */ - private static final long HASH_LIMIT = 2L * Integer.MAX_VALUE + 2L; +public interface GraphPartitionerFactoryInterface extends + ImmutableClassesGiraphConfigurable { + + /** + * Use some local data present in the worker + * + * @param localData localData present in the worker + */ + void initialize(LocalData localData); + /** + * Create the {@link MasterGraphPartitioner} used by the master. + * Instantiated once by the master and reused. + * + * @return Instantiated master graph partitioner + */ + MasterGraphPartitioner createMasterGraphPartitioner(); - @Override - public PartitionOwner getPartitionOwner(I vertexId) { - long unsignedHashCode = UnsignedInts.toLong(vertexId.hashCode()); - // The reader can verify that unsignedHashCode of HASH_LIMIT - 1 yields - // index of size - 1, and unsignedHashCode of 0 yields index of 0. - int index = (int) - ((unsignedHashCode * getPartitionOwners().size()) / HASH_LIMIT); - return partitionOwnerList.get(index); - } + /** + * Create the {@link WorkerGraphPartitioner} used by the worker. + * Instantiated once by every worker and reused. + * + * @return Instantiated worker graph partitioner + */ + WorkerGraphPartitioner createWorkerGraphPartitioner(); } diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java deleted file mode 100644 index 607347d5d..000000000 --- a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.partition; - -import com.google.common.collect.Lists; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.worker.WorkerInfo; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.log4j.Logger; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; - -/** - * Master will execute a hash based partitioning. - * - * @param Vertex index value - * @param Vertex value - * @param Edge value - */ -@SuppressWarnings("rawtypes") -public class HashMasterPartitioner implements - MasterGraphPartitioner { - /** Class logger */ - private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class); - /** Provided configuration */ - private ImmutableClassesGiraphConfiguration conf; - /** Save the last generated partition owner list */ - private List partitionOwnerList; - - /** - * Constructor. - * - *@param conf Configuration used. - */ - public HashMasterPartitioner(ImmutableClassesGiraphConfiguration conf) { - this.conf = conf; - } - - @Override - public Collection createInitialPartitionOwners( - Collection availableWorkerInfos, int maxWorkers) { - int partitionCount = PartitionUtils.computePartitionCount( - availableWorkerInfos.size(), conf); - List ownerList = new ArrayList(); - Iterator workerIt = availableWorkerInfos.iterator(); - for (int i = 0; i < partitionCount; ++i) { - PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next()); - if (!workerIt.hasNext()) { - workerIt = availableWorkerInfos.iterator(); - } - ownerList.add(owner); - } - this.partitionOwnerList = ownerList; - return ownerList; - } - - @Override - public void setPartitionOwners(Collection partitionOwners) { - this.partitionOwnerList = Lists.newArrayList(partitionOwners); - } - - @Override - public Collection getCurrentPartitionOwners() { - return partitionOwnerList; - } - - /** - * Subclasses can set the partition owner list. - * - * @param partitionOwnerList New partition owner list. - */ - protected void setPartitionOwnerList(List - partitionOwnerList) { - this.partitionOwnerList = partitionOwnerList; - } - - @Override - public Collection generateChangedPartitionOwners( - Collection allPartitionStatsList, - Collection availableWorkerInfos, - int maxWorkers, - long superstep) { - return PartitionBalancer.balancePartitionsAcrossWorkers( - conf, - partitionOwnerList, - allPartitionStatsList, - availableWorkerInfos); - } - - @Override - public PartitionStats createPartitionStats() { - return new PartitionStats(); - } - - -} diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java index 221e50da3..17aec51e3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java @@ -15,11 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.giraph.partition; -import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; -import org.apache.giraph.worker.LocalData; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -27,27 +24,21 @@ * Divides the vertices into partitions by their hash code using a simple * round-robin hash for great balancing if given a random hash code. * - * @param Vertex index value + * @param Vertex id value * @param Vertex value * @param Edge value */ -@SuppressWarnings("rawtypes") public class HashPartitionerFactory - extends DefaultImmutableClassesGiraphConfigurable - implements GraphPartitionerFactory { - - @Override - public void initialize(LocalData localData) { - } + V extends Writable, E extends Writable> + extends GraphPartitionerFactory { @Override - public MasterGraphPartitioner createMasterGraphPartitioner() { - return new HashMasterPartitioner(getConf()); + public int getPartition(I id, int partitionCount, int workerCount) { + return Math.abs(id.hashCode() % partitionCount); } @Override - public WorkerGraphPartitioner createWorkerGraphPartitioner() { - return new HashWorkerPartitioner(); + public int getWorker(int partition, int partitionCount, int workerCount) { + return partition % workerCount; } } diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java index 5f7ee409a..ef65800fe 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java @@ -18,11 +18,11 @@ package org.apache.giraph.partition; -import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; -import org.apache.giraph.worker.LocalData; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import com.google.common.primitives.UnsignedInts; + /** * Divides the vertices into partitions by their hash code using ranges of the * hash space. @@ -33,21 +33,22 @@ */ @SuppressWarnings("rawtypes") public class HashRangePartitionerFactory - extends DefaultImmutableClassesGiraphConfigurable - implements GraphPartitionerFactory { + V extends Writable, E extends Writable> + extends GraphPartitionerFactory { - @Override - public void initialize(LocalData localData) { - } + /** A transformed hashCode() must be strictly smaller than this. */ + private static final long HASH_LIMIT = 2L * Integer.MAX_VALUE + 2L; @Override - public MasterGraphPartitioner createMasterGraphPartitioner() { - return new HashMasterPartitioner(getConf()); + public int getPartition(I id, int partitionCount, int workerCount) { + long unsignedHashCode = UnsignedInts.toLong(id.hashCode()); + // The reader can verify that unsignedHashCode of HASH_LIMIT - 1 yields + // index of size - 1, and unsignedHashCode of 0 yields index of 0. + return (int) ((unsignedHashCode * partitionCount) / HASH_LIMIT); } @Override - public WorkerGraphPartitioner createWorkerGraphPartitioner() { - return new HashRangeWorkerPartitioner(); + public int getWorker(int partition, int partitionCount, int workerCount) { + return partition % workerCount; } } diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java deleted file mode 100644 index 12aa41751..000000000 --- a/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.partition; - -import com.google.common.collect.Lists; -import org.apache.giraph.worker.WorkerInfo; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -import java.util.Collection; -import java.util.List; - -/** - * Implements hash-based partitioning from the id hash code. - * - * @param Vertex index value - * @param Vertex value - * @param Edge value - */ -@SuppressWarnings("rawtypes") -public class HashWorkerPartitioner - implements WorkerGraphPartitioner { - /** - * Mapping of the vertex ids to {@link PartitionOwner}. - */ - protected List partitionOwnerList = - Lists.newArrayList(); - - @Override - public PartitionOwner createPartitionOwner() { - return new BasicPartitionOwner(); - } - - @Override - public PartitionOwner getPartitionOwner(I vertexId) { - return partitionOwnerList.get( - Math.abs(vertexId.hashCode() % partitionOwnerList.size())); - } - - @Override - public Collection finalizePartitionStats( - Collection workerPartitionStats, - PartitionStore partitionStore) { - // No modification necessary - return workerPartitionStats; - } - - @Override - public PartitionExchange updatePartitionOwners( - WorkerInfo myWorkerInfo, - Collection masterSetPartitionOwners) { - return PartitionBalancer.updatePartitionOwners(partitionOwnerList, - myWorkerInfo, masterSetPartitionOwners); - } - - @Override - public Collection getPartitionOwners() { - return partitionOwnerList; - } -} diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java index e129050c0..98d1285dc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java @@ -31,7 +31,7 @@ */ @SuppressWarnings("unchecked") public class LongMappingStorePartitionerFactory extends SimplePartitionerFactory { + E extends Writable> extends GraphPartitionerFactory { /** Logger Instance */ private static final Logger LOG = Logger.getLogger( LongMappingStorePartitionerFactory.class); @@ -46,14 +46,14 @@ public void initialize(LocalData Vertex value type * @param Edge value type */ -public abstract class SimpleMasterPartitioner implements MasterGraphPartitioner { - /** Class logger */ - private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class); /** Provided configuration */ - private ImmutableClassesGiraphConfiguration conf; + private final ImmutableClassesGiraphConfiguration conf; /** Save the last generated partition owner list */ private List partitionOwnerList; @@ -53,7 +51,8 @@ public abstract class SimpleMasterPartitioner conf) { this.conf = conf; } diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java index 5dd580b1a..6d1dcb13d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java @@ -32,13 +32,13 @@ * @param Edge value type */ public class SimpleIntRangePartitionerFactory extends SimplePartitionerFactory { + E extends Writable> extends GraphPartitionerFactory { /** Vertex key space size. */ private int keySpaceSize; @Override - protected int getPartition(IntWritable id, int partitionCount, + public int getPartition(IntWritable id, int partitionCount, int workerCount) { return getPartition(id, partitionCount); } @@ -56,7 +56,7 @@ protected int getPartition(IntWritable id, int partitionCount) { } @Override - protected int getWorker(int partition, int partitionCount, int workerCount) { + public int getWorker(int partition, int partitionCount, int workerCount) { return getPartitionInRange(partition, partitionCount, workerCount); } diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java index e637e1665..9dee3d16d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java @@ -32,13 +32,13 @@ * @param Edge value type */ public class SimpleLongRangePartitionerFactory extends SimplePartitionerFactory { + E extends Writable> extends GraphPartitionerFactory { /** Vertex key space size. */ private long keySpaceSize; @Override - protected int getPartition(LongWritable id, int partitionCount, + public int getPartition(LongWritable id, int partitionCount, int workerCount) { return getPartition(id, partitionCount); } @@ -56,7 +56,7 @@ protected int getPartition(LongWritable id, int partitionCount) { } @Override - protected int getWorker(int partition, int partitionCount, int workerCount) { + public int getWorker(int partition, int partitionCount, int workerCount) { return getPartitionInRange(partition, partitionCount, workerCount); } diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java deleted file mode 100644 index 1e2984609..000000000 --- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.partition; - -import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; -import org.apache.giraph.worker.LocalData; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -/** - * Abstracts and implements all GraphPartitionerFactory logic on top of two - * functions which define partitioning scheme: - * - which partition user should be in, and - * - which partition should belong to which worker - * - * @param Vertex id - * @param Vertex value - * @param Edge value - */ -public abstract class SimplePartitionerFactory - extends DefaultImmutableClassesGiraphConfigurable - implements GraphPartitionerFactory { - - @Override - public void initialize(LocalData localData) { - } - - @Override - public final MasterGraphPartitioner createMasterGraphPartitioner() { - return new SimpleMasterPartitioner(getConf()) { - @Override - protected int getWorkerIndex(int partition, int partitionCount, - int workerCount) { - return SimplePartitionerFactory.this.getWorker( - partition, partitionCount, workerCount); - } - }; - } - - @Override - public final WorkerGraphPartitioner createWorkerGraphPartitioner() { - return new SimpleWorkerPartitioner() { - @Override - protected int getPartitionIndex(I id, int partitionCount, - int workerCount) { - return SimplePartitionerFactory.this.getPartition(id, - partitionCount, workerCount); - } - }; - } - - /** - * Calculates in which partition current vertex belongs to, - * from interval [0, partitionCount). - * - * @param id Vertex id - * @param partitionCount Number of partitions - * @param workerCount Number of workers - * @return partition - */ - protected abstract int getPartition(I id, int partitionCount, - int workerCount); - - /** - * Calculates worker that should be responsible for passed partition. - * - * @param partition Current partition - * @param partitionCount Number of partitions - * @param workerCount Number of workers - * @return index of worker responsible for current partition - */ - protected abstract int getWorker( - int partition, int partitionCount, int workerCount); - - /** - * Utility function for calculating in which partition value - * from interval [0, max) should belong to. - * - * @param value Value for which partition is requested - * @param max Maximum possible value - * @param partitions Number of partitions, equally sized. - * @return Index of partition where value belongs to. - */ - public static int getPartitionInRange(int value, int max, int partitions) { - double keyRange = ((double) max) / partitions; - int part = (int) ((value % max) / keyRange); - return Math.max(0, Math.min(partitions - 1, part)); - } - - /** - * Utility function for calculating in which partition value - * from interval [0, max) should belong to. - * - * @param value Value for which partition is requested - * @param max Maximum possible value - * @param partitions Number of partitions, equally sized. - * @return Index of partition where value belongs to. - */ - public static int getPartitionInRange(long value, long max, int partitions) { - double keyRange = ((double) max) / partitions; - int part = (int) ((value % max) / keyRange); - return Math.max(0, Math.min(partitions - 1, part)); - } -} diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitionerImpl.java similarity index 96% rename from giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java rename to giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitionerImpl.java index 0ee8d920f..208718147 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitionerImpl.java @@ -38,12 +38,12 @@ * @param Vertex value type * @param Edge value type */ -public abstract class SimpleWorkerPartitioner implements WorkerGraphPartitioner { /** Logger instance */ private static final Logger LOG = Logger.getLogger( - SimpleWorkerPartitioner.class); + WorkerGraphPartitionerImpl.class); /** List of {@link PartitionOwner}s for this worker. */ private List partitionOwnerList = Lists.newArrayList(); /** List of available workers */ diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java index 478a33bd8..49602a166 100644 --- a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java @@ -41,6 +41,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; +import org.apache.log4j.Logger; import org.objenesis.strategy.StdInstantiatorStrategy; import com.esotericsoftware.kryo.Kryo; @@ -106,6 +107,9 @@ public Kryo create() { Random.class, "it should be rarely serialized, since it would create same stream " + "of numbers everywhere, use TransientRandom instead"); + NON_SERIALIZABLE.put( + Logger.class, + "Logger must be a static field"); } // Use chunked streams, so within same stream we can use both kryo and diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java index bf8749179..a3c95bbed 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java +++ b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java @@ -18,13 +18,17 @@ package org.apache.giraph; +import static org.apache.giraph.examples.GeneratedVertexReader.READER_VERTICES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; -import org.apache.giraph.examples.GeneratedVertexReader; import org.apache.giraph.examples.SimpleCheckpoint; import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat; import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat; -import org.apache.giraph.integration.SuperstepHashPartitionerFactory; import org.apache.giraph.job.GiraphJob; import org.apache.giraph.partition.HashRangePartitionerFactory; import org.apache.giraph.partition.PartitionBalancer; @@ -34,12 +38,6 @@ import org.apache.hadoop.fs.Path; import org.junit.Test; -import java.io.IOException; - -import static org.apache.giraph.examples.GeneratedVertexReader.READER_VERTICES; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - /** * Unit test for manual checkpoint restarting */ @@ -108,24 +106,6 @@ public void testPartitioners() assertTrue(job.run(true)); verifyOutput(hdfs, outputPath); - outputPath = getTempPath("testSuperstepHashPartitioner"); - conf = new GiraphConfiguration(); - conf.setComputationClass( - SimpleCheckpoint.SimpleCheckpointComputation.class); - conf.setWorkerContextClass( - SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class); - conf.setMasterComputeClass( - SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class); - conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class); - conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class); - job = prepareJob("testSuperstepHashPartitioner", conf, outputPath); - - job.getConfiguration().setGraphPartitionerFactoryClass( - SuperstepHashPartitionerFactory.class); - - assertTrue(job.run(true)); - verifyOutput(hdfs, outputPath); - job = new GiraphJob("testHashRangePartitioner"); setupConfiguration(job); job.getConfiguration().setComputationClass( @@ -145,24 +125,6 @@ public void testPartitioners() assertTrue(job.run(true)); verifyOutput(hdfs, outputPath); - outputPath = getTempPath("testReverseIdSuperstepHashPartitioner"); - conf = new GiraphConfiguration(); - conf.setComputationClass( - SimpleCheckpoint.SimpleCheckpointComputation.class); - conf.setWorkerContextClass( - SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class); - conf.setMasterComputeClass( - SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class); - conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class); - conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class); - job = prepareJob("testReverseIdSuperstepHashPartitioner", conf, - outputPath); - job.getConfiguration().setGraphPartitionerFactoryClass( - SuperstepHashPartitionerFactory.class); - GeneratedVertexReader.REVERSE_ID_ORDER.set(job.getConfiguration(), true); - assertTrue(job.run(true)); - verifyOutput(hdfs, outputPath); - job = new GiraphJob("testSimpleRangePartitioner"); setupConfiguration(job); job.getConfiguration().setComputationClass(