Skip to content

Commit

Permalink
Final comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed May 31, 2016
1 parent f17b531 commit aef24e8
Show file tree
Hide file tree
Showing 15 changed files with 149 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,11 @@ public E nextRecord(E reuseValue) throws IOException {
// --------------------------------------------------------------------------------------------

@Override
public Tuple2<FileInputSplit, Tuple2<Long, Long>> getCurrentState() throws IOException {
public Tuple2<Long, Long> getCurrentState() throws IOException {
if (this.reachedEnd()) {
return new Tuple2<>(null, new Tuple2<>(0L, 0L));
return new Tuple2<>(0L, 0L);
}

Tuple2<Long, Long> state = new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync);
return new Tuple2<>(currSplit, state);
return new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,13 @@ public void testAvroRecovery() throws Exception {
if(format.getRecordsReadFromBlock() == recordsUntilCheckpoint) {

// do the whole checkpoint-restore procedure and see if we pick up from where we left off.
Tuple2<FileInputSplit, Tuple2<Long, Long>> state = format.getCurrentState();
Tuple2<Long, Long> state = format.getCurrentState();

// this is to make sure that nothing stays from the previous format
// (as it is going to be in the normal case)
format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
Assert.assertEquals(splits[i], state.f0);
format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);

format.reopen(state.f0, state.f1);
format.reopen(splits[i], state);
assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint);
}
elementsPerSplit[i]++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,16 +376,14 @@ public int read(byte[] b, int off, int len) throws IOException {
// --------------------------------------------------------------------------------------------

@Override
public Tuple2<FileInputSplit, Tuple2<Long, Long>> getCurrentState() throws IOException {
public Tuple2<Long, Long> getCurrentState() throws IOException {
if (this.reachedEnd()) {
return new Tuple2<>(null, new Tuple2<>(0L, 0L));
return new Tuple2<>(0L, 0L);
}

Tuple2<Long, Long> state = new Tuple2<>(
return new Tuple2<>(
this.blockBasedInput.getCurrBlockPos(), // the last read index in the block
this.readRecords // the number of records read
);
return new Tuple2<>(currSplit, state);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ public interface CheckpointableInputFormat<S extends InputSplit, T extends Seria
* This will be used to restore the state of the reading channel when recovering from a task failure.
* In the case of a simple text file, the state can correspond to the last read offset in the split.
*
* @return The current split alogn with the state of the channel.
* @return The state of the channel.
*
* @throws Exception Thrown if the creation of the state object failed.
*/
Tuple2<S, T> getCurrentState() throws IOException;
T getCurrentState() throws IOException;

/**
* Restores the state of a parallel instance reading from an {@link InputFormat}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.api.common.io;

import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
Expand Down Expand Up @@ -82,8 +81,6 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
*/
private static int MAX_SAMPLE_LEN;

private boolean restoring = false;

static { loadGlobalConfigParams(); }

protected static void loadGlobalConfigParams() {
Expand Down Expand Up @@ -635,11 +632,11 @@ private boolean fillBuffer() throws IOException {
// --------------------------------------------------------------------------------------------

@Override
public Tuple2<FileInputSplit, Long> getCurrentState() throws IOException {
public Long getCurrentState() throws IOException {
if (reachedEnd()) {
return new Tuple2<>(null, 0l);
return 0l;
}
return new Tuple2<>(this.currSplit, this.offset);
return this.offset;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,10 @@ public void checkRead() throws Exception {
this.checkEquals(this.getRecord(readCount), record);

if (!input.reachedEnd()) {
Tuple2<FileInputSplit, Tuple2<Long, Long>> state = input.getCurrentState();
Assert.assertEquals(state.f0, inputSplit);
Tuple2<Long, Long> state = input.getCurrentState();

input = this.createInputFormat();
input.reopen(state.f0, state.f1);
} else {
Tuple2<FileInputSplit, Tuple2<Long, Long>> state = input.getCurrentState();
Assert.assertEquals(state.f0, null);

input.reopen(inputSplit, state);
}
readCount++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.streaming.api.functions.source.FilePathFilter;
import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
import org.apache.flink.streaming.api.functions.source.ProcessingMode;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileUtil;
Expand Down Expand Up @@ -124,7 +125,7 @@ protected void testProgram() throws Exception {
ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, hdfsURI,
FilePathFilter.DefaultFilter.getInstance(),
ContinuousFileMonitoringFunction.ProcessingMode.PROCESS_CONTINUOUSLY,
ProcessingMode.PROCESS_CONTINUOUSLY,
env.getParallelism(), INTERVAL);

TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.streaming.api.functions.source.FilePathFilter;
import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
import org.apache.flink.streaming.api.functions.source.ProcessingMode;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
Expand Down Expand Up @@ -217,7 +218,7 @@ public void testFilePathFiltering() throws Exception {
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, hdfsURI, new PathFilter(),
ContinuousFileMonitoringFunction.ProcessingMode.PROCESS_ONCE, 1, INTERVAL);
ProcessingMode.PROCESS_ONCE, 1, INTERVAL);

monitoringFunction.open(new Configuration());
monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
Expand All @@ -243,7 +244,7 @@ public void testFileSplitMonitoringReprocessWithAppended() throws Exception {
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, hdfsURI, FilePathFilter.DefaultFilter.getInstance(),
ContinuousFileMonitoringFunction.ProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
ProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);

monitoringFunction.open(new Configuration());
monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
Expand Down Expand Up @@ -292,7 +293,7 @@ public void testFileSplitMonitoringProcessOnce() throws Exception {
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, hdfsURI, FilePathFilter.DefaultFilter.getInstance(),
ContinuousFileMonitoringFunction.ProcessingMode.PROCESS_ONCE, 1, INTERVAL);
ProcessingMode.PROCESS_ONCE, 1, INTERVAL);

monitoringFunction.open(new Configuration());
monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,40 +108,38 @@ private void testSplitCsvInputStream(int bufferSize) throws Exception {
format.open(inputSplit);
while (!format.reachedEnd()) {
if ((result = format.nextRecord(result)) != null) {
assertEquals((long) format.getCurrentState().f1, offsetsAfterRecord[recordCounter]);
assertEquals((long) format.getCurrentState(), offsetsAfterRecord[recordCounter]);
recordCounter++;

if (recordCounter == 1) {
assertNotNull(result);
assertEquals("this is", result.f0);
assertEquals(new Integer(1), result.f1);
assertEquals(new Double(2.0), result.f2);
assertEquals((long) format.getCurrentState().f1, 15);
assertEquals((long) format.getCurrentState(), 15);
} else if (recordCounter == 2) {
assertNotNull(result);
assertEquals("a test", result.f0);
assertEquals(new Integer(3), result.f1);
assertEquals(new Double(4.0), result.f2);
assertEquals((long) format.getCurrentState().f1, 29);
assertEquals((long) format.getCurrentState(), 29);
} else if (recordCounter == 3) {
assertNotNull(result);
assertEquals("#next", result.f0);
assertEquals(new Integer(5), result.f1);
assertEquals(new Double(6.0), result.f2);
assertEquals((long) format.getCurrentState().f1, 42);
assertEquals((long) format.getCurrentState(), 42);
} else {
assertNotNull(result);
assertEquals("asdadas", result.f0);
assertEquals(new Integer(5), result.f1);
assertEquals(new Double(30.0), result.f2);
assertEquals((long) format.getCurrentState().f1, 58);
assertEquals((long) format.getCurrentState(), 58);
}

// simulate checkpoint
Tuple2<FileInputSplit, Long> state = format.getCurrentState();
FileInputSplit split = state.f0;
long offsetToRestore = state.f1;
Assert.assertEquals(split, inputSplit);
Long state = format.getCurrentState();
long offsetToRestore = state;

// create a new format
format = new TupleCsvInputFormat<>(new Path(tempFile.toURI()), "\n", "|", typeInfo);
Expand All @@ -150,7 +148,7 @@ private void testSplitCsvInputStream(int bufferSize) throws Exception {
format.configure(config);

// simulate the restore operation.
format.reopen(split, offsetToRestore);
format.reopen(inputSplit, offsetToRestore);
} else {
result = new Tuple3<>();
}
Expand Down Expand Up @@ -198,26 +196,25 @@ private void ignoreInvalidLines(int bufferSize) {
assertEquals("this is", result.f0);
assertEquals(new Integer(1), result.f1);
assertEquals(new Double(2.0), result.f2);
assertEquals((long) format.getCurrentState().f1, 65);
assertEquals((long) format.getCurrentState(), 65);

result = format.nextRecord(result);
assertNotNull(result);
assertEquals("a test", result.f0);
assertEquals(new Integer(3), result.f1);
assertEquals(new Double(4.0), result.f2);
assertEquals((long) format.getCurrentState().f1, 91);
assertEquals((long) format.getCurrentState(), 91);

result = format.nextRecord(result);
assertNotNull(result);
assertEquals("#next", result.f0);
assertEquals(new Integer(5), result.f1);
assertEquals(new Double(6.0), result.f2);
assertEquals((long) format.getCurrentState().f1, 104);
assertEquals((long) format.getCurrentState(), 104);

result = format.nextRecord(result);
assertNull(result);
assertEquals(null, format.getCurrentState().f0);
assertEquals(0, (long) format.getCurrentState().f1);
assertEquals(0, (long) format.getCurrentState());
}
catch (Exception ex) {
ex.printStackTrace();
Expand Down

0 comments on commit aef24e8

Please sign in to comment.