Skip to content

[FLINK-3771] [gelly] Methods for translating Graphs #1900

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions docs/apis/batch/libs/gelly.md
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,42 @@ val updatedGraph = graph.mapVertices(v => v.getValue + 1)
</div>
</div>

* <strong>Translate</strong>: Gelly provides specialized methods for translating the value and/or type of vertex and edge IDs (`translateGraphIDs`), vertex values (`translateVertexValues`), or edge values (`translateEdgeValues`). Translation is performed by the user-defined map function, several of which are provided in the `org.apache.flink.graph.asm.translate` package. The same `MapFunction` can be used for all the three translate methods.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);

// translate each vertex and edge ID to a String
Graph<String, Long, Long> updatedGraph = graph.translateGraphIds(
new MapFunction<Long, String>() {
public String map(Long id) {
return id.toString();
}
});

// translate vertex IDs, edge IDs, vertex values, and edge values to LongValue
Graph<LongValue, LongValue, LongValue> updatedGraph = graph
.translateGraphIds(new LongToLongValue())
.translateVertexValues(new LongToLongValue())
.translateEdgeValues(new LongToLongValue())
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val env = ExecutionEnvironment.getExecutionEnvironment
val graph = Graph.fromDataSet(vertices, edges, env)

// translate each vertex and edge ID to a String
val updatedGraph = graph.translateGraphIds(id => id.toString)
{% endhighlight %}
</div>
</div>


* <strong>Filter</strong>: A filter transformation applies a user-defined filter function on the vertices or edges of the `Graph`. `filterOnEdges` will create a sub-graph of the original graph, keeping only the edges that satisfy the provided predicate. Note that the vertex dataset will not be modified. Respectively, `filterOnVertices` applies a filter on the vertices of the graph. Edges whose source and/or target do not satisfy the vertex predicate are removed from the resulting edge dataset. The `subgraph` method can be used to apply a filter function to the vertices and the edges at the same time.

<div class="codetabs" markdown="1">
Expand Down Expand Up @@ -2013,6 +2049,57 @@ vertex and edge in the output graph stores the common group value and the number

{% top %}

Graph Algorithms
-----------

The logic blocks with which the `Graph` API and top-level algorithms are assembled are accessible in Gelly as graph
algorithms in the `org.apache.flink.graph.asm` package. These algorithms provide optimization and tuning through
configuration parameters and may provide implicit runtime reuse when processing the same input with a similar
configuration.

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Algorithm</th>
<th class="text-center">Description</th>
</tr>
</thead>

<tbody>
<tr>
<td><strong>TranslateGraphIds</strong></td>
<td>
<p>Translate vertex and edge IDs using the given <code>MapFunction</code>.</p>
{% highlight java %}
graph.run(new TranslateGraphIds(new LongValueToStringValue()));
{% endhighlight %}
</td>
</tr>

<tr>
<td><strong>TranslateVertexValues</strong></td>
<td>
<p>Translate vertex values using the given <code>MapFunction</code>.</p>
{% highlight java %}
graph.run(new TranslateVertexValues(new LongValueAddOffset(vertexCount)));
{% endhighlight %}
</td>
</tr>

<tr>
<td><strong>TranslateEdgeValues</strong></td>
<td>
<p>Translate edge values using the given <code>MapFunction</code>.</p>
{% highlight java %}
graph.run(new TranslateEdgeValues(new Nullify()));
{% endhighlight %}
</td>
</tr>
</tbody>
</table>

{% top %}

Graph Generators
-----------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,90 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
new Graph[K, VV, NV](jgraph.mapEdges[NV](mapper, createTypeInformation[Edge[K, NV]]))
}

/**
* Translate vertex and edge IDs using the given MapFunction.
*
* @param translator implements conversion from K to NEW
* @return graph with translated vertex and edge IDs
*/
def translateGraphIds[NEW: TypeInformation : ClassTag](translator: MapFunction[K, NEW]):
Graph[NEW, VV, EV] = {
new Graph[NEW, VV, EV](jgraph.translateGraphIds(translator))
}

/**
* Translate vertex and edge IDs using the given function.
*
* @param fun implements conversion from K to NEW
* @return graph with translated vertex and edge IDs
*/
def translateGraphIds[NEW: TypeInformation : ClassTag](fun: K => NEW):
Graph[NEW, VV, EV] = {
val mapper: MapFunction[K, NEW] = new MapFunction[K, NEW] {
val cleanFun = clean(fun)

def map(in: K): NEW = cleanFun(in)
}

new Graph[NEW, VV, EV](jgraph.translateGraphIds(mapper))
}

/**
* Translate vertex values using the given MapFunction.
*
* @param translator implements conversion from VV to NEW
* @return graph with translated vertex values
*/
def translateVertexValues[NEW: TypeInformation : ClassTag](translator: MapFunction[VV, NEW]):
Graph[K, NEW, EV] = {
new Graph[K, NEW, EV](jgraph.translateVertexValues(translator))
}

/**
* Translate vertex values using the given function.
*
* @param fun implements conversion from VV to NEW
* @return graph with translated vertex values
*/
def translateVertexValues[NEW: TypeInformation : ClassTag](fun: VV => NEW):
Graph[K, NEW, EV] = {
val mapper: MapFunction[VV, NEW] = new MapFunction[VV, NEW] {
val cleanFun = clean(fun)

def map(in: VV): NEW = cleanFun(in)
}

new Graph[K, NEW, EV](jgraph.translateVertexValues(mapper))
}

/**
* Translate edge values using the given MapFunction.
*
* @param translator implements conversion from EV to NEW
* @return graph with translated edge values
*/
def translateEdgeValues[NEW: TypeInformation : ClassTag](translator: MapFunction[EV, NEW]):
Graph[K, VV, NEW] = {
new Graph[K, VV, NEW](jgraph.translateEdgeValues(translator))
}

/**
* Translate edge values using the given function.
*
* @param fun implements conversion from EV to NEW
* @return graph with translated edge values
*/
def translateEdgeValues[NEW: TypeInformation : ClassTag](fun: EV => NEW):
Graph[K, VV, NEW] = {
val mapper: MapFunction[EV, NEW] = new MapFunction[EV, NEW] {
val cleanFun = clean(fun)

def map(in: EV): NEW = cleanFun(in)
}

new Graph[K, VV, NEW](jgraph.translateEdgeValues(mapper))
}

/**
* Joins the vertex DataSet of this graph with an input Tuple2 DataSet and applies
* a user-defined transformation on the values of the matched records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.graph.asm.translate.TranslateEdgeValues;
import org.apache.flink.graph.asm.translate.TranslateGraphIds;
import org.apache.flink.graph.asm.translate.TranslateVertexValues;
import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GSAConfiguration;
import org.apache.flink.graph.gsa.GatherFunction;
Expand Down Expand Up @@ -546,6 +549,42 @@ public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper)
return mapEdges(mapper, returnType);
}

/**
* Translate {@link Vertex} and {@link Edge} IDs using the given {@link MapFunction}.
*
* @param translator implements conversion from {@code K} to {@code NEW}
* @param <NEW> new ID type
* @return graph with translated vertex and edge IDs
* @throws Exception
*/
public <NEW> Graph<NEW, VV, EV> translateGraphIds(MapFunction<K, NEW> translator) throws Exception {
return run(new TranslateGraphIds<K, NEW, VV, EV>(translator));
}

/**
* Translate {@link Vertex} values using the given {@link MapFunction}.
*
* @param translator implements conversion from {@code VV} to {@code NEW}
* @param <NEW> new vertex value type
* @return graph with translated vertex values
* @throws Exception
*/
public <NEW> Graph<K, NEW, EV> translateVertexValues(MapFunction<VV, NEW> translator) throws Exception {
return run(new TranslateVertexValues<K, VV, NEW, EV>(translator));
}

/**
* Translate {@link Edge} values using the given {@link MapFunction}.
*
* @param translator implements conversion from {@code EV} to {@code NEW}
* @param <NEW> new edge value type
* @return graph with translated edge values
* @throws Exception
*/
public <NEW> Graph<K, VV, NEW> translateEdgeValues(MapFunction<EV, NEW> translator) throws Exception {
return run(new TranslateEdgeValues<K, VV, EV, NEW>(translator));
}

/**
* Apply a function to the attribute of each edge in the graph.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph.asm.translate;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.types.LongValue;

/**
* Translate {@link LongValue} by adding a constant offset value.
*/
public class LongValueAddOffset
implements MapFunction<LongValue, LongValue> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can go in the line above


private final long offset;

private LongValue output = new LongValue();

/**
* Translate {@link LongValue} by adding a constant offset value.
*
* @param offset value to be added to each element
*/
public LongValueAddOffset(long offset) {
this.offset = offset;
}

@Override
public LongValue map(LongValue value)
throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be moved to the line above

output.setValue(offset + value.getValue());
return output;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph.asm.translate;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;

/**
* Translate {@link LongValue} to {@link IntValue}.
*
* Throws {@link RuntimeException} for integer overflow.
*/
public class LongValueToIntValue
implements MapFunction<LongValue, IntValue> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be moved to the line above


private IntValue output = new IntValue();

@Override
public IntValue map(LongValue value)
throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be moved to the line above

long val = value.getValue();

if (val > Integer.MAX_VALUE) {
throw new RuntimeException("LongValue input overflows IntValue output");
}

output.setValue((int) val);
return output;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph.asm.translate;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.StringValue;

/**
* Translate {@link LongValue} to {@link StringValue}.
*/
public class LongValueToStringValue
implements MapFunction<LongValue, StringValue> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same :)


private StringValue output = new StringValue();

@Override
public StringValue map(LongValue value)
throws Exception {
output.setValue(Long.toString(value.getValue()));
return output;
}
}
Loading