Skip to content

Commit

Permalink
YARN-3367. Replace starting a separate thread for post entity with ev…
Browse files Browse the repository at this point in the history
…ent loop in TimelineClient (Naganarasimha G R via sjlee)
  • Loading branch information
sjlee committed Jul 10, 2016
1 parent 960af7d commit 0d02ab8
Show file tree
Hide file tree
Showing 12 changed files with 619 additions and 217 deletions.
Expand Up @@ -28,10 +28,7 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -85,7 +82,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientHandlerException;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* The job history events get routed to this class. This class writes the Job
* history events to the DFS directly into a staging dir and then moved to a
Expand Down Expand Up @@ -141,10 +137,6 @@ public class JobHistoryEventHandler extends AbstractService

private boolean timelineServiceV2Enabled = false;

// For posting entities in new timeline service in a non-blocking way
// TODO YARN-3367 replace with event loop in TimelineClient.
private ExecutorService threadPool;

private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
private static final String MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE =
Expand Down Expand Up @@ -284,10 +276,6 @@ protected void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.timelineServiceV2Enabled(conf);
LOG.info("Timeline service is enabled; version: " +
YarnConfiguration.getTimelineServiceVersion(conf));
if (timelineServiceV2Enabled) {
// initialize the thread pool for v.2 timeline service
threadPool = createThreadPool();
}
} else {
LOG.info("Timeline service is not enabled");
}
Expand Down Expand Up @@ -461,35 +449,9 @@ protected void serviceStop() throws Exception {
if (timelineClient != null) {
timelineClient.stop();
}
if (threadPool != null) {
shutdownAndAwaitTermination();
}
LOG.info("Stopped JobHistoryEventHandler. super.stop()");
super.serviceStop();
}

// TODO remove threadPool after adding non-blocking call in TimelineClient
private ExecutorService createThreadPool() {
return Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
.build());
}

private void shutdownAndAwaitTermination() {
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
LOG.error("ThreadPool did not terminate");
}
}
} catch (InterruptedException ie) {
threadPool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}

protected EventWriter createEventWriter(Path historyFilePath)
throws IOException {
Expand Down Expand Up @@ -1097,41 +1059,6 @@ private void processEventForTimelineServer(HistoryEvent event, JobId jobId,
+ "Server", ex);
}
}

@Private
public JsonNode countersToJSON(Counters counters) {
ArrayNode nodes = FACTORY.arrayNode();
if (counters != null) {
for (CounterGroup counterGroup : counters) {
ObjectNode groupNode = nodes.addObject();
groupNode.put("NAME", counterGroup.getName());
groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName());
ArrayNode countersNode = groupNode.putArray("COUNTERS");
for (Counter counter : counterGroup) {
ObjectNode counterNode = countersNode.addObject();
counterNode.put("NAME", counter.getName());
counterNode.put("DISPLAY_NAME", counter.getDisplayName());
counterNode.put("VALUE", counter.getValue());
}
}
}
return nodes;
}

private void putEntityWithoutBlocking(final TimelineClient client,
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity) {
Runnable publishWrapper = new Runnable() {
public void run() {
try {
client.putEntities(entity);
} catch (IOException|YarnException e) {
LOG.error("putEntityNonBlocking get failed: " + e);
throw new RuntimeException(e.toString());
}
}
};
threadPool.execute(publishWrapper);
}

// create JobEntity from HistoryEvent with adding other info, like:
// jobId, timestamp and entityType.
Expand Down Expand Up @@ -1293,7 +1220,13 @@ private void processEventForNewTimelineService(HistoryEvent event,
taskId, setCreatedTime);
}
}
putEntityWithoutBlocking(timelineClient, tEntity);
try {
timelineClient.putEntitiesAsync(tEntity);
} catch (IOException | YarnException e) {
LOG.error("Failed to process Event " + event.getEventType()
+ " for the job : " + jobId, e);
}

}

private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
Expand Down
Expand Up @@ -21,8 +21,8 @@
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -176,7 +176,7 @@ protected void writeEntities(Configuration tlConf,

// create entities from job history and write them
long totalTime = 0;
Set<TimelineEntity> entitySet =
List<TimelineEntity> entitySet =
converter.createTimelineEntities(jobInfo, jobConf);
LOG.info("converted them into timeline entities for job " + jobIdStr);
// use the current user for this purpose
Expand Down Expand Up @@ -215,15 +215,15 @@ protected void writeEntities(Configuration tlConf,
}

private void writeAllEntities(AppLevelTimelineCollector collector,
Set<TimelineEntity> entitySet, UserGroupInformation ugi)
List<TimelineEntity> entitySet, UserGroupInformation ugi)
throws IOException {
TimelineEntities entities = new TimelineEntities();
entities.setEntities(entitySet);
collector.putEntities(entities, ugi);
}

private void writePerEntity(AppLevelTimelineCollector collector,
Set<TimelineEntity> entitySet, UserGroupInformation ugi)
List<TimelineEntity> entitySet, UserGroupInformation ugi)
throws IOException {
for (TimelineEntity entity : entitySet) {
TimelineEntities entities = new TimelineEntities();
Expand Down
Expand Up @@ -18,7 +18,9 @@

package org.apache.hadoop.mapred;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -57,16 +59,16 @@ class TimelineEntityConverter {
* Note that we also do not add info to the YARN application entity, which
* would be needed for aggregation.
*/
public Set<TimelineEntity> createTimelineEntities(JobInfo jobInfo,
public List<TimelineEntity> createTimelineEntities(JobInfo jobInfo,
Configuration conf) {
Set<TimelineEntity> entities = new HashSet<>();
List<TimelineEntity> entities = new ArrayList<>();

// create the job entity
TimelineEntity job = createJobEntity(jobInfo, conf);
entities.add(job);

// create the task and task attempt entities
Set<TimelineEntity> tasksAndAttempts =
List<TimelineEntity> tasksAndAttempts =
createTaskAndTaskAttemptEntities(jobInfo);
entities.addAll(tasksAndAttempts);

Expand Down Expand Up @@ -125,9 +127,9 @@ private void addMetrics(TimelineEntity entity, Counters counters) {
}
}

private Set<TimelineEntity> createTaskAndTaskAttemptEntities(
private List<TimelineEntity> createTaskAndTaskAttemptEntities(
JobInfo jobInfo) {
Set<TimelineEntity> entities = new HashSet<>();
List<TimelineEntity> entities = new ArrayList<>();
Map<TaskID,TaskInfo> taskInfoMap = jobInfo.getAllTasks();
LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() +
" tasks");
Expand Down
Expand Up @@ -17,15 +17,16 @@
*/
package org.apache.hadoop.yarn.api.records.timelineservice;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.util.ArrayList;
import java.util.List;

import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.HashSet;
import java.util.Set;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
* This class hosts a set of timeline entities.
Expand All @@ -36,22 +37,22 @@
@InterfaceStability.Unstable
public class TimelineEntities {

private Set<TimelineEntity> entities = new HashSet<>();
private List<TimelineEntity> entities = new ArrayList<>();

public TimelineEntities() {

}

@XmlElement(name = "entities")
public Set<TimelineEntity> getEntities() {
public List<TimelineEntity> getEntities() {
return entities;
}

public void setEntities(Set<TimelineEntity> timelineEntities) {
public void setEntities(List<TimelineEntity> timelineEntities) {
this.entities = timelineEntities;
}

public void addEntities(Set<TimelineEntity> timelineEntities) {
public void addEntities(List<TimelineEntity> timelineEntities) {
this.entities.addAll(timelineEntities);
}

Expand Down
Expand Up @@ -1987,6 +1987,12 @@ public static boolean isAclEnabled(Configuration conf) {

public static final int DEFAULT_ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS = 1000;

public static final String NUMBER_OF_ASYNC_ENTITIES_TO_MERGE =
TIMELINE_SERVICE_PREFIX
+ "timeline-client.number-of-async-entities-to-merge";

public static final int DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE = 10;

// mark app-history related configs @Private as application history is going
// to be integrated into the timeline service
@Private
Expand Down

0 comments on commit 0d02ab8

Please sign in to comment.