Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-24565][avro] Port avro file format factory to BulkReaderFormatFactory #17520

Merged
merged 1 commit into from Dec 14, 2021
Merged

Conversation

tsreaper
Copy link
Contributor

@tsreaper tsreaper commented Oct 19, 2021

What is the purpose of the change

This PR ports the avro file format to the new source API by implementing the BulkFormat and BulkReaderFormatFactory for avro.

Brief change log

  • Port avro file format factory to BulkReaderFormatFactory

Verifying this change

This change added tests and can be verified by running the added tests.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): yes
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes (FileSystemConnectorOptions moved from flink-table-runtime to flink-table-api-java. This is a compatible change because flink-table-api-java is the dependency of flink-table-runtime)
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 19, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 19, 2021

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit d2f5ec7 (Tue Dec 14 09:06:49 UTC 2021)

Warnings:

  • 1 pom.xml files were touched: Check for build and licensing issues.
  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

Copy link
Contributor

@slinkydeveloper slinkydeveloper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any particular reason why you have chosen to implement BulkFormat more than StreamFormat? BulkFormat is way more complex and I see no reason to use it for this particular format, StreamFormat already adapts pretty well to the DataFileReader from avro.

Because you're implementing BulkFormat, you have a lot of additional complexity (e.g. the BlockingQueue), both in runtime code and in test code. Right now table api doesn't have an interface to discover DecodingFormatFactory<StreamFormat<RowData>>, but you can easily add it in common:

public interface StreamFormatFactory extends DecodingFormatFactory<StreamFormat<RowData>> {}

And then add the required code to use it in the FileSystemTableSource.

* SerializationSchema} and {@link DeserializationSchema}.
*/
@Internal
public class AvroFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why removing this? We still need it for Kafka and other connectors that require avro deserialization record by record

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this depedency change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For FileSystemConnectorOptions.PARTITION_DEFAULT_NAME in AvroFileFormatFactory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avro could be scala free.
Can we move FileSystemConnectorOptions to common?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say flink-table-api-java. This FileSystemConnectorOptions is a @PublicEvolving API.

Comment on lines 76 to 81
RowType physicalRowype =
(RowType)
context.getCatalogTable()
.getResolvedSchema()
.toPhysicalRowDataType()
.getLogicalType();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to just use producedDataType here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get it. producedDataType is the data type produced by this source operator after projection push down. It might be different from what is stored in the avro file. For example, if this source is partitioned then partition keys are stored in file path, not in avro file. Also the order of fields in producedDataType might be different from physicalRowType, and I need to map the fields in AvroGenericRecordBulkFormat.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From FileSystemTableSource, producedDataType type includes:

  • projected fields
  • partition fields

For the partition fields, as you sad, we should extract them from the FileSourceSplit, but then we should not extract and convert the non projected fields, we should simply ignore them. Extracting all the fields, and then returning only the projected ones, requires you to do a row copy https://github.com/apache/flink/pull/17520/files#diff-0f3083989c5687db08869ff72bcba9c9746c2441e40fd1f0cd75e1eab84b16c8R258 which we should avoid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean we should only read the fields which are presented in producedDataType from avro files and should not read other fields from avro files? If yes I shall take a look to determine if this is possible. If not, keeping the fields even if they're not in producedDataType will cause the result of this source to be incorrect. The resulting row must fit producedDataType.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The resulting row must fit producedDataType.

I think so, if you wanna try out, just write a test with a table and a select, and make sure in the select you don't pick all the fields of the input table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've checked the avro code once again. Avro reader will always provide the complete record in avro files, so what we can do is to selectively convert avro objects into Flink objects in AvroToRowDataConverters#createRowConverter. This is a good point as it not only does some optimization but also hides the projection logic in a common method. However this change is sort of separated from this PR and I would like to do it in a new PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tsreaper I think we can try again. Avro API seems to have provided the capability of projection:
https://icircuit.net/avro-schema-projection/1446

.getResolvedSchema()
.toPhysicalRowDataType()
.getLogicalType();
List<String> physicalFieldNames = physicalRowype.getFieldNames();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use DataType.getFieldNames() static method

@tsreaper
Copy link
Contributor Author

tsreaper commented Oct 20, 2021

@slinkydeveloper There are four reasons why I did not choose StreamFormat.

  1. The biggest concern is that StreamFormatAdapter.Reader#readBatch stores all results in a batch in heap memory. This is bad because avro is a format which supports compression. You'll never know how much data will be stuffed into heap memory after inflation.
  2. StreamFormat, from its concept, is for a stream of bytes where each record is shipped independently. Avro is a file format which organizes the records in its own blocks, so they do not match from the concept. I would say csv format will be more suitable for StreamFormat.
  3. StreamFormatAdapter cuts batches by counting number of bytes read from the file stream. If the sync size of avro is 2MB it will read 2M bytes from file in one go and produce a batch containing no records. However this only happens at the beginning of reading a file so this might be OK.
  4. Both orc and parquet formats have implemented BulkFormat instead of StreamFormat, so why not StreamFormat for them?

@JingsongLi
Copy link
Contributor

Hi @slinkydeveloper @tsreaper , I think Avro and StreamFormat are in conflict conceptually.
In the comments of StreamFormat:

Compared to the {@link BulkFormat}, the stream format handles a few things out-of-the-box, like deciding how to batch records or dealing with compression.

But Avro has own batch and compression logical. This conflict does lead to poor work in some places.

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tsreaper , left some comments.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avro could be scala free.
Can we move FileSystemConnectorOptions to common?


protected void open(SplitT split) {}

abstract T convert(A record);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

protected


abstract T convert(A record);

abstract A getReusedAvroObject();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

protected


abstract T convert(A record);

abstract A getReusedAvroObject();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createAvroRecord?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is sort of misleading as it seems that we can get data from this method. I would prefer createReusedAvroRecord.

private final A reuse;

private final long end;
private final BlockingQueue<Boolean> blockingQueue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use flink-connectors-files Pool instead? The record can be reused avro record.

@Nullable
@Override
public RecordIterator<T> readBatch() throws IOException {
if (reachEnd()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pool.pollEntry first.
I don't know there is thread safety bug in Avro reachEnd.

try {
blockingQueue.put(true);
} catch (InterruptedException e) {
throw new RuntimeException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For InterruptedException.
If you catch it, you shoud Thread.currentThread().interrupt();

DynamicTableSource.Context context,
RowType physicalRowType,
String[] physicalFieldNames,
DataType[] physicalFieldTypes,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Obtain physicalFieldTypes from physicalRowType?

@slinkydeveloper
Copy link
Contributor

The biggest concern is that StreamFormatAdapter.Reader#readBatch stores all results in a batch in heap memory. This is bad because avro is a format which supports compression. You'll never know how much data will be stuffed into heap memory after inflation.
But Avro has own batch and compression logical. This conflict does lead to poor work in some places.

Can you please elaborate? I don't understand where in this code changes we exploit the avro batch and compression logic. From the code in this PR, I still see we invoke the data file iterator record per record https://github.com/apache/flink/pull/17520/files#diff-07c21eca7aca500ba4675ecd3ace539cf31d69797fd254b7b914d22198a789baR157.

}

try {
blockingQueue.put(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that FileSource read splits using a single thread (check the javadoc of SingleThreadMultiplexSourceReaderBase), so why do you need a synchronization primitive here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SingleThreadMultiplexSourceReaderBase indeed reads splits with only one thread. But BulkFormats are used in fetching threads, not in reading threads. The big picture of a FLIP-27 source is that splits fetching and the actual readings are done in separate threads. They form a producer-consumer module and communicate with a FutureCompletingBlockingQueue. See this code.

Note that in this PR, the results of an avro block is lazily produced. So when a block is enqueued, the contents of that block hasn't been read. In this code you can see that the fetch task repeatedly adds blocks to the queue until the avro reader states that there is no more blocks. At this time the fetch task will close the avro reader without consuming the records. Thus when the file connector reader comes and tries to read from avro blocks, an exception stating that the avro reader has been closed will be thrown.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then why StreamFormatAdapter doesn't have any sync primitive in its readBatch method? And note that each readBatch reuses the same instance of FSDataInputStream, which is definitely not suitable for multithreaded usage. Same discussion applies to DeserializationSchemaAdapter.

Also, from FLIP-27, It states the reading is done sequentially for files: https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-BaseImplementationBaseimplementationandhigh-levelreaders but this information might be old...

They form a producer-consumer module and communicate with a FutureCompletingBlockingQueue

I see this, but then shouldn't we make sure that the consumer thread is only and only one? At the end of the day, your code just serializes readings of these several consumer threads.

Copy link
Contributor Author

@tsreaper tsreaper Oct 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why StreamFormatAdapter doesn't have any sync primitive in its readBatch method

This depends on the implementation of each specific format.

StreamFormatAdapter has extracted all records in a batch into memory once the batch is produced, so it is ok for the reader to close before the batch is consumed because the records are already there and there is no need for a reader anymore.

This PR, on the other hand, does not extract all records into memory when a batch is produced. It only stores the iterator. To iterate through the batch when consuming, the reader is still needed. ParquetColumnarRowInputFormat also takes this method, see here.

each readBatch reuses the same instance of FSDataInputStream, which is definitely not suitable for multithreaded usage

I think the "reader" here is sort of misleading. I'll now refer to the fetcher in FLIP-27 as "producer", the reader (NOT avro reader) in FLIP-27 as "consumer". Avro reader only lives in producers and not in consumers.

That FSDataInputStream is only used for that file split, while each file split is only handled by a single producer, so there is no multi-threaded problem here. But producers and consumers are in different threads. The same FSDataInputStream is handled by the same producer, and the results are given to some consumers. So it is true that reading is done sequentially for files.

FLIP-27 separates producers and consumers to prevent checkpoint blockage if it takes too long to read from an external system.


I think the best way to understand this problem is to remove this blocking queue. Add some outputs at the beginning of AbstractAvroBulkFormat.AvroReader#readBatch, AbstractAvroBulkFormat.AvroReader#close and the hasNext or next method of the iterator in AbstractAvroBulkFormat.AvroReader#readBatch and run all test cases in avro module to see what will fail.

You'll see that avro reader is closed before all records are consumed and an AsyncClosingException will be thrown when trying to read records from the batch.

@tsreaper
Copy link
Contributor Author

@slinkydeveloper

The biggest concern is that StreamFormatAdapter.Reader#readBatch stores all results in a batch in heap memory.

See this code. StreamFormatAdapter.Reader#readBatch stores all results in the current batch in an ArrayList.

On the other hand, this PR implements a BulkFormat.Reader by only storing the iterator instead of the actual results.

This is bad because avro is a format which supports compression.
But Avro has own batch and compression logical.

See this code. This PR is using the avro reader as if all records are read from stream one by one. This is not true because avro has wrapped the decompression logic in its own reader (by reading the whole block at a time and decompress it in memory). We're reading records one by one from the reader, not from the stream. StreamFormatAdapter.Reader does not know anything about the avro reader. Its only concern is the number of bytes read from the raw stream.

One might argue that avro reader is also storing decompression results in memory and is not doing anything fancy. Yes. But by deserializing the bytes and converting them into Java objects we're doubling the memory cost. It might be even worse because of Java object overhead.

@JingGe
Copy link
Contributor

JingGe commented Oct 20, 2021

@slinkydeveloper There are four reasons why I did not choose StreamFormat.

  1. The biggest concern is that StreamFormatAdapter.Reader#readBatch stores all results in a batch in heap memory. This is bad because avro is a format which supports compression. You'll never know how much data will be stuffed into heap memory after inflation.
  2. StreamFormat, from its concept, is for a stream of bytes where each record is shipped independently. Avro is a file format which organizes the records in its own blocks, so they do not match from the concept. I would say csv format will be more suitable for StreamFormat.
  3. StreamFormatAdapter cuts batches by counting number of bytes read from the file stream. If the sync size of avro is 2MB it will read 2M bytes from file in one go and produce a batch containing no records. However this only happens at the beginning of reading a file so this might be OK.
  4. Both orc and parquet formats have implemented BulkFormat instead of StreamFormat, so why not StreamFormat for them?

The consideration behind your solution was great! Thanks for your contribution. I will try to share what I understood with you. Let's discuss and understand the design together. Correct me if I am wrong.

For point 1, the uncompressed data size should be controlled by StreamFormat.FETCH_IO_SIZE. It might not be very precise to control the heap size, since the last read might overfulfil the quota a little bit, but it is acceptable. Speaking of compression, StreamFormatAdapter has built-in compressors support. Does this PR implementation have the same support too?

For point 2, StreamFormat defines a way to read each record. The granularity of each record could be controlled by the generic type StreamFormat.Reader<T>. There is plenty room to play if single avro record is too small in this case.

For point 4, it is a good question, we should deep dive into the code. Might It make sense to refactor the orc and parquet formats to StreamFormat too?

@tsreaper
Copy link
Contributor Author

tsreaper commented Oct 21, 2021

@JingGe

For point 1, the uncompressed data size should be controlled by StreamFormat.FETCH_IO_SIZE. It might not be very precise to control the heap size, since the last read might overfulfil the quota a little bit, but it is acceptable.

This is not the case. For example xz compression comes with a compression ratio of ~15% (google xz compression ratio if you want to confirm). Note that avro can be represented both in json and in compact binary form, so you may expect a 6x inflation after uncompressing the data. It will become worse as Java objects always come with extra overhead and this is not "overfulfil the quota a little bit".

StreamFormatAdapter has built-in compressors support. Does this PR implementation have the same support too?

If you take a look at the implementation of StreamFormatAdapter you'll find that it supports decompression by calling StandardDeCompression#getDecompressorForFileName, which determines the decompressor by the file extensions. Avro files are often ends with .avro so there will be no match.

Also avro files are compressed by blocks. Avro files contain their own magic numbers, specific headers and block splitters which cannot be understood by the standard xz or bzip2 decompressor. You have to use the avro reader to interpret the file and the avro reader will deal with all the work like decompression or such.

For point 2, StreamFormat defines a way to read each record.

The problem is that you just cannot read one record at a time from an avro file stream. Avro readers read one block at a time from the file stream and store the inflated raw bytes in memory. For detailed code see my reply to @slinkydeveloper.

@JingGe
Copy link
Contributor

JingGe commented Oct 21, 2021

@tsreaper

For point 1, the uncompressed data size should be controlled by StreamFormat.FETCH_IO_SIZE. It might not be very precise to control the heap size, since the last read might overfulfil the quota a little bit, but it is acceptable.

This is not the case. For example xz compression comes with a compression ratio of ~15% (google xz compression ratio if you want to confirm). Note that avro can be represented both in json and in compact binary form, so you may expect a 6x inflation after uncompressing the data. It will become worse as Java objects always come with extra overhead and this is not "overfulfil the quota a little bit".

Don't get me wrong. I am talking about the architecture design. "overfulfil the quota a little bit" has the context of "last read". This has nothing to do with the inflation. Speaking of inflation, the concrete implementation should use StreamFormat.FETCH_IO_SIZE to control the heap memory usage for the uncompressed data, as an example, AbstractLZ77CompressorInputStream#getSize() returns the uncompressed size of the stream. This is not the reason to choose BulkFormat over StreamFormat.

StreamFormatAdapter has built-in compressors support. Does this PR implementation have the same support too?

If you take a look at the implementation of StreamFormatAdapter you'll find that it supports decompression by calling StandardDeCompression#getDecompressorForFileName, which determines the decompressor by the file extensions. Avro files are often ends with .avro so there will be no match.

Also avro files are compressed by blocks. Avro files contain their own magic numbers, specific headers and block splitters which cannot be understood by the standard xz or bzip2 decompressor. You have to use the avro reader to interpret the file and the avro reader will deal with all the work like decompression or such.

That's right. That is exactly a good reason to extend the decompression logic in the StreamFormatAdapter to fulfil the avro requirement. Software goes robust in this way.

For point 2, StreamFormat defines a way to read each record.

The problem is that you just cannot read one record at a time from an avro file stream. Avro readers read one block at a time from the file stream and store the inflated raw bytes in memory. For detailed code see my reply to @slinkydeveloper.

again, I am talking about the architecture design. "record" is the abstract concept, it does not mean the record in avro. You can control the granularity of the abstract "record". This is the beauty of OOD.

@tsreaper
Copy link
Contributor Author

tsreaper commented Oct 21, 2021

@JingGe

"record" is the abstract concept, it does not mean the record in avro.

Are you suggesting an avro StreamFormat which produces an avro block, instead of a Flink row data, at a time? If yes we'll need another operator after the source to break the block into several row data. Why not leave all these work inside the source?

"overfulfil the quota a little bit" has the context of "last read". This has nothing to do with the inflation.

I guess by "overfulfil the quota a little bit" you mean the number of bytes read from the stream. This is true but what I'm considering is that StreamFormatAdapter.Reader is storing all the results in a batch in memory at the same time (see this code and also my reply to @slinkydeveloper). This might cause OOM for a highly compressed file.

One way to work around this is to create a StreamFormatAdapter.Reader which uses iterators, but I guess this is another topic.

That is exactly a good reason to extend the decompression logic in the StreamFormatAdapter to fulfil the avro requirement. Software goes robust in this way.

I guess you've mistaken avro as a compression algorithm.

Avro is not a compression algorithm or such. It is a type of row-oriented file format and you can see it as normal files like .xls or .json. We won't say this is a xls compression file because xls is not for compression but for recording data, although it uses some compression to help shrink the file size.

This is why avro exists as a sole module in flink-formats, instead of staying with the compressors in StandardDeCompression (you don't want to put a xls resolver in that file, do you). If you really do this you'll be essentially moving the whole avro module into StandardDeCompression.

@slinkydeveloper
Copy link
Contributor

slinkydeveloper commented Oct 21, 2021

@tsreaper I start getting your point of lazyness, but now I wonder, why is Avro different from, for example parquet? In parquet format, I see that when you invoke readBatch you load in memory the actual data and then return an iterator which iterates on those. So the I/O is performed within the readBatch invocation. Same for all the other formats I've looked at.

Your lazy reading is changing the threading model of the source, because with this PR the "consumer" thread (SourceReaderBase::pollNext in particular) will be the thread which will perform the actual I/O processing. Why is Avro special in this sense? Why don't we do the same for Parquet? And are we sure this is the way the source architecture is intended to work? Couldn't this cause issues because the original design might have not considered performing I/O within the consumer thread?

Even with lazyness, I still think we don't need concurrency primitives to serialize nextBatch invocations because each SourceOperator has its own instance of SourceReader (look at the method SourceOperator::initReader) and from SourceOperator the method SourceReaderBase::pollNext is invoked, which triggers the actual reading of avro records from your lazy operator.

@JingGe
Copy link
Contributor

JingGe commented Oct 21, 2021

@tsreaper

Are you suggesting an avro StreamFormat which produces an avro block, instead of a Flink row data, at a time? If yes we'll need another operator after the source to break the block into several row data. Why not leave all these work inside the source?

It is an option. Even with avro record, it should be fine to use StreamFormat. You can use avro reader in your own StreamFormat implementation, according to the code you referred to. I don't see any conflict between StreamFormat reading record one by one and avro reader reading blocks. Maybe I didn't get your point.

I guess by "overfulfil the quota a little bit" you mean the number of bytes read from the stream. This is true but what I'm considering is that StreamFormatAdapter.Reader is storing all the results in a batch in memory at the same time (see this code and also my reply to @slinkydeveloper). This might cause OOM for a highly compressed file.

If you can use StreamFormat.FETCH_IO_SIZE to control the uncompressed data size, how could OOM happen?

Avro is not a compression algorithm or such. It is a type of row-oriented file format and you can see it as normal files like .xls or .json. We won't say this is a xls compression file because xls is not for compression but for recording data, although it uses some compression to help shrink the file size.

This is why avro exists as a sole module in flink-formats, instead of staying with the compressors in StandardDeCompression (you don't want to put a xls resolver in that file, do you). If you really do this you'll be essentially moving the whole avro module into StandardDeCompression.

It is right to have avro as a sub module in flink-formats. What is the 'StandardDeCompression'? Do you mean "StandardDeCompressors"? It is a class not a module. I know what avro is, and it goes even more complicated with parquet where each column may have its own compression codec. But this is another story. Since avro reader will handle the decompression logic i.e. block.decompressUsing(codec), we don't even need to touch the compression logic built in the StreamFormatAdapter. We can delegate the decompression work to avro reader and focus on working on the uncompressed data. In the worst case(which is not exist), let's say there were no avro read handles decompression, StandardDeCompressors should be the place to handle it on our own. After all, the StandardDeCompressors and all related Factories are the abstraction designed for it. They are not limited to apache common compressors and extendable.

@tsreaper
Copy link
Contributor Author

@slinkydeveloper

Your lazy reading is changing the threading model of the source, because with this PR the "consumer" thread (SourceReaderBase::pollNext in particular) will be the thread which will perform the actual I/O processing.

Nice catch! I missed out this part before. Avro reader will read from file when its hasNext method is called and there is no more record in the current batch. If there are still records to consume it will only deserialize them from memory, just like what VectorizedColumnBatch actually does.

To avoid this we should never call the hasNext method if there is no more record. Fortunately avro's DataFileStream#getBlockCount can supply us with the number of records in the current block.

@JingsongLi
Copy link
Contributor

@JingGe @slinkydeveloper Good suggestion, we should put the IO operation on the fetcher side.
@tsreaper +1 to "we should never call the hasNext method if there is no more record (by DataFileStream#getBlockCount)."

@@ -47,13 +47,11 @@ under the License.

<!-- Table ecosystem -->

<!-- Projects depending on this project won't depend on flink-table-*. -->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove optional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, my fault. There are also data stream users using this module and they don't need to see this table dependency.


protected final RowType readerRowType;

protected AbstractAvroBulkFormat(RowType readerRowType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use Schema instead of Schema. I think other implements do not want to see table RowType.

import java.util.Iterator;

/** Provides a {@link BulkFormat} for Avro records. */
public abstract class AbstractAvroBulkFormat<A, T, SplitT extends FileSourceSplit>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

() -> pool.recycler().recycle(reuse));
}

private boolean reachEnd() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invert this method, readNextBlock?

public RecordIterator<T> readBatch() throws IOException {
A reuse;
try {
reuse = pool.pollEntry();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that you read on the producer thread, you don't need the sync primitives anymore I guess.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, have you considered that now you pay the thread to thread overhead for each record?

* <p>Internally in the file source, the readers pass batches of records from the reading threads

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the case. There are two steps for avro readers to perform before producing a record.

  1. Avro reader reads a block from the file, decompress it and store inflated bytes in memory.
  2. Avro reader deserialize the inflated bytes in memory one by one to produce each record.

What I have done is to move step 1 into the fetcher thread, while step 2 is still in the reader thread. This is OK because the actual heavy disk IO is performed only in step 1. ParquetVectorizedInputFormat is also doing the same thing. It returns a VectorizedColumnBatch for each batch and deserialize each column when they are read by the readers. So

Now that you read on the producer thread, you don't need the sync primitives anymore

We still need the reader to be open because inflated bytes are stored there, and only the avro readers know how to deserialize them.

now you pay the thread to thread overhead for each record

I'm still passing batches between threads instead of records. Records are deserialized from batches in reader threads in step 2.

@tsreaper
Copy link
Contributor Author

@flinkbot run azure

}

this.end = end;
this.pool = new Pool<>(1);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain a bit the pool size? If I understand it correctly the fetcher thread is blocked as long the record has not been consumed. In contrary to the ParquetVectorizedInputFormat we cannot configure it here to control the throughput.

I am slightly worried that for small Avro objects the synchronization is more expensive than the actual safe in object allocations.

Do I miss something?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems there was already a draft implementation for avro 11c6060#diff-edfd2d187d920f781382054f22fb4e6e5b5d9361b95a87ebeda68ba3a49d5a55R51 did you know?

Copy link
Contributor Author

@tsreaper tsreaper Oct 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the pool size. ParquetVectorizedInputFormat is an abstract class whose only subclass is ParquetColumnarRowInputFormat. In ParquetColumnarRowInputFormat the pool size is constantly set to 1.

The comments in ParquetColumnarRowInputFormat#numBatchesToCirculate state that

In a VectorizedColumnBatch, the dictionary will be lazied deserialized. If there are multiple batches at the same time, there may be thread safety problems, because the deserialization of the dictionary depends on some internal structures. We need set numBatchesToCirculate to 1.

This PR is facing the same problem with ParquetColumnarRowInputFormat, as deserializing avro records also need some internal avro structures.


About the StreamFormat draft by Stephan a year ago, I don't know about that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am slightly worried that for small Avro objects the synchronization is more expensive than the actual safe in object allocations.

Avro block is 64KB by default, it is larger than MemorySegment in Flink. I think it is OK for the synchronization.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the explanation. I took a deeper look into the pool sizing and the initial ticket https://issues.apache.org/jira/browse/FLINK-21397 unfortunately, only from reading the ticket I could not find the real root cause.

Coming back to this PR.. it adds a lot of complexity to get the best reading performance which is really great although it still seems that by limiting the pool size to 1 we basically serialize the execution of the IO thread and the task read and might lose a lot of the performance we initially tried to gain.

Did you do some benchmarking comparing implementing a simple StreamFormat for Avro similar to @StephanEwen's prototype against the current solution where we enabled the object reuse but are limited to the serialized execution?

For parquet, I would also like to try (in the near future) to further investigate if we can increase the pool size again because I think we might see bigger performance improvements.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to do some benchmark.

If we can increase the pool size of parquet is good.
But I don't think increasing pool size means increasing performance.
When we look at performance, we really look at efficiency, not pure throughput. Flink is a distributed computing system, which means that we can and should increase parallelism to obtain greater throughput, rather than tangle with a single parallelism throughput.

iterator,
currentBlockStart,
recordsToSkip,
() -> pool.recycler().recycle(reuse));
Copy link
Contributor

@JingGe JingGe Nov 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The design of the new FileSource is to separate the fetch thread and the task thread. Let all blocked I/O reading be done in the fetch thread, so that the task threads are blocking-free while reading the data. It is recommended to make batch objects ready in the fetch thread. Afaik, first of all, this implementation builds a iterator which will lazily read data and run I/O blocked reading within the task thread. IMHO, this is against the design of the new FileSource. Second, because the pool size is set to 1 and recycle is called when releaseBatch() is called, which means an iterator with only one element is built for each readBatch() call. All Flink built-in logic to optimise the reading has been therefore bypassed. It might be good to reconsider it. Benchmark should be one option to check the result, but it might not be very precise, because it depends heavily on the environment and testing data. It would be great if we could do it based on real use case and real big data.

Copy link
Contributor Author

@tsreaper tsreaper Nov 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is recommended to make batch objects ready in the fetch thread.

See my comment for detailed explanation.

@tsreaper
Copy link
Contributor Author

tsreaper commented Nov 3, 2021

@slinkydeveloper @fapaul @JingGe

I've done some benchmarking on a testing yarn cluster.

  • Test data: The Kaggle flight delay data, a ~500MB csv file. I've changed it into an avro file with xz compression and 64kb or 2mb block size.
  • Number of task slots: 8
  • Number of task manager: 1
  • Configuration
# common JVM configurations used in a lot of our production job, also for producing the TPCDS benchmark result in Flink 1.12
env.java.opts.jobmanager: -XX:+UseParNewGC -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -verbose:gc -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSMaxAbortablePrecleanTime=1000 -XX:+CMSClassUnloadingEnabled -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:SurvivorRatio=5 -XX:ParallelGCThreads=4
env.java.opts.taskmanager: -XX:+UseParNewGC -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -verbose:gc -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSMaxAbortablePrecleanTime=1000 -XX:+CMSClassUnloadingEnabled -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:SurvivorRatio=5 -XX:ParallelGCThreads=4

# default memory configuration of Flink
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m

I've tested three implementations.

  • Bulk Format + Lazy Reading: This is the implementation of this PR.
  • Bulk Format + ArrayList: This implementation reads and deserialize all records of the whole block into an array list and send it to the reader thread. This implementation does not have a blocking pool as @JingGe suggested. See here for code.
  • Stream Format: This is the implementation based on Stephan's draft. See here for code. I didn't implement projection pushdown for this but it should be fine because there is no projection pushdown in the benchmark.

Here are the test results. I've run a very simple job which reads from the avro file and writes directly into a blackhole sink. The time in the table is the execution time of the only task, excluding the scheduling time.

xz compression, 64kb block size xz compression, 2mb block size
bulk format + lazy reading 14s 10s
bulk format + array list 14s 30s, due to GC, sometimes out of memory
stream format 2m24s, due to heavy GC 51s, due to GC, sometimes out of memory

It is obvious that any implementation which loads all records of a block into memory at once will suffer from GC more or less. Also for smaller block sizes, blocking pool has almost no impact on performance. So I would say the implementation in this PR is the most suited implementation so far.

@JingGe
Copy link
Contributor

JingGe commented Nov 4, 2021

@tsreaper many thanks for your effort and for sharing the benchmark data.

The option of using BulkFormat + ArrayList is almost the same as using StreamFormat+StreamFormatAdapter, except the memory size control. Have you tried to control the number of records each batchRead() will fetch instead of fetch all records of the current block in one shot? Code reference please see here at line 145-148.

For the option of Stream Format based on Stephan's draft, may I know how you controlled the StreamFormat.FETCH_IO_SIZE? Thanks.

@tsreaper
Copy link
Contributor Author

tsreaper commented Nov 5, 2021

@JingGe

Have you tried to control the number of records each batchRead() will fetch instead of fetch all records of the current block in one shot?

No I haven't. But I can come up with one problems about this: Some records may be large, for example json strings containing tens of thousands of characters (this is not rare from the production jobs I've seen so far). If we only control the number of records there is still risk of overwhelming the memory. The other way is to control the actual size of each record, which requires a method to estimate the number of bytes in each record.

how you controlled the StreamFormat.FETCH_IO_SIZE?

Number of bytes read from file is controlled by StreamFormatAdapter.TrackingFsDataInputStream. It is controlled by source.file.stream.io-fetch-size whose default value is 1MB. However there is no use in tuning this value because avro reader (I mean the reader from avro library) will read the whole block from file. If the file size is 2MB it will consume 2MB of bytes and, according to the current logic of StreamFormatAdapter, deserialize all records from that block at once. I've tried to change that config option in the benchmark and it proves me right.

@JingGe
Copy link
Contributor

JingGe commented Nov 8, 2021

@tsreaper

No I haven't. But I can come up with one problems about this: Some records may be large, for example json strings containing tens of thousands of characters (this is not rare from the production jobs I've seen so far). If we only control the number of records there is still risk of overwhelming the memory. The other way is to control the actual size of each record, which requires a method to estimate the number of bytes in each record.

To make the discussion easier, we are talking about the benchmark data whose records have almost same size. For real cases, we can control the number of records dynamically by controlling the bytes read from the inputStream, e.g. in each batchRead, read 5 records for big size records and read 50 records for small size records.

how you controlled the StreamFormat.FETCH_IO_SIZE?

Number of bytes read from file is controlled by StreamFormatAdapter.TrackingFsDataInputStream. It is controlled by source.file.stream.io-fetch-size whose default value is 1MB. However there is no use in tuning this value because avro reader (I mean the reader from avro library) will read the whole block from file. If the file size is 2MB it will consume 2MB of bytes and, according to the current logic of StreamFormatAdapter, deserialize all records from that block at once. I've tried to change that config option in the benchmark and it proves me right.

if you take a close look at the implementation of TrackingFsDataInputStream, you will see how it uses StreamFormat.FETCH_IO_SIZE to control how many records will be read/deserilized from the avro block in each batchRead().

Any way, the benchmark result tells us the truth. Thanks again for sharing it. We will do more deep dive to figure out why using StreamFormat has these memory issues later.

public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
DynamicTableSource.Context sourceContext, DataType producedDataType) {
DataType physicalDataType =
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please rebase on master? I introduced a change to relieve format implementations to take care of partition keys 1ea2102. You can clean up all the code to support partition keys, and you can also revert the moving of FileSystemConnectorOptions.

Copy link
Contributor Author

@tsreaper tsreaper Nov 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love this change. It simplifies code a lot. Are there more jira tickets to simplify existing code like parquet and orc?

Comment on lines 182 to 186
List<String> physicalFieldNames = DataType.getFieldNames(physicalDataType);
int[] selectFieldIndices =
DataType.getFieldNames(producedDataType).stream()
.mapToInt(physicalFieldNames::indexOf)
.toArray();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, no need to project fields etc. Just use the producedDataType provided by you in createRuntimeDecoder

@JingsongLi
Copy link
Contributor

@slinkydeveloper @JingGe @fapaul Do you have other comments?

@fapaul
Copy link

fapaul commented Nov 16, 2021

@tsreaper @JingsongLi thanks for your efforts and for providing the benchmark results. I have no further objections :)

@slinkydeveloper
Copy link
Contributor

Just did another pass, from table point of view this is good to go for me

@StephanEwen
Copy link
Contributor

StephanEwen commented Nov 16, 2021

@tsreaper Thanks for doing the benchmark.

I am curious to understand what the difference is between "bulk format + array list" and "stream format", because the "stream format" also puts deserialized records into an ArrayList. But something must be different, else there would not be such a big performance difference.

Can we try and identify that, and maybe update the StreamFormatAdapter to be better?

I would also be curious to understand where the performance difference with different block sizes come from in the StreamFormat. The stream format counts the batch size bytes after decompression, and it should be independent of Avro's blocks and sync markers, so I am puzzled why it has an impact.

@JingsongLi
Copy link
Contributor

Hi all, I'll give my understanding. (Correct me if I am wrong)

Object ArrayList vs Lazy deserialization

As long as the objects inside the ArrayList do not fall into the GC old area, the performance difference is not significant.

If we use ArrayList. There is a trade-off:

  • Larger capacity: With the complexity of downstream processing, it may cause elements to fall into the GC full zone.
  • Smaller capacity: The extreme case is 1, which is too costly for BlockArrayQueue and seriously affects throughput.

Since this trade-off is more difficult to control, we try not to apply a collection of objects. If we must bundle data, we apply a structure similar to BytesMap (only binary, no objects).

Lazy deserialization in StreamFormat

The key problem is that StreamFormat has no way to know the real demarcation point of the implementation, which may cause the implementation to hit an EOF exception.
Is it possible for StreamFormat to expose a block-like interface that allows implementations to define the demarcation of a block, or each compressed block defines the demarcation point.

@JingsongLi
Copy link
Contributor

Hi @StephanEwen , do you have other comments? We can merge this if everything right.

@JingsongLi
Copy link
Contributor

@tsreaper can you rebase latest master?

Copy link
Contributor

@slinkydeveloper slinkydeveloper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a small comment as things have changed in the meantime 😄 Can you please convert all the used assertions to assertj? Also, for AvroBulkFormatTes can you use junit5?

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me!

@JingsongLi JingsongLi merged commit 388dce6 into apache:master Dec 14, 2021
niklassemmler pushed a commit to niklassemmler/flink that referenced this pull request Feb 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
8 participants