Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3189,6 +3189,12 @@ public static boolean isAclEnabled(Configuration conf) {
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS_DEFAULT =
7 * 24 * 60 * 60;

public static final String
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RECOVERY_ENABLED =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "recovery-enabled";
public static final boolean
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RECOVERY_ENABLED_DEFAULT = true;

// how old the most recent log of an UNKNOWN app needs to be in the active
// directory before we treat it as COMPLETED
public static final String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.ipc.CallerContext;
Expand All @@ -59,6 +64,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.MalformedURLException;
Expand All @@ -70,6 +77,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -132,6 +140,11 @@ public class EntityGroupFSTimelineStore extends CompositeService
private long logRetainMillis;
private long unknownActiveMillis;
private int appCacheMaxSize = 0;
private boolean recoveryEnabled;
private Path checkpointFile;
private ConcurrentMap<String, Pair<Long, Long>> recoveredLogs =
new ConcurrentHashMap<String, Pair<Long, Long>>();

private List<TimelineEntityGroupPlugin> cacheIdPlugins;
private Map<TimelineEntityGroupId, EntityCacheItem> cachedLogs;
private boolean aclsEnabled;
Expand Down Expand Up @@ -205,6 +218,11 @@ protected boolean removeEldestEntry(
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT));
fs = activeRootPath.getFileSystem(conf);
checkpointFile = new Path(fs.getHomeDirectory(), "atscheckpoint");
recoveryEnabled = conf.getBoolean(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RECOVERY_ENABLED,
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RECOVERY_ENABLED_DEFAULT);

aclsEnabled = conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE,
YarnConfiguration.DEFAULT_YARN_ACL_ENABLE);
CallerContext.setCurrent(
Expand Down Expand Up @@ -293,6 +311,15 @@ protected void serviceStart() throws Exception {
fs.setPermission(doneRootPath, DONE_DIR_PERMISSION);
}

// Recover the lastProcessedTime and offset for logfiles
if (recoveryEnabled && fs.exists(checkpointFile)) {
try (FSDataInputStream in = fs.open(checkpointFile)) {
recoveredLogs.putAll(recoverLogFiles(in));
} catch (IOException e) {
LOG.warn("Failed to recover summarylog files from the checkpointfile", e);
}
}

objMapper = new ObjectMapper();
objMapper.setAnnotationIntrospector(
new JaxbAnnotationIntrospector(TypeFactory.defaultInstance()));
Expand Down Expand Up @@ -352,10 +379,62 @@ protected void serviceStop() throws Exception {
super.serviceStop();
}

/* Returns Map of SummaryLog files. The Value Pair has
lastProcessedTime and offset */
HashMap<String, Pair<Long, Long>> recoverLogFiles(
DataInputStream in) throws IOException {
HashMap<String, Pair<Long, Long>> logFiles = new HashMap<>();
long totalEntries = in.readLong();
for (long i = 0; i < totalEntries; i++) {
Text attemptDirName = new Text();
attemptDirName.readFields(in);
Text fileName = new Text();
fileName.readFields(in);
LongWritable lastProcessedTime = new LongWritable();
lastProcessedTime.readFields(in);
LongWritable offset = new LongWritable();
offset.readFields(in);
Pair<Long, Long> pair = Pair.of(lastProcessedTime.get(), offset.get());
logFiles.put(attemptDirName + Path.SEPARATOR + fileName, pair);
}
LOG.info("Recovered {} summarylog files", totalEntries);
return logFiles;
}

// Stores set of SummaryLog files
void storeLogFiles(Collection<AppLogs> appLogs,
DataOutputStream checkPointStream) throws IOException {
long totalEntries = 0L;
for (AppLogs appLog : appLogs) {
totalEntries += appLog.summaryLogs.size();
}
checkPointStream.writeLong(totalEntries);
for (AppLogs appLog : appLogs) {
for (LogInfo summaryLog : appLog.summaryLogs) {
new Text(summaryLog.getAttemptDirName()).write(checkPointStream);
new Text(summaryLog.getFilename()).write(checkPointStream);
new LongWritable(summaryLog.getLastProcessedTime()).write(checkPointStream);
new LongWritable(summaryLog.getOffset()).write(checkPointStream);
}
}
LOG.info("Stored {} summarylog files into checkPointFile", totalEntries);
}

@InterfaceAudience.Private
@VisibleForTesting
int scanActiveLogs() throws IOException {
long startTime = Time.monotonicNow();
// Store the Last Processed Time and Offset
if (recoveryEnabled && appIdLogMap.size() > 0) {

try (FSDataOutputStream checkPointStream = fs.create(checkpointFile, true)) {

storeLogFiles(appIdLogMap.values(), checkPointStream);

} catch (Exception e) {
LOG.warn("Failed to checkpoint the summarylog files", e);
}
}
int logsToScanCount = scanActiveLogs(activeRootPath);
metrics.addActiveLogDirScanTime(Time.monotonicNow() - startTime);
return logsToScanCount;
Expand Down Expand Up @@ -824,6 +903,15 @@ private void addSummaryLog(String attemptDirName,
log = new EntityLogInfo(attemptDirName, filename, owner);
summaryLogs.add(log);
}
// This is to avoid processing summary files again during Restart of ATS
if (recoveryEnabled) {
Pair<Long, Long> pair = recoveredLogs.remove(log.getAttemptDirName()
+ Path.SEPARATOR + log.getFilename());
if (pair != null) {
log.setLastProcessedTime(pair.getKey());
log.setOffset(pair.getValue());
}
}
}

private synchronized void addDetailLog(String attemptDirName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,17 @@ public void setOffset(long newOffset) {
this.offset = newOffset;
}


public long getLastProcessedTime() {
return lastProcessedTime;
}

public void setLastProcessedTime(long lastProcessedTime) {
this.lastProcessedTime = lastProcessedTime;
}

private String attemptDirName;
private long lastProcessedTime = -1;
private String filename;
private String user;
private long offset = 0;
Expand Down Expand Up @@ -108,22 +118,31 @@ public long parseForStore(TimelineDataManager tdm, Path appDirPath,
FileStatus status = fs.getFileStatus(logPath);
long numParsed = 0;
if (status != null) {
long startTime = Time.monotonicNow();
try {
LOG.debug("Parsing {} at offset {}", logPath, offset);
long count = parsePath(tdm, logPath, appCompleted, jsonFactory,
objMapper, fs);
LOG.info("Parsed {} entities from {} in {} msec",
count, logPath, Time.monotonicNow() - startTime);
numParsed += count;
} catch (RuntimeException e) {
// If AppLogs cannot parse this log, it may be corrupted or just empty
if (e.getCause() instanceof JsonParseException &&
(status.getLen() > 0 || offset > 0)) {
// log on parse problems if the file as been read in the past or
// is visibly non-empty
LOG.info("Log {} appears to be corrupted. Skip. ", logPath);
long curModificationTime = status.getModificationTime();
if (curModificationTime > getLastProcessedTime()) {
long startTime = Time.monotonicNow();
try {
LOG.info("Parsing {} at offset {}", logPath, offset);
long count =
parsePath(tdm, logPath, appCompleted, jsonFactory, objMapper, fs);
setLastProcessedTime(curModificationTime);
LOG.info("Parsed {} entities from {} in {} msec", count, logPath,
Time.monotonicNow() - startTime);
numParsed += count;
} catch (RuntimeException e) {
// If AppLogs cannot parse this log, it may be corrupted or just empty
if (e.getCause() instanceof JsonParseException
&& (status.getLen() > 0 || offset > 0)) {
// log on parse problems if the file as been read in the past or
// is visibly non-empty
LOG.info("Log {} appears to be corrupted. Skip. ", logPath);
} else {
LOG.error("Failed to parse " + logPath + " from offset " + offset,
e);
}
}
} else {
LOG.info("Skip Parsing {} as there is no change", logPath);
}
} else {
LOG.warn("{} no longer exists. Skip for scanning. ", logPath);
Expand Down Expand Up @@ -182,21 +201,19 @@ protected long doParse(TimelineDataManager tdm, JsonParser parser,
long count = 0;
TimelineEntities entities = new TimelineEntities();
ArrayList<TimelineEntity> entityList = new ArrayList<TimelineEntity>(1);
long bytesParsed;
long bytesParsedLastBatch = 0;
boolean postError = false;
try {
MappingIterator<TimelineEntity> iter = objMapper.readValues(parser,
TimelineEntity.class);

long curPos;
while (iter.hasNext()) {
TimelineEntity entity = iter.next();
String etype = entity.getEntityType();
String eid = entity.getEntityId();
LOG.trace("Read entity {}", etype);
LOG.debug("Read entity {} of {}", eid, etype);
++count;
bytesParsed = parser.getCurrentLocation().getCharOffset() + 1;
LOG.trace("Parser now at offset {}", bytesParsed);
curPos = ((FSDataInputStream) parser.getInputSource()).getPos();
LOG.debug("Parser now at offset {}", curPos);

try {
LOG.debug("Adding {}({}) to store", eid, etype);
Expand All @@ -208,8 +225,7 @@ protected long doParse(TimelineDataManager tdm, JsonParser parser,
LOG.warn("Error putting entity: {} ({}): {}",
e.getEntityId(), e.getEntityType(), e.getErrorCode());
}
setOffset(getOffset() + bytesParsed - bytesParsedLastBatch);
bytesParsedLastBatch = bytesParsed;
setOffset(curPos);
entityList.clear();
} catch (YarnException e) {
postError = true;
Expand Down Expand Up @@ -247,8 +263,7 @@ protected long doParse(TimelineDataManager tdm, JsonParser parser,
ObjectMapper objMapper, UserGroupInformation ugi, boolean appCompleted)
throws IOException {
long count = 0;
long bytesParsed;
long bytesParsedLastBatch = 0;
long curPos;
boolean putError = false;
try {
MappingIterator<TimelineDomain> iter = objMapper.readValues(parser,
Expand All @@ -259,13 +274,12 @@ protected long doParse(TimelineDataManager tdm, JsonParser parser,
domain.setOwner(ugi.getShortUserName());
LOG.trace("Read domain {}", domain.getId());
++count;
bytesParsed = parser.getCurrentLocation().getCharOffset() + 1;
LOG.trace("Parser now at offset {}", bytesParsed);
curPos = ((FSDataInputStream) parser.getInputSource()).getPos();
LOG.debug("Parser now at offset {}", curPos);

try {
tdm.putDomain(domain, ugi);
setOffset(getOffset() + bytesParsed - bytesParsedLastBatch);
bytesParsedLastBatch = bytesParsed;
setOffset(curPos);
} catch (YarnException e) {
putError = true;
throw new IOException("Error posting domain", e);
Expand Down
Loading