Skip to content
This repository has been archived by the owner on Jun 7, 2021. It is now read-only.

Updated hadoop version from 2.6.0 to 3.1.0 #607

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,16 @@
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>6.1.2</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
* under the License.*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

undo this formatting change.

package com.datatorrent.stram.client;

import java.io.File;
Expand Down Expand Up @@ -606,16 +605,16 @@ public ApplicationId launchApp(AppFactory appConfig) throws Exception
conf.setEnum(StreamingApplication.ENVIRONMENT, StreamingApplication.Environment.CLUSTER);
LogicalPlan dag = appConfig.createApp(propertiesBuilder);
if (UserGroupInformation.isSecurityEnabled()) {
long hdfsTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_HDFS_TOKEN_MAX_LIFE_TIME, conf.getLong(StramClientUtils.HDFS_TOKEN_MAX_LIFE_TIME, StramClientUtils.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
long hdfsTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_HDFS_TOKEN_MAX_LIFE_TIME, conf.getLong(StramClientUtils.HDFS_TOKEN_MAX_LIFE_TIME, StramClientUtils.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
dag.setAttribute(LogicalPlan.HDFS_TOKEN_LIFE_TIME, hdfsTokenMaxLifeTime);
LOG.debug("HDFS token life time {}", hdfsTokenMaxLifeTime);
long hdfsTokenRenewInterval = conf.getLong(StramClientUtils.DT_HDFS_TOKEN_RENEW_INTERVAL, conf.getLong(StramClientUtils.HDFS_TOKEN_RENEW_INTERVAL, StramClientUtils.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT));
long hdfsTokenRenewInterval = conf.getLong(StramClientUtils.DT_HDFS_TOKEN_RENEW_INTERVAL, conf.getLong(StramClientUtils.HDFS_TOKEN_RENEW_INTERVAL, StramClientUtils.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT));
dag.setAttribute(LogicalPlan.HDFS_TOKEN_RENEWAL_INTERVAL, hdfsTokenRenewInterval);
LOG.debug("HDFS token renew interval {}", hdfsTokenRenewInterval);
long rmTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_RM_TOKEN_MAX_LIFE_TIME, conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY, YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
long rmTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_RM_TOKEN_MAX_LIFE_TIME, conf.getLong(YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY, YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
dag.setAttribute(LogicalPlan.RM_TOKEN_LIFE_TIME, rmTokenMaxLifeTime);
LOG.debug("RM token life time {}", rmTokenMaxLifeTime);
long rmTokenRenewInterval = conf.getLong(StramClientUtils.DT_RM_TOKEN_RENEW_INTERVAL, conf.getLong(YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_KEY, YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT));
long rmTokenRenewInterval = conf.getLong(StramClientUtils.DT_RM_TOKEN_RENEW_INTERVAL, conf.getLong(YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT));
dag.setAttribute(LogicalPlan.RM_TOKEN_RENEWAL_INTERVAL, rmTokenRenewInterval);
LOG.debug("RM token renew interval {}", rmTokenRenewInterval);
setTokenRefreshCredentials(dag, conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ public class StramClientUtils
@Deprecated
public static final String KEY_TAB_FILE = StramUserLogin.DT_AUTH_PREFIX + "store.keytab";
public static final String TOKEN_ANTICIPATORY_REFRESH_FACTOR = StramUserLogin.DT_AUTH_PREFIX + "token.refresh.factor";
public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 7 * 24 * 60 * 60 * 1000;
public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = 24 * 60 * 60 * 1000;
public static final long RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 7 * 24 * 60 * 60 * 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest you submit this change in a separate PR as it is not related to the Hadoop 3.1 change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I seem to remember Intellij complaining about those two lines after I made the changes to the constants used by Hadoop 3.1. Perhaps I misunderstood/misinterpreted the relationship?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not necessary to rename Apex constants.

public static final long RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = 24 * 60 * 60 * 1000;
public static final String TOKEN_REFRESH_PRINCIPAL = StramUserLogin.DT_AUTH_PREFIX + "token.refresh.principal";
public static final String TOKEN_REFRESH_KEYTAB = StramUserLogin.DT_AUTH_PREFIX + "token.refresh.keytab";
/**
Expand Down Expand Up @@ -828,7 +828,12 @@ private static YarnConfiguration getYarnConfiguration(Configuration conf)

public static InetSocketAddress getRMWebAddress(Configuration conf, String rmId)
{
boolean sslEnabled = conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY, CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT);
/*
* This is far from confirmed as the "correct" fix. The previous constants HADOOP_SSL_ENABLED_DEFAULT and HADOOP_SSL_ENABLED_KEY have
* been deprecated as far back as I can find Hadoop versions (2.4.1 at least), HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT and
* HADOOP_CALLER_CONTEXT_ENABLED_KEY are the only two constants in 3.1.0 that seem even remotely related...
* */
boolean sslEnabled = conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY, CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comments on this:

  • instead of having a tentative fix like this why can't we have a permanent fix? See below
  • Do we know what string constants these identifiers CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY and CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY stand for?
  • Same thing for CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT and CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT
  • if they are the same (which I doubt) we haven't introduced any incompatibility as far as external user impact is concerned.
  • if they are different we can just break the compatibility (not desirable) by using these new property names
  • or (desirable) honor the old property value by defining it here and checking for it before checking the new value.\
  • the comment is also repeated elsewhere. Ideally it should only exist in one place.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You make a good point...wil look into the constants more...the weird thing is that in the Javadocs for Hadoop, those two constants have no information about them as far back as I can see...other than being marked as deprecated...will do some more digging. From the APEX side of the house, what are we doing with this function other than the obvious...checking to see if SSL is enabled.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how reliable this page is...but the string/boolean values for the constants seem to make sense...I will hunt down which constants, if any, replaced them...

public static final String HADOOP_SSL_ENABLED_KEY = "hadoop.ssl.enabled"
public static final boolean HADOOP_SSL_ENABLED_DEFAULT = false

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worthwhile asking on the users@ list if anyone is using Apex in SSL mode. If no-one is using SSL we don't need to worry about breaking compatibility. But we should still confirm that HADOOP_CALLER_CONTEXT_ENABLED_KEY is the right property to check to verify SSL is enabled in the user's Hadoop cluster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I think our comments crossed. And what's the value of HADOOP_CALLER_CONTEXT_ENABLED_KEY ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not sufficient to ask on "user@apex" who uses SSL. It is necessary to vote on "dev@apex" prior to dropping SSL support and I will be the one to -1 it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vrozov who is recommending dropping SSL support? I didn't suggest that if that's the impression you got. My question was in connection with creating incompatibility which might be okay if no-one is currently using SSL

return getRMWebAddress(conf, sslEnabled, rmId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ public class LogicalPlan implements Serializable, DAG
public static Attribute<Boolean> FAST_PUBLISHER_SUBSCRIBER = new Attribute<>(false);
public static Attribute<Long> HDFS_TOKEN_RENEWAL_INTERVAL = new Attribute<>(86400000L);
public static Attribute<Long> HDFS_TOKEN_LIFE_TIME = new Attribute<>(604800000L);
public static Attribute<Long> RM_TOKEN_RENEWAL_INTERVAL = new Attribute<>(YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
public static Attribute<Long> RM_TOKEN_LIFE_TIME = new Attribute<>(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
public static Attribute<Long> RM_TOKEN_RENEWAL_INTERVAL = new Attribute<>(YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
public static Attribute<Long> RM_TOKEN_LIFE_TIME = new Attribute<>(YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
public static Attribute<String> PRINCIPAL = new Attribute<>(null, StringCodec.String2String.getInstance());
public static Attribute<String> KEY_TAB_FILE = new Attribute<>((String)null, StringCodec.String2String.getInstance());
public static Attribute<Double> TOKEN_REFRESH_ANTICIPATORY_FACTOR = new Attribute<>(0.7);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,11 @@ public boolean isReadyTurnoverPartFile()
try {
return (syncRequested || (partOutStr.getPos() > bytesPerPartFile) ||
(currentPartFileTimeStamp + millisPerPartFile < System.currentTimeMillis())) && partOutStr.getPos() > 0;
} catch (IOException ex) {
/* Not really sure this is a legitimate fix...getting the following error when compiling:
* exception java.io.IOException is never thrown in body of corresponding try statement
* The remedy for this appears to be to use a more specific exception as the current try/catch block could
* not spit out IOException. For now, just using Exception to try and get this to build, will revisit later. */
} catch (Exception ex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tried removing the catch clause completely and let your IDE insert the catch clause(s) for the exact specific types? Exception is more general than IOException but includes RuntimeException, is that why the compiler likes it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that is a good idea...should have thought of it....I'll give that a shot...as it relates to why Exception made the compiler happy...I suspect because Exception covers pretty much everything, it is a horrible choice, but not, from a compiler's standpoint, incorrect.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove try/catch as getPos() does not throw IOException now.

return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,21 @@ public InetSocketAddress getSocketAddr(String name, String defaultAddress, int d
};

// basic test
conf.setBoolean(CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY, false);

/*
* This is far from confirmed as the "correct" fix. The previous constants HADOOP_SSL_ENABLED_DEFAULT and HADOOP_SSL_ENABLED_KEY have
* been deprecated as far back as I can find Hadoop versions (2.4.1 at least), HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT and
* HADOOP_CALLER_CONTEXT_ENABLED_KEY are the only two constants in 3.1.0 that seem even remotely related...
* */
conf.setBoolean(CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY, false);
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "192.168.1.1:8032");
conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "192.168.1.2:8032");
Assert.assertEquals(getHostString("192.168.1.1") + ":8032", StramClientUtils.getSocketConnectString(StramClientUtils.getRMWebAddress(conf, null)));
List<InetSocketAddress> addresses = StramClientUtils.getRMAddresses(conf);
Assert.assertEquals(1, addresses.size());
Assert.assertEquals(getHostString("192.168.1.1") + ":8032", StramClientUtils.getSocketConnectString(addresses.get(0)));

conf.setBoolean(CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY, true);
conf.setBoolean(CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY, true);
Assert.assertEquals(getHostString("192.168.1.2") + ":8032", StramClientUtils.getSocketConnectString(StramClientUtils.getRMWebAddress(conf, null)));
addresses = StramClientUtils.getRMAddresses(conf);
Assert.assertEquals(1, addresses.size());
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.optimize>false</maven.compiler.optimize>
<maven.build.timestamp.format>yyyyMMdd</maven.build.timestamp.format>
<hadoop.version>2.6.0</hadoop.version>
<hadoop.version>3.1.0</hadoop.version>
<github.global.server>github</github.global.server>
<jackson.version>1.9.13</jackson.version>
<jersey.version>1.9</jersey.version>
Expand Down