Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
APEXMALHAR-2263 change offset in AbstractFileInputOperator from int t…
Browse files Browse the repository at this point in the history
…o long
  • Loading branch information
mattqzhang committed Oct 19, 2016
1 parent abb3900 commit 2f308aa
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
Expand Up @@ -96,6 +96,7 @@
* @param <T> The type of the object that this input operator reads.
* @since 1.0.2
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
public abstract class AbstractFileInputOperator<T>
implements InputOperator, Partitioner<AbstractFileInputOperator<T>>, StatsListener, Operator.CheckpointListener
{
Expand All @@ -106,15 +107,15 @@ public abstract class AbstractFileInputOperator<T>
@NotNull
protected DirectoryScanner scanner = new DirectoryScanner();
protected int scanIntervalMillis = 5000;
protected int offset;
protected long offset;
protected String currentFile;
protected Set<String> processedFiles = new HashSet<String>();
protected int emitBatchSize = 1000;
protected int currentPartitions = 1;
protected int partitionCount = 1;
private int retryCount = 0;
private int maxRetryCount = 5;
protected transient int skipCount = 0;
protected transient long skipCount = 0;
private transient OperatorContext context;

private final BasicCounters<MutableLong> fileCounters = new BasicCounters<MutableLong>(MutableLong.class);
Expand Down Expand Up @@ -143,22 +144,22 @@ public abstract class AbstractFileInputOperator<T>
protected static class FailedFile
{
String path;
int offset;
long offset;
int retryCount;
long lastFailedTime;

/* For kryo serialization */
@SuppressWarnings("unused")
protected FailedFile() {}

protected FailedFile(String path, int offset)
protected FailedFile(String path, long offset)
{
this.path = path;
this.offset = offset;
this.retryCount = 0;
}

protected FailedFile(String path, int offset, int retryCount)
protected FailedFile(String path, long offset, int retryCount)
{
this.path = path;
this.offset = offset;
Expand Down Expand Up @@ -623,7 +624,7 @@ public void emitTuples()
try {
if (currentFile != null && offset > 0) {
//open file resets offset to 0 so this a way around it.
int tmpOffset = offset;
long tmpOffset = offset;
if (fs.exists(new Path(currentFile))) {
this.inputStream = openFile(new Path(currentFile));
offset = tmpOffset;
Expand Down Expand Up @@ -651,7 +652,7 @@ public void emitTuples()
}
}
if (inputStream != null) {
int startOffset = offset;
long startOffset = offset;
String file = currentFile; //current file is reset to null when closed.

try {
Expand Down Expand Up @@ -1130,8 +1131,8 @@ protected void setPartitionCount(int partitionCount)
protected static class RecoveryEntry
{
final String file;
final int startOffset;
final int endOffset;
final long startOffset;
final long endOffset;

@SuppressWarnings("unused")
private RecoveryEntry()
Expand All @@ -1141,7 +1142,7 @@ private RecoveryEntry()
endOffset = -1;
}

RecoveryEntry(String file, int startOffset, int endOffset)
RecoveryEntry(String file, long startOffset, long endOffset)
{
this.file = Preconditions.checkNotNull(file, "file");
this.startOffset = startOffset;
Expand Down Expand Up @@ -1174,8 +1175,8 @@ public boolean equals(Object o)
public int hashCode()
{
int result = file.hashCode();
result = 31 * result + startOffset;
result = 31 * result + endOffset;
result = 31 * result + (int)(startOffset & 0xFFFFFFFF);
result = 31 * result + (int)(endOffset & 0xFFFFFFFF);
return result;
}
}
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Expand Up @@ -135,6 +135,7 @@
<exclude>@org.apache.hadoop.classification.InterfaceStability$Evolving</exclude>
<exclude>@org.apache.hadoop.classification.InterfaceStability$Unstable</exclude>
<exclude>com.datatorrent.lib.io.fs.FSInputModule</exclude>
<exclude>com.datatorrent.lib.io.fs.AbstractFileInputOperator</exclude>
</excludes>
</parameter>
<skip>${semver.plugin.skip}</skip>
Expand Down

0 comments on commit 2f308aa

Please sign in to comment.