Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-6489] [FLINK-8696] [shell scripts] Remove JobManager local mode from the Shell Scripts #5528

Closed
wants to merge 6 commits into from

Conversation

StephanEwen
Copy link
Contributor

@StephanEwen StephanEwen commented Feb 19, 2018

What is the purpose of the change

The JobManager local mode is problematic:

  • The JobManager does not start all components properly, making some operations fail/timeout unexpectedly
  • It introduces code complexity in maintaining the JobManager entry point that also spawns an embedded TaskManager

The JobManager local mode is also unnecessary. The start-cluster.sh script sets up a proper local cluster without additional configuration.

This pull request removes the local mode from all UNIX scripts and changes the Windows start-local.bat to a start-cluster.bat that also starts two separate processes for JobManager and TaskManager.

Brief change log

  • Changes the start-local.bat to a start-cluster.bat which spawns two processes (using start command). The two processes show up as separate windows and can be stopped separately.
  • Removes the start-local-sh and stop-local.sh scripts.
  • Removes the passing of the local parameter between the shell scripts and always passes cluster as the execution mode parameter to the JobManager.

The next step would be to remove the execution mode from the JobManagers command line argument parsing and setup logic.

Verifying this change

  • Building Flink, starting a cluster via the shell scripts (Linux, MacOS, Windows) and checking the WebUI that the cluster is properly up. No actual job execution needed.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

Docs are updated to not refer to start-local.xy any more.

@zentol
Copy link
Contributor

zentol commented Feb 21, 2018

There's still a reference to start-local.sh in common.sh:

function start_cluster {
  if [[ "$CLUSTER_MODE" == "local" ]]; then
    $FLINK_DIR/bin/start-local.sh
  elif [[ "$CLUSTER_MODE" == "cluster" ]]; then
    $FLINK_DIR/bin/start-cluster.sh
  else
    echo "Unrecognized cluster mode: $CLUSTER_MODE"
    exit
  fi

@zentol
Copy link
Contributor

zentol commented Feb 21, 2018

I've tried using the start-local.bat scripts and found 3 issues:
1)
The following warning is printed in the JobManager and TaskManager windows: (Note that this is the only thing ever written into them)

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/C:/Users/Zento/Documents/GitHub/flink/flink-dist/target/flink-1.5-SNAPSHOT-bin/flink-1.5-SNAPSHOT/lib/flink-shaded-hadoop2-uber-1.5-SNAPSHOT.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release

You cannot use them out-of-the-box as they start the old JobManager/TaskManager components, but the client works against FLIP-6 by default.
3)
After explicitly configuring mode: "old" in flink-conf.yaml the job submissions times out. In the JM/TM logs I found this funky exception:

2018-02-21 10:44:09,363 ERROR akka.remote.Remoting                                          - [B cannot be cast to [C
java.lang.ClassCastException: [B cannot be cast to [C
        at akka.remote.artery.FastHash$.ofString(LruBoundedCache.scala:18)
        at akka.remote.serialization.ActorRefResolveCache.hash(ActorRefResolveCache.scala:61)
        at akka.remote.serialization.ActorRefResolveCache.hash(ActorRefResolveCache.scala:55)
        at akka.remote.artery.LruBoundedCache.getOrCompute(LruBoundedCache.scala:110)
        at akka.remote.RemoteActorRefProvider.resolveActorRef(RemoteActorRefProvider.scala:403)
        at akka.actor.SerializedActorRef.readResolve(ActorRef.scala:433)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at java.base/java.io.ObjectStreamClass.invokeReadResolve(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
        at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
        at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
        at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:328)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:328)
        at akka.serialization.Serialization.akka$serialization$Serialization$$deserializeByteArray(Serialization.scala:156)
        at akka.serialization.Serialization$$anonfun$deserialize$2.apply(Serialization.scala:142)
        at scala.util.Try$.apply(Try.scala:192)
        at akka.serialization.Serialization.deserialize(Serialization.scala:136)
        at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:30)
        at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:64)
        at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:64)
        at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:82)
        at akka.remote.EndpointReader$$anonfun$akka$remote$EndpointReader$$deliverAndAck$1.apply(Endpoint.scala:1047)
        at akka.remote.EndpointReader$$anonfun$akka$remote$EndpointReader$$deliverAndAck$1.apply(Endpoint.scala:1046)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at akka.remote.EndpointReader.akka$remote$EndpointReader$$deliverAndAck(Endpoint.scala:1046)
        at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:980)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

@zentol
Copy link
Contributor

zentol commented Feb 21, 2018

The JM/TM can connect to each other though, and the job submission through the UI is also working. The metrics in the UI appear to be broken though.

@StephanEwen
Copy link
Contributor Author

Hmm, weird, I tried it out under Windows (7) two days ago and it worked quite well.

Let me check into what you found there...

@StephanEwen
Copy link
Contributor Author

Okay, after a quick check with @zentol

  • Some issues were caused by Java 9. Apparently Flink on Windows is incompatible with Java 9
  • We need to change the JobManager and TaskManager to use the flip-6 entry points.
  • The common.sh in flink-end-to-end-tests needs to be updated.

@zentol
Copy link
Contributor

zentol commented Feb 21, 2018

I tried it again with java 8. The job submission through the client now also works for me, and the warning is no longer printed.

…al.bat

Instead, this uses 'start' to start JobManager and TaskManager background processes.
…' instead of 'start-local.sh'

(and likewise for 'start-cluster.bat' vs. 'start-local.bat')
@StephanEwen
Copy link
Contributor Author

Tested it locally again, works (with current flip-6 Web UI at 9065)

Adjusted the test-infra scripts to remove local mode. Tose are only executed on Travis, so waiting until the CI build passes...


echo You can terminate the processes via CTRL-C in the spawned shell windows.

echo Web interface by default on http://localhost:8081/.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be updated to 9065.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to keep it at 8081, because the flip-6 endpoint will switch to that in the next days anyways and they will probably not be aware that this would need to be updated again ;-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok 👍

The host names are added in the UNIX scripts to support flink setups where all nodes put
their log files in a shared NFS folder, and hostnames are needed to avoid name collisions.

The windows setup scripts do not need that kind of functionality.
@StephanEwen
Copy link
Contributor Author

Merged in 74c5570

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants