Skip to content
This repository has been archived by the owner on Oct 30, 2023. It is now read-only.

Commit

Permalink
[GIRAPH-1117] Provide a flexible way to decide whether to create vert…
Browse files Browse the repository at this point in the history
…ex when it is not present in the input

Test Plan: run hello pagerank with this feature on and off

Reviewers: majakabiljo, maja.kabiljo, dionysis.logothetis

Reviewed By: dionysis.logothetis

Differential Revision: https://reviews.facebook.net/D64485
  • Loading branch information
Sergey Edunov committed Sep 29, 2016
1 parent 06de6c4 commit 8bf08f5
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 18 deletions.
Expand Up @@ -27,6 +27,8 @@
import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
import org.apache.giraph.comm.messages.MessageStoreFactory;
import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.edge.DefaultCreateSourceVertexCallback;
import org.apache.giraph.edge.CreateSourceVertexCallback;
import org.apache.giraph.edge.EdgeStoreFactory;
import org.apache.giraph.edge.InMemoryEdgeStoreFactory;
import org.apache.giraph.edge.OutEdges;
Expand Down Expand Up @@ -1117,6 +1119,18 @@ public interface GiraphConstants {
"Create a source vertex if present in edge input but not " +
"necessarily in vertex input");

/**
* Defines a call back that can be used to make decisions on
* whether the vertex should be created or not in the runtime.
*/
ClassConfOption<CreateSourceVertexCallback>
CREATE_EDGE_SOURCE_VERTICES_CALLBACK =
ClassConfOption.create("giraph.createEdgeSourceVerticesCallback",
DefaultCreateSourceVertexCallback.class,
CreateSourceVertexCallback.class,
"Decide whether we should create a source vertex when id is " +
"present in the edge input but not in vertex input");

/**
* This counter group will contain one counter whose name is the ZooKeeper
* server:port which this job is using
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.MapMaker;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.ooc.OutOfCoreEngine;
Expand Down Expand Up @@ -81,6 +82,8 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
protected boolean useInputOutEdges;
/** Whether we spilled edges on disk */
private boolean hasEdgesOnDisk = false;
/** Create source vertices */
private CreateSourceVertexCallback<I> createSourceVertexCallback;

/**
* Constructor.
Expand All @@ -100,6 +103,9 @@ public AbstractEdgeStore(
configuration.getNettyServerExecutionConcurrency()).makeMap();
reuseEdgeObjects = configuration.reuseEdgeObjects();
useInputOutEdges = configuration.useInputOutEdges();
createSourceVertexCallback =
GiraphConstants.CREATE_EDGE_SOURCE_VERTICES_CALLBACK
.newInstance(configuration);
}

/**
Expand Down Expand Up @@ -247,7 +253,6 @@ private OutEdges<I, E> convertInputToComputeEdges(

@Override
public void moveEdgesToVertices() {
final boolean createSourceVertex = configuration.getCreateSourceVertex();
if (transientEdges.isEmpty() && !hasEdgesOnDisk) {
if (LOG.isInfoEnabled()) {
LOG.info("moveEdgesToVertices: No edges to move");
Expand All @@ -256,7 +261,8 @@ public void moveEdgesToVertices() {
}

if (LOG.isInfoEnabled()) {
LOG.info("moveEdgesToVertices: Moving incoming edges to vertices.");
LOG.info("moveEdgesToVertices: Moving incoming edges to " +
"vertices. Using " + createSourceVertexCallback);
}

service.getPartitionStore().startIteration();
Expand Down Expand Up @@ -307,7 +313,8 @@ public Void call() throws Exception {
// If the source vertex doesn't exist, create it. Otherwise,
// just set the edges.
if (vertex == null) {
if (createSourceVertex) {
if (createSourceVertexCallback
.shouldCreateSourceVertex(vertexId)) {
// createVertex only if it is allowed by configuration
vertex = configuration.createVertex();
vertex.initialize(createVertexId(entry),
Expand Down
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.giraph.edge;

import org.apache.giraph.conf.GiraphConfigurationSettable;
import org.apache.hadoop.io.Writable;

/**
* Implementations of this interface can decide whether
* we should create a vertex when it is not present in vertex input
* but exists in edge input.
*
* @param <I> vertex id
*/
public interface CreateSourceVertexCallback<I extends Writable>
extends GiraphConfigurationSettable {

/**
* Should we create a vertex that doesn't exist in vertex input
* but only exists in edge input
* @param vertexId the id of vertex to be created
* @return true if we should create a vertex
*/
boolean shouldCreateSourceVertex(I vertexId);

}
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.giraph.edge;

import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.io.Writable;

/**
* Default implementation of vertex creation decision maker.
* By default you can either create all vertices or not create
* implicit vertices at all.
*
* @param <I> Vertex id
*/
public class DefaultCreateSourceVertexCallback<I extends Writable>
implements CreateSourceVertexCallback<I> {
/**
* True if giraph has to create even vertices that only exist
* in edge input
*/
private boolean shouldCreateVertices;

@Override
public boolean shouldCreateSourceVertex(I vertexId) {
return shouldCreateVertices;
}

@Override
public void setConf(ImmutableClassesGiraphConfiguration configuration) {
shouldCreateVertices =
GiraphConstants.CREATE_EDGE_SOURCE_VERTICES.get(configuration);
}
}
33 changes: 18 additions & 15 deletions giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
Expand Up @@ -24,7 +24,9 @@
import java.util.Map.Entry;

import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.CreateSourceVertexCallback;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.graph.Vertex;
Expand Down Expand Up @@ -54,6 +56,8 @@ public class TestGraph<I extends WritableComparable,
protected Basic2ObjectMap<I, Vertex<I, V, E>> vertices;
/** The configuration */
protected ImmutableClassesGiraphConfiguration<I, V, E> conf;
/** Callback that makes a decision on whether vertex should be created */
private CreateSourceVertexCallback<I> createSourceVertexCallback;

/**
* Constructor requiring classes
Expand All @@ -62,6 +66,9 @@ public class TestGraph<I extends WritableComparable,
*/
public TestGraph(GiraphConfiguration conf) {
this.conf = new ImmutableClassesGiraphConfiguration<>(conf);
createSourceVertexCallback =
GiraphConstants.CREATE_EDGE_SOURCE_VERTICES_CALLBACK
.newInstance(this.conf);
vertexValueCombiner = this.conf.createVertexValueCombiner();
vertices = BasicCollectionsUtils.create2ObjectMap(
this.conf.getVertexIdClass()
Expand Down Expand Up @@ -147,21 +154,13 @@ public TestGraph<I, V, E> setVertex(I id, V value, Entry<I, E>... edges) {

/**
* Add an edge to an existing vertex
*
*`
* @param vertexId Edge origin
* @param edgePair The edge
* @return this
*/
public TestGraph<I, V, E> addEdge(I vertexId, Entry<I, E> edgePair) {
if (!vertices.containsKey(vertexId)) {
Vertex<I, V, E> v = conf.createVertex();
v.initialize(vertexId, conf.createVertexValue());
vertices.put(vertexId, v);
}
vertices.get(vertexId)
.addEdge(EdgeFactory.create(edgePair.getKey(),
edgePair.getValue()));
return this;
return addEdge(vertexId, edgePair.getKey(), edgePair.getValue());
}

/**
Expand All @@ -174,12 +173,16 @@ public TestGraph<I, V, E> addEdge(I vertexId, Entry<I, E> edgePair) {
*/
public TestGraph<I, V, E> addEdge(I vertexId, I toVertex, E edgeValue) {
if (!vertices.containsKey(vertexId)) {
Vertex<I, V, E> v = conf.createVertex();
v.initialize(vertexId, conf.createVertexValue());
vertices.put(vertexId, v);
if (createSourceVertexCallback.shouldCreateSourceVertex(vertexId)) {
Vertex<I, V, E> v = conf.createVertex();
v.initialize(vertexId, conf.createVertexValue());
vertices.put(vertexId, v);
}
}
Vertex<I, V, E> v = vertices.get(vertexId);
if (v != null) {
v.addEdge(EdgeFactory.create(toVertex, edgeValue));
}
vertices.get(vertexId)
.addEdge(EdgeFactory.create(toVertex, edgeValue));
return this;
}

Expand Down
Expand Up @@ -20,13 +20,16 @@

import com.google.common.collect.Maps;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.edge.DefaultCreateSourceVertexCallback;
import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
import org.apache.giraph.io.formats.IntIntNullTextVertexInputFormat;
import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
import org.apache.giraph.utils.ComputationCountEdges;
import org.apache.giraph.utils.IntIntNullNoOpComputation;
import org.apache.giraph.utils.InternalVertexRunner;
import org.apache.hadoop.io.IntWritable;
import org.junit.Test;

import java.util.Map;
Expand Down Expand Up @@ -133,6 +136,68 @@ public void testNegativeCreateSourceVertex() throws Exception {
assertEquals(1, (int) values.get(7));
}

@Test
public void testCustomCreateSourceVertex() throws Exception {
String [] vertices = new String[] {
"1 0",
"2 0",
"3 0",
"4 0",
};
String [] edges = new String[] {
"1 2",
"1 5",
"2 4",
"2 1",
"3 4",
"4 1",
"4 5",
"6 2",
"7 8",
"4 8",
};

GiraphConfiguration conf = getConf();
GiraphConstants.CREATE_EDGE_SOURCE_VERTICES_CALLBACK.set(conf,
CreateEvenSourceVerticesCallback.class);

Iterable<String> results = InternalVertexRunner.run(conf, vertices, edges);
Map<Integer, Integer> values = parseResults(results);

// Check that only vertices from vertex input are present in output graph
assertEquals(5, values.size());
// Check that the ids of vertices in output graph exactly match vertex input
assertTrue(values.containsKey(1));
assertTrue(values.containsKey(2));
assertTrue(values.containsKey(3));
assertTrue(values.containsKey(4));
assertTrue(values.containsKey(6));

conf.setComputationClass(ComputationCountEdges.class);
results = InternalVertexRunner.run(conf, vertices, edges);
values = parseResults(results);

// Check the number of edges of each vertex
assertEquals(2, (int) values.get(1));
assertEquals(2, (int) values.get(2));
assertEquals(1, (int) values.get(3));
assertEquals(3, (int) values.get(4));
assertEquals(1, (int) values.get(6));
}

/**
* Only allows to create vertices with even ids.
*/
public static class CreateEvenSourceVerticesCallback extends
DefaultCreateSourceVertexCallback<IntWritable> {

@Override
public boolean shouldCreateSourceVertex(IntWritable vertexId) {
return vertexId.get() % 2 == 0;
}
}


private GiraphConfiguration getConf() {
GiraphConfiguration conf = new GiraphConfiguration();
conf.setComputationClass(IntIntNullNoOpComputation.class);
Expand Down

0 comments on commit 8bf08f5

Please sign in to comment.