Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.4: Support distributed planning #8123

Merged

Conversation

aokolnychyi
Copy link
Contributor

@aokolnychyi aokolnychyi commented Jul 21, 2023

This PR adds support for distributed planning to cover the following use cases on top of tables with tens of millions of files:

  • Selective filters against min/max stats but without partition predicates (e.g. grab a few files in each partition).
  • Selective filters on top of metadata that hasn't been properly clustered.
  • Full table scans (lesser priority and incremental improvement).

Iceberg planning uses manifest partition info to prune entire manifests, which allows the format to plan queries with partition filters blazingly fast even when the table contains 50+ million files (as long as the metadata is properly clustered, which can be achieved by calling the rewrite manifests action). At the same time, use cases mentioned above may benefit from distributed planning as the cluster parallelism can be much higher than the number of cores on the driver.

This logic has been tested on a table with 20 million files (400+ manifests) and enabled planning a scan with min/max filter (no partition predicate) within 3 seconds (compared to 30 s (24 driver cores) vs 1.5 m (4 driver cores)). Full table scans also see an improvement (35-50%) but the cost of bringing the result to the driver becomes a bottleneck. Queries with partition predicates can be planned in around 1 second even with 20+ million files (thanks to manifest filtering). Such a number of files can easily cover from 10 to 40 PB of data in a single table.

public static final String PLANNING_MODE_DEFAULT = PlanningMode.AUTO.modeName();

public static final String DELETE_PLANNING_MODE = "read.delete.planning-mode";
public static final String DELETE_PLANNING_MODE_DEFAULT = PlanningMode.LOCAL.modeName();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am debating whether these configs make sense at the table level. Using LOCAL for deletes by default as those are less likely to be filtered using min/max filters and we will need to load stats so the cost of bringing them to the driver will be higher.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it does make sense to have these as table config because this is dependent on the table structure. If a table has a ton of deletes, you'd probably want to set this to AUTO to override. I'm not sure whether we'd want this to also be a default in Spark though. Seems like we may want to override, but not actually get the default value from here.

}

@Override
protected List<DataFile> planDataRemotely(List<ManifestFile> manifests) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial though was to query the data_files metadata table. However, that faced a number of issues.

  • Need to translate a predicate on the main table into a predicate on the metadata table.
  • Need to implement Iceberg to Spark filter conversion.
  • Hard to populate scan metrics.

It was also expensive as DataFile is wrapped as GenericInternalRow, then converted to UnsafeRow, then bytes are collected to the driver, deserialized on the driver, then converted to public Row, then wrapped into DataFile again. Not to mention the required complexity.

Right now, I am using RDD so there is only one round of serialization (either Java or Kryo) and no conversion. The current approach performs exceptionally well for selective queries. The cost to serialize the entire content of a 10 MB manifest is around 0.2-0.3 s and is not an issue. The full table scan performance depends on how quickly the driver can fetch the result from other nodes. The most critical part is the size of serialized data.

Option 1: Java serialization of DataFile and DeleteFile.
Option 2: Kryo serialization of DataFile and DeleteFile.
Option 3: Converting DataFile and DeleteFile to UnsafeRow and using Java/Kryo serialization on top.

Option 2 (serializing files with Kryo) produced around 15% smaller chunks compared to Option 1 (serializing files with Java) and did not require any extra logic to convert to and from UnsafeRow. Option 3 does not require Kryo to be efficient but requires the conversion logic and gave only 3-5% size reduction in size compared to Option 2. I was surprised how well Kryo worked on top of files but I still debate whether conversion to UnsafeRow makes sense to not depend on Kryo. That said, using UnsafeRow will yield only marginally smaller chunks. The cost of the serialization is not an issue, like I said earlier.

The most efficient approach was to implement a custom Kryo serializer for DataFile[] and apply dictionary encoding for partitions. That gave 15% size reduction when a big number of files belonged to the same partition but is extremely complicated and required the user to register a custom serializer.

All in all, if someone expects to query 20 PB in one job, I doubt 30 seconds to plan the job will be their biggest issue. That's why I opted for the simplest solution that works really well for the first two use cases mentioned in the PR description (still faster in the third use case but not drastically). If we want to support such use cases, we need to change how Spark plans jobs and not collect data files back to the driver.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another choice is that we could customize the DataFile/DeleteFile serialization/deserialization with readObject() and writeObject() if we don't want to depend on Kryo. The customized implementation should provide a better performance as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, both the Java and Kryo serialization can be further improved but it won't be drastic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not depend on Kryo, but good to know that it works with both Kryo and Java serialization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we have two options:

  • Look into optimizing Java serialization for DataFile/DeleteFile to bridge that 15% gap in size.
  • Develop a conversion utility to and from UnsafeRow, which could potentially be used in metadata scans.

For instance, UnsafeRow implements both Externalizable and KryoSerializable to customize both.

Any other options you would explore, @ConeyLiu @rdblue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another idea can be to have an extra data structure per a group of data files so that we would serialize each partition info only once vs serializing the same partition tuple for many data files. That said, it is only applicable to full table scans and I am not sure is worth the complexity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance, UnsafeRow implements both Externalizable and KryoSerializable to customize both.

Do we still need the customized serializer if we use UnsafeRow? From my understanding, the object to/from UnsafeRow should already implement serializing/deserializing. Or maybe something I misunderstood.

I am inclined to try out optimizing Java serialization for GenericDataFile first but I can be convinced otherwise.

This is indeed a good direction to try. Maybe benefits for other engines as well?

Another idea can be to have an extra data structure per a group of data files so that we would serialize each partition info only once vs serializing the same partition tuple for many data files. That said, it is only applicable to full table scans and I am not sure is worth the complexity.

Is the reason for this because the biggest overhead of serialization and deserialization comes from partition data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need the customized serializer if we use UnsafeRow?

No, I meant UnsafeRow should be efficiently handled by both Java and Kryo serialization as it provides custom logic for both.

Is the reason for this because the biggest overhead of serialization and deserialization comes from partition data?

It is a substantial part but does not dominate. Based on my tests, it was around 15%. It is the only repetitive part, though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is that the DataFile and DeleteFile instance that we use is actually an Avro record with an Avro schema. I think you could directly use Avro serialization, which would save quite a bit of overhead and wouldn't require custom Java serialization. Check out IcebergEncoder and IcebergDecoder for the efficient code to produce byte[].

@aokolnychyi aokolnychyi force-pushed the distributed-planning-squashed branch from 573adb7 to d55167d Compare July 21, 2023 01:33
@aokolnychyi
Copy link
Contributor Author

@aokolnychyi aokolnychyi force-pushed the distributed-planning-squashed branch from d55167d to 3f63b78 Compare July 21, 2023 01:48
String modeName =
confParser
.stringConf()
.sessionConf(SparkSQLProperties.PLANNING_MODE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall add an options parameter as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, why not?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a use case for it, or is this speculative?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly speculative I guess if a query touches multiple tables but we want to override only in one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that provides the ability for fine controls when there are multiple tables.

return CloseableIterable.transform(
dataFiles,
dataFile -> {
DeleteFile[] deleteFiles = deletes.forDataFile(dataFile);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this step could be done in executor that would be amazing. We have noticed some v2 tables blocking here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you, please, provide details about this use case stats? Size and number of data/delete files, type of delete files, compaction, etc. Where does it block?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @aokolnychyi, such as the following. The data is ingested into Iceberg with a Flink CDC job, and the following is a full table rewrite job for that.

ScanMetricsResult{totalPlanningDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT45M35.528362027S, count=1}, resultDataFiles=CounterResult{unit=COUNT, value=4241121}, resultDeleteFiles=CounterResult{unit=COUNT, value=21491991}, totalDataManifests=CounterResult{unit=COUNT, value=138}, totalDeleteManifests=CounterResult{unit=COUNT, value=57}, scannedDataManifests=CounterResult{unit=COUNT, value=138}, skippedDataManifests=CounterResult{unit=COUNT, value=0}, totalFileSizeInBytes=CounterResult{unit=BYTES, value=70668583180805}, totalDeleteFileSizeInBytes=CounterResult{unit=BYTES, value=361014841132536}, skippedDataFiles=CounterResult{unit=COUNT, value=29}, skippedDeleteFiles=CounterResult{unit=COUNT, value=0}, scannedDeleteManifests=CounterResult{unit=COUNT, value=57}, skippedDeleteManifests=CounterResult{unit=COUNT, value=0}, indexedDeleteFiles=CounterResult{unit=COUNT, value=4181955}, equalityDeleteFiles=CounterResult{unit=COUNT, value=4181955}, positionalDeleteFiles=CounterResult{unit=COUNT, value=0}, skippedMetaBlobs=null}} 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a lot of equality deletes (4 million data files vs 21 million delete files, 350 TB of deletes to apply). Since we can't compact equality deletes across sequence numbers, would this use case benefit from a regular job that would convert equality deletes into position deletes? Is there a time interval where that would be possible on a regular basis to avoid rewriting the entire set of data files? @szehon-ho and I had a few discussions on how to build that. How quickly do you accumulate 21 million delete files?

Have you profiled it? Is most of the time spent on looking up the index? I wonder whether using an external system to hold the delete index would also make sense (which could be populated in a distributed manner).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does anyone have any other thoughts on this or faced similar issues?

cc @RussellSpitzer @stevenzwu @jackye1995 @amogh-jahagirdar @rdblue @szehon-ho

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, we can do something for partition scoped deletes but may not be that easy if we have global deletes or spec evolution. One alternative that I quickly discarded was to wait until deletes are planned and then broadcast the delete index. I feel that would be a bad decision as the size of that index may not be trivial and there would be more back and forth communication and no way to plan data and deletes concurrently.

I will explore this a bit more. I am not yet convinced using a thread pool on the driver to look up deletes would not be enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ConeyLiu, there is also a performance issue in DeleteFileIndex that I discovered while benchmarking. It is related to min/max value conversion. It can be really slow... I've created #8157 to fix it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is something we should skip for now. It's extremely helpful to take this step forward, without worrying about problems introduced by tables that are using a write strategy like Flink, where basically all work is deferred until later, and then is never done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will measure the time it takes to perform this step on a table in a reasonable state. If it is not that bad, I'll probably skip for now. If it is a substantial bottleneck, we can consider a distributed assignment of deletes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After all optimizations for local delete assignment that went in, it is fairly cheap to assign position deletes. It is more expensive for equality deletes. I'd probably go with local assignment for now until we figure out the final vision for assignment of deletes.

@zinking
Copy link
Contributor

zinking commented Jul 21, 2023

when it comes to distributed planning, it always reminds me of when metadata is big data, maybe that's too far.

@ConeyLiu
Copy link
Contributor

ConeyLiu commented Jul 21, 2023

Thanks @aokolnychyi for the awesome work. I think there are many people expect this.

@aokolnychyi
Copy link
Contributor Author

@zinking, I know that paper and there are a few ideas in it that may be applicable to us. At the same time, Iceberg metadata already forms a system table which we query in a distributed manner (data_files, etc), it is similar to having a separate table for metadata. If I remember correctly, one benefit of BigQuery is that it does not have to bring back the results while doing this distributed planning but that's on Spark side to provide that functionality. In my view, it is less likely one would query 20 PB of data in a single job and when that happens, it is unlikely to be a problem to spend 30 seconds planning the job.

At this point, we are not storing large blobs in the manifests so we will come back to that paper while discussing how to support secondary indexes and how to integrate that into planning. That would be the point when the metadata would be too big. Right now, we are talking about 2-4 GB of metadata to cover 20-40 PB of data.

@zinking
Copy link
Contributor

zinking commented Jul 22, 2023

one benefit of BigQuery is that it does not have to bring back the results while doing this distributed planning but that's on Spark side to provide that functionality

that is exactly what I am talking about.

I am ok with status quo.

Schema snapshotSchema = SnapshotUtil.schemaFor(table(), scanSnapshotId);
return newRefinedScan(table(), snapshotSchema, context().useSnapshotId(scanSnapshotId));
protected boolean useSnapshotSchema() {
return true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

big difference from the previous implementation here? I assume i'll see later that you moved the validation out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validation is in the parent so that I can reuse it in both scans now.

+ manifest.deletedFilesCount();
}

private ManifestEvaluator newManifestEvaluator(PartitionSpec spec) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like these are primarily based on ManifestEvaluator. I wonder if a shared util method would be helpful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate a bit?

Copy link
Contributor

@rdblue rdblue Sep 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just that this had quite a bit of copied code. I was wondering if we should share it between here and ManifestGroup. But my comment was confusing since I said ManifestEvaluator instead of ManifestGroup.

@aokolnychyi aokolnychyi added this to the Iceberg 1.4.0 milestone Aug 9, 2023
@github-actions github-actions bot removed the INFRA label Aug 23, 2023
@aokolnychyi
Copy link
Contributor Author

I have spent all this time profiling our local planning and DeleteFileIndex. There are a few open changes but I've already rebased this PR if anyone wants to take another look. I will continue to polish it and will work on tests next.

@github-actions github-actions bot added the INFRA label Aug 29, 2023
@aokolnychyi aokolnychyi force-pushed the distributed-planning-squashed branch 3 times, most recently from 1f92020 to 86f62f2 Compare August 29, 2023 19:43
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;

/**
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would eventually be part of the public docs.

Copy link
Contributor

@ConeyLiu ConeyLiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

*
* @return an iterable of file groups
*/
public Iterable<CloseableIterable<DataFile>> fileGroups() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this called fileGroups? It looks like it produces the file from every entry, not a group.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I called it fileGroups because instead of combining entries from all manifests into a single iterable, it returns an iterable of iterables where each element represents content of one manifest. Let me know if that makes sense.

*
* <p>Note that this class is evolving and is subject to change even in minor releases.
*/
abstract class DistributedDataBatchScan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be called BaseDistributedDataScan? Here's my rationale:

  • Add Base because this must be extended
  • Remove Batch because this is new and we aren't adding any more non-batch scan classes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, changed.

private boolean shouldCopyDataFiles(boolean planDataLocally, boolean loadColumnStats) {
return planDataLocally
|| shouldCopyRemotelyPlannedDataFiles()
|| (loadColumnStats && !shouldReturnColumnStats());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand why the column stats matter in this last case. Can you explain the rationale?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assumption is that keeping stats in memory is expensive. When we load stats for equality deletes but don't need to return them, we can drop them upon assigning deletes so that they can be garbage collected. That way, we less likely to hit OOM.

It is similar to what we do in local planning. Does it make sense?

TestSparkDistributedDataBatchScan.spark =
SparkSession.builder()
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to test with Java serialization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added separate suites for both. It was harder to parameterize cause the Spark session is static.

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Thanks for getting this ready!

@aokolnychyi aokolnychyi merged commit c4e35a5 into apache:master Sep 12, 2023
41 checks passed
@aokolnychyi
Copy link
Contributor Author

Thanks a lot everyone for reviewing, I merged this change as it got pretty large with all the tests. I'll follow up with a few minor things like benchmarks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants