Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Flink-Hudi Unable to use Hudi metadata with S3 #11036

Open
ChiehFu opened this issue Apr 16, 2024 · 2 comments
Open

[SUPPORT] Flink-Hudi Unable to use Hudi metadata with S3 #11036

ChiehFu opened this issue Apr 16, 2024 · 2 comments
Labels
flink Issues related to flink metadata metadata table

Comments

@ChiehFu
Copy link

ChiehFu commented Apr 16, 2024

Describe the problem you faced

Hi,

I was creating a Flink SQL stream pipeline in AWS EMR to compact data into a Hudi COW table. Because of S3 slowdown errors that occasionally happened during Hudi writes, I tried to turn on the metadata table to eliminate S3 file listing but ran into the following exception saying S3 FS doesn't support atomic creation.

This issue seems particular related to Flink as I have another Spark/Hudi based batch pipeline running in the same type of EMR cluster, and Hudi metadata table functionality is working as expected with S3 FS.

Can you please help me with this issue?

Exception:

Caused by: org.apache.hudi.exception.HoodieLockException: Unsupported scheme :s3, since this fs can not support atomic creation
	at org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider.<init>(FileSystemBasedLockProvider.java:89) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_402]
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_402]
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_402]
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_402]
	at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
	at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:123) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
	at org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:118) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
	at org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:109) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
	at org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:216) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
	at org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:318) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
	at org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:347) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
	at org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:192) ~[hudi-flink1.17-bundle-0.14.1.jar:0.14.1]
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:181) ~[flink-dist-1.17.1-amzn-1.jar:1.17.1-amzn-1]
	at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:165) ~[flink-dist-1.17.1-amzn-1.jar:1.17.1-amzn-1]
	at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.17.1-amzn-1.jar:1.17.1-amzn-1]
	at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:618) ~[flink-dist-1.17.1-amzn-1.jar:1.17.1-amzn-1]
	at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1130) ~[flink-dist-1.17.1-amzn-1.jar:1.17.1-amzn-1]
	at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:1047) ~[flink-dist-1.17.1-amzn-1.jar:1.17.1-amzn-1]
	at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:439) ~[flink-dist-1.17.1-amzn-1.jar:1.17.1-amzn-1]
	at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:198) ~[flink-dist-1.17.1-amzn-1.jar:1.17.1-amzn-1]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:622) ~[flink-rpc-akka_f5fd373f-2282-403d-a522-72b822a720aa.jar:1.17.1-amzn-1]
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_f5fd373f-2282-403d-a522-72b822a720aa.jar:1.17.1-amzn-1]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:621) ~[flink-rpc-akka_f5fd373f-2282-403d-a522-72b822a720aa.jar:1.17.1-amzn-1]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:190) ~[flink-rpc-akka_f5fd373f-2282-403d-a522-72b822a720aa.jar:1.17.1-amzn-1]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) ~[flink-scala_2.12-1.17.1-amzn-1.jar:1.17.1-amzn-1]
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) ~[flink-scala_2.12-1.17.1-amzn-1.jar:1.17.1-amzn-1]
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) ~[flink-scala_2.12-1.17.1-amzn-1.jar:1.17.1-amzn-1]

To Reproduce

Flink SQL / Hudi configs

 'path'='...',
 'connector' = 'hudi',
 'table.type' = 'COPY_ON_WRITE',
 'precombine.field' = 'integ_key',
 'write.precombine' = 'true',
 'write.bucket_assign.tasks' = '10',
 'write.task.max.size' = '2014',
 'write.operation' = 'upsert',
 'hoodie.datasource.write.recordkey.field' = 'key',
 'write.parquet.max.file.size' = '240',
 'index.bootstrap.enabled' = 'false',
 'write.index_bootstrap.tasks' = '200',
 'metadata.enabled' = 'true'

Environment Description

  • Hudi version : 0.14.1

  • EMR: 6.15.0

  • Hive version : 3.1.3

  • Hadoop version : 3.3.6

  • Storage (HDFS/S3/GCS..) : S3

  • Flink: 1.17.1

  • Running on Docker? (yes/no) : No

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

@ChiehFu
Copy link
Author

ChiehFu commented Apr 29, 2024

Hello, Could I please get some help on this issue? Also please let me know if any further information is needed. Thanks!

@danny0405
Copy link
Contributor

yeah, the mdt is not recommended for Flink 0.13.x now, we are striving to make it production ready for 1.x release.

@codope codope added flink Issues related to flink metadata metadata table labels May 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flink Issues related to flink metadata metadata table
Projects
Status: 🏁 Triaged
Development

No branches or pull requests

3 participants