Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] #1852

Closed
ssomuah opened this issue Jul 20, 2020 · 17 comments
Closed

[SUPPORT] #1852

ssomuah opened this issue Jul 20, 2020 · 17 comments

Comments

@ssomuah
Copy link

ssomuah commented Jul 20, 2020

Describe the problem you faced

Write performance degrades over time

To Reproduce

Steps to reproduce the behavior:

1.Create an unpartitoned MOR table
2.Use it for a few days

Expected behavior

Write performance should not degrade over time
Environment Description

Additional context

The MOR table has a single partition.
It's a spark streaming application with 5 minute batches.
Intially it runs and completes batches in the duration. But over time the time for batches to complete increases.
From the spark ui we can see that most of the time is being taken actually writing the files.

Screen Shot 2020-07-17 at 2 03 35 PM

And looking at the thread dump of the executors they are almost always spending their time listing files.

I think the reason for this is we have an extremely high number of files in the single partition folder.

An ls on the folder is showing about 45,000 files.

The other odd thing is that when we look at the write tasks in the spark ui. There
are several tasks that seem to have tiny numbers of records in them.

Screen Shot 2020-07-20 at 9 15 35 AM

We can see compaction taking place so it's not clear why we still have so many files.
Screen Shot 2020-07-17 at 2 04 33 PM

The table config is

  .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
  .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, HoodieTableType.MERGE_ON_READ.name)
  .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, s"$META_COLUMN.version")
  .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, s"$META_COLUMN.key")
  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, s"$META_COLUMN.partition")
  .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY,
    "com.myCustompayloadClass")
  .option(HoodieCompactionConfig.PAYLOAD_CLASS_PROP,
    "com.myCustompayloadClass")
  .option(HoodieWriteConfig.UPSERT_PARALLELISM, 32)
  .option(HoodieWriteConfig.INSERT_PARALLELISM, 32)
  .option(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP, 3)
  .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, 12 )
  .option(HoodieCompactionConfig.INLINE_COMPACT_PROP, true)
  .option(HoodieStorageConfig.PARQUET_FILE_MAX_BYTES, String.valueOf(256 * 1024 * 1024))
  .option(HoodieStorageConfig.PARQUET_BLOCK_SIZE_BYTES, String.valueOf(256 * 1024 * 1024))
  .option(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC, "snappy")

We're using our own payload class that decides what to keep based on a timestamp in the message and not latest.

Stacktrace

StackTrace of list operation where we are spending a lot of time.

sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:352)
shaded.databricks.v20180920_b33d810.org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation.processResponse(AbfsHttpOperation.java:259)
shaded.databricks.v20180920_b33d810.org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:167)
shaded.databricks.v20180920_b33d810.org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:124)
shaded.databricks.v20180920_b33d810.org.apache.hadoop.fs.azurebfs.services.AbfsClient.listPath(AbfsClient.java:180)
shaded.databricks.v20180920_b33d810.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listFiles(AzureBlobFileSystemStore.java:549)
shaded.databricks.v20180920_b33d810.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:628)
shaded.databricks.v20180920_b33d810.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:532)
shaded.databricks.v20180920_b33d810.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.listStatus(AzureBlobFileSystem.java:344)
org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517)
org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557)
org.apache.hudi.common.fs.HoodieWrapperFileSystem.listStatus(HoodieWrapperFileSystem.java:487)
org.apache.hudi.common.fs.FSUtils.getAllLogFiles(FSUtils.java:409)
org.apache.hudi.common.fs.FSUtils.getLatestLogVersion(FSUtils.java:420)
org.apache.hudi.common.fs.FSUtils.computeNextLogVersion(FSUtils.java:434)
org.apache.hudi.common.model.HoodieLogFile.rollOver(HoodieLogFile.java:115)
org.apache.hudi.common.table.log.HoodieLogFormatWriter.(HoodieLogFormatWriter.java:101)
org.apache.hudi.common.table.log.HoodieLogFormat$WriterBuilder.build(HoodieLogFormat.java:249)
org.apache.hudi.io.HoodieAppendHandle.createLogWriter(HoodieAppendHandle.java:291)
org.apache.hudi.io.HoodieAppendHandle.init(HoodieAppendHandle.java:141)
org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:197)
org.apache.hudi.table.action.deltacommit.DeltaCommitActionExecutor.handleUpdate(DeltaCommitActionExecutor.java:77)
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:246)
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.lambda$execute$caffe4c4$1(BaseCommitActionExecutor.java:102)
org.apache.hudi.table.action.commit.BaseCommitActionExecutor$$Lambda$192/1449069739.call(Unknown Source)
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:105)

@bvaradar
Copy link
Contributor

And looking at the thread dump of the executors they are almost always spending their time listing files.

This looks surprising to me. file listing for finding latest file versions for index lookup and writing happens in driver (concurrently within embedded-service). If the executors have trouble connecting to the driver, then executor would list them. Do you see any non-fatal exceptions when writing ? Can you also paste the timeline (listing of .hoodie folder)

@ssomuah
Copy link
Author

ssomuah commented Jul 21, 2020

I don't see any exceptions in the driver logs or executor logs.

I see these two warnings in driver logs

20/07/21 13:12:28 WARN IncrementalTimelineSyncFileSystemView: Incremental Sync of timeline is turned off or deemed unsafe. Will revert to full syncing
20/07/21 13:12:29 WARN CleanPlanner: Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed since last cleaned at 20200721032203. New Instant to retain : Option{val=[20200721032203__commit__COMPLETED]}

These are the contests of the timeline
dot_hoodie_folder.txt

The timeline only has files from the current day but I see log files in the data folder from over a week ago, do you have any idea what might be causing so many log files

@bvaradar
Copy link
Contributor

MacBook-Pro:hudi balaji.varadarajan$ grep -c '.clean.requested' ~/Downloads/dot_hoodie_folder.txt
16
MacBook-Pro:hudi balaji.varadarajan$ grep -c '.deltacommit.requested' ~/Downloads/dot_hoodie_folder.txt
266
MacBook-Pro:hudi balaji.varadarajan$ grep -c '.compaction.requested' ~/Downloads/dot_hoodie_folder.txt
44
MacBook-Pro:hudi balaji.varadarajan$ grep -c '.compaction.inflight' ~/Downloads/dot_hoodie_folder.txt
44
MacBook-Pro:hudi balaji.varadarajan$ grep -c '.commit' ~/Downloads/dot_hoodie_folder.txt
19

  1. I can see that there are many compactions that are in inflight status but have not completed. Can you add this patch : [HUDI-1029] In inline compaction mode, previously failed compactions … #1857 to retry failed compactions automatically ?
  2. Those 2 warnings are fine and one of them implies that the embedded timeline-server is on. Can you provide the stack trace where you deduced that most time is spent on listing ?

@ssomuah
Copy link
Author

ssomuah commented Jul 22, 2020

  1. I'm trying this now.
  2. The stack trace is the one I provided above.

@bvaradar
Copy link
Contributor

Sorry, I did not realize that. Let me check and get back

@bvaradar
Copy link
Contributor

We have a jira : https://issues.apache.org/jira/browse/HUDI-1015 to improve/avoid listing. I have added this case to the jira.

@bvaradar
Copy link
Contributor

Ended up creating a new jira : https://issues.apache.org/jira/browse/HUDI-1119 as this has different cause.

@ssomuah
Copy link
Author

ssomuah commented Jul 23, 2020

I updated to master @ 743ef32 and then applied the patch you linked above.

The first batch that ran had several "RunCompactionActionExecutor"

I'm still consistently seeing long batches

Screen Shot 2020-07-23 at 8 45 49 AM

Screen Shot 2020-07-23 at 8 46 07 AM

Screen Shot 2020-07-23 at 8 46 17 AM

Screen Shot 2020-07-23 at 8 46 33 AM

The contents of the timeline folder are now.

dot_hoodie_folder_v2.txt

I think the root of my issue is that I have tons of log files which don't seem to get compacted.

@ssomuah
Copy link
Author

ssomuah commented Jul 23, 2020

Only 1 compaction.inflight now

@bvaradar
Copy link
Contributor

@ssomuah : Regarding the patch, it is meant to ensure all pending compactions are completed. Regarding the slowness, we are working on general and S3 specific performance improvements on the write side which should be part of next release : 0.6.0

@ssomuah
Copy link
Author

ssomuah commented Jul 23, 2020

@bvaradar I think the issue I'm facing is due to configuration, but I can't pinpoint what it is.

I'm ending up with an extremely large number of files fo a single partition merge on read table.

I have tens of thousands of log files which I would have thought would get compacted into parquet at some point.

what volume of updates is working well for merge on read tables today?

@bvaradar
Copy link
Contributor

@ssomuah :
Such a large number of log files indicates your compaction frequency (INLINE_COMPACT_NUM_DELTA_COMMITS_PROP) is conservative. Many of these log files could also be belonging to older file versions which will be cleaned by Cleaner ( https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-WhatdoestheHudicleanerdo).

In addition, note that inline compaction which runs serially with ingestion. We have a working PR which lets compaction run concurrently with ingestion : #1752
Now that pending compactions have finished, you can setup concurrent compaction with the above PR ( config : hoodie.datasource.compaction.async.enable=true).

@ssomuah
Copy link
Author

ssomuah commented Jul 23, 2020

What do you mean by "runs serially with ingestion"? My understanding was that inline compaction happened in the same flow as writing so an inline compaction would simply slow down ingestion.

Does INLINE_COMPACT_NUM_DELTA_COMMITS_PROP refer to the number of commits retained in general, or the number of commits for a record?

I see in the timeline I have several clean.requested and clean.inflight, how can I get these to actually complete?

What determines how many log files are created in each batch for a MOR table?

EDIT:
Is it possible to force a compaction of the existing log files.

@bvaradar
Copy link
Contributor

What do you mean by "runs serially with ingestion"? My understanding was that inline compaction happened in the same flow as writing so an inline compaction would simply slow down ingestion.

===> Yes, that is what I meant. Inline Compaction would run after ingestion but not in parallel. You can use #1752 to have it run concurrently.

Does INLINE_COMPACT_NUM_DELTA_COMMITS_PROP refer to the number of commits retained in general, or the number of commits for a record?

==> INLINE_COMPACT_NUM_DELTA_COMMITS_PROP refers to number of ingestion (deltacommits) between 2 compaction runs.

I see in the timeline I have several clean.requested and clean.inflight, how can I get these to actually complete?

==> If it is in inflight state alone, there could be errors when Hudi is trying to cleanup. Please look for exceptions in driver logs. Cleaner run should be run automatically by default. Also, any pending clean operations will automatically get picked up in next ingestion. So, it must have been failing for some reasons. You can turn on logs to see what is happening.

Is it possible to force a compaction of the existing log files.

===> Yes, by configuring INLINE_COMPACT_NUM_DELTA_COMMITS_PROP. You can set it to 1 to have aggressive compaction.

@ssomuah
Copy link
Author

ssomuah commented Jul 24, 2020

Hi Balaji, I think I've narrowed down my issue somewhat for my MOR table.

I started again with a fresh table and the initial commits make sense, but after a time I notice It's consistently trying to write 300+ files.

Screen Shot 2020-07-24 at 1 15 17 PM

Screen Shot 2020-07-24 at 1 15 36 PM

Screen Shot 2020-07-24 at 1 15 52 PM

The individual tasks don't take that long so I think if I could reduce the number of files it's trying to write it would help.
Screen Shot 2020-07-24 at 1 16 03 PM

I can also see from the cli that whether it's doing a compaction or a delta commit I still seem to be writing the same number of files for a fraction of the data.
Screen Shot 2020-07-24 at 1 21 36 PM

Is there something I can tune to reduce the number of files it breaks the data into?

hoodie.logfile.max.size is 256MB
hoodie.parquet.max.file.size is 256MB
hoodie.parquet.compression.ratio is the default .35

@bvaradar
Copy link
Contributor

@ssomuah : Looking at the commit metadata, it is the case where your updates are spread across a large number of files. For example, in latest commit, 334 files sees updates whereas only one file is newly created due to inserts. It looks like this is the nature of your workload.

If your record key has some sort of ordering, then you can initially bootstrap using "bulk-insert" which would sort and write the data in record-key order. This can potentially help reduce the number of files getting updated if each batch of writes have similar ordering. You can also try recreating the dataset with larger parquet file size and higher small file limit and async compactions (more frequent to keep the number of active log files in check).

However, you are basically trying to reduce the number of files getting appended at the expense of more data getting appended to a single file. This is a general upsert problem due to the nature of your workload.

@bvaradar
Copy link
Contributor

bvaradar commented Aug 4, 2020

Closing this ticket as it was answered.

@bvaradar bvaradar closed this as completed Aug 4, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants