Skip to content

Commit

Permalink
Fixing broken test
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Jun 9, 2016
1 parent 61d2a1a commit a96f4de
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.avro.FSDataInputStreamWrapper;
Expand Down Expand Up @@ -67,7 +68,7 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType

private transient long recordsReadSinceLastSync;

private transient long lastSync;
private transient long lastSync = -1l;

public AvroInputFormat(Path filePath, Class<E> type) {
super(filePath);
Expand Down Expand Up @@ -177,21 +178,16 @@ public E nextRecord(E reuseValue) throws IOException {

@Override
public Tuple2<Long, Long> getCurrentState() throws IOException {
if (this.dataFileReader == null || this.reachedEnd()) {
return new Tuple2<>(0L, 0L);
}
return new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync);
}

@Override
public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
if (split == null) {
throw new RuntimeException("Called reopen() on a null split.");
}
Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");

this.open(split);
if (state != null && state.f0 != 0l) {
dataFileReader = initReader(split);
if (state.f0 != -1) {

// go to the block we stopped
lastSync = state.f0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;

import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -374,9 +375,10 @@ public int read(byte[] b, int off, int len) throws IOException {

@Override
public Tuple2<Long, Long> getCurrentState() throws IOException {
if (this.blockInfo == null || this.reachedEnd()) {
return new Tuple2<>(0L, 0L);
if (this.blockBasedInput == null) {
throw new RuntimeException("You must have forgotten to call open() on your input format.");
}

return new Tuple2<>(
this.blockBasedInput.getCurrBlockPos(), // the last read index in the block
this.readRecords // the number of records read
Expand All @@ -385,20 +387,17 @@ public Tuple2<Long, Long> getCurrentState() throws IOException {

@Override
public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
if (split == null) {
throw new RuntimeException("Called reopen() on a null split.");
}
Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");

this.open(split);
if (state != null && state.f1 != 0) {
this.blockInfo = this.createAndReadBlockInfo();
this.blockInfo = this.createAndReadBlockInfo();

long blockPos = state.f0;
this.readRecords = state.f1;
long blockPos = state.f0;
this.readRecords = state.f1;

this.stream.seek(this.splitStart + blockPos);
this.blockBasedInput = new BlockBasedInput(this.stream, (int) blockPos, this.splitLength);
this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput);
}
this.stream.seek(this.splitStart + blockPos);
this.blockBasedInput = new BlockBasedInput(this.stream, (int) blockPos, this.splitLength);
this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.api.common.io;

import org.apache.flink.annotation.Public;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
Expand Down Expand Up @@ -143,7 +144,7 @@ protected static void loadGlobalConfigParams() {

private transient boolean end;

private transient long offset;
private transient long offset = -1;

// --------------------------------------------------------------------------------------------
// The configuration parameters. Configured on the instance and serialized to be shipped.
Expand Down Expand Up @@ -630,20 +631,20 @@ private boolean fillBuffer() throws IOException {

@Override
public Long getCurrentState() throws IOException {
if (reachedEnd()) {
return 0l;
}
return this.offset;
}

@Override
public void reopen(FileInputSplit split, Long state) throws IOException {
if (split == null) {
throw new RuntimeException("Called reopen() on a null split.");
}
Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");

this.open(split);
if (state != null && state != split.getStart()) {
if (state > this.splitStart + this.splitLength) {
this.end = true;
}

if (state > split.getStart()) {
initBuffers();

// this is the case where we restart from a specific offset within a split (e.g. after a node failure)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,38 +137,6 @@ public void checkRead() throws Exception {

for (FileInputSplit inputSplit : inputSplits) {
input.open(inputSplit);

T record = createInstance();

while (!input.reachedEnd()) {
if (input.nextRecord(record) != null) {
this.checkEquals(this.getRecord(readCount), record);

if (!input.reachedEnd()) {
Tuple2<Long, Long> state = input.getCurrentState();

input = this.createInputFormat();
input.reopen(inputSplit, state);
}
readCount++;
}
}
}
Assert.assertEquals(this.numberOfTuples, readCount);
}

/**
* Tests if the expected sequence and amount of data can be read
*/
@Test
public void checkReadWithFailureAtStart() throws Exception {
BinaryInputFormat<T> input = this.createInputFormat();
FileInputSplit[] inputSplits = input.createInputSplits(0);
Arrays.sort(inputSplits, new InputSplitSorter());

int readCount = 0;

for (FileInputSplit inputSplit : inputSplits) {
input.reopen(inputSplit, input.getCurrentState());

T record = createInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ private void ignoreInvalidLines(int bufferSize) {

result = format.nextRecord(result);
assertNull(result);
assertEquals(0, (long) format.getCurrentState());
assertEquals(fileContent.length(), (long) format.getCurrentState());
}
catch (Exception ex) {
ex.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ public void open() throws Exception {

this.reader = new SplitReader<>(format, serializer, collector, checkpointLock, readerState);
this.reader.start();
this.readerState = null;
}

@Override
Expand Down Expand Up @@ -182,7 +181,7 @@ private class SplitReader<S extends Serializable, OT> extends Thread {
private final Object checkpointLock;
private final TimestampedCollector<OT> collector;

private final Object lock = new Object();
// private final Object lock = new Object();

private final Queue<FileInputSplit> pendingSplits;

Expand Down Expand Up @@ -210,18 +209,21 @@ private class SplitReader<S extends Serializable, OT> extends Thread {
FileInputSplit current = restoredState.f1;
S formatState = restoredState.f2;

for (FileInputSplit split : pending) {
pendingSplits.add(split);
}
synchronized (checkpointLock) {
for (FileInputSplit split : pending) {
pendingSplits.add(split);
}

this.currentSplit = current;
this.restoredFormatState = formatState;
this.currentSplit = current;
this.restoredFormatState = formatState;
}
}
ContinuousFileReaderOperator.this.readerState = null;
}

void addSplit(FileInputSplit split) {
Preconditions.checkNotNull(split);
synchronized (lock) {
synchronized (checkpointLock) {
this.pendingSplits.add(split);
}
}
Expand All @@ -235,49 +237,40 @@ public void run() {
try {
while (this.isRunning) {

if (this.currentSplit != null) {

if (currentSplit.equals(EOS)) {
isRunning = false;
break;
}

if (this.format instanceof CheckpointableInputFormat && restoredFormatState != null) {
((CheckpointableInputFormat) format).reopen(currentSplit, restoredFormatState);
} else {
// this is the case of a non-checkpointable input format that will reprocess the last split.
LOG.info("Format " + this.format.getClass().getName() + " used is not checkpointable.");
format.open(currentSplit);
}

// reset the restored state to null for the next iteration
this.restoredFormatState = null;

} else {

// get the next split to read.
// locking is needed because checkpointing will
// ask for a consistent snapshot of the list.
synchronized (lock) {
currentSplit = this.pendingSplits.peek();
}

if (currentSplit == null) {
Thread.sleep(50);
continue;
}

if (currentSplit.equals(EOS)) {
isRunning = false;
break;
}

synchronized (checkpointLock) {
synchronized (lock) {
synchronized (checkpointLock) {

if (this.currentSplit != null) {

if (currentSplit.equals(EOS)) {
isRunning = false;
break;
}

if (this.format instanceof CheckpointableInputFormat && restoredFormatState != null) {
((CheckpointableInputFormat) format).reopen(currentSplit, restoredFormatState);
} else {
// this is the case of a non-checkpointable input format that will reprocess the last split.
LOG.info("Format " + this.format.getClass().getName() + " used is not checkpointable.");
format.open(currentSplit);
}
// reset the restored state to null for the next iteration
this.restoredFormatState = null;
} else {

// get the next split to read.
currentSplit = this.pendingSplits.poll();

if (currentSplit == null) {
checkpointLock.wait(50);
continue;
}

if (currentSplit.equals(EOS)) {
isRunning = false;
break;
}
this.format.open(currentSplit);
}
this.format.open(currentSplit);
}
}

LOG.info("Reading split: " + currentSplit);
Expand Down Expand Up @@ -320,20 +313,25 @@ Tuple3<List<FileInputSplit>, FileInputSplit, S> getReaderState() throws IOExcept

List<FileInputSplit> snapshot;
S formatState = null;
synchronized (lock) {
// synchronized (lock) {
snapshot = new ArrayList<>(this.pendingSplits.size());
for (FileInputSplit split: this.pendingSplits) {
snapshot.add(split);
}

if (this.format instanceof CheckpointableInputFormat) {
formatState = (S) ((CheckpointableInputFormat) format).getCurrentState();
return new Tuple3<>(snapshot, currentSplit, formatState);

// remove the current split from the list if inside.
if (this.currentSplit != null && this.currentSplit.equals(pendingSplits.peek())) {
this.pendingSplits.poll();
}
return new Tuple3<>(snapshot, currentSplit, currentSplit == null ? null : formatState);
} else {
LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
return new Tuple3<>(snapshot, currentSplit, null);
}
}
// }
}

public void cancel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public int compare(String o1, String o2) {
}

collected.clear();
finalCollectedContent.clear();
fc.clean();
}

Expand Down Expand Up @@ -240,6 +241,10 @@ private static class TestingSinkFunction extends RichSinkFunction<String>

private Map<Integer, List<String>> collectedContent = new HashMap<>();

TestingSinkFunction() {
hasFailed = false;
}

@Override
public void open(Configuration parameters) throws Exception {
// this sink can only work with DOP 1
Expand Down Expand Up @@ -285,7 +290,7 @@ public void invoke(String value) throws Exception {
count++;
if (!hasFailed) {
Thread.sleep(2);
if (numSuccessfulCheckpoints >= 2 && count >= elementsToFailure) {
if (numSuccessfulCheckpoints >= 1 && count >= elementsToFailure) {
hasFailed = true;
throw new Exception("Task Failure");
}
Expand Down

0 comments on commit a96f4de

Please sign in to comment.