Skip to content

Commit

Permalink
Fixing broken test
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Jun 6, 2016
1 parent 61d2a1a commit 2ef1593
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.avro.FSDataInputStreamWrapper;
Expand Down Expand Up @@ -177,25 +178,20 @@ public E nextRecord(E reuseValue) throws IOException {

@Override
public Tuple2<Long, Long> getCurrentState() throws IOException {
if (this.dataFileReader == null || this.reachedEnd()) {
return new Tuple2<>(0L, 0L);
}
return new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync);
}

@Override
public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
if (split == null) {
throw new RuntimeException("Called reopen() on a null split.");
}
Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");

this.open(split);
if (state != null && state.f0 != 0l) {
dataFileReader = initReader(split);
if (state.f0 != split.getStart()) {

// go to the block we stopped
lastSync = state.f0;
dataFileReader.seek(lastSync);
lastSync = state.f0;

// read until the record we were before the checkpoint and discard the values
long recordsToDiscard = state.f1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;

import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -374,9 +375,10 @@ public int read(byte[] b, int off, int len) throws IOException {

@Override
public Tuple2<Long, Long> getCurrentState() throws IOException {
if (this.blockInfo == null || this.reachedEnd()) {
return new Tuple2<>(0L, 0L);
if (this.blockBasedInput == null) {
throw new RuntimeException("You must have forgotten to call open() on your input format.");
}

return new Tuple2<>(
this.blockBasedInput.getCurrBlockPos(), // the last read index in the block
this.readRecords // the number of records read
Expand All @@ -385,20 +387,17 @@ public Tuple2<Long, Long> getCurrentState() throws IOException {

@Override
public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
if (split == null) {
throw new RuntimeException("Called reopen() on a null split.");
}
Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");

this.open(split);
if (state != null && state.f1 != 0) {
this.blockInfo = this.createAndReadBlockInfo();
this.blockInfo = this.createAndReadBlockInfo();

long blockPos = state.f0;
this.readRecords = state.f1;
long blockPos = state.f0;
this.readRecords = state.f1;

this.stream.seek(this.splitStart + blockPos);
this.blockBasedInput = new BlockBasedInput(this.stream, (int) blockPos, this.splitLength);
this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput);
}
this.stream.seek(this.splitStart + blockPos);
this.blockBasedInput = new BlockBasedInput(this.stream, (int) blockPos, this.splitLength);
this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.api.common.io;

import org.apache.flink.annotation.Public;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
Expand Down Expand Up @@ -630,20 +631,16 @@ private boolean fillBuffer() throws IOException {

@Override
public Long getCurrentState() throws IOException {
if (reachedEnd()) {
return 0l;
}
return this.offset;
}

@Override
public void reopen(FileInputSplit split, Long state) throws IOException {
if (split == null) {
throw new RuntimeException("Called reopen() on a null split.");
}
Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");

this.open(split);
if (state != null && state != split.getStart()) {
if (state != split.getStart()) {
initBuffers();

// this is the case where we restart from a specific offset within a split (e.g. after a node failure)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,38 +137,6 @@ public void checkRead() throws Exception {

for (FileInputSplit inputSplit : inputSplits) {
input.open(inputSplit);

T record = createInstance();

while (!input.reachedEnd()) {
if (input.nextRecord(record) != null) {
this.checkEquals(this.getRecord(readCount), record);

if (!input.reachedEnd()) {
Tuple2<Long, Long> state = input.getCurrentState();

input = this.createInputFormat();
input.reopen(inputSplit, state);
}
readCount++;
}
}
}
Assert.assertEquals(this.numberOfTuples, readCount);
}

/**
* Tests if the expected sequence and amount of data can be read
*/
@Test
public void checkReadWithFailureAtStart() throws Exception {
BinaryInputFormat<T> input = this.createInputFormat();
FileInputSplit[] inputSplits = input.createInputSplits(0);
Arrays.sort(inputSplits, new InputSplitSorter());

int readCount = 0;

for (FileInputSplit inputSplit : inputSplits) {
input.reopen(inputSplit, input.getCurrentState());

T record = createInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ private void ignoreInvalidLines(int bufferSize) {

result = format.nextRecord(result);
assertNull(result);
assertEquals(0, (long) format.getCurrentState());
assertEquals(fileContent.length(), (long) format.getCurrentState());
}
catch (Exception ex) {
ex.printStackTrace();
Expand Down

0 comments on commit 2ef1593

Please sign in to comment.