Skip to content

Commit

Permalink
YARN-3333. Rename TimelineAggregator etc. to TimelineCollector. Contr…
Browse files Browse the repository at this point in the history
…ibuted by Sangjin Lee
  • Loading branch information
JunpingDu authored and sjlee committed Jul 10, 2016
1 parent 9b56364 commit 2188a07
Show file tree
Hide file tree
Showing 65 changed files with 1,105 additions and 990 deletions.
4 changes: 4 additions & 0 deletions hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml
Expand Up @@ -47,6 +47,10 @@
<groupId>commons-logging</groupId> <groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId> <artifactId>commons-logging</artifactId>
</dependency> </dependency>
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</dependency>


<!-- 'mvn dependency:analyze' fails to detect use of this dependency --> <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency> <dependency>
Expand Down
Expand Up @@ -127,7 +127,7 @@ public static AllocateResponse newInstance(int responseId,
response.setAMRMToken(amRMToken); response.setAMRMToken(amRMToken);
return response; return response;
} }

@Public @Public
@Unstable @Unstable
public static AllocateResponse newInstance(int responseId, public static AllocateResponse newInstance(int responseId,
Expand All @@ -137,13 +137,13 @@ public static AllocateResponse newInstance(int responseId,
PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken, PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
List<Container> increasedContainers, List<Container> increasedContainers,
List<Container> decreasedContainers, List<Container> decreasedContainers,
String aggregatorAddr) { String collectorAddr) {
AllocateResponse response = AllocateResponse response =
newInstance(responseId, completedContainers, allocatedContainers, newInstance(responseId, completedContainers, allocatedContainers,
updatedNodes, availResources, command, numClusterNodes, preempt, updatedNodes, availResources, command, numClusterNodes, preempt,
nmTokens, increasedContainers, decreasedContainers); nmTokens, increasedContainers, decreasedContainers);
response.setAMRMToken(amRMToken); response.setAMRMToken(amRMToken);
response.setAggregatorAddr(aggregatorAddr); response.setCollectorAddr(collectorAddr);
return response; return response;
} }


Expand Down Expand Up @@ -349,16 +349,16 @@ public abstract void setDecreasedContainers(
public abstract void setApplicationPriority(Priority priority); public abstract void setApplicationPriority(Priority priority);


/** /**
* The address of aggregator that belong to this app * The address of collector that belong to this app
* *
* @return The address of aggregator that belong to this attempt * @return The address of collector that belong to this attempt
*/ */
@Public @Public
@Unstable @Unstable
public abstract String getAggregatorAddr(); public abstract String getCollectorAddr();

@Private @Private
@Unstable @Unstable
public abstract void setAggregatorAddr(String aggregatorAddr); public abstract void setCollectorAddr(String collectorAddr);


} }
Expand Up @@ -47,7 +47,7 @@ public TimelineWriteResponse() {


/** /**
* Get a list of {@link TimelineWriteError} instances * Get a list of {@link TimelineWriteError} instances
* *
* @return a list of {@link TimelineWriteError} instances * @return a list of {@link TimelineWriteError} instances
*/ */
@XmlElement(name = "errors") @XmlElement(name = "errors")
Expand All @@ -57,7 +57,7 @@ public List<TimelineWriteError> getErrors() {


/** /**
* Add a single {@link TimelineWriteError} instance into the existing list * Add a single {@link TimelineWriteError} instance into the existing list
* *
* @param error * @param error
* a single {@link TimelineWriteError} instance * a single {@link TimelineWriteError} instance
*/ */
Expand All @@ -67,7 +67,7 @@ public void addError(TimelineWriteError error) {


/** /**
* Add a list of {@link TimelineWriteError} instances into the existing list * Add a list of {@link TimelineWriteError} instances into the existing list
* *
* @param errors * @param errors
* a list of {@link TimelineWriteError} instances * a list of {@link TimelineWriteError} instances
*/ */
Expand All @@ -77,7 +77,7 @@ public void addErrors(List<TimelineWriteError> errors) {


/** /**
* Set the list to the given list of {@link TimelineWriteError} instances * Set the list to the given list of {@link TimelineWriteError} instances
* *
* @param errors * @param errors
* a list of {@link TimelineWriteError} instances * a list of {@link TimelineWriteError} instances
*/ */
Expand Down Expand Up @@ -107,7 +107,7 @@ public static class TimelineWriteError {


/** /**
* Get the entity Id * Get the entity Id
* *
* @return the entity Id * @return the entity Id
*/ */
@XmlElement(name = "entity") @XmlElement(name = "entity")
Expand All @@ -117,7 +117,7 @@ public String getEntityId() {


/** /**
* Set the entity Id * Set the entity Id
* *
* @param entityId * @param entityId
* the entity Id * the entity Id
*/ */
Expand All @@ -127,7 +127,7 @@ public void setEntityId(String entityId) {


/** /**
* Get the entity type * Get the entity type
* *
* @return the entity type * @return the entity type
*/ */
@XmlElement(name = "entitytype") @XmlElement(name = "entitytype")
Expand All @@ -137,7 +137,7 @@ public String getEntityType() {


/** /**
* Set the entity type * Set the entity type
* *
* @param entityType * @param entityType
* the entity type * the entity type
*/ */
Expand All @@ -147,7 +147,7 @@ public void setEntityType(String entityType) {


/** /**
* Get the error code * Get the error code
* *
* @return an error code * @return an error code
*/ */
@XmlElement(name = "errorcode") @XmlElement(name = "errorcode")
Expand All @@ -157,7 +157,7 @@ public int getErrorCode() {


/** /**
* Set the error code to the given error code * Set the error code to the given error code
* *
* @param errorCode * @param errorCode
* an error code * an error code
*/ */
Expand Down
Expand Up @@ -814,10 +814,10 @@ public static boolean isAclEnabled(Configuration conf) {
public static final int DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT = 20; public static final int DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT = 20;


/** Number of threads container manager uses.*/ /** Number of threads container manager uses.*/
public static final String NM_AGGREGATOR_SERVICE_THREAD_COUNT = public static final String NM_COLLECTOR_SERVICE_THREAD_COUNT =
NM_PREFIX + "aggregator-service.thread-count"; NM_PREFIX + "collector-service.thread-count";
public static final int DEFAULT_NM_AGGREGATOR_SERVICE_THREAD_COUNT = 5; public static final int DEFAULT_NM_COLLECTOR_SERVICE_THREAD_COUNT = 5;

/** Number of threads used in cleanup.*/ /** Number of threads used in cleanup.*/
public static final String NM_DELETE_THREAD_COUNT = public static final String NM_DELETE_THREAD_COUNT =
NM_PREFIX + "delete.thread-count"; NM_PREFIX + "delete.thread-count";
Expand Down Expand Up @@ -845,13 +845,13 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String DEFAULT_NM_LOCALIZER_ADDRESS = "0.0.0.0:" + public static final String DEFAULT_NM_LOCALIZER_ADDRESS = "0.0.0.0:" +
DEFAULT_NM_LOCALIZER_PORT; DEFAULT_NM_LOCALIZER_PORT;


/** Address where the aggregator service IPC is.*/ /** Address where the collector service IPC is.*/
public static final String NM_AGGREGATOR_SERVICE_ADDRESS = public static final String NM_COLLECTOR_SERVICE_ADDRESS =
NM_PREFIX + "aggregator-service.address"; NM_PREFIX + "collector-service.address";
public static final int DEFAULT_NM_AGGREGATOR_SERVICE_PORT = 8048; public static final int DEFAULT_NM_COLLECTOR_SERVICE_PORT = 8048;
public static final String DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS = public static final String DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS =
"0.0.0.0:" + DEFAULT_NM_LOCALIZER_PORT; "0.0.0.0:" + DEFAULT_NM_LOCALIZER_PORT;

/** Interval in between cache cleanups.*/ /** Interval in between cache cleanups.*/
public static final String NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS = public static final String NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS =
NM_PREFIX + "localizer.cache.cleanup.interval-ms"; NM_PREFIX + "localizer.cache.cleanup.interval-ms";
Expand Down
Expand Up @@ -89,7 +89,7 @@ message AllocateResponseProto {
repeated ContainerProto decreased_containers = 11; repeated ContainerProto decreased_containers = 11;
optional hadoop.common.TokenProto am_rm_token = 12; optional hadoop.common.TokenProto am_rm_token = 12;
optional PriorityProto application_priority = 13; optional PriorityProto application_priority = 13;
optional string aggregator_addr = 14; optional string collector_addr = 14;
} }


enum SchedulerResourceTypes { enum SchedulerResourceTypes {
Expand Down
Expand Up @@ -70,6 +70,15 @@
<groupId>commons-io</groupId> <groupId>commons-io</groupId>
<artifactId>commons-io</artifactId> <artifactId>commons-io</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice</artifactId>
<scope>test</scope>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency --> <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
Expand Down
Expand Up @@ -40,10 +40,10 @@
import java.util.Vector; import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;


import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.GnuParser;
Expand Down Expand Up @@ -222,12 +222,12 @@ public static enum DSEntity {
private int appMasterRpcPort = -1; private int appMasterRpcPort = -1;
// Tracking url to which app master publishes info for clients to monitor // Tracking url to which app master publishes info for clients to monitor
private String appMasterTrackingUrl = ""; private String appMasterTrackingUrl = "";

private boolean newTimelineService = false; private boolean newTimelineService = false;

// For posting entities in new timeline service in a non-blocking way // For posting entities in new timeline service in a non-blocking way
// TODO replace with event loop in TimelineClient. // TODO replace with event loop in TimelineClient.
private static ExecutorService threadPool = private static ExecutorService threadPool =
Executors.newCachedThreadPool( Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
.build()); .build());
Expand Down Expand Up @@ -331,9 +331,9 @@ public static void main(String[] args) {
} }
appMaster.run(); appMaster.run();
result = appMaster.finish(); result = appMaster.finish();

threadPool.shutdown(); threadPool.shutdown();

while (!threadPool.isTerminated()) { // wait for all posting thread to finish while (!threadPool.isTerminated()) { // wait for all posting thread to finish
try { try {
if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) { if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) {
Expand Down Expand Up @@ -427,7 +427,7 @@ public boolean init(String[] args) throws ParseException, IOException {
opts.addOption("container_retry_interval", true, opts.addOption("container_retry_interval", true,
"Interval between each retry, unit is milliseconds"); "Interval between each retry, unit is milliseconds");
opts.addOption("debug", false, "Dump out debug information"); opts.addOption("debug", false, "Dump out debug information");
opts.addOption("timeline_service_version", true, opts.addOption("timeline_service_version", true,
"Version for timeline service"); "Version for timeline service");
opts.addOption("help", false, "Print usage"); opts.addOption("help", false, "Print usage");
CommandLine cliParser = new GnuParser().parse(opts, args); CommandLine cliParser = new GnuParser().parse(opts, args);
Expand Down Expand Up @@ -583,7 +583,7 @@ public boolean init(String[] args) throws ParseException, IOException {
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
if (cliParser.hasOption("timeline_service_version")) { if (cliParser.hasOption("timeline_service_version")) {
String timelineServiceVersion = String timelineServiceVersion =
cliParser.getOptionValue("timeline_service_version", "v1"); cliParser.getOptionValue("timeline_service_version", "v1");
if (timelineServiceVersion.trim().equalsIgnoreCase("v1")) { if (timelineServiceVersion.trim().equalsIgnoreCase("v1")) {
newTimelineService = false; newTimelineService = false;
Expand Down Expand Up @@ -655,7 +655,7 @@ public void run() throws YarnException, IOException, InterruptedException {
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.init(conf); amRMClient.init(conf);
amRMClient.start(); amRMClient.start();

containerListener = createNMCallbackHandler(); containerListener = createNMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener); nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.init(conf); nmClientAsync.init(conf);
Expand Down Expand Up @@ -840,7 +840,7 @@ protected boolean finish() {
if(timelineClient != null) { if(timelineClient != null) {
timelineClient.stop(); timelineClient.stop();
} }

return success; return success;
} }


Expand Down Expand Up @@ -1392,11 +1392,11 @@ Thread createLaunchContainerThread(Container allocatedContainer,
} }


private static void publishContainerStartEventOnNewTimelineService( private static void publishContainerStartEventOnNewTimelineService(
final TimelineClient timelineClient, final Container container, final TimelineClient timelineClient, final Container container,
final String domainId, final UserGroupInformation ugi) { final String domainId, final UserGroupInformation ugi) {
Runnable publishWrapper = new Runnable() { Runnable publishWrapper = new Runnable() {
public void run() { public void run() {
publishContainerStartEventOnNewTimelineServiceBase(timelineClient, publishContainerStartEventOnNewTimelineServiceBase(timelineClient,
container, domainId, ugi); container, domainId, ugi);
} }
}; };
Expand All @@ -1406,14 +1406,14 @@ public void run() {
private static void publishContainerStartEventOnNewTimelineServiceBase( private static void publishContainerStartEventOnNewTimelineServiceBase(
final TimelineClient timelineClient, Container container, String domainId, final TimelineClient timelineClient, Container container, String domainId,
UserGroupInformation ugi) { UserGroupInformation ugi) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
entity.setId(container.getId().toString()); entity.setId(container.getId().toString());
entity.setType(DSEntity.DS_CONTAINER.toString()); entity.setType(DSEntity.DS_CONTAINER.toString());
//entity.setDomainId(domainId); //entity.setDomainId(domainId);
entity.addInfo("user", ugi.getShortUserName()); entity.addInfo("user", ugi.getShortUserName());

org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
event.setTimestamp(System.currentTimeMillis()); event.setTimestamp(System.currentTimeMillis());
event.setId(DSEvent.DS_CONTAINER_START.toString()); event.setId(DSEvent.DS_CONTAINER_START.toString());
Expand All @@ -1435,29 +1435,29 @@ public TimelinePutResponse run() throws Exception {
e instanceof UndeclaredThrowableException ? e.getCause() : e); e instanceof UndeclaredThrowableException ? e.getCause() : e);
} }
} }

private static void publishContainerEndEventOnNewTimelineService( private static void publishContainerEndEventOnNewTimelineService(
final TimelineClient timelineClient, final ContainerStatus container, final TimelineClient timelineClient, final ContainerStatus container,
final String domainId, final UserGroupInformation ugi) { final String domainId, final UserGroupInformation ugi) {
Runnable publishWrapper = new Runnable() { Runnable publishWrapper = new Runnable() {
public void run() { public void run() {
publishContainerEndEventOnNewTimelineServiceBase(timelineClient, publishContainerEndEventOnNewTimelineServiceBase(timelineClient,
container, domainId, ugi); container, domainId, ugi);
} }
}; };
threadPool.execute(publishWrapper); threadPool.execute(publishWrapper);
} }

private static void publishContainerEndEventOnNewTimelineServiceBase( private static void publishContainerEndEventOnNewTimelineServiceBase(
final TimelineClient timelineClient, final ContainerStatus container, final TimelineClient timelineClient, final ContainerStatus container,
final String domainId, final UserGroupInformation ugi) { final String domainId, final UserGroupInformation ugi) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
entity.setId(container.getContainerId().toString()); entity.setId(container.getContainerId().toString());
entity.setType(DSEntity.DS_CONTAINER.toString()); entity.setType(DSEntity.DS_CONTAINER.toString());
//entity.setDomainId(domainId); //entity.setDomainId(domainId);
entity.addInfo("user", ugi.getShortUserName()); entity.addInfo("user", ugi.getShortUserName());
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
event.setTimestamp(System.currentTimeMillis()); event.setTimestamp(System.currentTimeMillis());
event.setId(DSEvent.DS_CONTAINER_END.toString()); event.setId(DSEvent.DS_CONTAINER_END.toString());
Expand All @@ -1482,28 +1482,28 @@ public TimelinePutResponse run() throws Exception {


private static void publishApplicationAttemptEventOnNewTimelineService( private static void publishApplicationAttemptEventOnNewTimelineService(
final TimelineClient timelineClient, final String appAttemptId, final TimelineClient timelineClient, final String appAttemptId,
final DSEvent appEvent, final String domainId, final DSEvent appEvent, final String domainId,
final UserGroupInformation ugi) { final UserGroupInformation ugi) {

Runnable publishWrapper = new Runnable() { Runnable publishWrapper = new Runnable() {
public void run() { public void run() {
publishApplicationAttemptEventOnNewTimelineServiceBase(timelineClient, publishApplicationAttemptEventOnNewTimelineServiceBase(timelineClient,
appAttemptId, appEvent, domainId, ugi); appAttemptId, appEvent, domainId, ugi);
} }
}; };
threadPool.execute(publishWrapper); threadPool.execute(publishWrapper);
} }

private static void publishApplicationAttemptEventOnNewTimelineServiceBase( private static void publishApplicationAttemptEventOnNewTimelineServiceBase(
final TimelineClient timelineClient, String appAttemptId, final TimelineClient timelineClient, String appAttemptId,
DSEvent appEvent, String domainId, UserGroupInformation ugi) { DSEvent appEvent, String domainId, UserGroupInformation ugi) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
entity.setId(appAttemptId); entity.setId(appAttemptId);
entity.setType(DSEntity.DS_APP_ATTEMPT.toString()); entity.setType(DSEntity.DS_APP_ATTEMPT.toString());
//entity.setDomainId(domainId); //entity.setDomainId(domainId);
entity.addInfo("user", ugi.getShortUserName()); entity.addInfo("user", ugi.getShortUserName());
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
event.setId(appEvent.toString()); event.setId(appEvent.toString());
event.setTimestamp(System.currentTimeMillis()); event.setTimestamp(System.currentTimeMillis());
Expand Down

0 comments on commit 2188a07

Please sign in to comment.