Skip to content
This repository has been archived by the owner on Oct 30, 2023. It is now read-only.

Commit

Permalink
Use Partitions in LocalBlockRunner
Browse files Browse the repository at this point in the history
Summary:
Speed up LocalBlockRunner, by not operating on a TestGraph, but on vertices stored in partitions.
With it - deprecate old non-SimplePartitionerFactory way of specifying partitioning.
(and with it renamed SimplePartitionerFactory to old name GraphPartitionerFactory, and changing it to
 GraphPartitionerFactoryInterface)

Test Plan:
Run unit-test for speed:

  testEmptyIterationsSmallGraph
    6.5 -> 6.3
  testEmptyIterationsSyntheticGraphLowDegree()
    42.0 -> 13.8
  testEmptyIterationsSyntheticGraphHighDegree()
    3.6 -> 2.0
  testPageRankSyntheticGraphLowDegree()
    51.0 -> 47.2
  testPageRankSyntheticGraphHighDegree()
    20.3 -> 17.4

Reviewers: maja.kabiljo, sergey.edunov, dionysis.logothetis

Reviewed By: dionysis.logothetis

Differential Revision: https://reviews.facebook.net/D52425
  • Loading branch information
Igor Kabiljo committed Jan 27, 2016
1 parent 1e802da commit ca36f1d
Show file tree
Hide file tree
Showing 26 changed files with 509 additions and 676 deletions.
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.block_app.framework.api.local;

import org.apache.giraph.block_app.examples.pagerank.AbstractPageRankExampleBlockFactory;
import org.apache.giraph.block_app.examples.pagerank.PageRankExampleBlockFactory;
import org.apache.giraph.block_app.framework.BlockUtils;
import org.apache.giraph.block_app.framework.block.Block;
import org.apache.giraph.block_app.framework.block.RepeatBlock;
import org.apache.giraph.block_app.framework.piece.Piece;
import org.apache.giraph.block_app.test_setup.TestGraphUtils;
import org.apache.giraph.block_app.test_setup.graphs.EachVertexInit;
import org.apache.giraph.block_app.test_setup.graphs.Small1GraphInit;
import org.apache.giraph.block_app.test_setup.graphs.SyntheticGraphInit;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

public class TestLocalBlockRunnerSpeed {
public static class EmptyPiecesBlockFactory extends AbstractPageRankExampleBlockFactory {
@Override
public Block createBlock(GiraphConfiguration conf) {
return new RepeatBlock(NUM_ITERATIONS.get(conf), new Piece<>());
}
}

@BeforeClass
public static void warmup() throws Exception {
TestGraphUtils.runTest(
new Small1GraphInit<LongWritable, DoubleWritable, NullWritable>(),
null,
(GiraphConfiguration conf) -> {
LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false);
BlockUtils.setBlockFactoryClass(conf, EmptyPiecesBlockFactory.class);
BlockUtils.LOG_EXECUTION_STATUS.set(conf, false);
AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 1000);
});
}

@Test
@Ignore("use for benchmarking")
public void testEmptyIterationsSmallGraph() throws Exception {
TestGraphUtils.runTest(
new Small1GraphInit<LongWritable, DoubleWritable, NullWritable>(),
null,
(GiraphConfiguration conf) -> {
LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false);
BlockUtils.setBlockFactoryClass(conf, EmptyPiecesBlockFactory.class);
BlockUtils.LOG_EXECUTION_STATUS.set(conf, false);
AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 10000);
});
}

@Test
@Ignore("use for benchmarking")
public void testEmptyIterationsSyntheticGraphLowDegree() throws Exception {
TestGraphUtils.runTest(
new SyntheticGraphInit<LongWritable, DoubleWritable, NullWritable>(),
null,
(GiraphConfiguration conf) -> {
LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false);
BlockUtils.setBlockFactoryClass(conf, EmptyPiecesBlockFactory.class);
AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 1000);

SyntheticGraphInit.NUM_VERTICES.set(conf, 500000);
SyntheticGraphInit.NUM_EDGES_PER_VERTEX.set(conf, 10);
SyntheticGraphInit.NUM_COMMUNITIES.set(conf, 1000);
});
}

@Test
@Ignore("use for benchmarking")
public void testEmptyIterationsSyntheticGraphHighDegree() throws Exception {
TestGraphUtils.runTest(
new SyntheticGraphInit<LongWritable, DoubleWritable, NullWritable>(),
null,
(GiraphConfiguration conf) -> {
LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false);
BlockUtils.setBlockFactoryClass(conf, EmptyPiecesBlockFactory.class);
AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 1000);

SyntheticGraphInit.NUM_VERTICES.set(conf, 50000);
SyntheticGraphInit.NUM_EDGES_PER_VERTEX.set(conf, 100);
SyntheticGraphInit.NUM_COMMUNITIES.set(conf, 1000);
});
}

@Test
@Ignore("use for benchmarking")
public void testPageRankSyntheticGraphLowDegree() throws Exception {
TestGraphUtils.runTest(
TestGraphUtils.chainModifiers(
new SyntheticGraphInit<LongWritable, DoubleWritable, NullWritable>(),
new EachVertexInit<>((vertex) -> vertex.getValue().set(1.0))),
null,
(GiraphConfiguration conf) -> {
LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false);
BlockUtils.setBlockFactoryClass(conf, PageRankExampleBlockFactory.class);
AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 100);

SyntheticGraphInit.NUM_VERTICES.set(conf, 500000);
SyntheticGraphInit.NUM_EDGES_PER_VERTEX.set(conf, 10);
SyntheticGraphInit.NUM_COMMUNITIES.set(conf, 1000);
});
}

@Test
@Ignore("use for benchmarking")
public void testPageRankSyntheticGraphHighDegree() throws Exception {
TestGraphUtils.runTest(
TestGraphUtils.chainModifiers(
new SyntheticGraphInit<LongWritable, DoubleWritable, NullWritable>(),
new EachVertexInit<>((vertex) -> vertex.getValue().set(1.0))),
null,
(GiraphConfiguration conf) -> {
LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false);
BlockUtils.setBlockFactoryClass(conf, PageRankExampleBlockFactory.class);
AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 100);

SyntheticGraphInit.NUM_VERTICES.set(conf, 50000);
SyntheticGraphInit.NUM_EDGES_PER_VERTEX.set(conf, 100);
SyntheticGraphInit.NUM_COMMUNITIES.set(conf, 1000);
});
}
}
Expand Up @@ -28,6 +28,7 @@
import org.apache.giraph.block_app.framework.block.Block;
import org.apache.giraph.block_app.framework.piece.AbstractPiece;
import org.apache.giraph.block_app.framework.piece.Piece;
import org.apache.giraph.conf.BooleanConfOption;
import org.apache.giraph.conf.ClassConfOption;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
Expand Down Expand Up @@ -58,6 +59,12 @@ public class BlockUtils {
Object.class, Object.class,
"block worker context value class");

/** Property describing whether to log execution status as application runs */
public static final
BooleanConfOption LOG_EXECUTION_STATUS = new BooleanConfOption(
"giraph.block_utils.log_execution_status", true,
"Log execution status (of which pieces are being executed, etc)");

private static final Logger LOG = Logger.getLogger(BlockUtils.class);

/** Dissallow constructor */
Expand Down
Expand Up @@ -18,7 +18,6 @@
package org.apache.giraph.block_app.framework.api.local;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -53,6 +52,8 @@
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.master.AggregatorToGlobalCommTranslation;
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.reducers.ReduceOperation;
import org.apache.giraph.utils.TestGraph;
import org.apache.giraph.utils.WritableUtils;
Expand All @@ -74,7 +75,10 @@
@SuppressWarnings({ "rawtypes", "unchecked" })
class InternalApi<I extends WritableComparable, V extends Writable,
E extends Writable> implements BlockMasterApi, BlockOutputHandleAccessor {
private final TestGraph<I, V, E> graph;
private final TestGraph<I, V, E> inputGraph;
private final List<Partition<I, V, E>> partitions;
private final GraphPartitionerFactory<I, V, E> partitionerFactory;

private final ImmutableClassesGiraphConfiguration conf;
private final boolean runAllChecks;
private final InternalAggregators globalComm;
Expand All @@ -94,8 +98,22 @@ class InternalApi<I extends WritableComparable, V extends Writable,
public InternalApi(
TestGraph<I, V, E> graph,
ImmutableClassesGiraphConfiguration conf,
int numPartitions,
boolean runAllChecks) {
this.graph = graph;
this.inputGraph = graph;
this.partitions = new ArrayList<>(numPartitions);
for (int i = 0; i < numPartitions; i++) {
this.partitions.add(conf.createPartition(i, null));
}
this.partitionerFactory = conf.createGraphPartitioner();
Preconditions.checkNotNull(this.partitionerFactory);
Preconditions.checkState(this.partitions.size() == numPartitions);

for (Vertex<I, V, E> vertex : graph) {
getPartition(vertex.getId()).putVertex(vertex);
}
graph.clear();

this.conf = conf;
this.runAllChecks = runAllChecks;
this.globalComm = new InternalAggregators(runAllChecks);
Expand Down Expand Up @@ -362,32 +380,34 @@ public void afterMasterBeforeWorker(BlockWorkerPieces computation) {
Collections.EMPTY_SET : previousMessages.targetsSet();
if (createVertexOnMsgs) {
for (I target : targets) {
if (!graph.getVertices().containsKey(target)) {
mutations.put(target, new VertexMutations<I, V, E>());
if (getPartition(target).getVertex(target) == null) {
mutations.putIfAbsent(target, new VertexMutations<I, V, E>());
}
}
}

VertexResolver<I, V, E> vertexResolver = conf.createVertexResolver();
for (Map.Entry<I, VertexMutations<I, V, E>> entry : mutations.entrySet()) {
I vertexIndex = entry.getKey();
Vertex<I, V, E> originalVertex = graph.getVertex(vertexIndex);
Vertex<I, V, E> originalVertex =
getPartition(vertexIndex).getVertex(vertexIndex);
VertexMutations<I, V, E> curMutations = entry.getValue();
Vertex<I, V, E> vertex = vertexResolver.resolve(
vertexIndex, originalVertex, curMutations,
targets.contains(vertexIndex));

if (vertex != null) {
graph.addVertex(vertex);
getPartition(vertex.getId()).putVertex(vertex);
} else if (originalVertex != null) {
graph.getVertices().remove(originalVertex.getId());
getPartition(originalVertex.getId()).removeVertex(
originalVertex.getId());
}
}
mutations.clear();
}

public Collection<Vertex<I, V, E>> getAllVertices() {
return graph.getVertices().values();
public List<Partition<I, V, E>> getPartitions() {
return partitions;
}

public InternalWorkerApi getWorkerApi() {
Expand All @@ -397,15 +417,19 @@ public InternalWorkerApi getWorkerApi() {
@Override
public long getTotalNumEdges() {
int numEdges = 0;
for (Vertex<I, V, E> vertex : graph.getVertices().values()) {
numEdges += vertex.getNumEdges();
for (Partition<I, V, E> partition : partitions) {
numEdges += partition.getEdgeCount();
}
return numEdges;
}

@Override
public long getTotalNumVertices() {
return graph.getVertices().size();
int numVertices = 0;
for (Partition<I, V, E> partition : partitions) {
numVertices += partition.getVertexCount();
}
return numVertices;
}

@Override
Expand Down Expand Up @@ -438,4 +462,21 @@ public BlockWorkerContextLogic getWorkerContextLogic() {
public int getWorkerCount() {
return 1;
}

private int getPartitionId(I id) {
Preconditions.checkNotNull(id);
return partitionerFactory.getPartition(id, partitions.size(), 1);
}

private Partition<I, V, E> getPartition(I id) {
return partitions.get(getPartitionId(id));
}

public void postApplication() {
for (Partition<I, V, E> partition : partitions) {
for (Vertex<I, V, E> vertex : partition) {
inputGraph.setVertex(vertex);
}
}
}
}

0 comments on commit ca36f1d

Please sign in to comment.