Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

(TWILL-254) Update to use ContainerId.fromString #65

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
51 changes: 47 additions & 4 deletions pom.xml
Expand Up @@ -171,7 +171,6 @@
<junit.version>4.11</junit.version>
<jopt-simple.version>3.2</jopt-simple.version>
<commons-compress.version>1.5</commons-compress.version>
<hadoop.version>[2.0.2-alpha,2.3.0]</hadoop.version>
<hadoop20.output.dir>target/hadoop20-classes</hadoop20.output.dir>
</properties>

Expand Down Expand Up @@ -538,9 +537,6 @@
<properties>
<hadoop.version>2.3.0</hadoop.version>
</properties>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
Expand Down Expand Up @@ -722,6 +718,53 @@
</plugins>
</build>
</profile>
<profile>
<id>hadoop-2.6</id>
<properties>
<hadoop.version>2.6.5</hadoop.version>
</properties>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.8</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/hadoop21</source>
<source>src/main/hadoop22</source>
<source>src/main/hadoop23</source>
<source>src/main/hadoop26</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-source-2.0</id>
<phase>prepare-package</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/hadoop20</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>java8-test</id>
<modules>
Expand Down
Expand Up @@ -26,11 +26,13 @@
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
import org.apache.twill.internal.yarn.ports.AMRMClient;
import org.apache.twill.internal.yarn.ports.AMRMClientImpl;
Expand Down Expand Up @@ -71,6 +73,11 @@ public Hadoop20YarnAMClient(Configuration conf) {
this.nmClient = new Hadoop20YarnNMClient(YarnRPC.create(conf), conf);
}

@Override
protected ContainerId containerIdLookup(String containerIdStr) {
return (ConverterUtils.toContainerId(containerIdStr));
}

@Override
protected void startUp() throws Exception {
Preconditions.checkNotNull(trackerAddr, "Tracker address not set.");
Expand Down
Expand Up @@ -27,11 +27,13 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -57,6 +59,11 @@ public YarnContainerStatus apply(ContainerStatus status) {
};
}

@Override
protected ContainerId containerIdLookup(String containerIdStr) {
return (ConverterUtils.toContainerId(containerIdStr));
}

protected final AMRMClient<AMRMClient.ContainerRequest> amrmClient;
protected final Hadoop21YarnNMClient nmClient;
protected Resource maxCapability;
Expand Down
Expand Up @@ -26,7 +26,7 @@
/**
* Wrapper class for AMRMClient for Hadoop version 2.2 or greater.
*/
public final class Hadoop22YarnAMClient extends Hadoop21YarnAMClient {
public class Hadoop22YarnAMClient extends Hadoop21YarnAMClient {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove final?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If marked final, I can not extend it for Hadoop26YarnAMClient?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, okay then. sorry, I missed that.

private static final Logger LOG = LoggerFactory.getLogger(Hadoop22YarnAMClient.class);

Expand Down
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.twill.internal.yarn;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
* Wrapper class for AMRMClient for Hadoop version 2.6 or greater.
*/
public final class Hadoop26YarnAMClient extends Hadoop22YarnAMClient {

private static final Logger LOG = LoggerFactory.getLogger(Hadoop26YarnAMClient.class);

public Hadoop26YarnAMClient(Configuration conf) {
super(conf);
}

@Override
protected final ContainerId containerIdLookup(String containerIdStr) {
return (ContainerId.fromString(containerIdStr));
}
}
Expand Up @@ -27,7 +27,6 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.twill.internal.ProcessLauncher;
import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
import org.slf4j.Logger;
Expand Down Expand Up @@ -80,7 +79,7 @@ protected AbstractYarnAMClient(String containerIdEnvName) {
String masterContainerId = System.getenv().get(containerIdEnvName);
Preconditions.checkArgument(masterContainerId != null,
"Missing %s from environment", containerIdEnvName);
this.containerId = ConverterUtils.toContainerId(masterContainerId);
this.containerId = containerIdLookup(masterContainerId);
this.inflightRequests = ArrayListMultimap.create();
this.pendingRequests = ArrayListMultimap.create();
this.pendingRemoves = Lists.newLinkedList();
Expand All @@ -89,7 +88,6 @@ protected AbstractYarnAMClient(String containerIdEnvName) {
this.blacklistedResources = Lists.newArrayList();
}


@Override
public final ContainerId getContainerId() {
return containerId;
Expand Down Expand Up @@ -226,6 +224,14 @@ protected boolean recordUnsupportedFeature(String unsupportedFeature) {
return true;
}

/**
* Returns the ContainerId given a container ID string
*
* @param containerIdStr the container ID string to lookup
* @return A {@link ContainerId} instance representing the result.
*/
protected abstract ContainerId containerIdLookup(String containerIdStr);

/**
* Adjusts the given resource capability to fit in the cluster limit.
*
Expand Down
Expand Up @@ -48,11 +48,17 @@ public YarnAMClient create() {
clzName = getClass().getPackage().getName() + ".Hadoop21YarnAMClient";
clz = (Class<YarnAMClient>) Class.forName(clzName);
break;
default:
// Uses hadoop-2.2 or above class
case HADOOP_22:
case HADOOP_23:
// Uses hadoop-2.2 class
clzName = getClass().getPackage().getName() + ".Hadoop22YarnAMClient";
clz = (Class<YarnAMClient>) Class.forName(clzName);
break;
default:
// Uses hadoop-2.6 or above class
clzName = getClass().getPackage().getName() + ".Hadoop26YarnAMClient";
clz = (Class<YarnAMClient>) Class.forName(clzName);
break;
}

return clz.getConstructor(Configuration.class).newInstance(conf);
Expand Down
Expand Up @@ -68,7 +68,8 @@ public enum HadoopVersions {
HADOOP_20,
HADOOP_21,
HADOOP_22,
HADOOP_23
HADOOP_23,
HADOOP_26
}

private static final Logger LOG = LoggerFactory.getLogger(YarnUtils.class);
Expand Down Expand Up @@ -263,7 +264,15 @@ public static HadoopVersions getHadoopVersion() {
Class.forName("org.apache.hadoop.yarn.client.cli.LogsCLI");
try {
Class.forName("org.apache.hadoop.yarn.conf.HAUtil");
HADOOP_VERSION.set(HadoopVersions.HADOOP_23);
try {
Class[] args = new Class[1];
args[0] = String.class;
// see if we have a org.apache.hadoop.yarn.api.records.ContainerId.fromString() method
Class.forName("org.apache.hadoop.yarn.api.records.ContainerId").getMethod("fromString", args);
HADOOP_VERSION.set(HadoopVersions.HADOOP_26);
} catch (NoSuchMethodException e) {
HADOOP_VERSION.set(HadoopVersions.HADOOP_23);
}
} catch (ClassNotFoundException e) {
HADOOP_VERSION.set(HadoopVersions.HADOOP_22);
}
Expand Down