Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hadoopcompatibility: Implementations of basic programming interfaces and... #37

Closed
wants to merge 4 commits into from

Conversation

atsikiridis
Copy link

... a basic driver.

  • wrappers for Mapper, Reducer and Combiner (as a local Reducer) on the new Java API
  • wrapper for OutputCollector
  • wrappers for Partitioner, values comparator.
  • a driver making it posible to run unmonitored (so far) Hadoop jobs on Flink.
  • tests and variations of the WordCount driver exclusively in Hadoop.

You can find more about the Google Summer of Code project here: https://issues.apache.org/jira/browse/FLINK-838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14031511

@rmetzger
Copy link
Contributor

Thank you for the pull request. I know it was a lot of work.
It fails to build with hadoop 2.2.0 dependencies: https://travis-ci.org/apache/incubator-flink/builds/28311858

@atsikiridis
Copy link
Author

Hey @rmetzger I think I made a fix for hadoop 2.2.0 dependencies. But I'm not sure I understand why it is still failing. Can you take a look please? :)

@rmetzger
Copy link
Contributor

Hey, everything is okay now, according to this page: https://travis-ci.org/apache/incubator-flink/builds/28399204 (You can ignore the one failed build, its not your fault).
So travis gave its okay to your pull request.
Now we have to wait until somebody finds time to test the pull request.
@twalthr: you do have time for that? I think we have to write a little map-reduce example and run it on our cluster. The test should verify that everything is intuitive, well documented .. and that it runs on the cluster. We could also try to run a Mahout (map reduce) algorithm from Stratosphere.

@twalthr
Copy link
Contributor

twalthr commented Jun 25, 2014

I don't think that I can review and test it this week. But next week would be possible.

@rmetzger
Copy link
Contributor

rmetzger commented Jul 8, 2014

I'm testing the PR now.

@rmetzger
Copy link
Contributor

rmetzger commented Jul 8, 2014

Okay, I've taken this Job: https://github.com/apache/hadoop-common/blob/branch-1.2/src/examples/org/apache/hadoop/examples/Join.java and replaced the JobClient by the StratosphereHadoopJobClient.
The first thing I got is the following:

Exception in thread "main" java.lang.NullPointerException
    at org.apache.hadoop.mapred.JobClient.getClusterStatus(JobClient.java:1298)
    at org.apache.hadoop.mapred.JobClient.getClusterStatus(JobClient.java:1285)
    at eu.stratosphere.test.hadoop.Join.run(Join.java:87)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
    at eu.stratosphere.test.hadoop.Join.main(Join.java:172)

I think you should either throw a "UnsupportedOperationException" or implement support for this. (The same applies to all the other methods that you've inherited from JobClient which are not working)

*/
@Override
@SuppressWarnings("unchecked")
public RunningJob submitJob(JobConf hadoopJobConf) throws IOException{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you expect the JobConf again here? I think the user already passed it when creating the StratosphereHadoopJobClient. (JobConf inherits from Configuration).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually here I am just following the JobClient Interface. And it has this static method runJob which is just a submitJob and waitForCompletion in one line. It is perhaps redundant in that case to pass the JobConf both to the constructor and the method but it will get useful once we have more methods for the JobClient.

What is really missing is a constructor with no arguments, which I should add.

Sorry If I completely misunderstood this :P

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you got the question right. I'm still a bit unhappy with the configuration-object situation because the configuration that the user is passing into the constructor is never used. But lets keep it this way.

@rmetzger
Copy link
Contributor

rmetzger commented Jul 8, 2014

I experienced another error

Exception in thread "main" java.io.IOException: An error has occurred.
    at eu.stratosphere.hadoopcompatibility.mapred.StratosphereHadoopJobClient$DummyStratosphereRunningJob.waitForCompletion(StratosphereHadoopJobClient.java:238)
    at eu.stratosphere.test.hadoop.Join.run(Join.java:162)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
    at eu.stratosphere.test.hadoop.Join.main(Join.java:172)
Caused by: eu.stratosphere.nephele.client.JobExecutionException: java.io.IOException: Type mismatch in key: expected class org.apache.hadoop.io.BytesWritable, received org.apache.hadoop.io.LongWritable
    at eu.stratosphere.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector.validateExpectedTypes(HadoopOutputCollector.java:78)
    at eu.stratosphere.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector.collect(HadoopOutputCollector.java:48)
    at eu.stratosphere.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector.collect(HadoopOutputCollector.java:1)
    at org.apache.hadoop.mapred.lib.IdentityMapper.map(IdentityMapper.java:38)
    at eu.stratosphere.hadoopcompatibility.mapred.HadoopMapFunction.flatMap(HadoopMapFunction.java:80)
    at eu.stratosphere.hadoopcompatibility.mapred.HadoopMapFunction.flatMap(HadoopMapFunction.java:1)
    at eu.stratosphere.pact.runtime.task.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:71)
    at eu.stratosphere.pact.runtime.task.DataSourceTask.invoke(DataSourceTask.java:215)
    at eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:260)
    at java.lang.Thread.run(Thread.java:745)

    at eu.stratosphere.nephele.client.JobClient.submitJobAndWait(JobClient.java:354)
    at eu.stratosphere.client.LocalExecutor.executePlan(LocalExecutor.java:231)
    at eu.stratosphere.api.java.LocalEnvironment.execute(LocalEnvironment.java:55)
    at eu.stratosphere.hadoopcompatibility.mapred.StratosphereHadoopJobClient$DummyStrat

I'm not sure what I'm doing wrong.

You can find the code that I'm using for testing here: https://github.com/rmetzger/testjob/tree/artem

@atsikiridis
Copy link
Author

Thank you for testing it.

I believe it is not a bug. I actually tried to simulate this behavior with my last commit. If you run this on Hadoop, the same message is shown.

Can you mention how do you run the driver please?

I get the same when I set TextInputFormat and TextOutputFormat, And all defaults.

So it really depends on the InputFormat and OutputFormat you choose.

This happens because we have mismatches between the types built in the input / output formats and the the input output format keys specified in the driver.

If you mention your case exactly (if it is a sequence file how is it generated?) and maybe we can check (or it is really a bug).

@rmetzger
Copy link
Contributor

rmetzger commented Jul 9, 2014

I was just executing the main method of this class: https://github.com/rmetzger/testjob/blob/artem/src/main/java/eu/stratosphere/test/hadoop/Join.java, with these arguments -inFormat org.apache.hadoop.mapred.TextInputFormat -joinOp outer /tmp/jointest/input /tmp/jointest/input1 /tmp/jointest/output

I thought this is independent of the InputFormat since the error is happening when the output collector is checking the type of the elements it received?
I'll check it again.

@rmetzger
Copy link
Contributor

rmetzger commented Jul 9, 2014

I was following these instructions: http://www.mail-archive.com/core-user@hadoop.apache.org/msg04066.html and it worked! So the problem was indeed my input format!

I think the pull request is ready to merge once the NullPointerException (inherited methods which are not working) issue has been resolved.

@rmetzger
Copy link
Contributor

rmetzger commented Jul 9, 2014

One more question: Why do you force the application to use a DOP of one?

@atsikiridis
Copy link
Author

Definitely, this should not stay 1. I just left it here for simplicity.

In my new branch it is the the maximum of ( mapred.tasktracker.map.tasks.maximum and mapred.tasktracker.reduce.tasks.maximum ) which are the "slots" of tasks per node. What do you think?

The problem here is that in flink it is a DOP for all operators in an environment (if I'm not mistaken) so if for example, DOP = 3 it will be 3 mappers and 3 reducers. What happens if I want 3 mappers in parallel and 1 reducer ? Is there actually a way to set parallelism per operator with the current APIs ?

@atsikiridis
Copy link
Author

It's great that you think it is ok. I will check which operations have not been implemented on the branch and guard them as you said. By the way, I wanted to ask you, do you think that WordCountVariations file is ok where it is? Maybe I should move it to test ? Because these are not really examples, they are just used for testing. I think that maybe I have it in the wrong place...

@rmetzger
Copy link
Contributor

rmetzger commented Jul 9, 2014

Yes, move them to a test package.

Regarding the DOP: The ./bin/stratosphere client allows to pass a -p parameter that sets the DOP for the enviroment. This way, users can change the DOP very easily.
So by default, I would not set any DOP so that you can set it when running.
If the user has set the respective hadoop property (mapred.tasktracker.map.tasks.maximum) we indeed should respect that. Flink allows to set the DOP individually to operators, the functionality is just not exposed via the Java API.
Can you open a JIRA for that, discuss how to implement it and then contribute the change via a PR? (I will merge this change independently of the change, so it won't be a blocker).

@atsikiridis
Copy link
Author

Ok, I'll do that. Thank you :)

@fhueske
Copy link
Contributor

fhueske commented Jul 9, 2014

In fact, setting the DOP of individual operators is supported with Operator.setParallelism(int dop).

@atsikiridis
Copy link
Author

Oh yes, @fhueske. I completely missed it for some reason. So then, it gets more trivial. I will add correct parallelism for this PR and see if the tests pass.

@fhueske
Copy link
Contributor

fhueske commented Jul 9, 2014

Well, it's an undocumented feature and easy to miss... ;-)
Will open a JIRA to fix that.

@rmetzger
Copy link
Contributor

Let me know when I can have a look at the PR again.

@atsikiridis
Copy link
Author

Hi @rmetzger , I believe I have implemented your observations along with some other minor stuff. Do you think I should rebase and change the name now or in a seperate PR? Moreover, the sorting and the Distributed cache support are ready, do you think I should include them? Then we can be sure we have all the programming interfaces in this PR. I can do that tomorrow. What do you think?

@atsikiridis
Copy link
Author

Moreover, if I rebase I can handle the InputFormats more elegantly for the client after #52 (comment)

Of course I can do seperate smaller PRs in the coming week.

@atsikiridis
Copy link
Author

You might also be interested in my weekly report:

https://issues.apache.org/jira/browse/FLINK-838

@rmetzger
Copy link
Contributor

It would be much easier for us, if you could
a) update the license headers to the new ASF headers
b) rename the code from stratosphere to flink (also the packages and files and directories)
c) rebase the code to the latest master

It would also be cool if you could improve the InputFormats.

@atsikiridis
Copy link
Author

This PR is now ASF compliant. The parallelism of the tasks is not fully mapped though. Hopefully I can solve this properly soon... ( @fhueske has given me some input in the tracker issue https://issues.apache.org/jira/browse/FLINK-838)

//setting up the inputFormat for the job
final DataSet input = environment.createInput(getFlinkInputFormat(hadoopJobConf));

final Mapper mapper = InstantiationUtil.instantiate(hadoopJobConf.getMapperClass());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code contains many rawtype warnings.
Please try to get rid of them (e.g., DataSet<?> instead of DataSet) and add a @SuppressWarnings("rawtypes") to the method if that's not possible.

input.setParallelism(mapParallelism);

final FlatMapOperator mapped = input.flatMap(new HadoopMapFunction(hadoopJobConf));
mapped.setParallelism(getMapParallelism(hadoopJobConf));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set it to mapParallelism to avoid the recomputation of the input splits.

@fhueske
Copy link
Contributor

fhueske commented Jul 29, 2014

Hi @atsikiridis,

please add new commits to this PR and do not squash and force push the branch. Otherwise, all previous code comments are gone and not longer available.

The FlinkHadoopJobClient.submitJob() method looks much better and I checked that Configuration values are correctly passed.

There are a few issues that I commented inline.
Also please add JavaDocs to all public methods of user-facing classes and check if you can get rid of some of the rawType warnings (use @SuppressWarnings("rawtype") if not possible).

@atsikiridis
Copy link
Author

Ahh @fhueske squashing was a complete brain stop sorry :(

*/
public final class FlinkHadoopJobClient extends JobClient {

private final static int TASK_SLOTS = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, -1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let's go for now with your suggestion, i.e., use max dop as number of slot if the parameter has not been initialized before. Please also add the warning.

@atsikiridis
Copy link
Author

@rmetzger @fhueske what do you think now, guys ?

* @see http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html
*/
private void writeObject(final ObjectOutputStream out) throws IOException {
HadoopConfiguration.writeHadoopJobConf(jobConf,out);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ensures that each task gets a seperate JobConf object. This is related to https://mail-archives.apache.org/mod_mbox/incubator-flink-dev/201407.mbox/browser

@fhueske
Copy link
Contributor

fhueske commented Aug 7, 2014

Looks good to me.
Note, I only browsed the code and did not run it.

I'll add a comment to JIRA as well. I have a working prototype for full support of custom partitioner, sorter, and groupers (combine is not supported yet, though). However, the solution goes through the full program compilation stack and I'd like to get some feedback before I continue...

@nirvanesque
Copy link

Hello Artem and mentors,

First of all nice greetings from INRIA, France.
Hope you had an enjoyable experience in GSOC!
Thanks to Robert (rmetzger) for forwarding me here ...

At INRIA, we are starting to adopt Stratosphere / Flink.
The top-level goal is to enhance performance in User Defined Functions (UDFs) with long workflows using multiple M-R, by using the larger set of Second Order Functions (SOFs) in Stratosphere / Flink.
We will demonstrate this improvement by implementing some Use Cases for business purposes.
For this purpose, we have chosen some customer analysis Use Cases using weblogs and related data, for 2 companies (who appeared interested to try using Stratosphere / Flink )

  • a mobile phone app developer: http://www.tribeflame.com
  • an anti-virus & Internet security software company: www.f-secure.com
    I will be happy to share with you these Use Cases, if you are interested. Just ask me here.

At present, we are typically in the profiles of Alice-Bob-Sam, as described in Artem's GSoC proposal.
Hadoop seems to be the starting square for our Stratosphere / Flink journey.
Same is the situation with developers in the above 2 companies :-)

Briefly,
We have installed and run some example programmes from Flink / Stratosphere (versions 0.5.2 and 0.6). We use a cluster (the grid5000 for our Hadoop & Stratosphere installations)
We have some good understanding of Hadoop and its use in Streaming and Pipes in conjunction with scripting languages (Python & R specifically)
In the first phase, we would like to run some "Hadoop-like" jobs (mainly multiple M-R workflows) on Stratosphere, preferably with extensive Java or Scala programming.
I refer to your GSoC project map which seems very interesting.
If we could have a Hadoop abstraction as you have mentioned, that would be ideal for our first phase.
In later phases, when we implement complex join and group operations, we would dive deeper into Stratosphere / Flink Java or Scala APIs

Hence, I would like to know, what is the current status in this direction?
What has been implemented already? In which version onwards? How to try them?
What is yet to be implemented? When - which versions?

You may also like to see my discussion with Robert on this page.
I am still mining into different discussions - here as well as on JIRA.
Please do refer me to the relevant links, JIRA tickets, etc if that saves your time in re-typing large replies.
It will help us to catch up fast with the train of collective thinking in the Stratosphere / Flink roadmap, and eventually contribute to the project.

Thanks in advance,
Anirvan
PS : Apologies for using names / rechristened names (e.g. Flink / Stratosphere) as I am not sure, which name to use currently.

@fhueske
Copy link
Contributor

fhueske commented Aug 27, 2014

Hi Anirvan,
thanks a lot for getting in touch.
Robert forwarded your mail to the flink-dev mailing list and Artem and I replied there (see Archive)

I would suggest continue the discussion on the mailing list.
You can sign up here.

Best, Fabian

@fhueske
Copy link
Contributor

fhueske commented Sep 6, 2014

@atsikiridis I think we can close this PR for now.

The support to run complete Hadoop Jobs requires a bit more work. At least the combiner should work.
I am waiting for comments on PR #108 which is required to add custom combiners to the Hadoop Job operation.

Parts of this PR (wrappers for iterators and collectors, dummy reporters, etc.) can be added in a new PR which addresses FLINK-1076.

@atsikiridis
Copy link
Author

@fhueske Ok, I will try to have the pr for FLINK-1076 soon.

@uce
Copy link
Contributor

uce commented Nov 19, 2014

@fhueske What's the state of this PR? Is it supercedes by other changes, which happened in the mean time?

@fhueske
Copy link
Contributor

fhueske commented Nov 19, 2014

I backuped the code.
Most of it went already in as part of the Hadoop function compatibility. I will continue with the full Hadoop job integration sometime later if nobody else picks it up.
I'm OK with closing this PR.

@uce
Copy link
Contributor

uce commented Nov 20, 2014

OK. +1 to close this. I've pinged @atsikiridis.

@atsikiridis
Copy link
Author

@fhueske @uce ok guys. I'd love to continue working on it hopefully very soon. So I'll open a new issue on JIRA or discuss with @fhueske . :)

@fhueske
Copy link
Contributor

fhueske commented Nov 20, 2014

That would be really cool! Let me know if you need any help! :-)

uce pushed a commit to uce/flink that referenced this pull request Aug 26, 2020
tzulitai added a commit to tzulitai/flink that referenced this pull request Jan 15, 2021
tzulitai added a commit to tzulitai/flink that referenced this pull request Jan 15, 2021
End-to-end test based on the {@link RoutableKafkaVerificationModule} application.

This test writes some records to Kafka, with target function id as key
(UTF8 String) and MessageWithAddress messages as value, without the
"from" field set. The routable Kafka ingress should automatically route
them to the correct function instances, which tag the input messages
with their own address, and then forwards it back to Kafka. The test
verifies that the tagged outputs written back to Kafka are correct.

This closes apache#37.
RyanSkraba pushed a commit to RyanSkraba/flink that referenced this pull request Sep 25, 2023
…gner-algorithm-dependency

Exclude pentaho-aggdesigner-algorithm

aiven#37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants