Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] MOR trigger compaction from Hudi CLI #1823

Closed
RajasekarSribalan opened this issue Jul 11, 2020 · 12 comments
Closed

[SUPPORT] MOR trigger compaction from Hudi CLI #1823

RajasekarSribalan opened this issue Jul 11, 2020 · 12 comments

Comments

@RajasekarSribalan
Copy link

RajasekarSribalan commented Jul 11, 2020

Describe the problem you faced

We are writing to a Hudi MOR table via spark streaming. We read data from kafka and write to Hudi MOR. We get huge inserts/upserts so we want to have good performance ,so we chose MOR tables. We have disabled inline compaction to avoid blocking ingestion and we wanted compaction to run async via Hudi CLI. The issue is, we are unable to see any COMPACTION instant in the DFS hence we get error saying "No Pending compaction", but we do see a lot of delta logs getting created/appended but compaction is not requested.

We want to understand when does the compaction request is triggerred when inline compaction is switched OFF? so that I can run compaction via hudi-cli? Please assist vinoth @vinothchandar @bhasudha . There is no much information for async compaction in hudi documentation.

upsertDf.write
.format("hudi")
.options(getQuickstartWriteConfigs)
.option(OPERATION_OPT_KEY, "upsert")
.option(PRECOMBINE_FIELD_OPT_KEY, hudi_precombine_key)
.option(RECORDKEY_FIELD_OPT_KEY, hudi_key)
.option(PARTITIONPATH_FIELD_OPT_KEY, "")
.option(KEYGENERATOR_CLASS_OPT_KEY, classOf[NonpartitionedKeyGenerator].getName)
.option(TABLE_NAME, tablename)
.option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option(HIVE_SYNC_ENABLED_OPT_KEY, "true")
.option(HIVE_URL_OPT_KEY, "XXXXXXX")
.option(HIVE_DATABASE_OPT_KEY, hudi_db)
.option(HIVE_TABLE_OPT_KEY, tablename)
.option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[NonPartitionedExtractor].getName)
.option(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC, "snappy")
.option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false")
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, "24")
.mode(Append)
.save("/user/xyz/hudi/" + tablename)

Environment Description

  • Hudi version : 0.5.2

  • Spark version : 2.2.0

  • Hive version :1.0

  • Hadoop version :2.7

  • Storage (HDFS/S3/GCS..) :

  • Running on Docker? (yes/no) :

Stacktrace

hudi:user_emails->compactions show all
╔═════════════════════════╤═══════╤═══════════════════════════════╗
║ Compaction Instant Time │ State │ Total FileIds to be Compacted ║
╠═════════════════════════╧═══════╧═══════════════════════════════╣
║ (empty) ║
╚═════════════════════════════════════════════════════════════════╝

@RajasekarSribalan
Copy link
Author

Another issue is, I am getting below error during inline compaction. Pls help.

com.esotericsoftware.kryo.KryoException: Unable to find class: org.apache.hudi.common.model.OverwriteWithLatestAvroPayload$$Lambda$46/188376151
Serialization trace:
orderingVal (org.apache.hudi.common.model.OverwriteWithLatestAvroPayload)
data (org.apache.hudi.common.model.HoodieRecord)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:156)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:551)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:708)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:551)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
at org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:107)
at org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:81)
at org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:217)
at org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:211)
at org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:207)
at org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:168)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.processNextRecord(HoodieMergedLogRecordScanner.java:114)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processAvroDataBlock(AbstractHoodieLogRecordScanner.java:277)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processQueuedBlocksForInstant(AbstractHoodieLogRecordScanner.java:305)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:152)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.(HoodieMergedLogRecordScanner.java:81)
at org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.compact(HoodieMergeOnReadTableCompactor.java:126)
at org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.lambda$compact$644ebad7$1(HoodieMergeOnReadTableCompactor.java:98)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:371)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1055)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.hudi.common.model.OverwriteWithLatestAvroPayload$$Lambda$46/188376151
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154)
... 42 more
20/07/12 09:59:44 WARN storage.BlockManager: Putting block rdd_2_694 failed due to an exception

@bhasudha
Copy link
Contributor

@RajasekarSribalan For your first question, unfortunately currently in Spark Streaming writes only support inline compaction is supported. So you have to enable that config. Good news is, this PR is working on enabling the async compaction for Spark Streaming and is in priority.

For second question, couple clarifications.

  1. The conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
  2. in addition to hudi-spark-bundle, you need to pass ,org.apache.spark:spark-avro_2.11:2.4.4 Note the spark-avro must match your spark version which is 2.4.4. This applies if you are using spark-shell as it does not ship with spark-avro explicitly.

@RajasekarSribalan
Copy link
Author

RajasekarSribalan commented Jul 13, 2020 via email

@bvaradar
Copy link
Contributor

@RajasekarSribalan : Compactions should be able to be triggered and executed from CLI if there are delta files and configured correctly. Can you post the logs from "compaction schedule" after you disabled inline compaction and delta files got added. The only constraint is that when scheduling compactions, ingestion should not be running.

If you are running structured streaming job, please try out the PR suggested by @bhasudha which will execute compaction asynchronously. In batch mode, You can also enable inline compaction and setting a proper value for hoodie.compact.inline.max.delta.commits depending on how frequent you want to run compactions.

Regarding (2) Compaction is only relevant for MOR tables. Compaction is the process of taking the delta-files and creating columnar files. CopyOnWrite tables are implicitly compacted.

Regarding the error related to kryoException, Are you still seeing the error ?

@RajasekarSribalan
Copy link
Author

Thanks @bvaradar @bhasudha and one more problem which I could see is, how the compaction and cleaner should be configured? Should both have same values? What If i configure clean commits as 3, so that I reclaim more space and compaction to happen after 24 commits.. Since I am doing cleaner frequently, will be delta commits will be cleaned/delete before compaction. Please shed some light on this matter because, i could see tons of files in hdfs for a single table.

For example, in my case, when i ran a bulk insert for a table to store it in Hudi, there were 7000+ parquet files for created which was fine. After running streaming pipeline for doing upsert on the same table for 2 days, i could see there were 90,000+ files in HDFS. I havent changed the default cleaner configuration ,so i believe cleaning happends after 24 commits? so thats the reason i have these many files. Pls correct me if I am wrong.

@RajasekarSribalan
Copy link
Author

@bvaradar @bhasudha

PLs find the cli output for a MOR table clean info

20/07/18 16:05:31 INFO timeline.HoodieActiveTimeline: Loaded instants [[20200716082419__clean__COMPLETED], [20200716102509__clean__COMPLETED], [20200716103921__clean__COMPLETED], [20200716134933__clean__COMPLETED], [20200716135749__clean__COMPLETED], [20200716163408__clean__COMPLETED], [20200716164519__clean__COMPLETED], [20200716192304__clean__COMPLETED], [20200716192304__deltacommit__COMPLETED], [20200716193103__deltacommit__COMPLETED], [20200717034005__commit__COMPLETED], [20200717080741__clean__COMPLETED], [20200717080741__deltacommit__COMPLETED], [20200717100758__clean__COMPLETED], [20200717100758__deltacommit__COMPLETED], [20200717101709__clean__COMPLETED], [20200717101709__deltacommit__COMPLETED], [20200717120702__clean__COMPLETED], [20200717120702__deltacommit__COMPLETED], [20200717121648__clean__COMPLETED], [20200717121648__deltacommit__COMPLETED], [20200717141621__clean__COMPLETED], [20200717141621__deltacommit__COMPLETED], [20200717142837__clean__COMPLETED], [20200717142837__deltacommit__COMPLETED], [20200717161843__clean__COMPLETED], [20200717161843__deltacommit__COMPLETED], [20200717162524__clean__COMPLETED], [20200717162524__deltacommit__COMPLETED], [20200717180202__clean__COMPLETED], [20200717180202__deltacommit__COMPLETED], [20200717182211__deltacommit__COMPLETED], [20200717203440__deltacommit__COMPLETED], [20200718040640__clean__COMPLETED], [20200718040640__deltacommit__COMPLETED], [20200718055600__commit__COMPLETED], [20200718062014__clean__COMPLETED], [20200718062014__deltacommit__COMPLETED], [20200718062721__clean__COMPLETED], [20200718062721__deltacommit__COMPLETED], [20200718082117__clean__COMPLETED], [20200718082117__deltacommit__COMPLETED], [20200718082800__clean__COMPLETED], [20200718082800__deltacommit__COMPLETED], [20200718102800__clean__COMPLETED], [20200718102800__deltacommit__COMPLETED], [20200718104348__deltacommit__COMPLETED]]
Clean null not found in metadata org.apache.hudi.common.table.timeline.HoodieDefaultTimeline: [20200716082419__clean__COMPLETED],[20200716102509__clean__COMPLETED],[20200716103921__clean__COMPLETED],[20200716134933__clean__COMPLETED],[20200716135749__clean__COMPLETED],[20200716163408__clean__COMPLETED],[20200716164519__clean__COMPLETED],[20200716192304__clean__COMPLETED],[20200717080741__clean__COMPLETED],[20200717100758__clean__COMPLETED],[20200717101709__clean__COMPLETED],[20200717120702__clean__COMPLETED],[20200717121648__clean__COMPLETED],[20200717141621__clean__COMPLETED],[20200717142837__clean__COMPLETED],[20200717161843__clean__COMPLETED],[20200717162524__clean__COMPLETED],[20200717180202__clean__COMPLETED],[20200718040640__clean__COMPLETED],[20200718062014__clean__COMPLETED],[20200718062721__clean__COMPLETED],[20200718082117__clean__COMPLETED],[20200718082800__clean__COMPLETED],[20200718102800__clean__COMPLETED]
hudi:XXXXXX->cleans show
╔════════════════╤═════════════════════════╤═════════════════════╤══════════════════╗
║ CleanTime │ EarliestCommandRetained │ Total Files Deleted │ Total Time Taken ║
╠════════════════╪═════════════════════════╪═════════════════════╪══════════════════╣
║ 20200718102800 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200718082800 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200718082117 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200718062721 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200718062014 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200718040640 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200717180202 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200717162524 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200717161843 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200717142837 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200717141621 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200717121648 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200717120702 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200717101709 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200717100758 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200717080741 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200716192304 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200716164519 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200716163408 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200716135749 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200716134933 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200716103921 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200716102509 │ │ 0 │ -1 ║
╟────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20200716082419 │ │ 0 │ -1

@garyli1019
Copy link
Member

Hi @RajasekarSribalan , I am seeing the same issue as you did.

com.esotericsoftware.kryo.KryoException: Unable to find class: org.apache.hudi.common.model.OverwriteWithLatestAvroPayload$$Lambda$46/188376151
Serialization trace:
orderingVal (org.apache.hudi.common.model.OverwriteWithLatestAvroPayload)
data (org.apache.hudi.common.model.HoodieRecord)

Have you solved this problem?

@bvaradar
Copy link
Contributor

@garyli1019 : Trying to catchup on this. Is this still an issue ?

@garyli1019
Copy link
Member

@garyli1019 : Trying to catchup on this. Is this still an issue ?

@bvaradar I think this issue is reproducible when the log file group is larger than 2GB.

@garyli1019
Copy link
Member

garyli1019 commented Aug 20, 2020

@bvaradar I created a ticket to track this. I think we can close this issue and #1890
https://issues.apache.org/jira/browse/HUDI-1205

@bvaradar
Copy link
Contributor

Thanks Gary.

@LiLiu88
Copy link

LiLiu88 commented Jul 6, 2021

@RajasekarSribalan For your first question, unfortunately currently in Spark Streaming writes only support inline compaction is supported. So you have to enable that config. Good news is, this PR is working on enabling the async compaction for Spark Streaming and is in priority.

For second question, couple clarifications.

  1. The conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
  2. in addition to hudi-spark-bundle, you need to pass ,org.apache.spark:spark-avro_2.11:2.4.4 Note the spark-avro must match your spark version which is 2.4.4. This applies if you are using spark-shell as it does not ship with spark-avro explicitly.

can we disable compaction in sink ,and run compaction using hudi-cli manully? both inline or async all disable

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants