Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ThreadUtils.parmap for optimize #1315

Closed
wants to merge 1 commit into from

Conversation

Kimahriman
Copy link
Contributor

Description

Resolves #1220

Uses ThreadUtils.parmap to parallelize the compaction instead of parallel collections. This should improve the "tail" of the compaction execution. It seems to be that the parallel collections method buckets each job into maxThreads groups and then executes each group with one of the threads in the pool. I think this is more of a proper queue based approach so any remaining tasks can be done by any free thread.

How was this patch tested?

Exsting UTs

Does this PR introduce any user-facing changes?

Just tail performance gain for optimize commands.

@Kimahriman
Copy link
Contributor Author

Note: I haven't actually run this at scale to know for sure that this does what it says it should do with the tail of the execution, more just logically I think the change make sense.

Also, I might make a follow up to allow failures by default instead of trying to do the fail fast behavior I was going for before. Having something running for hours and writing dozens of TB of data for none of it to be committed at the end because of a bad partition is painful 😅

@Kimahriman
Copy link
Contributor Author

Threw this together to try to induce some skew.

from datetime import datetime
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from delta.tables import DeltaTable

spark = SparkSession.builder.config('spark.databricks.delta.optimize.maxThreads', '2').getOrCreate()

columns = 100
df = (spark.range(1_000_000)
    .withColumn('part', F.col('id') % 5)
    .withColumns({f'c{i}': F.rand() for i in range(columns)})
    .repartition('part')
)
# Create 10 files per partition
for _ in range(10):
    df.write.format('delta').partitionBy('part').mode('append').save('optimize-test')

df = (spark.range(100_000)
    .withColumn('part', (F.col('id') % 5) + 5)
    .withColumns({f'c{i}': F.rand() for i in range(columns)})
    .repartition('part')
)
for _ in range(10):
    df.write.format('delta').partitionBy('part').mode('append').save('optimize-test')

df = (spark.range(10_000)
    .withColumn('part', (F.col('id') % 5) + 10)
    .withColumns({f'c{i}': F.rand() for i in range(columns)})
    .repartition('part')
)
for _ in range(10):
    df.write.format('delta').partitionBy('part').mode('append').save('optimize-test')

table = DeltaTable.forPath(spark, "/path/to/optimize-test")
start = datetime.now()
table.optimize().executeCompaction()
duration = datetime.now() - start
print(duration)

Ran it three times each:
Master was 142-160 seconds
This branch was 119 - 129 seconds

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the benchmark result!

@tdas tdas closed this in de7ba23 Aug 11, 2022
@allisonport-db allisonport-db added this to the 2.1.0 milestone Aug 28, 2022
@talecsander
Copy link

could you please add it also in version 2.0 ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

OPTIMIZE jobs aren't fully parallelized until the end of the execution
5 participants