Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-2374: Add metrics support for parquet file reader #1187

Merged
merged 6 commits into from
Jan 12, 2024

Conversation

parthchandra
Copy link
Contributor

Make sure you have checked all steps below.

Jira

This PR addresses the following: PARQUET-2374
Add metrics support for parquet file reader

Tests

No new tests. Adds a new public interface for reporting metrics from the parquet file reader

@parthchandra
Copy link
Contributor Author

Note that this can be used to compare read performance of the existing file reader with the the hadoop vector io reader #1139

@parthchandra
Copy link
Contributor Author

@shangxinli could you take a look at this PR?

Copy link
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this! I have left a few comments.

@@ -199,7 +206,7 @@ public DataPage visit(DataPageV1 dataPageV1) {

@Override
public DataPage visit(DataPageV2 dataPageV2) {
if (!dataPageV2.isCompressed() && offsetIndex == null && null == blockDecryptor) {
if (!dataPageV2.isCompressed() && offsetIndex == null && null == blockDecryptor) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please revert this unnecessary style change here and below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -237,21 +246,22 @@ public DataPage visit(DataPageV2 dataPageV2) {
}
} else {
if (null != blockDecryptor) {
pageBytes = BytesInput.from(
blockDecryptor.decrypt(pageBytes.toByteArray(), dataPageAAD));
pageBytes = BytesInput.from(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

package org.apache.parquet.hadoop;

/**
* a simple interface to pass bask metric values by name to any implementation. Typically an
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* a simple interface to pass bask metric values by name to any implementation. Typically an
* a simple interface to pass basic metric values by name to any implementation. Typically an

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. My spellink could be better. :)

Copy link
Contributor Author

@parthchandra parthchandra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for reviewing @wgtmac !
Addressed your comments. There was some weird reformatting that occurred. I've fixed those in places you pointed out as well as some others.

@@ -199,7 +206,7 @@ public DataPage visit(DataPageV1 dataPageV1) {

@Override
public DataPage visit(DataPageV2 dataPageV2) {
if (!dataPageV2.isCompressed() && offsetIndex == null && null == blockDecryptor) {
if (!dataPageV2.isCompressed() && offsetIndex == null && null == blockDecryptor) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -237,21 +246,22 @@ public DataPage visit(DataPageV2 dataPageV2) {
}
} else {
if (null != blockDecryptor) {
pageBytes = BytesInput.from(
blockDecryptor.decrypt(pageBytes.toByteArray(), dataPageAAD));
pageBytes = BytesInput.from(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

package org.apache.parquet.hadoop;

/**
* a simple interface to pass bask metric values by name to any implementation. Typically an
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. My spellink could be better. :)

Copy link
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @parthchandra! LGTM

@parthchandra
Copy link
Contributor Author

Thank you @wgtmac !

/**
* set a callback to send back metrics info
*/
public synchronized void initMetrics(ParquetMetricsCallback callback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the synchronized is needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake. I initially implemented the metrics callback as a singleton. The method no longer needs to be synchronized.

@@ -125,10 +125,20 @@ public class ParquetFileReader implements Closeable {

public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism";

public ParquetMetricsCallback metricsCallback;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this field as public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we don't. Made it private

@@ -80,10 +80,12 @@ static final class ColumnChunkPageReader implements PageReader {
private final byte[] dataPageAAD;
private final byte[] dictionaryPageAAD;

ParquetMetricsCallback metricsCallback;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be private final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you forget to push your local changes? @parthchandra

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I just pushed them now. 🤦🏾

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to mark it as final. I suggest getting callback from ParquetReadOptions and then we don't need to update the constructor and more reasonable.

@steveloughran
Copy link
Contributor

it'd be really nice if somehow there was a way to push hadoop stream IOStats here, especially the counters, min, max and mean maps: https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/filesystem/iostatistics.html

and its really interesting for s3, azure and gcs clients, where we collect stream specific stuff, including things like: bytes discarded in seek, time for GET, whether we did a HEAD first, and more. These are collected in a thread level, but also include stats from helper threads such as those in async stream draining, vector IO...

It'd take a move to hadoop 3.3.1+ to embrace the API, but if there was a way for something to publish stats to your metric collector, then maybe something could be done

Tip: you can enable a dump of a filesystem's aggregate stats in process shutdown for azure and s3a

fs.iostatistics.logging.level=info
2023-11-17 18:30:28,634 [shutdown-hook-0] INFO  statistics.IOStatisticsLogging (IOStatisticsLogging.java:logIOStatisticsAtLevel(269)) - IOStatistics: counters=((action_http_head_request=3)
(audit_request_execution=15)
(audit_span_creation=12)
(object_list_request=12)
(object_metadata_request=3)
(op_get_file_status=1)
(op_glob_status=1)
(op_list_status=9)
(store_io_request=15));

gauges=();

minimums=((action_http_head_request.min=22)
(object_list_request.min=25)
(op_get_file_status.min=1)
(op_glob_status.min=9)
(op_list_status.min=25));

maximums=((action_http_head_request.max=41)
(object_list_request.max=398)
(op_get_file_status.max=1)
(op_glob_status.max=9)
(op_list_status.max=408));

means=((action_http_head_request.mean=(samples=3, sum=87, mean=29.0000))
(object_list_request.mean=(samples=12, sum=708, mean=59.0000))
(op_get_file_status.mean=(samples=1, sum=1, mean=1.0000))
(op_glob_status.mean=(samples=1, sum=9, mean=9.0000))
(op_list_status.mean=(samples=9, sum=814, mean=90.4444)));

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the object stores, things to measure are

  • time to open() and close() a file
  • time for a read after a backwards seek
  • time for a read after a forwards seek.
  • how many reads actually took place
  • for vector IO, whatever gets picked up there
  • were errors reported and retried, or throttling events
  • number of underlying GET requests

as well as publishing this through the IOStatisticsSource API, we also now collect it at the thread level (IOStatisticsContext) with the goal of having the execution engine collect all stats for a task and then aggregate across the job. you can set the s3a committers up to do this.

If you were for a recent hadoop release only I'd say "publish the stream stats to the thread context for aggregation", but that's not viable here. What would be good if this stats was set up to

  • take maps of key-value rather than a fixed enum
  • collect those min/mean/max as well as counts.
  • somehow provided a plugin point where we could add something to add any of the parquet reader/writer stats to the thread context -trying to collect stats from inside wrapped-many-times-over streams and iterators is way too complex. I know, i have a branch of parquet where I tried that...

@@ -1841,8 +1851,12 @@ public void addChunk(ChunkDescriptor descriptor) {
* @throws IOException if there is an error while reading from the stream
*/
public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException {
long seekStart = System.nanoTime();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s3a and abfs do lazy seek; no IO until the first read/readFully. best to measure that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Generally, seeks (especially backwards) cause the file system to stop their read-ahead and turn off sequential read optimizations. The seek call itself doesn't take much time.

@parthchandra
Copy link
Contributor Author

@steveloughran I did look into leveraging Hadoop io stats but my first attempt did not work too well and I thought a simpler initial implementation would be more useful. Once we move to hadoop vector io, I'll take another stab at it.

What would be good if this stats was set up to

take maps of key-value rather than a fixed enum

The fixed enum here is simply the Parquet file reader providing information that these are the values it knows about. This implementation is not really collecting and aggregating anything, it is simply recording the time and counts and passing them on.

collect those min/mean/max as well as counts.

The implementation of the parquet metrics callback will do that. So if the execution engine is Spark, it can simply get the values and add them to it's own metrics collection subsystem which then computes the min/max/mean.

somehow provided a plugin point where we could add something to add any of the parquet reader/writer stats to the thread context -trying to collect stats from inside wrapped-many-times-over streams and iterators is way too complex. I know, i have a branch of parquet where I tried that...

Hmm, that will take some work. I wanted to measure streaming decompression time (where the decompress call simply returns a stream which is decompressed as it is read), but found it required too many breaking changes to implement. But a standard system like IOStatistics where such a stream is a IOStatisticsSource would be perfect.

@wgtmac
Copy link
Member

wgtmac commented Nov 21, 2023

For the object stores, things to measure are

  • time to open() and close() a file
  • time for a read after a backwards seek
  • time for a read after a forwards seek.
  • how many reads actually took place
  • for vector IO, whatever gets picked up there
  • were errors reported and retried, or throttling events
  • number of underlying GET requests

CMIW, it seems that these stats can be collected solely at the input stream level.

@parthchandra
Copy link
Contributor Author

For the object stores, things to measure are

  • time to open() and close() a file
  • time for a read after a backwards seek
  • time for a read after a forwards seek.
  • how many reads actually took place
  • for vector IO, whatever gets picked up there
  • were errors reported and retried, or throttling events
  • number of underlying GET requests

CMIW, it seems that these stats can be collected solely at the input stream level.

Yes, they are best collected by the file system client API. However it would be nice to be able to hook up all these metrics together. Then we could, for instance, show a single Spark scan operator that displays stats for the operator, parquet reader, and the input stream in one place.

@wgtmac
Copy link
Member

wgtmac commented Nov 23, 2023

Thanks @parthchandra! Do you have any TODO work item on this (or after vectored I/O is merged)?

@parthchandra
Copy link
Contributor Author

No I don't have any todo for this. If we are going to merge vector IO soon, I can add the metrics for that as well as part of this PR.

@wgtmac
Copy link
Member

wgtmac commented Nov 24, 2023

No I don't have any todo for this. If we are going to merge vector IO soon, I can add the metrics for that as well as part of this PR.

No necessary. We can add them progressively.

@steveloughran @shangxinli @gszadovszky Do you have any concern to merge this?

/**
* set a callback to send back metrics info
*/
public void initMetrics(ParquetMetricsCallback callback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not set the callback in ParquetReadOptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea, but it may require changes to some of the public constructors which are in use by others. For instance Spark has this https://github.com/apache/spark/blob/15e99cf676d9de02c54ca5ebe9a2bc6a3ce014e5/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java#L104
In this case, the constructor is deprecated and it looks like Spark could benefit from a constructor that takes a ParquetReadOptions as a parameter. I can introduce a new constructor but wasn't sure that we would want to do so. Having an additional API to set the metrics callback seemed like the simpler option.
WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ConeyLiu Gentle ping, do you have any comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for missing this. https://github.com/apache/parquet-mr/blob/afd39dde8fd762bf696fea3dab16d45eae1093c3/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L669
The open method passing a footer is deprecated. I think we should support it with ParquetReadOptions. Then Spark could benefit from this as well.

I am OK with the current changes to unblock this PR.

@shangxinli
Copy link
Contributor

shangxinli commented Dec 15, 2023

LGTM

Thanks @parthchandra for working on this PR. And thanks @wgtmac, @steveloughran and @ConeyLiu for reviewing the changes.

@ConeyLiu
Copy link
Contributor

@parthchandra one more question, why reverted changes for #1187 (comment)?

@parthchandra
Copy link
Contributor Author

@parthchandra one more question, why reverted changes for #1187 (comment)?

Hmm. I'm sure I didn't revert that. Let me address this.

Also, I think i will change this to use ParquetReadOptions, and add an additional open API for use by Spark. So let's hold off on merging this.

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just realised i had a comment but didn't submit it...here you go. Sorry

* implementation of this interface will serve as a bridge to pass metric values on
* to the metrics system of a distributed engine (hadoop, spark, etc).
*/
public interface ParquetMetricsCallback {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you tag as unstable but promise that all new callbacks will have a default implementation for any new metric which will be a no-op, eg

default addMaximum(key, value) { } ;

I'd also propose a specific "addDuration(key, duration)" call. For iostats we do min/mean/max and sum values here as they are all relevant

Note also that the nanotimer is a tricky one. It is low cost and required to go monotonincally forward on a single core -but that doesn't hold across sockets. Older Intel parts don't even do this on cores on the same die. Not saying don't use it, but be prepared for the diff to be negative and if so set to zero.
For this reason, and because exceptions may be a very different duration category from successes, consider a DurationTracker class or interface/class which hides more of the start and stop timestamp collection and can even be told of a failure/ + probed for success vs failure. Then use this as the addDuration() value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you tag as unstable but promise that all new callbacks will have a default implementation for any new metric

Done.

Re: nanotimer - Learnt something new today. I remember when nanosecond timers were introduced things were unstable, but hopefully things are more reliable nowadays. I usually look at Guava's Stopwatch as the baseline implementation and the nanotimer implementation doesn't have a check for getting a negative diff either. Anyhow, I added a check for that so we get at least zero.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
I should revisit state of the art there, and make sure that hadoop duration timers are good at -ve clock changes. past commentary on 2015 CPUs

maybe something like some soft referenced "source of time" per thread which is both (systime, nanotime) and when querying it if current nanotime is "close enough" to previous nanotime, just apply nanotime diff to systime; update systime fields. So it'd be "approximate per cpu core/socket", enough for metrics where things happen in millis

Copy link
Contributor Author

@parthchandra parthchandra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ConeyLiu, @wgtmac, changed the implementation to use ParqutReadOptions and added a new constructor that takes a footer and read options as parameters (for use by Spark).
I checked Iceberg as well and it looks like it doesn't use any deprecated APIs.

* implementation of this interface will serve as a bridge to pass metric values on
* to the metrics system of a distributed engine (hadoop, spark, etc).
*/
public interface ParquetMetricsCallback {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you tag as unstable but promise that all new callbacks will have a default implementation for any new metric

Done.

Re: nanotimer - Learnt something new today. I remember when nanosecond timers were introduced things were unstable, but hopefully things are more reliable nowadays. I usually look at Guava's Stopwatch as the baseline implementation and the nanotimer implementation doesn't have a check for getting a negative diff either. Anyhow, I added a check for that so we get at least zero.

@@ -88,7 +89,8 @@ static final class ColumnChunkPageReader implements PageReader {
byte[] fileAAD,
int rowGroupOrdinal,
int columnOrdinal,
ParquetReadOptions options) {
ParquetReadOptions options,
ParquetMetricsCallback callback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not get the callback by the options.getMetricsCallback?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course. My oversight.

* @param file Path to a parquet file
* @param footer a {@link ParquetMetadata} footer already read from the file
* @throws IOException if the file can not be opened
* @deprecated will be removed in 2.0.0.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this eventually be replaced by another constructor or just not recommended?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. Copied the old implementation and did not update the javadoc. Fixed.

@parthchandra
Copy link
Contributor Author

Hi @ConeyLiu, @wgtmac, any additional concerns? If none, should we merge this?

Copy link
Contributor

@ConeyLiu ConeyLiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, thanks @parthchandra LGTM

@wgtmac wgtmac merged commit 2e0cd19 into apache:master Jan 12, 2024
9 checks passed
@wgtmac
Copy link
Member

wgtmac commented Jan 12, 2024

Sorry for the delay. I just merged this. Thanks @parthchandra!

@parthchandra
Copy link
Contributor Author

Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants