-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-2268] Allow Flink binary release without Hadoop #4636
Conversation
"org.apache.hadoop.util.VersionInfo", | ||
false, | ||
EnvironmentInformation.class.getClassLoader()); | ||
log.info(" Hadoop version: " + VersionInfo.getVersion()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you intend to directly call VersionInfo
, or should we maybe do this with reflection instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that is intended because I didn't want to fiddle with the reflection API. Ideally, I would like to do this:
try {
log.info(" Hadoop version: " + VersionInfo.getVersion());
} catch (ClassNotFoundException e) {
// ignore
}
but java won't let you do this. With the explicit Class.forName()
it will let me put the catch block.
@@ -320,14 +315,6 @@ private[flink] trait TypeInformationGen[C <: Context] { | |||
} | |||
} | |||
|
|||
def mkWritableTypeInfo[T <: Writable : c.WeakTypeTag]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What exactly does this removal mean for supporting Writable
? Does hadoop-compat take care of that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, this means that users now have to manually specify a TypeInformation
that they can get from TypeExtractor.createHadoopWritableTypeInfo(MyWritable.class)
.
I'm not sure how often people are using Hadoop Writables in their Scala code but this is definitely something that will break.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scratch that, this actually still works and I added a test for that in the Hadoop compat package.
flink-tests/pom.xml
Outdated
@@ -54,6 +54,12 @@ under the License. | |||
|
|||
<dependency> | |||
<groupId>org.apache.flink</groupId> | |||
<artifactId>flink-shaded-hadoop2</artifactId> | |||
<version>${project.version}</version> | |||
</dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add test scope?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add test scope and see if everything still runs.
@@ -52,6 +51,13 @@ | |||
|
|||
private static final AppConfigurationEntry userKerberosAce; | |||
|
|||
/* Return the Kerberos login module name */ | |||
public static String getKrb5LoginModuleName() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this was copied from hadoop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this had a dependency on KerberosUtil
form Hadoop just for this method. Now we can have Kerberos independent of Hadoop.
} else { | ||
sc = new SecurityUtils.SecurityConfiguration(configuration); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixing
// Try to load HDFS configuration from Hadoop's own configuration files | ||
// 1. approach: Flink configuration | ||
final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants | ||
.HDFS_DEFAULT_CONFIG, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a rather odd line-break
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed, I'm fixing
if (hdfsDefaultPath != null) { | ||
retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath)); | ||
} else { | ||
LOG.debug("Cannot find hdfs-default configuration file"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should say that they could not be loaded from the flink configuration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, I just copied these from another HadoopUtils
. I'm fixing.
for (String possibleHadoopConfPath : possibleHadoopConfPaths) { | ||
if (possibleHadoopConfPath != null) { | ||
if (new File(possibleHadoopConfPath).exists()) { | ||
if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's track whether any of these succeeded and log something otherwise (mirroring the flink configuration approach).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
|
||
final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null); | ||
if (hdfsSitePath != null) { | ||
retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add debug statement to mirror environment variables approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
} | ||
} | ||
} | ||
return retConf; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should log something (WARN maybe) if we couldn't find anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Overall this looks great. I looked for any signs of behavior changes, especially in the security code since there's some subtlety there, didn't notice any. Please consider moving Are you sure that you can check for a class ( |
In certain environments today, the Hadoop dependencies are actually duplicated on the classpath, i.e. in the Flink dist jar and also via the local installed Hadoop (when something like |
Thanks for reviewing @EronWright and @zentol. I pushed some more commits that address your comments. I did check the approach of first using @EronWright Yes, your hunch is correct and I did check this on GCE (dataproc)and AWS(EMR). This is actually quite nice because you can now build a Hadoop-free Flink and only use the Hadoop dependencies provided by your distro. |
} | ||
} | ||
|
||
LOG.debug("Could not find Hadoop configuration via any of the supported methods " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing the check whether we actually didn't find anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, that was stupid... Fixing.
6fbba4b
to
564b645
Compare
I pushed a rebased version of this. |
914890b
to
223e045
Compare
This removes all Hadoop-related methods from ExecutionEnvironment (there are already equivalent methods in flink-hadoop-compatibility (see HadoopUtils and HadoopInputs, etc.). This also removes Hadoop-specific tests from flink-tests because these are duplicated by tests in flink-hadoop-compatibility. This also removes Hadoop-specic example code from flink-examples: the DistCp example and related code.
There are methods for this in flink-hadoop-compatibility.
commons-io is only usable as a transitive dependency of the Hadoop dependencies. We can just use the Java ByteArrayOutputStream and get rid of that dependency.
This was in there because of legacy reasons but is not required by the test.
This was only used for the Enum for a specific http response type. The jets3t dependency is only available as a transitive dependency of the Hadoop dependencies, that's why we remove it.
This removes the dependency on Hadoop and ensures that we only close if Hadoop is available.
This also makes them optional in flink-runtime, which is enabled by the previous changes to only use Hadoop dependencies if they are available. This also requires adding a few explicit dependencies in other modules because they were using transitive dependencies of the Hadoop deps. The most common dependency there is, ha!, commons-io.
223e045
to
2eaf92b
Compare
This is a series of PRs that allows running a Flink without any Hadoop dependencies in the lib folder. Each PR stands on its own but all of them are necessary for the last commit to work. The commit's themselves clearly document what is changed.
R: @zentol