Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-7973] fix shading and relocating Hadoop for the S3 filesystems #4961

Closed
wants to merge 14 commits into from

Conversation

NicoK
Copy link
Contributor

@NicoK NicoK commented Nov 6, 2017

What is the purpose of the change

The current shading of the flink-s3-fs-hadoop and flink-s3-fs-presto projects also relocates Flink core classes and even some from the JDK itself. Additionally, the relocation of Hadoop does not work as expected since Hadoop loads classes based on class names in its core-default.xml which are unshaded and thus use the original namespace.

Brief change log

  • adapt the pom.xml of both flink-s3-fs-hadoop and flink-s3-fs-presto:
    • do not shade everything and instead define include patterns explicitly
    • only shade and relocate Flink classes imported from flink-hadoop-fs
  • hack around Hadoop loading (unshaded/non-relocated) classes based on names in the core-default.xml by overwriting the Configuration class (we may need to also extend this for the mapred-default.xml and hdfs-defaults.xml and their respective configuration classes in the future):
    • provide a core-default-shaded.xml file with shaded class names and
    • copy and adapt the Configuration class of the respective Hadoop version to load this file instead of core-default.xml

(Thanks @zentol and @StephanEwen for helping to find and fix this.)

Verifying this change

This change can (and was) manually tested as follows:

  • verify the shaded jar file does not contain non-relocated classes
  • verify the changed Configuration classes reside in the shaded namespace where the original Hadoop Configuration classes would go into, e.g. org.apache.flink.fs.s3hadoop.shaded.org.hadoop.conf (look for core-default-shaded.xml string in the Configuration.class file)
  • verify the META-INF/services files are still correct (name + content)

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (yes)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

- do not shade everything, especially not JDK classes!
-> instead define include patterns explicitly
- do not shade core Flink classes (only those imported from flink-hadoop-fs)
- hack around Hadoop loading (unshaded/non-relocated) classes based on names in
  the core-default.xml by overwriting the Configuration class (we may need to
  extend this for the mapred-default.xml and hdfs-defaults.xml):
-> provide a core-default-shaded.xml file with shaded class names and copy and
  adapt the Configuration class of the respective Hadoop version to load this
  file instead of core-default.xml.
@NicoK NicoK changed the title [FLINK-7973] fix shading and relocating Hhadoop for the S3 filesystems [FLINK-7973] fix shading and relocating Hadoop for the S3 filesystems Nov 6, 2017
Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to add a README.md to both modules explaining the reasoning for the Configuration class, how to properly update the hadoop dependency, including the Configuration shading, along with a checklist of what to verify when modifying anything.

@@ -33,6 +33,7 @@ under the License.
<packaging>jar</packaging>

<properties>
<!-- Do not change this without updating the copied Configuration class! -->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a similar comment to the presto hadoop dependency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done (also the other suggestions)

@@ -277,6 +336,7 @@ under the License.
<exclude>META-INF/maven/org.apache.commons/**</exclude>
<exclude>META-INF/maven/org.apache.flink/flink-hadoop-fs/**</exclude>
<exclude>META-INF/maven/org.apache.flink/force-shading/**</exclude>
<exclude>core-default.xml</exclude>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment that we're using our own shaded core-default.xml

@@ -284,6 +322,7 @@ under the License.
<exclude>META-INF/maven/org.apache.h*/**</exclude>
<exclude>META-INF/maven/org.apache.flink/flink-hadoop-fs/**</exclude>
<exclude>META-INF/maven/org.apache.flink/force-shading/**</exclude>
<exclude>core-default.xml</exclude>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as in the other pom

- also fix the (integration) tests not working because they tried to load the
relocated classes which are apparently not available there
Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

- `src/test/resources/core-site.xml` (as is)
3. verify the shaded jar:
- does not contain any unshaded classes except for `org.apache.flink.fs.s3presto.S3FileSystemFactory`
- every other classes should be under `org.apache.flink.fs.s3presto.shaded`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"every other class" or "all other classes"

- `src/test/resources/core-site.xml` (as is)
3. verify the shaded jar:
- does not contain any unshaded classes except for `org.apache.flink.fs.s3hadoop.S3FileSystemFactory`
- every other classes should be under `org.apache.flink.fs.s3hadoop.shaded`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"every other class" or "all other classes"

setup. For this to work, however, we needed to adapt Hadoop's `Configuration`
class to load a (shaded) `core-default-shaded.xml` configuration with the
relocated class names of classes loaded via reflection
(in the fute, we may need to extend this to `mapred-default.xml` and `hdfs-defaults.xml` and their respective configuration classes).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "fute"

- `src/test/resources/core-site.xml` (as is)
3. verify the shaded jar:
- does not contain any unshaded classes except for `org.apache.flink.fs.s3hadoop.S3FileSystemFactory`
- every other classes should be under `org.apache.flink.fs.s3hadoop.shaded`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "classes"

setup. For this to work, however, we needed to adapt Hadoop's `Configuration`
class to load a (shaded) `core-default-shaded.xml` configuration with the
relocated class names of classes loaded via reflection
(in the fute, we may need to extend this to `mapred-default.xml` and `hdfs-defaults.xml` and their respective configuration classes).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "fute"

- `src/test/resources/core-site.xml` (as is)
3. verify the shaded jar:
- does not contain any unshaded classes except for `org.apache.flink.fs.s3presto.S3FileSystemFactory`
- every other classes should be under `org.apache.flink.fs.s3presto.shaded`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "classes"

@aljoscha
Copy link
Contributor

aljoscha commented Nov 8, 2017

I think this looks good now that the comments are addressed. @zentol what do you think?

@NicoK
Copy link
Contributor Author

NicoK commented Nov 8, 2017

looks like I created a new word: fute ;) - fixed the typos both of you found

@aljoscha
Copy link
Contributor

aljoscha commented Nov 8, 2017

I added two end-to-end tests and it seems for presto this currently fails with:

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job ae34d825b54b7afe2e973627d396e722 (WordCount Example)
	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
	at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:444)
	at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
	at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:86)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)
	at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
	at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
	at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
	at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
	at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
	at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job ae34d825b54b7afe2e973627d396e722 (WordCount Example)
	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1325)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:447)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: null
	at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:262)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:801)
	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:180)
	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1277)
	... 19 more
Caused by: java.lang.ExceptionInInitializerError
	at org.apache.flink.fs.s3presto.shaded.com.amazonaws.ClientConfiguration.<clinit>(ClientConfiguration.java:65)
	at org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:203)
	at org.apache.flink.fs.s3presto.S3FileSystemFactory.create(S3FileSystemFactory.java:133)
	at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:377)
	at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:307)
	at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
	at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:472)
	at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62)
	at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:248)
	... 22 more
Caused by: org.apache.flink.fs.s3presto.shaded.org.apache.commons.logging.LogConfigurationException: java.lang.ClassNotFoundException: org.apache.flink.fs.s3presto.shaded.org.apache.commons.logging.impl.LogFactoryImpl (Caused by java.lang.ClassNotFoundException: org.apache.flink.fs.s3presto.shaded.org.apache.commons.logging.impl.LogFactoryImpl)
	at org.apache.flink.fs.s3presto.shaded.org.apache.commons.logging.LogFactory.createFactory(LogFactory.java:1201)
	at org.apache.flink.fs.s3presto.shaded.org.apache.commons.logging.LogFactory$2.run(LogFactory.java:1003)
	at java.security.AccessController.doPrivileged(Native Method)
	at org.apache.flink.fs.s3presto.shaded.org.apache.commons.logging.LogFactory.newFactory(LogFactory.java:1000)
	at org.apache.flink.fs.s3presto.shaded.org.apache.commons.logging.LogFactory.getFactory(LogFactory.java:626)
	at org.apache.flink.fs.s3presto.shaded.org.apache.commons.logging.LogFactory.getLog(LogFactory.java:657)
	at org.apache.flink.fs.s3presto.shaded.com.amazonaws.util.VersionInfoUtils.<clinit>(VersionInfoUtils.java:47)
	... 31 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.fs.s3presto.shaded.org.apache.commons.logging.impl.LogFactoryImpl
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at org.apache.flink.fs.s3presto.shaded.org.apache.commons.logging.LogFactory.createFactory(LogFactory.java:1063)
	... 37 more

This is my branch with the end-to-end tests: https://github.com/aljoscha/flink/tree/finish-pr-4961-s3-shading-fixing

This was causing
"java.lang.ClassNotFoundException: org.apache.flink.fs.s3presto.shaded.org.apache.commons.logging.impl.LogFactoryImpl"
since these classes are not statically imported and thus removed when minimizing.
@NicoK
Copy link
Contributor Author

NicoK commented Nov 8, 2017

unfortunately, it seems we cannot minimize the shaded jar or it will not contain non-imported/dynamically-loaded classes like this one

@NicoK
Copy link
Contributor Author

NicoK commented Nov 10, 2017

I did include your end-to-end tests (with some fixes) and the fixes for the errors they found. Should be fine now, let's see what travis says...

Also improve some search patterns in general.
@aljoscha
Copy link
Contributor

This look even better now! Waiting for Travis and then I'll merge.

Thanks! 😃

@aljoscha
Copy link
Contributor

Could you please close the PR if it doesn't auto-close?

@NicoK
Copy link
Contributor Author

NicoK commented Nov 13, 2017

Fixed on master in
0e5fb0b
e9e7c33

Fixed on release-1.4 in
25a28ab
9f68212

@NicoK NicoK closed this Nov 13, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants