Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
APEXMALHAR-2263 change offset in AbstractFileInputOperator from int t…
Browse files Browse the repository at this point in the history
…o long
  • Loading branch information
mattqzhang committed Oct 17, 2016
1 parent 2153cd6 commit 640c9da
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 38 deletions.
Expand Up @@ -107,6 +107,7 @@ public abstract class AbstractFileInputOperator<T>
protected DirectoryScanner scanner = new DirectoryScanner();
protected int scanIntervalMillis = 5000;
protected int offset;
protected long longOffset;
protected String currentFile;
protected Set<String> processedFiles = new HashSet<String>();
protected int emitBatchSize = 1000;
Expand All @@ -115,6 +116,7 @@ public abstract class AbstractFileInputOperator<T>
private int retryCount = 0;
private int maxRetryCount = 5;
protected transient int skipCount = 0;
protected transient long longSkipCount = 0;
private transient OperatorContext context;

private final BasicCounters<MutableLong> fileCounters = new BasicCounters<MutableLong>(MutableLong.class);
Expand Down Expand Up @@ -144,6 +146,7 @@ protected static class FailedFile
{
String path;
int offset;
long longOffset;
int retryCount;
long lastFailedTime;

Expand All @@ -155,13 +158,31 @@ protected FailedFile(String path, int offset)
{
this.path = path;
this.offset = offset;
this.longOffset = offset;
this.retryCount = 0;
}

protected FailedFile(String path, long offset)
{
this.path = path;
this.offset = (int)offset;
this.longOffset = offset;
this.retryCount = 0;
}

protected FailedFile(String path, int offset, int retryCount)
{
this.path = path;
this.offset = offset;
this.longOffset = offset;
this.retryCount = retryCount;
}

protected FailedFile(String path, long offset, int retryCount)
{
this.path = path;
this.offset = (int)offset;
this.longOffset = offset;
this.retryCount = retryCount;
}

Expand All @@ -170,7 +191,7 @@ public String toString()
{
return "FailedFile[" +
"path='" + path + '\'' +
", offset=" + offset +
", offset=" + longOffset +
", retryCount=" + retryCount +
", lastFailedTime=" + lastFailedTime +
']';
Expand Down Expand Up @@ -461,6 +482,7 @@ public void setup(OperatorContext context)
//reset current file and offset in case of replay
currentFile = null;
offset = 0;
longOffset = 0;
}
}

Expand Down Expand Up @@ -564,7 +586,7 @@ protected void replay(long windowId)
//The operator may have continued processing the same file in multiple windows.
//So the recovery states of subsequent windows will have an entry for that file however the offset changes.
//In this case we continue reading from previously opened stream.
if (currentFile == null || !(currentFile.equals(recoveryEntry.file) && offset == recoveryEntry.startOffset)) {
if (currentFile == null || !(currentFile.equals(recoveryEntry.file) && longOffset == recoveryEntry.longStartOffset)) {
if (inputStream != null) {
closeFile(inputStream);
}
Expand All @@ -573,7 +595,7 @@ protected void replay(long windowId)
Iterator<FailedFile> failedFileIterator = failedFiles.iterator();
while (failedFileIterator.hasNext()) {
FailedFile ff = failedFileIterator.next();
if (ff.path.equals(recoveryEntry.file) && ff.offset == recoveryEntry.startOffset) {
if (ff.path.equals(recoveryEntry.file) && ff.longOffset == recoveryEntry.longStartOffset) {
failedFileIterator.remove();
break;
}
Expand All @@ -582,24 +604,24 @@ protected void replay(long windowId)
Iterator<FailedFile> unfinishedFileIterator = unfinishedFiles.iterator();
while (unfinishedFileIterator.hasNext()) {
FailedFile ff = unfinishedFileIterator.next();
if (ff.path.equals(recoveryEntry.file) && ff.offset == recoveryEntry.startOffset) {
if (ff.path.equals(recoveryEntry.file) && ff.longOffset == recoveryEntry.longStartOffset) {
unfinishedFileIterator.remove();
break;
}
}
if (pendingFiles.contains(recoveryEntry.file)) {
pendingFiles.remove(recoveryEntry.file);
}
inputStream = retryFailedFile(new FailedFile(recoveryEntry.file, recoveryEntry.startOffset));
while (offset < recoveryEntry.endOffset) {
inputStream = retryFailedFile(new FailedFile(recoveryEntry.file, recoveryEntry.longStartOffset));
while (longOffset < recoveryEntry.longEndOffset) {
T line = readEntity();
offset++;
longOffset++;
emit(line);
}
} else {
while (offset < recoveryEntry.endOffset) {
while (longOffset < recoveryEntry.longEndOffset) {
T line = readEntity();
offset++;
longOffset++;
emit(line);
}
}
Expand All @@ -621,17 +643,17 @@ public void emitTuples()

if (inputStream == null) {
try {
if (currentFile != null && offset > 0) {
if (currentFile != null && longOffset > 0) {
//open file resets offset to 0 so this a way around it.
int tmpOffset = offset;
long tmpOffset = longOffset;
if (fs.exists(new Path(currentFile))) {
this.inputStream = openFile(new Path(currentFile));
offset = tmpOffset;
skipCount = tmpOffset;
longOffset = tmpOffset;
longSkipCount = tmpOffset;
} else {
currentFile = null;
offset = 0;
skipCount = 0;
longOffset = 0;
longSkipCount = 0;
}
} else if (!unfinishedFiles.isEmpty()) {
retryFailedFile(unfinishedFiles.poll());
Expand All @@ -651,15 +673,15 @@ public void emitTuples()
}
}
if (inputStream != null) {
int startOffset = offset;
long longStartOffset = longOffset;
String file = currentFile; //current file is reset to null when closed.

try {
int counterForTuple = 0;
while (counterForTuple++ < emitBatchSize) {
T line = readEntity();
if (line == null) {
LOG.info("done reading file ({} entries).", offset);
LOG.info("done reading file ({} entries).", longOffset);
closeFile(inputStream);
break;
}
Expand All @@ -668,19 +690,19 @@ public void emitTuples()
// used to prevent already emitted records from being emitted again during recovery.
// When failed file is open, skipCount is set to the last read offset for that file.
//
if (skipCount == 0) {
offset++;
if (longSkipCount == 0) {
longOffset++;
emit(line);
} else {
skipCount--;
longSkipCount--;
}
}
} catch (IOException e) {
failureHandling(e);
}
//Only when something was emitted from the file then we record it for entry.
if (offset > startOffset) {
currentWindowRecoveryState.add(new RecoveryEntry(file, startOffset, offset));
if (longOffset > longStartOffset) {
currentWindowRecoveryState.add(new RecoveryEntry(file, longStartOffset, longOffset));
}
}
}
Expand Down Expand Up @@ -720,7 +742,7 @@ private void failureHandling(Exception e)

protected void addToFailedList()
{
FailedFile ff = new FailedFile(currentFile, offset, retryCount);
FailedFile ff = new FailedFile(currentFile, longOffset, retryCount);

try {
// try to close file
Expand All @@ -734,7 +756,7 @@ protected void addToFailedList()

ff.retryCount++;
ff.lastFailedTime = System.currentTimeMillis();
ff.offset = this.offset;
ff.longOffset = this.longOffset;

// Clear current file state.
this.currentFile = null;
Expand All @@ -745,38 +767,38 @@ protected void addToFailedList()
}

localNumberOfRetries.increment();
LOG.info("adding to failed list path {} offset {} retry {}", ff.path, ff.offset, ff.retryCount);
LOG.info("adding to failed list path {} offset {} retry {}", ff.path, ff.longOffset, ff.retryCount);
failedFiles.add(ff);
}

protected InputStream retryFailedFile(FailedFile ff) throws IOException
{
LOG.info("retrying failed file {} offset {} retry {}", ff.path, ff.offset, ff.retryCount);
LOG.info("retrying failed file {} offset {} retry {}", ff.path, ff.longOffset, ff.retryCount);
String path = ff.path;
if (!fs.exists(new Path(path))) {
return null;
}
this.inputStream = openFile(new Path(path));
this.offset = ff.offset;
this.longOffset = ff.longOffset;
this.retryCount = ff.retryCount;
this.skipCount = ff.offset;
this.longSkipCount = ff.longOffset;
return this.inputStream;
}

protected InputStream openFile(Path path) throws IOException
{
currentFile = path.toString();
offset = 0;
longOffset = 0;
retryCount = 0;
skipCount = 0;
longSkipCount = 0;
LOG.info("opening file {}", path);
InputStream input = fs.open(path);
return input;
}

protected void closeFile(InputStream is) throws IOException
{
LOG.info("closing file {} offset {}", currentFile, offset);
LOG.info("closing file {} offset {}", currentFile, longOffset);

if (is != null) {
is.close();
Expand Down Expand Up @@ -823,7 +845,7 @@ public Collection<Partition<AbstractFileInputOperator<T>>> definePartitions(Coll
tempGlobalNumberOfRetries.add(oper.localNumberOfRetries);
tempGlobalNumberOfFailures.add(oper.localNumberOfFailures);
if (oper.currentFile != null) {
currentFiles.add(new FailedFile(oper.currentFile, oper.offset));
currentFiles.add(new FailedFile(oper.currentFile, oper.longOffset));
}
oldscanners.add(oper.getScanner());
deletedOperators.add(oper.operatorId);
Expand Down Expand Up @@ -857,7 +879,7 @@ public Collection<Partition<AbstractFileInputOperator<T>>> definePartitions(Coll
/* redistribute unfinished files properly */
oper.unfinishedFiles.clear();
oper.currentFile = null;
oper.offset = 0;
oper.longOffset = 0;
Iterator<FailedFile> unfinishedIter = currentFiles.iterator();
while (unfinishedIter.hasNext()) {
FailedFile unfinishedFile = unfinishedIter.next();
Expand Down Expand Up @@ -1131,21 +1153,36 @@ protected static class RecoveryEntry
{
final String file;
final int startOffset;
final long longStartOffset;
final int endOffset;
final long longEndOffset;

@SuppressWarnings("unused")
private RecoveryEntry()
{
file = null;
startOffset = -1;
longStartOffset = -1;
endOffset = -1;
longEndOffset = -1;
}

RecoveryEntry(String file, int startOffset, int endOffset)
{
this.file = Preconditions.checkNotNull(file, "file");
this.startOffset = startOffset;
this.longStartOffset = startOffset;
this.endOffset = endOffset;
this.longEndOffset = endOffset;
}

RecoveryEntry(String file, long startOffset, long endOffset)
{
this.file = Preconditions.checkNotNull(file, "file");
this.startOffset = (int)startOffset;
this.longStartOffset = startOffset;
this.endOffset = (int)endOffset;
this.longEndOffset = endOffset;
}

@Override
Expand All @@ -1160,10 +1197,10 @@ public boolean equals(Object o)

RecoveryEntry that = (RecoveryEntry)o;

if (endOffset != that.endOffset) {
if (longEndOffset != that.longEndOffset) {
return false;
}
if (startOffset != that.startOffset) {
if (longStartOffset != that.longStartOffset) {
return false;
}
return file.equals(that.file);
Expand All @@ -1174,8 +1211,8 @@ public boolean equals(Object o)
public int hashCode()
{
int result = file.hashCode();
result = 31 * result + startOffset;
result = 31 * result + endOffset;
result = 31 * result + (int)(longStartOffset & 0xFFFFFFFF);
result = 31 * result + (int)(longEndOffset & 0xFFFFFFFF);
return result;
}
}
Expand Down
Expand Up @@ -579,7 +579,7 @@ public void testRecoveryWithCurrentFile() throws Exception
LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
oper.scanner = null;
oper.currentFile = testFile.getAbsolutePath();
oper.offset = 1;
oper.longOffset = 1;

CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
@SuppressWarnings({"unchecked", "rawtypes"})
Expand Down

0 comments on commit 640c9da

Please sign in to comment.