Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2314] Make Streaming File Sources Persistent #2020

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import java.io.IOException;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.avro.FSDataInputStreamWrapper;
Expand All @@ -42,15 +44,16 @@

/**
* Provides a {@link FileInputFormat} for Avro records.
*
*
* @param <E>
* the type of the result Avro record. If you specify
* {@link GenericRecord} then the result will be returned as a
* {@link GenericRecord}, so you do not have to know the schema ahead
* of time.
*/
public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E> {

public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E>,
CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class);
Expand All @@ -59,16 +62,19 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType

private boolean reuseAvroValue = true;

private transient FileReader<E> dataFileReader;
private transient DataFileReader<E> dataFileReader;

private transient long end;


private transient long recordsReadSinceLastSync;

private transient long lastSync = -1l;

public AvroInputFormat(Path filePath, Class<E> type) {
super(filePath);
this.avroValueType = type;
}



/**
* Sets the flag whether to reuse the Avro value instance for all records.
* By default, the input format reuses the Avro value.
Expand Down Expand Up @@ -102,42 +108,59 @@ public TypeInformation<E> getProducedType() {
@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
dataFileReader = initReader(split);
dataFileReader.sync(split.getStart());
lastSync = dataFileReader.previousSync();
}

private DataFileReader<E> initReader(FileInputSplit split) throws IOException {
DatumReader<E> datumReader;

if (org.apache.avro.generic.GenericRecord.class == avroValueType) {
datumReader = new GenericDatumReader<E>();
} else {
datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
}

if (LOG.isInfoEnabled()) {
LOG.info("Opening split {}", split);
}

SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader);

dataFileReader = DataFileReader.openReader(in, datumReader);

if (LOG.isDebugEnabled()) {
LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema());
}

dataFileReader.sync(split.getStart());
this.end = split.getStart() + split.getLength();

end = split.getStart() + split.getLength();
recordsReadSinceLastSync = 0;
return dataFileReader;
}

@Override
public boolean reachedEnd() throws IOException {
return !dataFileReader.hasNext() || dataFileReader.pastSync(end);
}

public long getRecordsReadFromBlock() {
return this.recordsReadSinceLastSync;
}

@Override
public E nextRecord(E reuseValue) throws IOException {
if (reachedEnd()) {
return null;
}

// if we start a new block, then register the event, and
// restart the counter.
if(dataFileReader.previousSync() != lastSync) {
lastSync = dataFileReader.previousSync();
recordsReadSinceLastSync = 0;
}
recordsReadSinceLastSync++;

if (reuseAvroValue) {
return dataFileReader.next(reuseValue);
} else {
Expand All @@ -148,4 +171,34 @@ public E nextRecord(E reuseValue) throws IOException {
}
}
}

// --------------------------------------------------------------------------------------------
// Checkpointing
// --------------------------------------------------------------------------------------------

@Override
public Tuple2<Long, Long> getCurrentState() throws IOException {
return new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync);
}

@Override
public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");

this.open(split);
if (state.f0 != -1) {

// go to the block we stopped
lastSync = state.f0;
dataFileReader.seek(lastSync);

// read until the record we were before the checkpoint and discard the values
long recordsToDiscard = state.f1;
for(int i = 0; i < recordsToDiscard; i++) {
dataFileReader.next(null);
recordsReadSinceLastSync++;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.io.avro.generated.Fixed16;
import org.apache.flink.api.io.avro.generated.User;
import org.apache.flink.api.java.io.AvroInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -166,7 +167,7 @@ public void testSplittedIF() throws IOException {
Configuration parameters = new Configuration();

AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);

format.configure(parameters);
FileInputSplit[] splits = format.createInputSplits(4);
assertEquals(splits.length, 4);
Expand All @@ -191,6 +192,98 @@ public void testSplittedIF() throws IOException {
format.close();
}

@Test
public void testAvroRecoveryWithFailureAtStart() throws Exception {
final int recordsUntilCheckpoint = 132;

Configuration parameters = new Configuration();

AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
format.configure(parameters);

FileInputSplit[] splits = format.createInputSplits(4);
assertEquals(splits.length, 4);

int elements = 0;
int elementsPerSplit[] = new int[4];
for(int i = 0; i < splits.length; i++) {
format.reopen(splits[i], format.getCurrentState());
while(!format.reachedEnd()) {
User u = format.nextRecord(null);
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
elements++;

if(format.getRecordsReadFromBlock() == recordsUntilCheckpoint) {

// do the whole checkpoint-restore procedure and see if we pick up from where we left off.
Tuple2<Long, Long> state = format.getCurrentState();

// this is to make sure that nothing stays from the previous format
// (as it is going to be in the normal case)
format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);

format.reopen(splits[i], state);
assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint);
}
elementsPerSplit[i]++;
}
format.close();
}

Assert.assertEquals(1539, elementsPerSplit[0]);
Assert.assertEquals(1026, elementsPerSplit[1]);
Assert.assertEquals(1539, elementsPerSplit[2]);
Assert.assertEquals(896, elementsPerSplit[3]);
Assert.assertEquals(NUM_RECORDS, elements);
format.close();
}

@Test
public void testAvroRecovery() throws Exception {
final int recordsUntilCheckpoint = 132;

Configuration parameters = new Configuration();

AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
format.configure(parameters);

FileInputSplit[] splits = format.createInputSplits(4);
assertEquals(splits.length, 4);

int elements = 0;
int elementsPerSplit[] = new int[4];
for(int i = 0; i < splits.length; i++) {
format.open(splits[i]);
while(!format.reachedEnd()) {
User u = format.nextRecord(null);
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
elements++;

if(format.getRecordsReadFromBlock() == recordsUntilCheckpoint) {

// do the whole checkpoint-restore procedure and see if we pick up from where we left off.
Tuple2<Long, Long> state = format.getCurrentState();

// this is to make sure that nothing stays from the previous format
// (as it is going to be in the normal case)
format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);

format.reopen(splits[i], state);
assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint);
}
elementsPerSplit[i]++;
}
format.close();
}

Assert.assertEquals(1539, elementsPerSplit[0]);
Assert.assertEquals(1026, elementsPerSplit[1]);
Assert.assertEquals(1539, elementsPerSplit[2]);
Assert.assertEquals(896, elementsPerSplit[3]);
Assert.assertEquals(NUM_RECORDS, elements);
format.close();
}

/*
This test is gave the reference values for the test of Flink's IF.

Expand Down