Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZEPPELIN-44 Interpreter for Apache Flink #75

Closed
wants to merge 8 commits into from

Conversation

Leemoonsoo
Copy link
Member

Interpreter for Apache Flink.

Flink people helped a lot to write the interpreter. Thanks so much! Some codes are copied from Flink's development branch. Once Flink releases 0.9, copied code and snapshot repository configuration will be removed.

Build

if there're no options, by default it is building against flink 0.9.0-milestone-1.
With combination of Zeppelin, it is good idea to use 0.9-SNAPSHOT, because of it support .collect() that helps really a lot to get results data and display it on Zeppelin.

So, you might want to build in this way,

mvn package -Dflink.version=0.9-SNAPSHOT -DskipTests

Screenshot

image

@jongyoul
Copy link
Member

@Leemoonsoo Wow, Does it work?

}
}

// jarr up
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'jar'?

@Leemoonsoo
Copy link
Member Author

@jongyoul Yes it works!

@rmetzger
Copy link

I'm very excited to see Flink support in Zeppelin 👍

@jongyoul
Copy link
Member

@Leemoonsoo Great!!

@Leemoonsoo
Copy link
Member Author

This PR has some test using FlinkMiniCluster.
Tests are fine when i run them on my IDE.
But if i run them using maven, it fails. (CI is failing, too)

I think everything is ready, except for this test failure. Any advice is very appreciated.
here's failure message

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running org.apache.zeppelin.flink.FlinkInterpreterTest
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/travis/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/travis/build/apache/incubator-zeppelin/zeppelin-interpreter/target/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 106.131 sec <<< FAILURE! - in org.apache.zeppelin.flink.FlinkInterpreterTest
org.apache.zeppelin.flink.FlinkInterpreterTest  Time elapsed: 106.131 sec  <<< ERROR!
java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at akka.remote.Remoting.start(Remoting.scala:180)
    at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
    at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:619)
    at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:616)
    at akka.actor.ActorSystemImpl._start(ActorSystem.scala:616)
    at akka.actor.ActorSystemImpl.start(ActorSystem.scala:633)
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
    at akka.actor.ActorSystem$.create(ActorSystem.scala:67)
    at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:82)
    at org.apache.flink.runtime.minicluster.FlinkMiniCluster.startJobManagerActorSystem(FlinkMiniCluster.scala:105)
    at org.apache.flink.runtime.minicluster.FlinkMiniCluster.<init>(FlinkMiniCluster.scala:61)
    at org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster.<init>(LocalFlinkMiniCluster.scala:44)
    at org.apache.zeppelin.flink.FlinkInterpreter.startFlinkMiniCluster(FlinkInterpreter.java:297)
    at org.apache.zeppelin.flink.FlinkInterpreter.initializeFlinkEnv(FlinkInterpreter.java:152)
    at org.apache.zeppelin.flink.FlinkInterpreter.open(FlinkInterpreter.java:115)
    at org.apache.zeppelin.flink.FlinkInterpreterTest.setUp(FlinkInterpreterTest.java:40)


Results :

Tests in error: 
  FlinkInterpreterTest.setUp:40 » Timeout Futures timed out after [100000 millis...

@fhueske
Copy link

fhueske commented May 21, 2015

I forwarded the error to the Flink dev mailing list. Hopefully somebody from the Flink community can help with this.

@asfbot
Copy link

asfbot commented May 21, 2015

Fabian Hueske on dev@flink.apache.org replies:
Hi Flink folks,

the Flink interpreter PR for Apache Zeppelin is blocked by a failing test
case (see below).
Does anybody have an idea what is going on and can maybe help to resolve
the problem?

Thanks, Fabian

@asfbot
Copy link

asfbot commented May 21, 2015

Till Rohrmann on dev@flink.apache.org replies:
I'll try to reproduce this problem locally on my machine.
r
84
j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
er/target/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.c=
lass]
31
0
cala:53)
)
a:82)
rSystem(FlinkMiniCluster.scala:105)
ster.scala:61)
inkMiniCluster.scala:44)
erpreter.java:297)
reter.java:152)
)
.java:40)
t after
ur
e
se

@asfbot
Copy link

asfbot commented May 21, 2015

Stephan Ewen on dev@flink.apache.org replies:
The actor system fails to start. Here are the suspicious log lines:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/travis/.m2/

repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-
1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/travis/build/
apache/incubator-zeppelin/zeppelin-interpreter/target/
lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

Seems that SLF4J has more than one binding. I remember that Akka crashed
because of that before.

As a simple fix, can you try and exclude the SLF4J jar from your build
somehow? Or set it to "provided" in the Flink POM?
:
st
e
84
ce
j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
er/target/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.c=
lass]
]
31
cala:53)
4)
a:82)
rSystem(FlinkMiniCluster.scala:105)
ster.scala:61)
inkMiniCluster.scala:44)
erpreter.java:297)
reter.java:152)
)
.java:40)
out after

@asfbot
Copy link

asfbot commented May 21, 2015

Till Rohrmann on dev@flink.apache.org replies:
Hmm that is interesting. I always thought that in the case of multiple
SLF4J bindings, slf4j will simply pick one of them (first, last, random).
84
j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
er/target/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.c=
lass]
)
cala:53)
a:82)
rSystem(FlinkMiniCluster.scala:105)
ster.scala:61)
inkMiniCluster.scala:44)
erpreter.java:297)
reter.java:152)
)
.java:40)
d out after
e

@asfbot
Copy link

asfbot commented May 21, 2015

Stephan Ewen on dev@flink.apache.org replies:
True, may still have another issue.
]
d
g
84
j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
er/target/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.c=
lass]
r
t
9)
cala:53)
4)
a:82)
rSystem(FlinkMiniCluster.scala:105)
ster.scala:61)
inkMiniCluster.scala:44)
erpreter.java:297)
reter.java:152)
)
.java:40)
med out
,

@tillrohrmann
Copy link
Contributor

I figured out the problem: It is a wrong protobuf-java version. flink-runtime depends on akka-remote_2.10:2.3.7 which depends on protobuf-java:2.5.0. However, in the Zeppelin parent pom the protobuf-java version is manually set to 2.4.1. There is a comment which declares this as a work-around to make Zeppelin build with Hadoop/YARN 2.2.

As a quick work-around you can set the protobuf-java version to 2.5.0 in the zeppelin-flink pom.xml. I'll also open a JIRA to shade the protobuf and akka dependency away in the flink project. This should solve this issue permanently.

@Leemoonsoo
Copy link
Member Author

Really appreciate for investigating the problem.
I think this is wrong in Zeppelin side. That, Zeppelin used to have dependency configuration for spark in pom.xml not in spark/pom.xml. I think it should be moved into spark/pom.xml to not interfere the other modules.
I'll create issue for it in Zeppelin side.

@rmetzger
Copy link

Okay, so this pull request is not blocked by anything from the Flink side right now?

@Leemoonsoo
Copy link
Member Author

@rmetzger Not blocked by anything from Flink side.
I'm testing the branch with 9 node Flink cluster environment. Works well but looks like need special build command. I'll update build configuration.

@Leemoonsoo Leemoonsoo force-pushed the flink branch 2 times, most recently from 1b8af5d to 460cf46 Compare June 8, 2015 07:12
@Leemoonsoo
Copy link
Member Author

After resolving Zeppelin side issue by #88, Tests are passing.
Ready to merge.

@Leemoonsoo
Copy link
Member Author

Merging it if there is no more discussions :-)

@asfgit asfgit closed this in f0301cd Jun 9, 2015
@hsaputra
Copy link

W000t!

asfgit pushed a commit that referenced this pull request Dec 20, 2017
…ron jobs takes long time or gets stuck

### What is this PR for?
The cron scheduler is easy to get stuck when one of the cron jobs takes long time or gets stuck.

I sometimes come across the issue that the cron scheduler stops working suddenly. According to the thread dump of ZeppelinServer, all of the DefaultQuartzScheduler_Worker threads were waiting for the job's completion and there was no thread to launch a new job.

Here is the contents of the thread dump:

```
"DefaultQuartzScheduler_Worker-10" #76 prio=5 os_prio=0 tid=0x00007fb41d3b4000 nid=0x1b521 sleeping[0x00007fb3daef1000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at org.apache.zeppelin.notebook.Notebook$CronJob.execute(Notebook.java:889)
        at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
        at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
        - locked <0x00000000c0a7dbf0> (a java.lang.Object)

   Locked ownable synchronizers:
        - None

"DefaultQuartzScheduler_Worker-9" #75 prio=5 os_prio=0 tid=0x00007fb41d3b2000 nid=0x1b520 waiting on condition [0x00007fb3daff2000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at org.apache.zeppelin.notebook.Notebook$CronJob.execute(Notebook.java:889)
        at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
        at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
        - locked <0x00000000c0a7a470> (a java.lang.Object)

   Locked ownable synchronizers:
        - None

...

"DefaultQuartzScheduler_Worker-2" #68 prio=5 os_prio=0 tid=0x00007fb41d3c8800 nid=0x1b519 waiting on condition [0x00007fb3da473000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at org.apache.zeppelin.notebook.Notebook$CronJob.execute(Notebook.java:889)
        at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
        at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
        - locked <0x00000000c0a7a7b0> (a java.lang.Object)

   Locked ownable synchronizers:
        - None

"DefaultQuartzScheduler_Worker-1" #67 prio=5 os_prio=0 tid=0x00007fb41d3cc800 nid=0x1b518 waiting on condition [0x00007fb3da372000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at org.apache.zeppelin.notebook.Notebook$CronJob.execute(Notebook.java:889)
        at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
        at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
        - locked <0x00000000c0a7dd90> (a java.lang.Object)

   Locked ownable synchronizers:
        - None
```

The above thread dump says that all of the worker threads get stuck at https://github.com/apache/zeppelin/blob/v0.7.3/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java#L889.

One way to reproduce this kind of issue is creating a paragraph whose status is "READY" and "disable run". That makes the paragraph status "READY" permanently and `note.isTerminated()` never turns to `true`.

To fix this issue, the following two improvements has been made at this PR:

1. Remove the unnecessary `while (!note.isTerminated()) { ... }` block because the execution of all of the paragraphs is finished after `note.runAll()`.
2. Skip the cron execution if there is a running or pending paragraph. That prevents the Zeppelin cron scheduler from getting stuck by the long running paragraph whose execution duration is greater than the cron execution cycle.

### What type of PR is it?
[Bug]

### Todos

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-3077

### How should this be tested?
* Tested manually.
    1. The cron scheduler does not get stuck if there is a paragraph whose status is "READY" and "disable run".
    2. The following message is printed on the log file when the cron job is launched while the previous cron job still has been running.
        * `execution of the cron job is skipped because there is a running or pending paragraph (note id: XXXXXXXXX)`

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No.
* Is there breaking changes for older versions? No.
* Does this needs documentation? Yes. The behavior of the cron job was changed not to run if there is a running or pending paragraph by this PR. Thus, the documentation `docs/usage/other_features/cron_scheduler.md` was also added by this PR. Its layout is as follow:

<img width="711" alt="screen shot 2017-11-28 at 18 30 54" src="https://user-images.githubusercontent.com/31149688/33312407-20664e02-d46b-11e7-9715-9e2562d5e064.png">

Author: Keiji Yoshida <kjmrknsn@gmail.com>

Closes #2687 from kjmrknsn/ZEPPELIN-3077 and squashes the following commits:

81e7218 [Keiji Yoshida] [ZEPPELIN-3077] Cron scheduler is easy to get stuck when one of the cron jobs takes long time or gets stuck
jithinchandranj pushed a commit to jithinchandranj/zeppelin that referenced this pull request Dec 20, 2017
…ron jobs takes long time or gets stuck

### What is this PR for?
The cron scheduler is easy to get stuck when one of the cron jobs takes long time or gets stuck.

I sometimes come across the issue that the cron scheduler stops working suddenly. According to the thread dump of ZeppelinServer, all of the DefaultQuartzScheduler_Worker threads were waiting for the job's completion and there was no thread to launch a new job.

Here is the contents of the thread dump:

```
"DefaultQuartzScheduler_Worker-10" apache#76 prio=5 os_prio=0 tid=0x00007fb41d3b4000 nid=0x1b521 sleeping[0x00007fb3daef1000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at org.apache.zeppelin.notebook.Notebook$CronJob.execute(Notebook.java:889)
        at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
        at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
        - locked <0x00000000c0a7dbf0> (a java.lang.Object)

   Locked ownable synchronizers:
        - None

"DefaultQuartzScheduler_Worker-9" apache#75 prio=5 os_prio=0 tid=0x00007fb41d3b2000 nid=0x1b520 waiting on condition [0x00007fb3daff2000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at org.apache.zeppelin.notebook.Notebook$CronJob.execute(Notebook.java:889)
        at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
        at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
        - locked <0x00000000c0a7a470> (a java.lang.Object)

   Locked ownable synchronizers:
        - None

...

"DefaultQuartzScheduler_Worker-2" apache#68 prio=5 os_prio=0 tid=0x00007fb41d3c8800 nid=0x1b519 waiting on condition [0x00007fb3da473000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at org.apache.zeppelin.notebook.Notebook$CronJob.execute(Notebook.java:889)
        at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
        at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
        - locked <0x00000000c0a7a7b0> (a java.lang.Object)

   Locked ownable synchronizers:
        - None

"DefaultQuartzScheduler_Worker-1" apache#67 prio=5 os_prio=0 tid=0x00007fb41d3cc800 nid=0x1b518 waiting on condition [0x00007fb3da372000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at org.apache.zeppelin.notebook.Notebook$CronJob.execute(Notebook.java:889)
        at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
        at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
        - locked <0x00000000c0a7dd90> (a java.lang.Object)

   Locked ownable synchronizers:
        - None
```

The above thread dump says that all of the worker threads get stuck at https://github.com/apache/zeppelin/blob/v0.7.3/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java#L889.

One way to reproduce this kind of issue is creating a paragraph whose status is "READY" and "disable run". That makes the paragraph status "READY" permanently and `note.isTerminated()` never turns to `true`.

To fix this issue, the following two improvements has been made at this PR:

1. Remove the unnecessary `while (!note.isTerminated()) { ... }` block because the execution of all of the paragraphs is finished after `note.runAll()`.
2. Skip the cron execution if there is a running or pending paragraph. That prevents the Zeppelin cron scheduler from getting stuck by the long running paragraph whose execution duration is greater than the cron execution cycle.

### What type of PR is it?
[Bug]

### Todos

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-3077

### How should this be tested?
* Tested manually.
    1. The cron scheduler does not get stuck if there is a paragraph whose status is "READY" and "disable run".
    2. The following message is printed on the log file when the cron job is launched while the previous cron job still has been running.
        * `execution of the cron job is skipped because there is a running or pending paragraph (note id: XXXXXXXXX)`

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No.
* Is there breaking changes for older versions? No.
* Does this needs documentation? Yes. The behavior of the cron job was changed not to run if there is a running or pending paragraph by this PR. Thus, the documentation `docs/usage/other_features/cron_scheduler.md` was also added by this PR. Its layout is as follow:

<img width="711" alt="screen shot 2017-11-28 at 18 30 54" src="https://user-images.githubusercontent.com/31149688/33312407-20664e02-d46b-11e7-9715-9e2562d5e064.png">

Author: Keiji Yoshida <kjmrknsn@gmail.com>

Closes apache#2687 from kjmrknsn/ZEPPELIN-3077 and squashes the following commits:

81e7218 [Keiji Yoshida] [ZEPPELIN-3077] Cron scheduler is easy to get stuck when one of the cron jobs takes long time or gets stuck
egorklimov pushed a commit to Tinkoff/zeppelin that referenced this pull request Sep 18, 2019
…ack_notifications_fixed_branch to V_1.0.0

* commit '199cb1790f5ff03eee3be91fdeedd5ff08f7d4a5':
  [ZP-252] add slack integration
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants