Skip to content

Commit

Permalink
Fixing broken test
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Jun 10, 2016
1 parent 61d2a1a commit 4cd9239
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.avro.FSDataInputStreamWrapper;
Expand Down Expand Up @@ -67,7 +68,7 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType

private transient long recordsReadSinceLastSync;

private transient long lastSync;
private transient long lastSync = -1l;

public AvroInputFormat(Path filePath, Class<E> type) {
super(filePath);
Expand Down Expand Up @@ -177,21 +178,16 @@ public E nextRecord(E reuseValue) throws IOException {

@Override
public Tuple2<Long, Long> getCurrentState() throws IOException {
if (this.dataFileReader == null || this.reachedEnd()) {
return new Tuple2<>(0L, 0L);
}
return new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync);
}

@Override
public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
if (split == null) {
throw new RuntimeException("Called reopen() on a null split.");
}
Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");

this.open(split);
if (state != null && state.f0 != 0l) {
dataFileReader = initReader(split);
if (state.f0 != -1) {

// go to the block we stopped
lastSync = state.f0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;

import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -374,9 +375,10 @@ public int read(byte[] b, int off, int len) throws IOException {

@Override
public Tuple2<Long, Long> getCurrentState() throws IOException {
if (this.blockInfo == null || this.reachedEnd()) {
return new Tuple2<>(0L, 0L);
if (this.blockBasedInput == null) {
throw new RuntimeException("You must have forgotten to call open() on your input format.");
}

return new Tuple2<>(
this.blockBasedInput.getCurrBlockPos(), // the last read index in the block
this.readRecords // the number of records read
Expand All @@ -385,20 +387,17 @@ public Tuple2<Long, Long> getCurrentState() throws IOException {

@Override
public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
if (split == null) {
throw new RuntimeException("Called reopen() on a null split.");
}
Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");

this.open(split);
if (state != null && state.f1 != 0) {
this.blockInfo = this.createAndReadBlockInfo();
this.blockInfo = this.createAndReadBlockInfo();

long blockPos = state.f0;
this.readRecords = state.f1;
long blockPos = state.f0;
this.readRecords = state.f1;

this.stream.seek(this.splitStart + blockPos);
this.blockBasedInput = new BlockBasedInput(this.stream, (int) blockPos, this.splitLength);
this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput);
}
this.stream.seek(this.splitStart + blockPos);
this.blockBasedInput = new BlockBasedInput(this.stream, (int) blockPos, this.splitLength);
this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.api.common.io;

import org.apache.flink.annotation.Public;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
Expand Down Expand Up @@ -143,7 +144,7 @@ protected static void loadGlobalConfigParams() {

private transient boolean end;

private transient long offset;
private transient long offset = -1;

// --------------------------------------------------------------------------------------------
// The configuration parameters. Configured on the instance and serialized to be shipped.
Expand Down Expand Up @@ -630,25 +631,21 @@ private boolean fillBuffer() throws IOException {

@Override
public Long getCurrentState() throws IOException {
if (reachedEnd()) {
return 0l;
}
return this.offset;
}

@Override
public void reopen(FileInputSplit split, Long state) throws IOException {
if (split == null) {
throw new RuntimeException("Called reopen() on a null split.");
}
Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");

this.open(split);
if (state != null && state != split.getStart()) {
this.offset = state;
if (state > this.splitStart + split.getLength()) {
this.end = true;
} else if (state > split.getStart()) {
initBuffers();

// this is the case where we restart from a specific offset within a split (e.g. after a node failure)
this.offset = state;

this.stream.seek(this.offset);
if (split.getLength() == -1) {
// this is the case for unsplittable files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,38 +137,6 @@ public void checkRead() throws Exception {

for (FileInputSplit inputSplit : inputSplits) {
input.open(inputSplit);

T record = createInstance();

while (!input.reachedEnd()) {
if (input.nextRecord(record) != null) {
this.checkEquals(this.getRecord(readCount), record);

if (!input.reachedEnd()) {
Tuple2<Long, Long> state = input.getCurrentState();

input = this.createInputFormat();
input.reopen(inputSplit, state);
}
readCount++;
}
}
}
Assert.assertEquals(this.numberOfTuples, readCount);
}

/**
* Tests if the expected sequence and amount of data can be read
*/
@Test
public void checkReadWithFailureAtStart() throws Exception {
BinaryInputFormat<T> input = this.createInputFormat();
FileInputSplit[] inputSplits = input.createInputSplits(0);
Arrays.sort(inputSplits, new InputSplitSorter());

int readCount = 0;

for (FileInputSplit inputSplit : inputSplits) {
input.reopen(inputSplit, input.getCurrentState());

T record = createInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,6 @@ public void testSplitCsvInputStreamInSmallBuffer() throws Exception {
testSplitCsvInputStream(2, false);
}

@Test
public void testSplitCsvInputStreamInLargeBufferFailingAtStart() throws Exception {
testSplitCsvInputStream(1024 * 1024, true);
}

@Test
public void testSplitCsvInputStreamInSmallBufferFailingAtStart() throws Exception {
testSplitCsvInputStream(2, true);
}

private void testSplitCsvInputStream(int bufferSize, boolean failAtStart) throws Exception {
final String fileContent =
"this is|1|2.0|\n"+
Expand Down Expand Up @@ -115,11 +105,8 @@ private void testSplitCsvInputStream(int bufferSize, boolean failAtStart) throws
assertEquals(inputSplit.getStart() + inputSplit.getLength(), offsetAtEndOfSplit[splitCounter]);
splitCounter++;

if (failAtStart) {
format.reopen(inputSplit, inputSplit.getStart());
} else {
format.open(inputSplit);
}
format.open(inputSplit);
format.reopen(inputSplit, format.getCurrentState());

while (!format.reachedEnd()) {
if ((result = format.nextRecord(result)) != null) {
Expand Down Expand Up @@ -229,7 +216,7 @@ private void ignoreInvalidLines(int bufferSize) {

result = format.nextRecord(result);
assertNull(result);
assertEquals(0, (long) format.getCurrentState());
assertEquals(fileContent.length(), (long) format.getCurrentState());
}
catch (Exception ex) {
ex.printStackTrace();
Expand Down

0 comments on commit 4cd9239

Please sign in to comment.