-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Closed
Labels
area:ingestIngestion into HudiIngestion into Hudipriority:highSignificant impact; potential bugsSignificant impact; potential bugs
Description
- Have you gone through our FAQs?
- Yes
Describe the problem you faced
Exception when running HoodieDeltaStreamer: Could not load class org.apache.hudi.utilities.sources.AvroKafkaSource
A clear and concise description of the problem.
I want to use streaming ingestion feature using DelatStreamer but run intojava.lang.NoSuchMethodException: org.apache.hudi.utilities.sources.AvroKafkaSource.<init>(org.apache.hudi.common.config.TypedProperties, org.apache.spark.api.java.JavaSparkContext, org.apache.spark.sql.SparkSession, org.apache.hudi.utilities.schema.SchemaProvider)
It kind of looks like version mismatch and I might be missing some obvious configuration. :)
To Reproduce
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /opt/spark/hudi-utilities-bundle_2.12-0.12.1.jar \
--props /opt/spark/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--source-ordering-field f1 \
--target-base-path /home/azureuser/hudi-t1a \
--target-table hudi-t1a \
--op INSERT \
--filter-dupes \
--table-type COPY_ON_WRITE \
--continuous
Expected behavior
Streaming ingestion
Environment Description
-
Hudi version : 0.10,0.11, 0.12
-
Spark version :3.1.3
-
Hive version :
-
Hadoop version : 3.2
-
Storage (HDFS/S3/GCS..) : local
-
Running on Docker? (yes/no) : No
Additional context
I tried to run it on Azure Spark pool but facing the same errors when running in my local machine.
Stacktrace
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /opt/spark/hudi-utilities-bundle_2.11-0.10.1.jar --props /opt/spark/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider --source-class org.apache.hudi.utilities.sources.AvroKafkaSource --source-ordering-field f1 --target-base-path /opt/spark/hudi-t1a --target-table hudi-t1a --op INSERT --filter-dupes --table-type COPY_ON_WRITE --continuous
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.1.3.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
23/01/03 10:11:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
log4j:WARN No appenders could be found for logger (org.apache.hudi.utilities.deltastreamer.SchedulerConfGenerator).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
23/01/03 10:11:51 INFO SparkContext: Running Spark version 3.1.3
23/01/03 10:11:51 INFO ResourceUtils: ==============================================================
23/01/03 10:11:51 INFO ResourceUtils: No custom resources configured for spark.driver.
23/01/03 10:11:51 INFO ResourceUtils: ==============================================================
23/01/03 10:11:51 INFO SparkContext: Submitted application: delta-streamer-hudi-t1a
23/01/03 10:11:51 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
23/01/03 10:11:51 INFO ResourceProfile: Limiting resource is cpu
23/01/03 10:11:51 INFO ResourceProfileManager: Added ResourceProfile id: 0
23/01/03 10:11:51 INFO SecurityManager: Changing view acls to: azureuser
23/01/03 10:11:51 INFO SecurityManager: Changing modify acls to: azureuser
23/01/03 10:11:51 INFO SecurityManager: Changing view acls groups to:
23/01/03 10:11:51 INFO SecurityManager: Changing modify acls groups to:
23/01/03 10:11:51 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(azureuser); groups with view permissions: Set(); users with modify permissions: Set(azureuser); groups with modify permissions: Set()
23/01/03 10:11:51 INFO deprecation: mapred.output.compression.codec is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.codec
23/01/03 10:11:51 INFO deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
23/01/03 10:11:51 INFO deprecation: mapred.output.compression.type is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.type
23/01/03 10:11:51 INFO Utils: Successfully started service 'sparkDriver' on port 41851.
23/01/03 10:11:51 INFO SparkEnv: Registering MapOutputTracker
23/01/03 10:11:51 INFO SparkEnv: Registering BlockManagerMaster
23/01/03 10:11:51 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
23/01/03 10:11:51 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
23/01/03 10:11:51 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/01/03 10:11:51 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-30a8d33e-9d76-4fa9-963f-161561751946
23/01/03 10:11:51 INFO MemoryStore: MemoryStore started with capacity 434.4 MiB
23/01/03 10:11:51 INFO SparkEnv: Registering OutputCommitCoordinator
23/01/03 10:11:52 INFO Utils: Successfully started service 'SparkUI' on port 4040.
23/01/03 10:11:52 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://kafka-vm.internal.cloudapp.net:4040
23/01/03 10:11:52 INFO SparkContext: Added JAR file:/opt/spark/hudi-utilities-bundle_2.11-0.10.1.jar at spark://kafka-vm.internal.cloudapp.net:41851/jars/hudi-utilities-bundle_2.11-0.10.1.jar with timestamp 1672740711075
23/01/03 10:11:52 INFO Executor: Starting executor ID driver on host kafka-vm.internal.cloudapp.net
23/01/03 10:11:52 INFO Executor: Fetching spark://kafka-vm.internal.cloudapp.net:41851/jars/hudi-utilities-bundle_2.11-0.10.1.jar with timestamp 1672740711075
23/01/03 10:11:52 INFO TransportClientFactory: Successfully created connection to kafka-vm.internal.cloudapp.net/10.0.0.4:41851 after 47 ms (0 ms spent in bootstraps)
23/01/03 10:11:52 INFO Utils: Fetching spark://kafka-vm.internal.cloudapp.net:41851/jars/hudi-utilities-bundle_2.11-0.10.1.jar to /tmp/spark-3d253876-11e3-4216-80df-73ab6e1072bc/userFiles-5bee7627-ccf8-4614-b6ce-baaf31425b20/fetchFileTemp4471654452255410462.tmp
23/01/03 10:11:53 INFO Executor: Adding file:/tmp/spark-3d253876-11e3-4216-80df-73ab6e1072bc/userFiles-5bee7627-ccf8-4614-b6ce-baaf31425b20/hudi-utilities-bundle_2.11-0.10.1.jar to class loader
23/01/03 10:11:53 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40931.
23/01/03 10:11:53 INFO NettyBlockTransferService: Server created on kafka-vm.internal.cloudapp.net:40931
23/01/03 10:11:53 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
23/01/03 10:11:53 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, kafka-vm.internal.cloudapp.net, 40931, None)
23/01/03 10:11:53 INFO BlockManagerMasterEndpoint: Registering block manager kafka-vm.internal.cloudapp.net:40931 with 434.4 MiB RAM, BlockManagerId(driver, kafka-vm.internal.cloudapp.net, 40931, None)
23/01/03 10:11:53 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, kafka-vm.internal.cloudapp.net, 40931, None)
23/01/03 10:11:53 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, kafka-vm.internal.cloudapp.net, 40931, None)
23/01/03 10:11:53 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
23/01/03 10:11:53 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
23/01/03 10:11:53 WARN SparkContext: Using an existing SparkContext; some configuration may not take effect.
23/01/03 10:11:54 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from /opt/spark/hudi-t1a
23/01/03 10:11:54 INFO HoodieTableConfig: Loading table properties from /opt/spark/hudi-t1a/.hoodie/hoodie.properties
23/01/03 10:11:54 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from /opt/spark/hudi-t1a
23/01/03 10:11:54 INFO HoodieDeltaStreamer: Creating delta streamer with configs:
//: kafka-source-properties
auto.offset.reset: smallest
hoodie.datasource.write.partitionpath.field: f1
hoodie.datasource.write.reconcile.schema: false
hoodie.datasource.write.recordkey.field: f1
hoodie.deltastreamer.schemaprovider.registry.url: http://20.245.4.243:8082/subjects/t1-a-value/versions/latest
hoodie.deltastreamer.source.kafka.topic: t1-a
metadata.broker.list: 20.245.4.243:9092
schema.registry.url: http://20.245.4.243:8081
23/01/03 10:11:54 INFO HoodieSparkKeyGeneratorFactory: The value of hoodie.datasource.write.keygenerator.type is empty, use SIMPLE
23/01/03 10:11:54 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from /opt/spark/hudi-t1a
23/01/03 10:11:54 INFO HoodieTableConfig: Loading table properties from /opt/spark/hudi-t1a/.hoodie/hoodie.properties
23/01/03 10:11:54 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from /opt/spark/hudi-t1a
23/01/03 10:11:54 INFO HoodieActiveTimeline: Loaded instants upto : Optional.empty
23/01/03 10:11:54 INFO SparkUI: Stopped Spark web UI at http://kafka-vm.internal.cloudapp.net:4040
23/01/03 10:11:54 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/01/03 10:11:54 INFO MemoryStore: MemoryStore cleared
23/01/03 10:11:54 INFO BlockManager: BlockManager stopped
23/01/03 10:11:54 INFO BlockManagerMaster: BlockManagerMaster stopped
23/01/03 10:11:54 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/01/03 10:11:54 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" java.io.IOException: Could not load source class org.apache.hudi.utilities.sources.AvroKafkaSource
at org.apache.hudi.utilities.UtilHelpers.createSource(UtilHelpers.java:119)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.<init>(DeltaSync.java:234)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:611)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:142)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:114)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:514)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hudi.exception.HoodieException: Unable to instantiate class org.apache.hudi.utilities.sources.AvroKafkaSource
at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:91)
at org.apache.hudi.utilities.UtilHelpers.createSource(UtilHelpers.java:113)
... 17 more
Caused by: java.lang.NoSuchMethodException: org.apache.hudi.utilities.sources.AvroKafkaSource.<init>(org.apache.hudi.common.config.TypedProperties, org.apache.spark.api.java.JavaSparkContext, org.apache.spark.sql.SparkSession, org.apache.hudi.utilities.schema.SchemaProvider)
at java.base/java.lang.Class.getConstructor0(Class.java:3349)
at java.base/java.lang.Class.getConstructor(Class.java:2151)
at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:89)
... 18 more
23/01/03 10:11:54 INFO ShutdownHookManager: Shutdown hook called
23/01/03 10:11:54 INFO ShutdownHookManager: Deleting directory /tmp/spark-3d253876-11e3-4216-80df-73ab6e1072bc
23/01/03 10:11:54 INFO ShutdownHookManager: Deleting directory /tmp/spark-10d1602c-cc91-456b-a764-0576b06680ef
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
area:ingestIngestion into HudiIngestion into Hudipriority:highSignificant impact; potential bugsSignificant impact; potential bugs
Type
Projects
Status
✅ Done