Skip to content

Commit

Permalink
[#1114] fixed bug in random vertex neighborhood sampling (#1115)
Browse files Browse the repository at this point in the history
fixes #1114
  • Loading branch information
Kevin Gómez authored and ChrizZz110 committed Dec 6, 2018
1 parent 8de9b6d commit 961a451
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 95 deletions.
Expand Up @@ -16,21 +16,18 @@
package org.gradoop.flink.model.impl.operators.sampling; package org.gradoop.flink.model.impl.operators.sampling;


import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.Edge; import org.gradoop.common.model.impl.pojo.Edge;
import org.gradoop.common.model.impl.pojo.Vertex; import org.gradoop.common.model.impl.pojo.Vertex;
import org.gradoop.flink.model.impl.epgm.LogicalGraph; import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.epgm.SourceId;
import org.gradoop.flink.model.impl.functions.tuple.Value0Of3; import org.gradoop.flink.model.impl.functions.tuple.Value0Of3;
import org.gradoop.flink.model.impl.operators.sampling.functions.EdgeWithSourceTarget;
import org.gradoop.flink.model.impl.operators.sampling.functions.Neighborhood;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexRandomMarkedMap;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexWithId;
import org.gradoop.flink.model.impl.operators.sampling.functions.EdgeSourceVertexJoin; import org.gradoop.flink.model.impl.operators.sampling.functions.EdgeSourceVertexJoin;
import org.gradoop.flink.model.impl.operators.sampling.functions.EdgeTargetVertexJoin; import org.gradoop.flink.model.impl.operators.sampling.functions.EdgeTargetVertexJoin;
import org.gradoop.flink.model.impl.operators.sampling.functions.EdgesWithSampledVerticesFilter; import org.gradoop.flink.model.impl.operators.sampling.functions.EdgesWithSampledVerticesFilter;
import org.gradoop.flink.model.impl.operators.sampling.functions.FilterVerticesWithDegreeOtherThanGiven; import org.gradoop.flink.model.impl.operators.sampling.functions.FilterVerticesWithDegreeOtherThanGiven;
import org.gradoop.flink.model.impl.operators.sampling.functions.Neighborhood;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexRandomMarkedMap;


/** /**
* Computes a vertex sampling of the graph. Retains randomly chosen vertices of a given relative * Computes a vertex sampling of the graph. Retains randomly chosen vertices of a given relative
Expand Down Expand Up @@ -108,24 +105,22 @@ public RandomVertexNeighborhoodSampling(float sampleSize,
*/ */
@Override @Override
public LogicalGraph sample(LogicalGraph graph) { public LogicalGraph sample(LogicalGraph graph) {
DataSet<Tuple2<Vertex, GradoopId>> sampledVerticesWithId = graph.getVertices()
.map(new VertexRandomMarkedMap<>(sampleSize, randomSeed, PROPERTY_KEY_SAMPLED)) DataSet<Vertex> sampledVertices = graph.getVertices()
.map(new VertexWithId()); .map(new VertexRandomMarkedMap(sampleSize, randomSeed, PROPERTY_KEY_SAMPLED));


DataSet<Tuple3<Edge, GradoopId, GradoopId>> edgeSourceIdTargetId = graph.getEdges() DataSet<Edge> newEdges = graph.getEdges()
.map(new EdgeWithSourceTarget()); .join(sampledVertices)

.where(new SourceId<>()).equalTo(new Id<>())
DataSet<Edge> newEdges = edgeSourceIdTargetId .with(new EdgeSourceVertexJoin(PROPERTY_KEY_SAMPLED))
.join(sampledVerticesWithId) .join(sampledVertices)
.where(1).equalTo(1) .where(1).equalTo(new Id<>())
.with(new EdgeSourceVertexJoin()) .with(new EdgeTargetVertexJoin(PROPERTY_KEY_SAMPLED))
.join(sampledVerticesWithId) .filter(new EdgesWithSampledVerticesFilter(neighborType))
.where(2).equalTo(1)
.with(new EdgeTargetVertexJoin())
.filter(new EdgesWithSampledVerticesFilter(PROPERTY_KEY_SAMPLED, neighborType))
.map(new Value0Of3<>()); .map(new Value0Of3<>());


graph = graph.getConfig().getLogicalGraphFactory().fromDataSets(graph.getVertices(), newEdges); graph = graph.getFactory().fromDataSets(graph.getVertices(), newEdges);

graph = new FilterVerticesWithDegreeOtherThanGiven(0L).execute(graph); graph = new FilterVerticesWithDegreeOtherThanGiven(0L).execute(graph);


return graph; return graph;
Expand Down
Expand Up @@ -16,39 +16,48 @@
package org.gradoop.flink.model.impl.operators.sampling.functions; package org.gradoop.flink.model.impl.operators.sampling.functions;


import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.common.model.impl.id.GradoopId; import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.Edge; import org.gradoop.common.model.impl.pojo.Edge;
import org.gradoop.common.model.impl.pojo.Vertex; import org.gradoop.common.model.impl.pojo.Vertex;


/** /**
* Joins to get the edge source * Joins to get the edge source:
* (edge),(vertex) -> (edge,edge.targetId,(bool)vertex[propertyKey])
*/ */
public class EdgeSourceVertexJoin implements JoinFunction<Tuple3<Edge, GradoopId, GradoopId>, @FunctionAnnotation.ForwardedFieldsFirst({"*->f0", "id->f1"})
Tuple2<Vertex, GradoopId>, Tuple3<Edge, Vertex, GradoopId>> { @FunctionAnnotation.ReadFieldsSecond("properties")
public class EdgeSourceVertexJoin
implements JoinFunction<Edge, Vertex, Tuple3<Edge, GradoopId, Boolean>> {
/** /**
* Reduce object instantiations * Reduce object instantiations
*/ */
private Tuple3<Edge, Vertex, GradoopId> reuse; private Tuple3<Edge, GradoopId, Boolean> reuse;


/** /**
* Constructor * Property key of marked value
*/ */
public EdgeSourceVertexJoin() { private String propertyKey;
reuse = new Tuple3<>();
/**
* Creates an instance of this join function
*
* @param propertyKey vertex property key
*/
public EdgeSourceVertexJoin(String propertyKey) {
this.reuse = new Tuple3<>();
this.propertyKey = propertyKey;
} }


/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public Tuple3<Edge, Vertex, GradoopId> join( public Tuple3<Edge, GradoopId, Boolean> join(Edge edge, Vertex vertex) throws Exception {
Tuple3<Edge, GradoopId, GradoopId> edgeWithItsVerticesIds, reuse.f0 = edge;
Tuple2<Vertex, GradoopId> vertexWithItsId) { reuse.f1 = edge.getTargetId();
reuse.f0 = edgeWithItsVerticesIds.f0; reuse.f2 = vertex.getPropertyValue(propertyKey).getBoolean();
reuse.f1 = vertexWithItsId.f0;
reuse.f2 = edgeWithItsVerticesIds.f2;
return reuse; return reuse;
} }
} }
Expand Up @@ -16,38 +16,50 @@
package org.gradoop.flink.model.impl.operators.sampling.functions; package org.gradoop.flink.model.impl.operators.sampling.functions;


import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.common.model.impl.id.GradoopId; import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.Edge; import org.gradoop.common.model.impl.pojo.Edge;
import org.gradoop.common.model.impl.pojo.Vertex; import org.gradoop.common.model.impl.pojo.Vertex;


/** /**
* Joins to get the edge target * Joins to get the edge target:
* (edge,edge.targetId,bool-source),(target) -> (edge,bool-source,(bool)target[propertyKey])
*/ */
public class EdgeTargetVertexJoin implements JoinFunction<Tuple3<Edge, Vertex, GradoopId>, @FunctionAnnotation.ForwardedFieldsFirst({"f0->f0", "f2->f1"})
Tuple2<Vertex, GradoopId>, Tuple3<Edge, Vertex, Vertex>> { @FunctionAnnotation.ReadFieldsSecond("properties")
public class EdgeTargetVertexJoin implements
JoinFunction<Tuple3<Edge, GradoopId, Boolean>, Vertex, Tuple3<Edge, Boolean, Boolean>> {

/** /**
* Reduce object instantiations * Reduce object instantiations
*/ */
private Tuple3<Edge, Vertex, Vertex> reuse; private Tuple3<Edge, Boolean, Boolean> reuse;

/**
* Property key of vertex value
*/
private final String propertyKey;


/** /**
* Constructor * Creates an instance of this join function
*
* @param propertyKey property key of marked vertex value
*/ */
public EdgeTargetVertexJoin() { public EdgeTargetVertexJoin(String propertyKey) {
reuse = new Tuple3<>(); this.reuse = new Tuple3<>();
this.propertyKey = propertyKey;
} }


/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public Tuple3<Edge, Vertex, Vertex> join(Tuple3<Edge, Vertex, GradoopId> edgeWithItsVerticesIds, public Tuple3<Edge, Boolean, Boolean> join(Tuple3<Edge, GradoopId, Boolean> interim,
Tuple2<Vertex, GradoopId> vertexWithItsId) { Vertex vertex) {
reuse.f0 = edgeWithItsVerticesIds.f0; reuse.f0 = interim.f0;
reuse.f1 = edgeWithItsVerticesIds.f1; reuse.f1 = interim.f2;
reuse.f2 = vertexWithItsId.f0; reuse.f2 = vertex.getPropertyValue(propertyKey).getBoolean();
return reuse; return reuse;
} }
} }
Expand Up @@ -17,58 +17,51 @@


import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.common.exceptions.UnsupportedTypeException;
import org.gradoop.common.model.impl.pojo.Edge; import org.gradoop.common.model.impl.pojo.Edge;
import org.gradoop.common.model.impl.pojo.Vertex;


/** /**
* Filters the edges with sampled vertices. If any vertices of the edge does not have any related * Filters the edges with sampled vertices. If any vertices of the edge does not have any related
* property for sampling, we consider that vertex as not sampled. * property for sampling, we consider that vertex as not sampled.
*/ */
public class EdgesWithSampledVerticesFilter public class EdgesWithSampledVerticesFilter
implements FilterFunction<Tuple3<Edge, Vertex, Vertex>> { implements FilterFunction<Tuple3<Edge, Boolean, Boolean>> {
/**
* Property name which shows if a vertex is sampled
*/
private String propertyNameForSampled;
/** /**
* type of neighborhood * Type of neighborhood
*/ */
private Neighborhood neighborType; private Neighborhood neighborType;


/** /**
* Constructor * Constructor
* *
* @param propertyNameForSampled property name which shows if a vertex is sampled
* @param neighborType type of neighborhood * @param neighborType type of neighborhood
*/ */
public EdgesWithSampledVerticesFilter(String propertyNameForSampled, Neighborhood neighborType) { public EdgesWithSampledVerticesFilter(Neighborhood neighborType) {
this.propertyNameForSampled = propertyNameForSampled;
this.neighborType = neighborType; this.neighborType = neighborType;
} }


/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public boolean filter(Tuple3<Edge, Vertex, Vertex> t3) { public boolean filter(Tuple3<Edge, Boolean, Boolean> tuple) {
boolean isSourceVertexMarked = false;
boolean isTargetVertexMarked = false; boolean isSourceVertexMarked = tuple.f1;
if (t3.f1.hasProperty(propertyNameForSampled)) { boolean isTargetVertexMarked = tuple.f2;
isSourceVertexMarked = Boolean.getBoolean(
t3.f1.getPropertyValue(propertyNameForSampled).toString()); boolean filter;
}
if (t3.f2.hasProperty(propertyNameForSampled)) { switch (neighborType) {
isTargetVertexMarked = Boolean.getBoolean( case BOTH: filter = isSourceVertexMarked || isTargetVertexMarked;
t3.f2.getPropertyValue(propertyNameForSampled).toString()); break;
} case IN: filter = isTargetVertexMarked;
boolean ret = false; break;
if (neighborType.equals(Neighborhood.BOTH)) { case OUT: filter = isSourceVertexMarked;
ret = isSourceVertexMarked || isTargetVertexMarked; break;
} else if (neighborType.equals(Neighborhood.IN)) { default: throw new UnsupportedTypeException("NeighborType needs to be BOTH, IN or OUT");
ret = isTargetVertexMarked;
} else if (neighborType.equals(Neighborhood.OUT)) {
ret = isSourceVertexMarked;
} }
return ret;
return filter;
} }
} }
Expand Up @@ -47,6 +47,7 @@ public FilterVerticesWithDegreeOtherThanGiven(long degree) {
*/ */
@Override @Override
public LogicalGraph execute(LogicalGraph graph) { public LogicalGraph execute(LogicalGraph graph) {

DistinctVertexDegrees distinctVertexDegrees = new DistinctVertexDegrees( DistinctVertexDegrees distinctVertexDegrees = new DistinctVertexDegrees(
SamplingAlgorithm.DEGREE_PROPERTY_KEY, SamplingAlgorithm.DEGREE_PROPERTY_KEY,
SamplingAlgorithm.IN_DEGREE_PROPERTY_KEY, SamplingAlgorithm.IN_DEGREE_PROPERTY_KEY,
Expand Down
Expand Up @@ -23,10 +23,8 @@
/** /**
* Creates a random value for each vertex and marks those that are below a * Creates a random value for each vertex and marks those that are below a
* given threshold. * given threshold.
*
* @param <V> EPGM vertex type
*/ */
public class VertexRandomMarkedMap<V extends Vertex> implements MapFunction<V, V> { public class VertexRandomMarkedMap implements MapFunction<Vertex, Vertex> {
/** /**
* Threshold to decide if a vertex needs to be filtered. * Threshold to decide if a vertex needs to be filtered.
*/ */
Expand Down Expand Up @@ -57,7 +55,7 @@ public VertexRandomMarkedMap(float sampleSize, long randomSeed, String mark) {
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public V map(V vertex) throws Exception { public Vertex map(Vertex vertex) throws Exception {
if (randomGenerator.nextFloat() <= sampleSize) { if (randomGenerator.nextFloat() <= sampleSize) {
vertex.setProperty(mark, true); vertex.setProperty(mark, true);
} else { } else {
Expand Down
Expand Up @@ -15,15 +15,12 @@
*/ */
package org.gradoop.flink.model.impl.operators.sampling; package org.gradoop.flink.model.impl.operators.sampling;


import org.gradoop.common.model.impl.pojo.Edge;
import org.gradoop.flink.model.impl.epgm.LogicalGraph; import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.sampling.functions.Neighborhood; import org.gradoop.flink.model.impl.operators.sampling.functions.Neighborhood;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;


import java.util.Arrays; import java.util.Arrays;


import static org.junit.Assert.assertFalse;

public class RandomVertexNeighborhoodSamplingTest extends ParameterizedTestForGraphSampling { public class RandomVertexNeighborhoodSamplingTest extends ParameterizedTestForGraphSampling {


/** /**
Expand All @@ -35,7 +32,7 @@ public class RandomVertexNeighborhoodSamplingTest extends ParameterizedTestForGr
* @param neighborType The vertex neighborhood type, e.g. Neighborhood.BOTH * @param neighborType The vertex neighborhood type, e.g. Neighborhood.BOTH
*/ */
public RandomVertexNeighborhoodSamplingTest(String testName, String seed, String sampleSize, public RandomVertexNeighborhoodSamplingTest(String testName, String seed, String sampleSize,
String neighborType) { String neighborType) {
super(testName, Long.parseLong(seed), Float.parseFloat(sampleSize), super(testName, Long.parseLong(seed), Float.parseFloat(sampleSize),
Neighborhood.valueOf(neighborType)); Neighborhood.valueOf(neighborType));
} }
Expand All @@ -53,15 +50,9 @@ public SamplingAlgorithm getSamplingOperator() {
*/ */
@Override @Override
public void validateSpecific(LogicalGraph input, LogicalGraph output) { public void validateSpecific(LogicalGraph input, LogicalGraph output) {

dbEdges.removeAll(newEdges);
for (Edge edge : dbEdges) {
assertFalse("edge from original graph was not sampled but source and target were",
newVertexIDs.contains(edge.getSourceId()) &&
newVertexIDs.contains(edge.getTargetId()));
}
} }



/** /**
* Parameters called when running the test * Parameters called when running the test
* *
Expand Down

0 comments on commit 961a451

Please sign in to comment.