Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
CHUKWA-646. Fix file rotation correctly for 0 byte rotated files. (Iv…
…y Tang via Eric Yang)

git-svn-id: https://svn.apache.org/repos/asf/incubator/chukwa/trunk@1361902 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
macroadster committed Jul 16, 2012
1 parent 78b2892 commit 7d1290cc59b5631a2b75d3683cf6313a46595333
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 41 deletions.
@@ -14,6 +14,8 @@ Trunk (unreleased changes)

BUGS

CHUKWA-646. Fix file rotation correctly for 0 byte rotated files. (Ivy Tang via Eric Yang)

CHUKWA-643. Updated Jersey dependency. (Prakhar Srivastava via Eric Yang)

CHUKWA-641. Fix stack trace for dumpChunk command when invalid regular expression is specified. (Eric Spishak via Eric Yang)
@@ -20,6 +20,7 @@

import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.File;
import org.apache.hadoop.chukwa.datacollection.adaptor.*;
import org.apache.hadoop.chukwa.util.ExceptionUtil;

@@ -168,55 +169,59 @@ public synchronized boolean tailFile()

long len = 0L;
try {
RandomAccessFile newReader = new RandomAccessFile(toWatch, "r");
len = reader.length();
long newLength = newReader.length();
if (newLength < len && fileReadOffset >= len) {
if (reader != null) {
reader.close();
}

reader = newReader;
fileReadOffset = 0L;
log.debug("Adaptor|"+ adaptorID + "| File size mismatched, rotating: "
+ toWatch.getAbsolutePath());
} else {
try {
if (newReader != null) {
newReader.close();
}
newReader =null;
} catch (Throwable e) {
log.debug(ExceptionUtil.getStackTrace(e));
}
if (lastSlurpTime == 0) {
lastSlurpTime = System.currentTimeMillis();
}
} catch (IOException e) {
log.debug(ExceptionUtil.getStackTrace(e));
}
if (len >= fileReadOffset) {
if (offsetOfFirstByte > fileReadOffset) {
// If the file rotated, the recorded offsetOfFirstByte is greater than
// file size,
// reset the first byte position to beginning of the file.
// file size,reset the first byte position to beginning of the file.
fileReadOffset = 0;
offsetOfFirstByte = 0L;
log.warn("offsetOfFirstByte>fileReadOffset, resetting offset to 0");
}
hasMoreData = slurp(len, reader);

} else {
// file has rotated and no detection
if (reader != null) {
reader.close();
if (len == fileReadOffset) {
File fixedNameFile = new File(toWatch.getAbsolutePath());
long fixedNameLastModified = fixedNameFile.lastModified();
if (fixedNameLastModified > lastSlurpTime) {
// If len == fileReadOffset,the file stops rolling log or the file
// has rotated.
// But fixedNameLastModified > lastSlurpTime , this means after the
// last slurping,the file has been written ,
// so the file has been rotated.
boolean hasLeftData = true;
while (hasLeftData) {// read the possiblly generated log
hasLeftData = slurp(len, reader);
}
RandomAccessFile newReader = new RandomAccessFile(toWatch, "r");
if (reader != null) {
reader.close();
}
reader = newReader;
fileReadOffset = 0L;
len = reader.length();
log.debug("Adaptor|" + adaptorID
+ "| File size mismatched, rotating: "
+ toWatch.getAbsolutePath());
}
hasMoreData = slurp(len, reader);
} else if (len < fileReadOffset) {
// file has rotated and no detection
if (reader != null) {
reader.close();
}
reader = null;
fileReadOffset = 0L;
offsetOfFirstByte = 0L;
hasMoreData = true;
log.warn("Adaptor|" + adaptorID + "| file: " + toWatch.getPath()
+ ", has rotated and no detection - reset counters to 0L");
} else {
hasMoreData = slurp(len, reader);
}

reader = null;
fileReadOffset = 0L;
offsetOfFirstByte = 0L;
hasMoreData = true;
log.warn("Adaptor|" + adaptorID + "| file: " + toWatch.getPath()
+ ", has rotated and no detection - reset counters to 0L");
}
} catch (IOException e) {
// do nothing, if file doesn't exist.
}
} catch (IOException e) {
log.warn("failure reading " + toWatch, e);
}
@@ -72,7 +72,11 @@ public class LWFTAdaptor extends AbstractAdaptor {
*/
protected long offsetOfFirstByte = 0;
protected Configuration conf = null;

/**
* The timestamp of last slurping.
*/
protected long lastSlurpTime = 0l;

File toWatch;

@Override
@@ -188,6 +188,7 @@ public List<CommitListEntry> send(List<Chunk> toSend)
// store a CLE for this chunk which we will use to ack this chunk to the
// caller of send()
// (e.g. the agent will use the list of CLE's for checkpointing)
log.info("chunk seqID:"+c.getSeqID());
commitResults.add(new CommitListEntry(c.getInitiator(), c.getSeqID(),
c.getSeqID() - c.getData().length));
}

0 comments on commit 7d1290c

Please sign in to comment.