Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Error upsetting bucketType UPDATE for partition :20240119 #10639

Open
zjq888 opened this issue Feb 8, 2024 · 1 comment
Open

[SUPPORT] Error upsetting bucketType UPDATE for partition :20240119 #10639

zjq888 opened this issue Feb 8, 2024 · 1 comment
Labels
flink Issues related to flink

Comments

@zjq888
Copy link

zjq888 commented Feb 8, 2024

  • Hudi version :0.13.1

  • Flink version :1.13

Hudi Flink Config:
'connector' = 'hudi',
'path' = 's3://bnb-datalake-hudi/**********',
'table.type' = 'COPY_ON_WRITE', 'write.batch.size' = '512',
'write.tasks' = '4', 'write.bucket_assign.tasks' = '4',
'write.operation' = 'upsert', 'write.task.max.size' = '4096',
'write.merge.max_memory' = '3072',
'write.precombine' = 'true',
'precombine.field' = 'update_time',
'hive_sync.enable' = 'true',
'hive_sync.db' = '---',
'hive_sync.table' = '---',
'hive_sync.mode' = 'GLUE',
'hive_sync.partition_fields' = 'date_key',
'write.rate.limit' = '15000',
'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor'

It started to run normally, but after a period of execution, the following error will be reported(every sync will report more than 10 partitions)
Exception trace during upsert:

@timestamp 2024-02-06T20:38:41.222Z,

    • app_id application_1678281252174_81026,
    • app_name bnb_dwd,
    • container_id container_e03_1678281252174_81026_01_000004,
    • content org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor [] - Error upsetting bucketType UPDATE for partition :20240119
      com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AbortedException:
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.internal.SdkFilterInputStream.abortIfNeeded(SdkFilterInputStream.java:61) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.internal.SdkFilterInputStream.markSupported(SdkFilterInputStream.java:125) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.makeResettable(AmazonHttpClient.java:999) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.beforeRequest(AmazonHttpClient.java:966) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:807) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5453) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5400) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:421) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:6528) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1861) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1821) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.s3.lite.call.PutObjectCall.performCall(PutObjectCall.java:35) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.s3.lite.call.PutObjectCall.performCall(PutObjectCall.java:10) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.s3.lite.call.AbstractUploadingS3Call.perform(AbstractUploadingS3Call.java:87) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor$CallPerformer.call(GlobalS3Executor.java:111) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:138) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:191) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:186) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.putObject(AmazonS3LiteClient.java:107) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.s3.upload.dispatch.DefaultSinglePartUploadDispatcher.create(DefaultSinglePartUploadDispatcher.java:39) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.uploadSingleCompleteFile(S3FSOutputStream.java:386) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.doClose(S3FSOutputStream.java:225) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.close(S3FSOutputStream.java:201) ~[emrfs-hadoop-assembly-2.51.0.jar:?]
      at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74) ~[hadoop-common-2.10.1-amzn-4.jar:?]
      at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108) ~[hadoop-common-2.10.1-amzn-4.jar:?]
      at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74) ~[hadoop-common-2.10.1-amzn-4.jar:?]
      at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108) ~[hadoop-common-2.10.1-amzn-4.jar:?]
      at org.apache.hudi.common.fs.SizeAwareFSDataOutputStream.close(SizeAwareFSDataOutputStream.java:75) ~[hudi.jar:0.13.1]
      at org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64) ~[hudi.jar:0.13.1]
      at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1106) ~[hudi.jar:0.13.1]
      at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:132) ~[hudi.jar:0.13.1]
      at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319) ~[hudi.jar:0.13.1]
      at org.apache.hudi.io.storage.HoodieAvroParquetWriter.close(HoodieAvroParquetWriter.java:84) ~[hudi.jar:0.13.1]
      at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:415) ~[hudi.jar:0.13.1]
      at org.apache.hudi.io.FlinkMergeHandle.close(FlinkMergeHandle.java:172) ~[hudi.jar:0.13.1]
      at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:168) ~[hudi.jar:0.13.1]
      at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.handleUpdateInternal(BaseFlinkCommitActionExecutor.java:227) ~[hudi.jar:0.13.1]
      at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.handleUpdate(BaseFlinkCommitActionExecutor.java:218) ~[hudi.jar:0.13.1]
      at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.handleUpsertPartition(BaseFlinkCommitActionExecutor.java:189) ~[hudi.jar:0.13.1]
      at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:107) ~[hudi.jar:0.13.1]
      at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:69) ~[hudi.jar:0.13.1]
      at org.apache.hudi.table.action.commit.FlinkWriteHelper.write(FlinkWriteHelper.java:77) ~[hudi.jar:0.13.1]
      at org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor.execute(FlinkUpsertCommitActionExecutor.java:51) ~[hudi.jar:0.13.1]
      at org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.upsert(HoodieFlinkCopyOnWriteTable.java:111) ~[hudi.jar:0.13.1]
      at org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:147) ~[hudi.jar:0.13.1]
      at org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$1(StreamWriteFunction.java:189) ~[hudi.jar:0.13.1]
      at org.apache.hudi.sink.StreamWriteFunction.lambda$flushRemaining$7(StreamWriteFunction.java:472) ~[hudi.jar:0.13.1]
      at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[?:1.8.0_372]
      at org.apache.hudi.sink.StreamWriteFunction.flushRemaining(StreamWriteFunction.java:464) ~[hudi.jar:0.13.1]
      at org.apache.hudi.sink.StreamWriteFunction.snapshotState(StreamWriteFunction.java:136) ~[hudi.jar:0.13.1]
      at org.apache.hudi.sink.common.AbstractStreamWriteFunction.snapshotState(AbstractStreamWriteFunction.java:167) ~[hudi.jar:0.13.1]
      at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:218) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:60) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) ~[flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) [flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) [flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:782) [flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) [flink-dist_2.12-1.13.2-SNAPSHOT.jar:1.13.2-SNAPSHOT]
      at java.lang.Thread.run(Thread.java:750) [?:1.8.0_372],
    • hostname ip-10-116-20-255,
    • level ERROR,
    • log_name taskmanager_bnb_dwd.fact_main_c2c_order_di_hudi.log,
    • timestamp 2024-02-06 20:38:41,222
      }
@codope codope added the flink Issues related to flink label Feb 22, 2024
@ad1happy2go
Copy link
Collaborator

@danny0405 Any insights here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flink Issues related to flink
Projects
Status: Awaiting Triage
Development

No branches or pull requests

3 participants