Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-1514][Gelly] Add a Gather-Sum-Apply iteration method #408

Closed
wants to merge 4 commits into from

Conversation

balidani
Copy link
Contributor

Hi!

I implemented the Gather-Sum-Apply iteration method, it is much nicer now than the previous (flink-graph) version. I also added a SSSP test case to the already existing graph coloring one.

public Tuple2<Long, Double> gather(Tuple3<Vertex<Long, Double>,
Edge<Long, Long>, Vertex<Long, Double>> triplet) {

if (triplet.f2.getValue() == 0.0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally this is not okay, but we never change the initial 0.0 value.
Should we still make an epsilon check here, even though it would result in (a bit) less readable code?

@vasia
Copy link
Contributor

vasia commented Feb 20, 2015

Hi @balidani! Thanks a lot for this PR! Gather-Sum-Apply will be an awesome addition to Gelly ^^
Here come my comments:

  • There's no need for Gather, Sum and Apply functions to implement MapFunction, FlatJoinFunction, etc., since they are wrapped inside those in GatherSumApplyIteration class. Actually, I would use the Rich* versions instead, so that we can have access to open() and close() methods. You can look at how VertexCentricIteration wraps the VertexUpdateFunction inside a RichCoGroupFunction.
  • With this small change above, we could also allow access to aggregators and broadcast sets. This must be straight-forward to add (again look at VertexCentricIteration for hints). We should also add getName(), setName(), getParallelism(), setParallelism() methods to GatherSumApplyIteration.
  • Finally, it'd be great if you could add the tests you have as examples, i.e. one for Greedy Graph Coloring and one for GSAShortestPaths.

Let me know if you have any doubts!

Thanks again ☀️

@balidani
Copy link
Contributor Author

Hi @vasia!

Thanks for the review! I started making the changes, but I got stuck. I get an exception about GatherUdf not being Serializable. I tried copying from VertexCentricIteration and I'm not sure what's the difference that causes the exception.

Here is the full stack trace: https://gist.github.com/balidani/e6f716d3562d2aa131fb
Here is GatherSumApplyIteration.java: https://gist.github.com/balidani/1139eddcb1b4cbf60404

The GatherUdf is at the end. I didn't bother fully converting the Sum and Apply UDFs, because I already get an exception for gather.

What exactly is ResultTypeQueryable? Does it have anything to do with Serializability? Should we implement this with all UDFs, just like in VertexCentricIteration?

Cheers!

@vasia
Copy link
Contributor

vasia commented Feb 23, 2015

I had an offline discussion with @balidani and we figured that the exception was actually coming from the test, not the GatherSumApplyIteration. So, no issue here.

Regarding the ResultTypeQueryable interface and types, I will point you to this mailing list thread and this doc. I hope these clear things up a bit :-)

@balidani
Copy link
Contributor Author

Hi @vasia, all!

So I implemented the changes that @vasia suggested. I also added a commit that adds the Hadoop profiles to the flink-gelly project in pom.xml, so tests using files won't fail.

Are there any more things I should change?

Cheers!

/**
* This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
*/
public class GSASingleSourceShortestPathsExample implements ProgramDescription, GraphAlgorithm<Long, Double, Double> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's not a library method, there's no need to implement GraphAlgorithm or have a run() method. You can move the body to main().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I was blindly copying this part and didn't realize it's not really needed :)

@vasia
Copy link
Contributor

vasia commented Feb 24, 2015

This looks great already! We just need a few more changes:

  • you can add the Triplet type inside the gsa package. This should just be a simple wrapper around Tuple3<VV, EV, VV>. (I think we don't even need the Vertex IDs here, just the values).
  • I see you have exposed the vertex keys in the UDFs. These should be hidden from the user-facing methods, e.g. in the SSSP example gather() should return Double, not Tuple2<Long, Double>, sum() should get two Doubles and return a Double (the minimum) and apply() should get a Double (the current value) and an M (the accumulator result) and return a Double.
    More specifically, the UDFs should be defined like this:
  • gather: Triplet -> M
  • sum: <M, M> -> M
  • apply: <VV, M> -> VV

Of course the wrappers inside GatherSumApplyIteration (GatherUDF, SumUDF,ApplyUDF`) will have to maintain the keys. The point is to hide them from the user :)

Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);

// The path from src to trg through edge e costs src + e
// If the target's distance is 0 (the target is the actual source), return 0 instead
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't the case anymore, you can delete this comment :)

@vasia
Copy link
Contributor

vasia commented Feb 25, 2015

Thanks for the revision @balidani! I added some minor inline comments.
I'd like someone else to also review this before we merge.
In the meantime, I'll try to find some time and test this on a cluster :)

}
});
} else {
return ExampleUtils.getLongDoubleVertexData(env);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a cosmetic bug, but I think it could be useful to mention it...
Could you, instead of ExampleUtils, take the data from a separate class? For SSSP for example, you have SingleSourceShortestPathsData in example/utils. This would be more legible(i.e. it would look nicer than getLongLongVertexData :) ).
Also @vasia , I see nobody else is using that method in ExampleUtils so maybe after this we can get rid of it :D

@andralungu
Copy link
Contributor

Not sure why the last Travis check failed because the pom.xml seems to be the correct one.

But apart from that, this looks very nice and compliant with the vertex centric approach.
However, there is a bug there, in runVertexCentricIteration: the superstep number does not change when you run tests in Collection mode and I have a hunch that it would also be reproducible here.

Apart from that. Good to merge from my side.

And as future work, once I finish the vertexCentricIteration extensions which will allow the user to choose the messaging direction, we should exend GSA to do the same :D

@StephanEwen
Copy link
Contributor

This adds a Guava dependency and does some Hadoop profile magic. We have to be very careful when merging this to be consistent with changes in #454 by @rmetzger

I think that #454 should significantly simplify the Hadoop profile and Guava business.

@StephanEwen
Copy link
Contributor

In the core API, we have started to define all functions (map, join, ... - here it would be SumFunction, ApplyFunction) as single-abstract-method interfaces. That allows to use Java 8 lambas (Scala may pick this up as well). For each function, there is a RichFunction variant which has the open() and close() methods, and the getRuntimeContext() (here the preSuperstep() and postSuperstep() methods).

</dependency>
</dependencies>
</profile>
<profile>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why we need the hadoop2 profile here. It doesn't do anything.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the pom, we've actually merged this change as part of #429. We added the guava dependency after this discussion in the dev mailing list. If there's a better way to deal with this, please let me know! The hadoop2 profile seems to be there by mistake, I'll fix that :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you.
I've rebased #454 onto your changes in master and adopted it to our new approach of handling dependencies in Flink. There, I've added guava again as a regular dependency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect, thanks @rmetzger!

@StephanEwen
Copy link
Contributor

I am just dropping some more thoughts here ;-)

The interface seems a bit confusing. Right now, one has to first create the iteration from the graph and then tell the graph to run it:

GatherSumApplyIteration iteration = graph.createGatherSumApplyIteration(gather, sum, apply);
Graph result = graph.runGatherSumApplyIteration(iteration);

What was the reason to not go simply for

Graph result = graph.runGatherSumApplyIteration(gather, sum, apply);

If the reason was additional parameterization of the iteration, would it be nicer do go for the following?

GatherSumApplyIteration iteration = graph.createGatherSumApplyIteration(gather, sum, apply);
iteration.configureInSomeWay(...);
Graph result = iteration.result();

@vasia
Copy link
Contributor

vasia commented Mar 8, 2015

Regarding the rest of the comments:

@StephanEwen
Copy link
Contributor

The vertex-centric iterations use semantic annotations a lot to save sorting/partitioning where possible. This results in a plan for the vertex-centric iterations that partitions one and sorts one, despite the fact that it uses wither two coGroups or 2 joins plus a reduce. I think this would help big time here as well. In general, plan optimization may be something that Gelly could take a look at.

@StephanEwen
Copy link
Contributor

Ona more thought: The triplet re-building in every iteration seems quite expensive. Does it make sense to have the triplets (or quadruplets, with the vertex value) as the data in the solution set?

@vasia
Copy link
Contributor

vasia commented Mar 8, 2015

  • Regarding annotations, I actually tried as many I could find :)) This is the commit. I haven't looked at the execution plans, but my annotations didn't make a big difference in the runtime of my experiments.
  • As for having triplets in the solution set, I also thought about this. The problem is that then, you will still have to re-build them, in the solution set delta, at the end of every iteration (in order to update the solution set). Unless you have something in mind that avoids this?

@balidani
Copy link
Contributor Author

Hi!

I pushed some changes. @vasia told me that she and @StephanEwen even decided that iterating on triplets is unfeasible, so we should operate on <srcVertex, edge> pairs instead. Only a few changes were required, since neither of the examples require the target vertex value anyway.

Regarding the other suggestions, @vasia and I think it would be a good idea to push them in a separate PR, because they affect both GSA and Vertex centric iterations.

Regarding the cosmetic changes that @andralungu suggested: I changed SSSP to use SingleSourceShortestPathData, and for greedy graph coloring I just generated a graph, like in LabelPropagationExample.

Cheers!

@vasia
Copy link
Contributor

vasia commented Mar 17, 2015

Hi @balidani, thanks a lot for the changes! I'll try to run some tests on a cluster soon and test the new version.
I have a question about the graph coloring example. As far as I understand, in this algorithm, colors are represented by numbers (vertex values) and the goal is to color the graph with the minimum number of colors, so that no 2 neighboring vertices have the same color. So, in each superstep, each vertex gathers the colors of its neighbors in a set and then assigns itself the minimum color that isn't in this set. However, it seems to me that in your implementation, you're only propagating the minimum neighbor value, similar to what you would do in connected components. Is there anything I'm missing here?
Thanks!

@balidani
Copy link
Contributor Author

@vasia was completely right about the GGC algorithm, I misunderstood what it was supposed to do. I implemented the correct version, but it turns out, after reading the GAS paper that this algorithm has problems when executed synchronously (e.g., values can oscillate between 0 and 1). We had a discussion and decided to add a Connected Components example instead, as well as a PageRank example, which I will implement shortly.

@andralungu
Copy link
Contributor

Hi @balidani :)

This PR has not been updated in a while now. Are you facing any issues? In that case I would like to help. Otherwise, if you're busy maybe it makes sense to pass the remainder of the implementation to someone else?

@balidani
Copy link
Contributor Author

Hi @andralungu!

I just talked to Vasia, and I'll do a rebase today.
Cheers!

@balidani
Copy link
Contributor Author

Okay, I updated the PR and Travis passed.

@vasia
Copy link
Contributor

vasia commented Apr 17, 2015

Thanks @balidani! I'll try to review asap :-)

@@ -57,4 +57,35 @@ under the License.
<version>${guava.version}</version>
</dependency>
</dependencies>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @balidani. I think there's no need for these profiles anymore.

@vasia
Copy link
Contributor

vasia commented Apr 18, 2015

Thanks @balidani for the update! I left some inline comments which should be fairly easy to address.
I would like to merge this soon and then we should open separate issues for documentation and configuration.

@balidani
Copy link
Contributor Author

Hi @vasia!

I fixed the problems, sorry about that. I had to resolve some merge conflicts and it appears that I did not notice some things. Next time I'll check the diff before I commit :)

@vasia
Copy link
Contributor

vasia commented Apr 21, 2015

Hi @balidani!

I pulled your changes and made some improvements, mainly to the examples, so that they are consistent with the rest of Gelly's examples. You can see my changes in this branch.

I've been running some experiments on a small cluster and so far everything runs smoothly. GSA SSSP actually seems to be faster than vertex-centric SSSP for both input datasets I'm using 😄

I will run a few more experiments in the next days and if I find no problems and there are no objections, I will merge by the end of the week.

@andralungu
Copy link
Contributor

I noticed that there is no GSAPageRank example in the latest version of the code.
I'd like to take care of that once this PR gets merged, to get to understand this model on a practical level, and @vasia can write the documentation. If no objections, I will open a JIRA for this after I see the code in "production" :D

@vasia
Copy link
Contributor

vasia commented Apr 24, 2015

Hey Andra,

there are several issues we'll need to fix/add after this is merged. A Pagerank example is one of them :) I have gathered and will open all related JIRAs once this is merged. Thanks for volunteering to take care of one already :))

@asfgit asfgit closed this in 6e24879 Apr 26, 2015
bhatsachin pushed a commit to bhatsachin/flink that referenced this pull request May 5, 2015
marthavk pushed a commit to marthavk/flink that referenced this pull request Jun 9, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants