Skip to content

Conversation

@stevenzwu
Copy link
Contributor

This is useufl when we want to use smaller row group size. In the past, we also found that tuning these configurations to smaller value is important lto avoid OOM problem when there are large records (like MBs). E.g., in the past, we have set the min/max check count default to 10/100 by default for the Flink streaming ingestion part. The performance impact is very insignificant. For streams with large records, we have even set the min/max to much smaller value like 1/5.

class ParquetWriter<T> implements FileAppender<T>, Closeable {

private static DynConstructors.Ctor<PageWriteStore> pageStoreCtorParquet = DynConstructors
private static final DynConstructors.Ctor<PageWriteStore> pageStoreCtorParquet = DynConstructors
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, I also fixed this compiler warnings and RuntimeIOException deprecation (to UncheckedIOException). Please let me know if it is preferred to leave those out as a separate PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on that! I think we are accumulating more and more of these warnings, better to fix them as much as possible along the way.

@stevenzwu
Copy link
Contributor Author

@rdblue can you help take a look?

Copy link
Contributor

@jackye1995 jackye1995 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me.

The only concern I have is that this seems to be a configuration specific to the use case on engine side. For example, in streaming we might want to turn this to smaller number, but for normal Spark ingestion the defaults might work better.

But as of today we don't have a way to add runtime session options to the appender, it just takes table properties. I don't have enough historical context to say if that's intentional or not, maybe @rdblue can comment more about this. But overall I think it good that at least we have a config rather than hard-coded values.

class ParquetWriter<T> implements FileAppender<T>, Closeable {

private static DynConstructors.Ctor<PageWriteStore> pageStoreCtorParquet = DynConstructors
private static final DynConstructors.Ctor<PageWriteStore> pageStoreCtorParquet = DynConstructors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on that! I think we are accumulating more and more of these warnings, better to fix them as much as possible along the way.

"write.parquet.row-group-check-max-record-count";
public static final String DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT =
"write.delete.parquet.row-group-check-max-record-count";
public static final String PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT = "10000";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be integer

"write.parquet.row-group-check-min-record-count";
public static final String DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT =
"write.delete.parquet.row-group-check-min-record-count";
public static final String PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT_DEFAULT = "100";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be integer

String compressionLevel = config.getOrDefault(PARQUET_COMPRESSION_LEVEL, PARQUET_COMPRESSION_LEVEL_DEFAULT);

return new Context(rowGroupSize, pageSize, dictionaryPageSize, codec, compressionLevel);
int rowGroupCheckMinRecordCount = Integer.parseInt(config.getOrDefault(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use PropertyUtil.propertyAsInt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally, I was trying to stay with the existing code/style in this class. In the new commit, I have updated Integer.partInt to PropertyUtil.propertyAsInt for all integer configs (old or new)

PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT, PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT_DEFAULT));
int rowGroupCheckMaxRecordCount = Integer.parseInt(config.getOrDefault(
PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT, PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to add some basic validations such as the config value is positive, max is greater than min.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx for pointing this out. added validations.

@github-actions github-actions bot added the spark label Sep 30, 2021
@stevenzwu
Copy link
Contributor Author

stevenzwu commented Sep 30, 2021

@jackye1995 Please take another look.

Regarding your comment that new configs is more specific to the use case on engine side, I think this is not engine specific. Sure, it probably matters a little more on the streaming ingestion (Flink or Spark streaming). It can matter to batch write too.

E.g., We want to have smaller row group size (like 16 MB) to be able to split files into more splits for higher parallelism. if the average row size is big (like MBs), then we need to tune down these configs to have more accurate control on the target row group size. This is useful if we want more accurate control on the row group size (and memory consumption) irrespective to streaming or batch write.

@stevenzwu stevenzwu force-pushed the parquetWriter branch 2 times, most recently from e02dac1 to 817f426 Compare November 9, 2021 01:41
@stevenzwu stevenzwu closed this Nov 9, 2021
@stevenzwu stevenzwu reopened this Nov 9, 2021
@stevenzwu
Copy link
Contributor Author

@rdblue can you help take a look?

int rowGroupSize = PropertyUtil.propertyAsInt(config,
PARQUET_ROW_GROUP_SIZE_BYTES, PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT);
Preconditions.checkArgument(rowGroupSize > 0,
"Row group size must be > 0");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be on a separate line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

// Even though row group size is 16 bytes, we still have to write 101 records
// as default PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT is 100.
File parquetFile = generateFileWithTwoRowGroups(null, 101, props)
.first();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be on a separate line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}

private Pair<File, Long> generateFileWithTwoRowGroups(Function<MessageType, ParquetValueWriter<?>> createWriterFunc)
private Pair<File, Long> generateFileWithTwoRowGroups(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The modification to these tests doesn't seem to fit. This is pulling the writer configuration out of this method, but still using the name generateFileWithTwoRowGroups even though there's no longer a guarantee that will actually happen. I think it would be better to build the properties in this method and pass in settings:

private ... generateFile(Function<...> createWriterFunc, Long rowGroupSizeBytes, Integer minCheckRecordCount, Integer maxCheckRecordCount)

That way, all this function does is create the file. I think it should also set its own desired record count based on the min row group count so you don't have to pass in 1 more than the row group size in records.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the method to build the props inside this method.

desiredRecordCount is already part of the method signature. No more hardcoded 1 more than the row group size in records.

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main changes look good, but I think this could make the test modifications more clear.

This is useufl when we want to use smaller row group size. In the past, we also found that tuning these configurations to smaller value is important lto avoid OOM problem when there are large records (like MBs). E.g., in the past, we have set the min/max check count default to 10/100 by default for the Flink streaming ingestion part. The performance impact is very insignificant. For streams with large records, we have even set the min/max to much smaller value like 1/5.
Copy link
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@rdblue rdblue merged commit 9cc2ac8 into apache:master Feb 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants