You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Parquet list/open/read/commit performance can be improved by reducing the amount of storage IO made, and for the IO which does take place, doing it more efficiently.
PARQUET-2171 is the first "cloud-first" performance enhancement for parquet, but there are many more available.
Use Hadoop 3.3+ filesystem APIs when available.
All recent Hadoop FS APIs have been cloud-friendly, e.g. the openFile() call lets the caller pass in file status/length (saves a HEAD) and force random IO as the read policy.
use openFile() where supported, passing in file status, length, read policy. saves a HEAD on s3 and azure.
use ByteBufferPositionedReadable where supported. lets connector know full range to read. This benefits HDFS more than anything else.
Parquet is hampered by the need to support Hadoop 2.x, but even if it was brought up to the latest release, there are always going to be changes at the hadoop IO layer it won't be able to keep up with. Here PARQUET-2171 show the solution: embrace reflection. But the homework is not entirely on parquet.
HADOOP-19131 exports hadoop 3.3.0+ APIs for opening files faster with specified seek policies, collecting, reporting and serializing statistics and more. It builds on HADOOP-18679 whose bulk delete API is intended for easy use by Iceberg.
If parquet switches to these and other APIs then it will save IO overhead reading data.
Export a public Vector IO API
As discussed in PARQUET-2171, the vector API should be pulled up and made public, for application code.
Optimise Footer fetch for higher latency stores
Fetch large footer when reading file, then seek within it. This will save one GET; that 8 byte tail read is very expensive. One issue: what is a good size to fetch?
Simplify binding to PathOutputCommitters.
Spark code has to jump through hoops to get parquet to be happy with a filesystem specific PathOutputCommitter, as it requires all committers to be of type ParquetOutputCommitter. That's only needed when saving schemas to a separate file, which isn't normally done in cloud storage
Hadoop Vector API to collect/report metrics
Vector IO metric collection (see PARQUET-2374)
abfs/s3a/gcs connectors all collect hadoop IOStatistics, which (ignoring gauges) consists of counters, min, max and mean (key -> value) maps. Duration tracking will upate all of these, and split failure from success timings, so failure-triggered timeouts are separated from the success path.
The key names are strings and not a fixed enum; extra ones are added as we do new things.
The stats are collected from each IOStatisticsSource, which includes filesystems, input and output streams, etc. There's a also a thread IOStatisticsContext which is updated by some of the IO streams in close(). That is to support per-worker-thread IOStats collection without having to propagate it around. Instead worker thread can reset the stats when it starts its work, upload them with task commit (s3a/manifest committers also save in their manifests, aggregate into _SUCCESS files).
To align these up with parquet metrics would, ideally, just have parquet support the classes itself, aggregate them, serialize them, etc.
Failing that, having something equivalent would be wonderful. In particular, counters and mean durations are important.
These must be indexed by string, not enum, so that the layers underneath can collect and report more statistics, which can then be aggregated.
Add a minimal benchmark to test file open and footer load performance against cloud storage
This is to assess the benefits of this work and v3 footer development.
Parquet list/open/read/commit performance can be improved by reducing the amount of storage IO made, and for the IO which does take place, doing it more efficiently.
PARQUET-2171 is the first "cloud-first" performance enhancement for parquet, but there are many more available.
Use Hadoop 3.3+ filesystem APIs when available.
All recent Hadoop FS APIs have been cloud-friendly, e.g. the openFile() call lets the caller pass in file status/length (saves a HEAD) and force random IO as the read policy.
use openFile() where supported, passing in file status, length, read policy. saves a HEAD on s3 and azure.
use ByteBufferPositionedReadable where supported. lets connector know full range to read. This benefits HDFS more than anything else.
Parquet is hampered by the need to support Hadoop 2.x, but even if it was brought up to the latest release, there are always going to be changes at the hadoop IO layer it won't be able to keep up with. Here PARQUET-2171 show the solution: embrace reflection. But the homework is not entirely on parquet.
HADOOP-19131 exports hadoop 3.3.0+ APIs for opening files faster with specified seek policies, collecting, reporting and serializing statistics and more. It builds on HADOOP-18679 whose bulk delete API is intended for easy use by Iceberg.
If parquet switches to these and other APIs then it will save IO overhead reading data.
Export a public Vector IO API
As discussed in PARQUET-2171, the vector API should be pulled up and made public, for application code.
Optimise Footer fetch for higher latency stores
Fetch large footer when reading file, then seek within it. This will save one GET; that 8 byte tail read is very expensive. One issue: what is a good size to fetch?
Simplify binding to PathOutputCommitters.
Spark code has to jump through hoops to get parquet to be happy with a filesystem specific PathOutputCommitter, as it requires all committers to be of type ParquetOutputCommitter. That's only needed when saving schemas to a separate file, which isn't normally done in cloud storage
Hadoop Vector API to collect/report metrics
Vector IO metric collection (see PARQUET-2374)
abfs/s3a/gcs connectors all collect hadoop IOStatistics, which (ignoring gauges) consists of counters, min, max and mean (key -> value) maps. Duration tracking will upate all of these, and split failure from success timings, so failure-triggered timeouts are separated from the success path.
The key names are strings and not a fixed enum; extra ones are added as we do new things.
The stats are collected from each IOStatisticsSource, which includes filesystems, input and output streams, etc. There's a also a thread IOStatisticsContext which is updated by some of the IO streams in close(). That is to support per-worker-thread IOStats collection without having to propagate it around. Instead worker thread can reset the stats when it starts its work, upload them with task commit (s3a/manifest committers also save in their manifests, aggregate into _SUCCESS files).
To align these up with parquet metrics would, ideally, just have parquet support the classes itself, aggregate them, serialize them, etc.
Failing that, having something equivalent would be wonderful. In particular, counters and mean durations are important.
These must be indexed by string, not enum, so that the layers underneath can collect and report more statistics, which can then be aggregated.
Add a minimal benchmark to test file open and footer load performance against cloud storage
This is to assess the benefits of this work and v3 footer development.
Reporter: Steve Loughran / @steveloughran
Assignee: Steve Loughran / @steveloughran
Related issues:
Note: This issue was originally created as PARQUET-2486. Please see the migration documentation for further details.
The text was updated successfully, but these errors were encountered: