Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributing Job Planning on Spark #1422

Closed
RussellSpitzer opened this issue Sep 3, 2020 · 15 comments
Closed

Distributing Job Planning on Spark #1422

RussellSpitzer opened this issue Sep 3, 2020 · 15 comments
Labels

Comments

@RussellSpitzer
Copy link
Member

Background:
One issue we’ve seen come up frequently with our larger table scans in Spark is the amount of time required for job planning. Spark reads have a long startup phase waiting to plan all read tasks before any progress can be made. This phase represents a large amount of time where resources are held but no progress is made on the job. In our workloads we see the the majority of time is spent trying to read and process ManifestFiles so we are looking into ways of parallelizing this step and hopefully reducing the amount of time in planning.

One recent example of how this can cause dramatic speedups is in the work with ExpireSnapshotsAction, which prior to a fix made by rdblue, spent the majority of it’s time reading manifest files locally. Prior to this fix, the job was stalled during the planning phase as manifests were being read. Here the issue was fixed acutely by changing the construction of the metadata dataframe. We would like to take this approach to Spark based reads in general.

Potential Solutions:
Ideally we would like to be able to heuristically decide when a distributed planning solution would be useful. When those checks pass we would start a Spark job whose only mission is to produce the tasks for the actual scan.

We have prototyped two approaches we think might be steps in the right direction but were hoping for some feedback from the community, especially if there is another route we haven’t considered yet.

Both of these implementations work at a basic level although the 2nd approach does not yet have deletes implemented.

  1. Addition to TableScan api (WIP - Add an api for Parallelizing Manifest Reading in ManifestGroup #1420)
    This modification targets ManifestGroup with the goal of just parallelizing the ManifestReading phase on command. To do this, we allow TableScan users to specify a function (ManifestProcessor) for turning a list of Manifests into an CloteableIterable of Manifest Entries. The Actual reading function is expected to still be provided by the TableScan but the execution of that function is changed based on the user provided ManifestProcessor.
    This requires a bit of additional code to the API module so presents a bit of risk there, as well as requiring some things to be serializable which previously were not. On the plus side, this allows the distributed planning and normal planning pathways to share code.

  2. Refactor of PlanFiles for Spark (Add a Parallelized Spark Job Planning Path #1421)
    This modification aims to replicate the planning portion of DataTableScan as separate code (SparkPlanUtil). Here we use the DataFiles metadata table to get our original listing of DataFiles. Because the metadata table doesn’t support any kind of pushdown yet, we have to read all manifest files in the current implementation. Then we apply our filters to the resultant DataFile rows and end up producing ScanTasks in Spark. These results are returned to the driver and used in the scan.
    This approach also required changing a bit of serializability, although we have plans on how to reduce that. The biggest pain point here was getting the PartitionSpecId info from the metadata table. Currently that information is missing since the row only returns the information located in the avro files. A ghost column was added to the table which is populated with the specId to propagate the information but this approach could definitely be improved.

If you are interested in this or have feedback I would love to hear it. Neither of the above PR's is production ready at the moment, but I wanted to get some feedback before we finish one (or neither) of them.

@yiguolei
Copy link

yiguolei commented Sep 4, 2020

If it costs a lot of time to read manifest file, why not merge the data files or merge manifests under a manifest list file?

@aokolnychyi
Copy link
Contributor

To give a bit more background, job planning is fast even on huge tables if we have a partition predicate and end up processing 10-15 partitions in a single job. We have RewriteManifestsAction to rewrite metadata and align it with partitions on demand. That covers all common use cases.

At the same time, Iceberg supports file filtering within partitions and opens the opportunity for efficient full table scans. So there are two use cases we want to address:

  • job planning for full table scans (less important)
  • job planning for queries with predicates only on sort key in partitioned tables (becomes common in Iceberg tables)

The second use case is the primary one. It is a common pattern to partition tables by date and sort by key and have queries without partition predicates but with predicates on the sort key. Such use cases were not possible without Iceberg since we could not filter files within partitions and we ended up with full table scans. Right now, we can narrow down the number of matching files to 1-2 per partition. So we can scan PB scale tables for a given key faster. If we do so right now, we end up spending most of the time during job planning. That's why it would be better to parallelize that.

@aokolnychyi
Copy link
Contributor

In the future, I see the possibility to load and leverage secondary indexes using this approach too.

@RussellSpitzer
Copy link
Member Author

@yiguolei In the use cases we are thinking about the

If it costs a lot of time to read manifest file, why not merge the data files or merge manifests under a manifest list file?

The issue is that that approach also does not scale with very large amounts of metadata. We would I really like a solution that allows us to do TableScan style queries even when the underlying table has hundreds of megs if not gigabytes of metadata.

@aokolnychyi
Copy link
Contributor

aokolnychyi commented Sep 4, 2020

I haven't formed an opinion on what approach is more promising right now. Here are my thoughts after a quick look:

Option 1 (extension to the TableScan API)

Pros:

  • Reuses the existing filtering logic.
  • API could be reused by other query engines.
  • Seems to require less code?

Cons:

  • Is a substantial change to the core planning logic that requires thorough testing (both performance and correctness).
  • Requires to think about serialization and especially Kryo serialization during planning (was not needed before).

Option 2 (metadata tables)

Pros:

  • Reuses the existing logic to read manifests in a scalable way via metadata tables.
  • Reuses the existing logic for wrapping Spark Rows into Iceberg DataFiles.
  • Doesn't touch the core planning API and is more isolated.
  • Maybe, can be exposed as an action (makes sense or not?)

Cons:

  • Requires instantiating evaluators ourselves.
  • Seems to require a bit more code but I feel like it can be simplified.
  • Specific to Spark but could be implemented in other systems that support metadata tables.

Implementation aside, we need to consider when to apply this. Ideally, we would have some sort of a threshold for the size of manifests we need to scan after manifest filtering. If we narrow down the scope to a couple of manifests, plan locally. Otherwise, plan using Spark. I am not sure it will be that easy, though. Instead, we could support a Spark read option that would tell us which mode to use. The value can be local, distributed, auto. In auto, the simplest option is to analyze the scan condition. If there is a reasonable partition predicate (e.g. equals or inSet), we could always do planning locally. If not and if distributed is enabled, leverage Spark.

@aokolnychyi
Copy link
Contributor

I tend to think that Option 2 would require less substantial changes. We anyway want to have predicate pushdown in metadata tables, fix spec id in SparkDataFile, propagate spec id in the files metadata table. That being said, I would be interested to know whether any other query engines plan to leverage this too. I know planning in Presto is very different, though.

@shardulm94
Copy link
Contributor

@aokolnychyi We have this use case too

job planning for queries with predicates only on sort key in partitioned tables

where distributed job planning can be helpful. I think one reason why partition predicates are faster is because we have partition upper and lower bounds aggregated at a manifest file level. If we aggregate these metrics for other fields as well do you think that helps your case here?

@aokolnychyi
Copy link
Contributor

I think one reason why partition predicates are faster is because we have partition upper and lower bounds aggregated at a manifest file level. If we aggregate these metrics for other fields as well do you think that helps your case here?

I am afraid that won't help as a single manifest covers a subset of partitions and we tend to have values for the complete range of sort keys in every partition. We really need to read all manifests to get to the file stats.

@RussellSpitzer
Copy link
Member Author

So it's been a while and we've gotten a bit of feedback on the PR's - Thanks @kbendick , @aokolnychyi , and @rdblue

Since I think at the moment it seems like the safer path is the MetadataTables based approach I'm going to start cleaning up and moving forward with that PR.

For me the big benefits of the other approach were mainly around code re-use and the possibility of other systems to use the same method for parallelization. Since we haven't really heard from anyone using presto, flink, or whatnot I'm going to assume that the benefit is probably not that great. We can always use a set of common ideas with all these platforms as well each having their own planning algorithm if need be.

@rdblue
Copy link
Contributor

rdblue commented Sep 29, 2020

I think that I agree with the metadata table approach. Because Presto can run tasks and planning at the same time, this is less of an issue. And the work done for Spark in option 2 could translate into a parallel scan on a Presto metadata table as well (converting partition predicates to filters on metadata table columns). Flink is much more likely to consume tables incrementally, so I think it wouldn't be a big issue there for now (but would be nice to hear from them).

Risk is lower with option 2, and I think it sounds like the better option. It also pushes on the metadata tables in healthy ways: it would incentivize building pushdown in the files and entries metadata tables and might require adding a delete_files metadata table. Those are good side-effects of implementing this that way.

@aokolnychyi
Copy link
Contributor

aokolnychyi commented Sep 29, 2020

Sounds like we have an initial plan then. Excited to see progress on this.

@MayankSharma-MS
Copy link

Hello, i see this is work in progress. Any estimates on when will this be merged to master?

@RussellSpitzer
Copy link
Member Author

It is very out of date, we never went forward with finishing it, so my current estimate would be the far future :)

Copy link

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

@github-actions github-actions bot added the stale label Feb 22, 2024
Copy link

github-actions bot commented Mar 8, 2024

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Mar 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants