Skip to content

[SUPPORT] Hudi DeltaStreamer Job with continuos mode submit job crashes at some point after eating all memory available #2888

@PavelPetukhov

Description

@PavelPetukhov

Hi,

I am facing the following issue:

After spark submit start (attached the whole request with parameters below) it fails on

Application application_1617982296136_0040 failed 2 times due to AM Container for appattempt_1617982296136_0040_000002 exited with exitCode: -104
For more detailed output, check the application tracking page: http://xxx:8088/cluster/app/application_1617982296136_0040 Then click on links to logs of each attempt.
Diagnostics: Container [pid=32089,containerID=container_e37_1617982296136_0040_02_000001] is running beyond physical memory limits. Current usage: 10.0 GB of 10 GB physical memory used; 17.3 GB of 21 GB virtual memory used. Killing container.
Dump of the process-tree for container_e37_1617982296136_0040_02_000001 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE

I've added hudi logs below as well

Note 1: even after increasing memory limits spark submit would crash consuming all of the memory available
Note 2: it works fine without --continuous parameter
Note 3: it stores data as expected with --continuous but fails at some point
Note 4: dynamic resource allocations didn't help as well. Like specifying
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.shuffleTracking.enabled=true
--conf spark.shuffle.service.enabled=true

  • Hudi version : 0.6.0

  • Spark version : 2.4.7

  • Hadoop version : 2.7

  • Storage (HDFS/S3/GCS..) : hdfs

  • Running on Docker? (yes/no) : yes

Spark submit command:
/usr/local/spark/bin/spark-submit
--conf "spark.eventLog.enabled=true"
--conf "spark.eventLog.dir=hdfs://xxx:8020/eventLogging"
--conf "spark.driver.extraJavaOptions=-DsparkAappName=mlops827.ml_training_data.smth.v1.private -DlogIndex=GOLANG_JSON -DappName=data-lake-extractors-streamer -DlogFacility=stdout"
--conf spark.executor.memoryOverhead=4096
--conf spark.driver.memoryOverhead=4096
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.shuffleTracking.enabled=true
--conf spark.shuffle.service.enabled=true
--packages org.apache.hudi:hudi-utilities-bundle_2.11:0.7.0,org.apache.spark:spark-avro_2.11:2.4.4
--master yarn
--deploy-mode cluster
--driver-memory 10G
--executor-memory 10G
--name mlops827.ml_training_data.smth.v1.private
--conf spark.yarn.submit.waitAppCompletion=false
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer hoodie-utilities.jar
--op BULK_INSERT
--table-type MERGE_ON_READ
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource
--source-ordering-field __null_ts_ms
--target-base-path /user/hdfs/raw_data/public/ml_training_data/smth
--target-table mlops827.ml_training_data.smth.v1.private
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider
--hoodie-conf hoodie.upsert.shuffle.parallelism=2
--hoodie-conf hoodie.insert.shuffle.parallelism=2
--hoodie-conf hoodie.delete.shuffle.parallelism=2
--hoodie-conf hoodie.bulkinsert.shuffle.parallelism=2
--hoodie-conf hoodie.embed.timeline.server=true
--hoodie-conf hoodie.filesystem.view.type=EMBEDDED_KV_STORE
--hoodie-conf hoodie.compact.inline=false
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
--hoodie-conf hoodie.deltastreamer.keygen.timebased.timestamp.type="DATE_STRING"
--hoodie-conf hoodie.deltastreamer.keygen.timebased.input.dateformat="yyyy-MM-dd'T'HH:mm:ssZ,yyyy-MM-dd'T'HH:mm:ss.SSSZ"
--hoodie-conf hoodie.deltastreamer.keygen.timebased.input.dateformat.list.delimiter.regex=""
--hoodie-conf hoodie.deltastreamer.keygen.timebased.input.timezone=""
--hoodie-conf hoodie.deltastreamer.keygen.timebased.output.dateformat="yyyy/MM/dd"
--hoodie-conf hoodie.datasource.write.recordkey.field=id
--hoodie-conf hoodie.datasource.write.partitionpath.field=date
--hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://xxx/subjects/yyy.ml_train.smth.v1.private-value/versions/latest
--hoodie-conf hoodie.deltastreamer.source.kafka.topic=yyy.ml_train.smth.v1.private
--hoodie-conf bootstrap.servers=xxx:9092
--hoodie-conf auto.offset.reset=earliest
--hoodie-conf group.id=hudi_group
--hoodie-conf schema.registry.url=http://xxx
--hoodie-conf hoodie.datasource.hive_sync.enable=true
--hoodie-conf hoodie.datasource.hive_sync.table=smth
--hoodie-conf hoodie.datasource.hive_sync.partition_fields=date
--hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
--hoodie-conf hoodie.datasource.hive_sync.jdbcurl="hdfs://xxx:8020/"
--enable-sync
--continuous

  • Stacktrace
    21/04/10 06:59:14 INFO service.FileSystemViewHandler: TimeTakenMillis[Total=161, Refresh=0, handle=161, Check=0], Success=true, Query=partition=2021%2F04%2F08&maxinstant=20210410065719&basepath=%2Fuser%2Fdelta%2Fraw_data%2Fdelivery%2Forders&lastinstantts=20210410065905&timelinehash=ada30e15bdcb74559290e5c426f394b27bd4fb2c7c737f7047a1ffa84c615260, Host=xxx:43469, synced=false
    21/04/10 06:59:14 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL TERM
    21/04/10 06:59:14 INFO collection.RocksDBDAO: Prefix Search for (query=type=slice,part=2021/04/08,id=) on hudi_view__user_delta_raw_data_delivery_orders. Total Time Taken (msec)=21. Serialization Time taken(micro)=14909, num entries=2462
    21/04/10 06:59:14 INFO spark.SparkContext: Invoking stop() from shutdown hook
    21/04/10 06:59:14 INFO server.AbstractConnector: Stopped Spark@3898238{HTTP/1.1,[http/1.1]}{0.0.0.0:0}
    21/04/10 06:59:14 INFO ui.SparkUI: Stopped Spark web UI at http://xxx:37127
    21/04/10 06:59:14 INFO scheduler.DAGScheduler: Job 20064 failed: collect at HoodieSparkEngineContext.java:73, took 4.417391 s
    21/04/10 06:59:14 INFO scheduler.DAGScheduler: ResultStage 23409 (collect at HoodieSparkEngineContext.java:73) failed in 4.416 s due to Stage cancelled because SparkContext was shut down
    21/04/10 06:59:14 ERROR deltastreamer.HoodieDeltaStreamer: Shutting down delta-sync due to exception
    org.apache.spark.SparkException: Job 20064 cancelled because SparkContext was shut down
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:954)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:952)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:952)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2164)
    at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
    at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2077)
    at org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1949)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:1948)
    at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:575)
    at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
    at scala.util.Try$.apply(Try.scala:192)

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:performancePerformance optimizationspriority:highSignificant impact; potential bugs

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions