Skip to content

Commit

Permalink
[FLINK-24565][avro] Port avro file format factory to BulkReaderFormat…
Browse files Browse the repository at this point in the history
…Factory
  • Loading branch information
tsreaper committed Dec 14, 2021
1 parent 481f305 commit d2f5ec7
Show file tree
Hide file tree
Showing 9 changed files with 865 additions and 201 deletions.
8 changes: 8 additions & 0 deletions flink-formats/flink-avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ under the License.

<!-- Table ecosystem and filesystem connector -->

<!-- Data stream projects depending on this project might not depend on flink-table-*. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
Expand Down Expand Up @@ -84,6 +85,13 @@ under the License.

<!-- Tests -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-testing</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.avro;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.util.CheckpointedPosition;
import org.apache.flink.connector.file.src.util.IteratorResultIterator;
import org.apache.flink.connector.file.src.util.Pool;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.utils.FSDataInputStreamWrapper;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.DatumReader;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Iterator;

/** Provides a {@link BulkFormat} for Avro records. */
@Internal
public abstract class AbstractAvroBulkFormat<A, T, SplitT extends FileSourceSplit>
implements BulkFormat<T, SplitT> {

private static final long serialVersionUID = 1L;

protected final Schema readerSchema;

protected AbstractAvroBulkFormat(Schema readerSchema) {
this.readerSchema = readerSchema;
}

@Override
public AvroReader createReader(Configuration config, SplitT split) throws IOException {
open(split);
return createReader(split);
}

@Override
public AvroReader restoreReader(Configuration config, SplitT split) throws IOException {
open(split);
return createReader(split);
}

@Override
public boolean isSplittable() {
return true;
}

private AvroReader createReader(SplitT split) throws IOException {
long end = split.offset() + split.length();
if (split.getReaderPosition().isPresent()) {
CheckpointedPosition position = split.getReaderPosition().get();
return new AvroReader(
split.path(),
split.offset(),
end,
position.getOffset(),
position.getRecordsAfterOffset());
} else {
return new AvroReader(split.path(), split.offset(), end, -1, 0);
}
}

protected void open(SplitT split) {}

protected abstract T convert(A record);

protected abstract A createReusedAvroRecord();

private class AvroReader implements BulkFormat.Reader<T> {

private final DataFileReader<A> reader;

private final long end;
private final Pool<A> pool;

private long currentBlockStart;
private long currentRecordsToSkip;

private AvroReader(Path path, long offset, long end, long blockStart, long recordsToSkip)
throws IOException {
A reuse = createReusedAvroRecord();

this.reader = createReaderFromPath(path);
if (blockStart >= 0) {
reader.seek(blockStart);
} else {
reader.sync(offset);
}
for (int i = 0; i < recordsToSkip; i++) {
reader.next(reuse);
}

this.end = end;
this.pool = new Pool<>(1);
this.pool.add(reuse);

this.currentBlockStart = reader.previousSync();
this.currentRecordsToSkip = recordsToSkip;
}

private DataFileReader<A> createReaderFromPath(Path path) throws IOException {
FileSystem fileSystem = path.getFileSystem();
DatumReader<A> datumReader = new GenericDatumReader<>(null, readerSchema);
SeekableInput in =
new FSDataInputStreamWrapper(
fileSystem.open(path), fileSystem.getFileStatus(path).getLen());
return (DataFileReader<A>) DataFileReader.openReader(in, datumReader);
}

@Nullable
@Override
public RecordIterator<T> readBatch() throws IOException {
A reuse;
try {
reuse = pool.pollEntry();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(
"Interrupted while waiting for the previous batch to be consumed", e);
}

if (!readNextBlock()) {
pool.recycler().recycle(reuse);
return null;
}

currentBlockStart = reader.previousSync();
Iterator<T> iterator =
new AvroBlockIterator(
reader.getBlockCount() - currentRecordsToSkip, reader, reuse);
long recordsToSkip = currentRecordsToSkip;
currentRecordsToSkip = 0;
return new IteratorResultIterator<>(
iterator,
currentBlockStart,
recordsToSkip,
() -> pool.recycler().recycle(reuse));
}

private boolean readNextBlock() throws IOException {
// read the next block with reader,
// returns true if a block is read and false if we reach the end of this split
return reader.hasNext() && !reader.pastSync(end);
}

@Override
public void close() throws IOException {
reader.close();
}
}

private class AvroBlockIterator implements Iterator<T> {

private long numRecordsRemaining;
private final DataFileReader<A> reader;
private final A reuse;

private AvroBlockIterator(long numRecordsRemaining, DataFileReader<A> reader, A reuse) {
this.numRecordsRemaining = numRecordsRemaining;
this.reader = reader;
this.reuse = reuse;
}

@Override
public boolean hasNext() {
return numRecordsRemaining > 0;
}

@Override
public T next() {
try {
numRecordsRemaining--;
// reader.next merely deserialize bytes in memory to java objects
// and will not read from file
return convert(reader.next(reuse));
} catch (IOException e) {
throw new RuntimeException(
"Encountered exception when reading from avro format file", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,21 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory;
import org.apache.flink.connector.file.table.factories.BulkWriterFormatFactory;
import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.types.DataType;
Expand All @@ -36,6 +43,7 @@
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
Expand All @@ -49,10 +57,28 @@

/** Avro format factory for file system. */
@Internal
public class AvroFileFormatFactory implements BulkWriterFormatFactory {
public class AvroFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {

public static final String IDENTIFIER = "avro";

@Override
public BulkDecodingFormat<RowData> createDecodingFormat(
DynamicTableFactory.Context context, ReadableConfig formatOptions) {
return new BulkDecodingFormat<RowData>() {
@Override
public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
DynamicTableSource.Context sourceContext, DataType producedDataType) {
return new AvroGenericRecordBulkFormat(
sourceContext, (RowType) producedDataType.getLogicalType().copy(false));
}

@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
};
}

@Override
public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(
DynamicTableFactory.Context context, ReadableConfig formatOptions) {
Expand Down Expand Up @@ -90,6 +116,46 @@ public Set<ConfigOption<?>> optionalOptions() {
return options;
}

private static class AvroGenericRecordBulkFormat
extends AbstractAvroBulkFormat<GenericRecord, RowData, FileSourceSplit> {

private static final long serialVersionUID = 1L;

private final RowType producedRowType;
private final TypeInformation<RowData> producedTypeInfo;

private transient AvroToRowDataConverters.AvroToRowDataConverter converter;
private transient GenericRecord reusedAvroRecord;

public AvroGenericRecordBulkFormat(
DynamicTableSource.Context context, RowType producedRowType) {
super(AvroSchemaConverter.convertToSchema(producedRowType));
this.producedRowType = producedRowType;
this.producedTypeInfo = context.createTypeInformation(producedRowType);
}

@Override
protected void open(FileSourceSplit split) {
converter = AvroToRowDataConverters.createRowConverter(producedRowType);
reusedAvroRecord = new GenericData.Record(readerSchema);
}

@Override
protected RowData convert(GenericRecord record) {
return record == null ? null : (GenericRowData) converter.convert(record);
}

@Override
protected GenericRecord createReusedAvroRecord() {
return reusedAvroRecord;
}

@Override
public TypeInformation<RowData> getProducedType() {
return producedTypeInfo;
}
}

/**
* A {@link BulkWriter.Factory} to convert {@link RowData} to {@link GenericRecord} and wrap
* {@link AvroWriterFactory}.
Expand Down

0 comments on commit d2f5ec7

Please sign in to comment.