Skip to content

Standalone Compaction Scheduling takes so much time  #8438

@SamarthRaval

Description

@SamarthRaval

Running standalone compaction job for spark datasource on huge table:
Configuration:
spark-submit --deploy-mode cluster --class org.apache.hudi.utilities.HoodieCompactor --jars /usr/lib/hudi/hudi-utilities-bundle.jar,/usr/lib/hudi/hudi-spark-bundle.jar,custom.jar /usr/lib/hudi/hudi-utilities-bundle.jar --base-path s3://transactions.all_hudi --table-name transactions_all --parallelism 1000 --spark-memory 90g --mode schedule --hoodie-conf hoodie.embed.timeline.server=false --hoodie-conf hoodie.datasource.write.payload.class=customPayload --hoodie-conf hoodie.payload.ordering.field=lastmodifieddate --hoodie-conf hoodie.compaction.logfile.size.threshold=500000

Error inside Task nodes(just scheduling compaction takes so much time) ?
compaction Scheduling: 78 mins (too much time and weird)
compaction Execution: 7 mins(super fast)

To Reproduce

Steps to reproduce the behavior:

  1. Ran daily upsert on transaction table(huge table with 3 level of partitions)
  2. Ran standalone compaction job of this table.
  3. Issue while finishing up the scheduling.
  4. task nodes inside the emr job gives the below error[shown in stack trace], seems and issue in metadata but that is required as to improve the reading time in query.

Expected behavior

  • Compaction scheduling should also be faster same as compaction execution.
  • It does work faster for other tables only this particular table is problematic. [Note: Transaction table is huge in size and has 3 level of partitions]

Environment Description

  • Hudi version : hudi 0.11.0

  • Spark version : spark 3.2.1

  • Hive version : 3.1.3

  • Hadoop version : 3.2.1

  • Storage (HDFS/S3/GCS..) : AWS s3

  • Running on Docker? (yes/no) :

Running of emr-6.7.0

Add any other context about the problem here.

INFO S3FSInputStream: Encountered exception while reading 'hudi_table/.hoodie/metadata/files/.files-0000_20230303162559914001.log.6_0-91-247146', will retry by attempting to reopen stream.
javax.net.ssl.SSLException: Connection reset
	at sun.security.ssl.Alert.createSSLException(Alert.java:127)
	at sun.security.ssl.TransportContext.fatal(TransportContext.java:324)
	at sun.security.ssl.TransportContext.fatal(TransportContext.java:267)
	at sun.security.ssl.TransportContext.fatal(TransportContext.java:262)
	at sun.security.ssl.SSLTransport.decode(SSLTransport.java:138)
	at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1400)
	at sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1368)
	at sun.security.ssl.SSLSocketImpl.access$300(SSLSocketImpl.java:73)
	at sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:962)
	at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
	at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:197)
	at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:176)
	at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:107)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.internal.S3AbortableInputStream.read(S3AbortableInputStream.java:125)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
	at com.amazon.ws.emr.hadoop.fs.s3.InputStreamWithInfo.read(InputStreamWithInfo.java:231)
	at com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.read(S3FSInputStream.java:262)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
	at java.io.DataInputStream.read(DataInputStream.java:149)
	at java.io.DataInputStream.readFully(DataInputStream.java:195)
	at org.apache.hudi.common.table.log.block.HoodieLogBlock.inflate(HoodieLogBlock.java:273)
	at org.apache.hudi.common.table.log.block.HoodieDataBlock.readRecordsFromBlockPayload(HoodieDataBlock.java:185)
	at org.apache.hudi.common.table.log.block.HoodieDataBlock.getRecordIterator(HoodieDataBlock.java:147)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.getRecordsIterator(AbstractHoodieLogRecordReader.java:492)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:379)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:467)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:343)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:192)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:110)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:103)
	at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:63)
	at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:51)
	at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader$Builder.build(HoodieMetadataMergedLogRecordReader.java:230)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:499)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:461)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.openReaders(HoodieBackedTableMetadata.java:407)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getOrCreateReaders$10(HoodieBackedTableMetadata.java:393)
	at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.getOrCreateReaders(HoodieBackedTableMetadata.java:393)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$0(HoodieBackedTableMetadata.java:202)
	at java.util.HashMap.forEach(HashMap.java:1290)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:200)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordByKey(HoodieBackedTableMetadata.java:140)
	at org.apache.hudi.metadata.BaseTableMetadata.fetchAllFilesInPartition(BaseTableMetadata.java:312)
	at org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:135)
	at org.apache.hudi.metadata.HoodieMetadataFileSystemView.listPartition(HoodieMetadataFileSystemView.java:66)
	at org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$ensurePartitionLoadedCorrectly$9(AbstractTableFileSystemView.java:304)
	at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
	at org.apache.hudi.common.table.view.AbstractTableFileSystemView.ensurePartitionLoadedCorrectly(AbstractTableFileSystemView.java:295)
	at org.apache.hudi.common.table.view.AbstractTableFileSystemView.getLatestFileSlices(AbstractTableFileSystemView.java:591)
	at org.apache.hudi.table.action.compact.HoodieCompactor.lambda$generateCompactionPlan$30498406$1(HoodieCompactor.java:294)
	at org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:137)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2255)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:133)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
	Suppressed: java.net.SocketException: Broken pipe (Write failed)
		at java.net.SocketOutputStream.socketWrite0(Native Method)
		at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
		at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
		at sun.security.ssl.SSLSocketOutputRecord.encodeAlert(SSLSocketOutputRecord.java:81)
		at sun.security.ssl.TransportContext.fatal(TransportContext.java:355)
		... 90 more
Caused by: java.net.SocketException: Connection reset
	at java.net.SocketInputStream.read(SocketInputStream.java:210)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:464)
	at sun.security.ssl.SSLSocketInputRecord.decodeInputRecord(SSLSocketInputRecord.java:237)
	at sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:190)
	at sun.security.ssl.SSLTransport.decode(SSLTransport.java:109)
	... 87 more

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    Status

    ✅ Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions