Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2254] Add BipartiateGraph class #2564

Closed
wants to merge 4 commits into from

Conversation

mushketyk
Copy link
Contributor

This PR implements BipartiteGraph class with support classes and lays foundation for future work on bipartite graphs support. I didn't add documentation because I would like to make sure that this approach is in line with what community has in mind regarding bipartite graph support. If this PR is good, I'll continue with documentation and other related tasks.

  • General
    • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
    • The pull request addresses only one issue
    • Each commit in the PR has a meaningful commit message (including the JIRA id)
  • Documentation
    • Documentation has been added for new functionality
    • Old documentation affected by the pull request has been updated
    • JavaDoc for public methods has been added
  • Tests & Build
    • Functionality added by the pull request is covered by tests
    • mvn clean verify has been executed successfully locally or a Travis build has passed

Copy link
Contributor

@vasia vasia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mushketyk,
thank you for the PR!
The class, types, dataset look OK, but I think we should look into the projection methods more carefully. A projection transformation is an expensive operation and might increase the graph edges by an order of magnitude. In its naive form, a top-(bottom-)projection can be generated if every bottom-(top-)vertex creates an edge for each pair of neighbors. That's an operation quadratic on the vertex degree. However, we might be able to re-use some of the optimizations that @greghogan has implemented in the Jaccard coefficient algorithm since the main computation is the same: finding common neighbors. @greghogan what do you think?


/**
*
* Bipartite graph is a graph that contains two sets of vertices: top vertices and bottom vertices. Edges can only exist
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rephrase that to "... a graph whose vertices can be divided into two disjoint sets"

* between a pair of vertices from different vertices sets. E.g. there can be no vertices between a pair
* of top vertices.
*
* <p>Bipartite graph is useful to represent graphs with two sets of objects, like researchers and their publications,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bipartite graphs are useful...


/**
* Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
* graph will exist only if the original bipartite graph contains a bottom vertex they both connected to.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they *are both connected to

* Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
* graph will exist only if the original bipartite graph contains a bottom vertex they both connected to.
*
* <p>Caller should provide a function that will create an edge between two top vertices. This function will receive
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether this is a good idea. Why leave this to the user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that different datasets would require different algorithms to consolidate a number of connections into a single edge value. Hence the callback function. But I think Greg's idea is better.

this.f0 = src;
this.f1 = trg;
this.f2 = val;
public Edge(K source, K target, V value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make them consistent with naming style in other classes.
Do you suggest to revert this?

Copy link
Contributor

@greghogan greghogan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, near the top of the list is FLINK-1267 to add a GroupedCross operator. It is nice to have this as an additional use case.

What if instead of passing in a user function we simply return a flattened tuple to the user? The user can then apply a (chainable) MapFunction to interpret the data as desired.

We could have multiple project methods. The full set is a Tuple8 (three labels, three vertex values, and two edge values). There could also be a variant without joining on the vertex sets that would return a Tuple5 (three labels, two edge values) and perhaps another variant to return a Graph with null edge values (NullValue).

I'd also look to add a middle variant as Tuple7 which only joins on one vertex set.

@@ -480,7 +480,8 @@ protected static File asFile(String path) {
}
}

assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length);
assertEquals(String.format("Wrong number of elements result. Expected: %s. Result: %s.", Arrays.toString(expectedStrings), Arrays.toString(resultStrings)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't IntelliJ offer to view the different results?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue here is that it compares lengths of objects and therefore JUnit only prints compared numbers (say 2 and 0) and not content of arrays.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The array contents are compared in the assertions that follow the test for length.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we moved String.format to its own line, included in the string both the array lengths and contents, and added a comment to describe why we are also printing the full arrays?

Also, should the arrays be printed on new lines such that they would line up until the diverging element? We'll need to move the sorting of the arrays before the length check.

Edge<KT, NEV>> edgeFactory) {

DataSet<Edge<KT, NEV>> newEdges = edges.join(edges)
.where(new TopProjectionKeySelector<KT, KB, EV>())
Copy link
Contributor

@greghogan greghogan Sep 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the field index should be faster than a key selector, and allows the optimizer to reuse sorted fields.

@mushketyk
Copy link
Contributor Author

Hi @greghogan,

I like your ideas about providing different API for projections. This should be better than my approach.
@vasia What do you think about this?

@vasia
Copy link
Contributor

vasia commented Sep 30, 2016

Providing a flattened tuple is certainly better than having the user implement the reduce, but I think we should provide separate methods for the default and custom operations. A projection is a very well-defined operation: create a graph where there is an edge between any pair of vertices with a common neighbor in the bipartite graph. If the user wants to apply mappers or other transformations on the vertices and edges, they can do so afterwards, using the graph methods. The problem is that with a projection, some information is lost, e.g. the edge values. For these cases, we can provide a custom projection method where we give the labels in a flattened tuple as @greghogan suggested, but I'm afraid the API will look ugly with a Tuple8 there. Another, maybe friendlier solution, would be attaching the labels on the projection graph edges. What do you think?

@mushketyk
Copy link
Contributor Author

Tuple8 does not seem to friendly to me either. What do you mean by "attaching the labels"? Is it something similar to what we do with Edge/Vertex classes right, inheriting Tuple and providing getters and setters to access values in it? Or is there some other way to attach labels to tuples?

@vasia
Copy link
Contributor

vasia commented Sep 30, 2016

What I meant is simply creating an edge with a Tuple2 label containing the labels of the two edges in the bipartite graph. Makes sense?

@greghogan
Copy link
Contributor

Agreed, I would amend my earlier suggestion to say we only need to start with two projection methods (for each of top and bottom), something like
public Graph<TK, TVV, Tuple2<EV, EV>> topProjectionSimple() {
and
public Graph<TK, TVV, TopProjection<TVV, BK, BVV, EV>> topProjection() {

TopProjection (we can find better names) would be a Tuple6 with POJO accessors as with Vertex, Edge, etc.

@mushketyk
Copy link
Contributor Author

Makes sense to me. I'll implement this during weekend.

@mushketyk
Copy link
Contributor Author

@greghogan @vasia I've update the code according to your suggestion.
The only thing that I did differently: I return Tuple4 from a more complete version of a bottom/top projections it contains vertex key, vertex value and values of two vertices. I assumed that to get values of two other vertices I would need to perform two other joins which will make the method much slower, while a user can do with the result of the method if needed.

* @param <BV> the bottom vertices value type
* @param <EV> the edge value type
*/
public class BipartiteGraph<TK, BK, TV, BV, EV> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the generic parameters be easier to read as public class BipartiteGraph<KT, KB, VVT, VVB, EV> {?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll rename them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's check with @vasia first. She may prefer the current type parameter names or have a better suggestion.

@greghogan
Copy link
Contributor

The advantage to joining on vertex values before the grouped cross is that the number of projected vertices is quadratic in the vertex degree. The projected graphs will usually be much larger than the bipartite graph.

@mushketyk
Copy link
Contributor Author

Hi @greghogan. Thank you for clarification. I'll update code accordingly. Do you have any other comments regarding the PR?

Copy link
Contributor

@greghogan greghogan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a few more comments. Let's discuss with @vasia before reworking too much code.

* @param <BV> the bottom vertices value type
* @param <EV> the edge value type
*/
public class BipartiteGraph<TK, BK, TV, BV, EV> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's check with @vasia first. She may prefer the current type parameter names or have a better suggestion.


private static final long serialVersionUID = 1L;

public BipartiteEdge(){}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra space for () {}.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

.map(new MapFunction<Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>>, Edge<TK, Tuple2<EV, EV>>>() {
@Override
public Edge<TK, Tuple2<EV, EV>> map(Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>> value) throws Exception {
return new Edge<>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Edge and nested Tuple2 can be reused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand what you mean. Could you elaborate please?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to create new objects for each call to map. The Edge and Tuple2 can be fields on the class. For examples look in DegreeAnnotationFunctions.java.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. Got it.

return this.f1;
}

public EV getEdgeValue1() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we now call this the "source" value (and Value2 the "target" value??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Good point.

* @return top projection of the bipartite graph where every edge contains a tuple with values of two edges that
* connect top vertices in the original graph
*/
public Graph<TK, TV, Tuple2<EV, EV>> simpleTopProjection() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be preferable for IDE command completion to call this method projectTopSimple (and then have projectTopFull / projectBottomSimple / projectBottomFull)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Will update.

* @return top projection of the bipartite graph where every edge contains a tuple with values of two edges that
* connect top vertices in the original graph
*/
public Graph<TK, TV, Tuple2<EV, EV>> simpleTopProjection() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, and it was discussed only have a Tuple2 of edge values, but double checking that we don't also want to include the (here: bottom) vertex ID in the new edge value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't get your point. Could you please elaborate on this please?

@vasia
Copy link
Contributor

vasia commented Oct 6, 2016

Thanks for the update @mushketyk and for the review @greghogan. I agree with your suggestions. For the type parameters I would go for <KT, KB, VVT, VVB, EV>. Let me know if there's any other issue you'd like my opinion on.

@mushketyk
Copy link
Contributor Author

@vasia @greghogan I've updated the PR. Could you please give it another look?

@mushketyk
Copy link
Contributor Author

New gelly tests failed with errors like:

Caused by: java.io.IOException: Insufficient number of network buffers: required 32, but only 3 available. The total number of network buffers is currently set to 2048. You can increase this number by setting the configuration key 'taskmanager.network.numberOfBuffers'.

Do you know what is causing this error? Should I update the code somehow?

@greghogan
Copy link
Contributor

Try switching to ExecutionEnvironment.createCollectionsEnvironment().

@mushketyk
Copy link
Contributor Author

@greghogan
Thank you for the suggestion.
The build is passing now.

@vasia
Copy link
Contributor

vasia commented Nov 21, 2016

Thank @mushketyk. @greghogan are you shepherding this PR or shall I?

@mushketyk
Copy link
Contributor Author

@vasia I don't think anybody is shepherding this PR :)

Copy link
Contributor

@greghogan greghogan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mushketyk, thanks for your contribution! In addition to some comments, if you could rebase the PR we may avoid later difficulties. I don't expect any conflicts since you are mostly adding new code.

import org.apache.flink.api.java.tuple.Tuple3;

/**
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty line.


/**
*
* A BipartiteEdge represents a link between a top and bottom vertices
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"between a top" -> "between top", or similar.

/**
*
* A BipartiteEdge represents a link between a top and bottom vertices
* in a {@link BipartiteGraph}. It is similar to the {@link Edge} class
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"It is generalized form of {@link Edge} where the source and target vertex IDs can be of different types.", or similar?

return this.f0;
}

public void setTopId(KT i) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parameter name "i" -> "topId"? Also, below for "i" -> "bottomId" and "newValue" -> "value"?

import org.apache.flink.api.java.tuple.Tuple2;

/**
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty line.

}
})
// Join with top vertices to preserve top vertices values
.join(topVertices)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not more efficient to do the Cartesian join last if we assume that the |vertices| << |bipartite edges| << |simple edges|? The code can be reused between the full projection functions: first join the bipartite edge with top vertex, then join the result with the bottom vertex (perhaps using the Projection class below with NullValue where appropriate).

* @param <VV> the value type of vertices of an opposite set
* @param <EV> the edge value type
*/
public class Projection<VK, VV, EV, VVC> extends Tuple6<VK, VV, EV, EV, VVC, VVC> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing comment for documenting VVC. Should EV be placed before VVC? And before VK and VV?

@Test
public void testSetBottomId() {
edge.setBottomId(100);
assertEquals(Integer.valueOf(100), edge.getBottomId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does auto-boxing not work here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works but a compiler can't decide between assertEquals(Object, Object) and assertEquals(long, long).
Anyway I replaced it with:

assertEquals(100, (long) edge.getBottomId());


import static org.junit.Assert.assertEquals;

public class ProjectionTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this test class necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that testing getters/setters is considered wasting of CPU cycles, but since getters/setters are not auto-generated here and need to access proper tuple's values I decided to add them.

@@ -480,7 +480,8 @@ protected static File asFile(String path) {
}
}

assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length);
assertEquals(String.format("Wrong number of elements result. Expected: %s. Result: %s.", Arrays.toString(expectedStrings), Arrays.toString(resultStrings)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The array contents are compared in the assertions that follow the test for length.

@mushketyk
Copy link
Contributor Author

Hi @greghogan , thank you for your review.
I'll try to fix them in the next couple of days.

Best regards,
Ivan.

@mushketyk
Copy link
Contributor Author

Hi @greghogan,

I've updated the PR according to your review and rebased it on top of the master branch.

The only thing that I didn't change is the message in the assertEquals you pointed to since it is not very helpful to receive an error message like: "Wrong number of elements result. Expected 4, actual 3." I think it is much more helpful for the debugging purposes to see contents of the arrays to figure out why their lengths are different.

@greghogan
Copy link
Contributor

@mushketyk @vasia, thoughts on package naming? Should we create a new org.apache.flink.bigraph package? Another option would be org.apache.flink.graph.bidirectional which would suggest future package names like org.apache.flink.graph.multi and org.apache.flink.graph.temporal.

@vasia
Copy link
Contributor

vasia commented Dec 8, 2016

I would go for org.apache.flink.graph.bipartite. I think that bidirectional simply suggests that each edge exists in both directions.

@greghogan
Copy link
Contributor

Yes, you are right, bipartite.

@mushketyk
Copy link
Contributor Author

@vasia , @greghogan I've created a new package, moved new classes there and update PR according to your latest comments.

Best regards,
Ivan.


assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length);

//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you were intending to add a comment here. The if (sort) ... block will need to go before msg is created. Should we add newlines to the message string, something like "Expected %d elements but received %d.\n expected array: %s\n received array: %s"? That should line up.


if (sort) {
Arrays.sort(expectedStrings);
Arrays.sort(resultStrings);
}

for (int i = 0; i < expectedStrings.length; i++) {
assertEquals(expectedStrings[i], resultStrings[i]);
assertEquals(msg, expectedStrings[i], resultStrings[i]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to include msg here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will give more context just as in the comparing lengths case.

@mushketyk
Copy link
Contributor Author

Hi @greghogan , I've fixed the PR according to your review.

@asfgit asfgit closed this in 365cd98 Dec 9, 2016
@vasia
Copy link
Contributor

vasia commented Dec 9, 2016

Thank you both for your work @mushketyk and @greghogan!
Please, keep in mind that we should always add documentation for every new feature; especially a big one such as supporting a new graph type. We've added the checklist template for each new PR so that we don't forget about it :)
Can you please open a JIRA to track that docs for bipartite graphs are missing? Thank you!

@greghogan
Copy link
Contributor

Thanks for the reminder @vasia. The separate JIRA sub-task does allow for a discussion of how best to document the full set of proposed bipartite functionality.

@mushketyk
Copy link
Contributor Author

Hi @vasia , thank you for merging my PR.
Thank you for the reminder about the documentation. I've created the JIRA for it: https://issues.apache.org/jira/browse/FLINK-5311

static-max pushed a commit to static-max/flink that referenced this pull request Dec 13, 2016
joseprupi pushed a commit to joseprupi/flink that referenced this pull request Feb 12, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants