Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3OutputStream - failure to close should persist on subsequent close calls #5311

Merged
merged 59 commits into from Aug 2, 2022

Conversation

abmo-x
Copy link
Contributor

@abmo-x abmo-x commented Jul 20, 2022

Fix for #5310 and #4168

Issue

When S3OutputStream fails to upload a file successfully on call to close due to some failure, IcebergStreamWriter in Flink still ends up adding the file to completedDataFiles from BaseTaskWriter resulting in table metadata pointing to a s3 data file which was never uploaded to s3.

Steps to Reproduce

  • Flink 1.14 pipeline with Iceberg 0.13

  • Customer implemented ProcessFunction<FlinkRecord, Row> function with catch all exceptions in processElement

  • configure pipeline to use S3FileIO and file size according to your test data so that the file will roll to new file

  • S3 failure on putObject(should be reproducible for MultiPartUpload as well) call to shouldRollToNewFile which calls close --> completeUploads

StackTrace from failure

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:101)
	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
	...
	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:101)
	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
	... 21 more
Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to load credentials from service endpoint.
	at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)
	at software.amazon.awssdk.auth.credentials.HttpCredentialsProvider.refreshCredentials(HttpCredentialsProvider.java:110)
	at software.amazon.awssdk.utils.cache.CachedSupplier.refreshCache(CachedSupplier.java:132)
	at software.amazon.awssdk.utils.cache.OneCallerBlocks.prefetch(OneCallerBlocks.java:38)
	at software.amazon.awssdk.utils.cache.CachedSupplier.prefetchCache(CachedSupplier.java:116)
	at software.amazon.awssdk.utils.cache.CachedSupplier.get(CachedSupplier.java:91)
	at java.base/java.util.Optional.map(Optional.java:265)
	at software.amazon.awssdk.auth.credentials.HttpCredentialsProvider.resolveCredentials(HttpCredentialsProvider.java:146)
	at software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain.resolveCredentials(AwsCredentialsProviderChain.java:85)
	at software.amazon.awssdk.auth.credentials.internal.LazyAwsCredentialsProvider.resolveCredentials(LazyAwsCredentialsProvider.java:45)
	at software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.resolveCredentials(DefaultCredentialsProvider.java:104)
	at software.amazon.awssdk.awscore.client.handler.AwsClientHandlerUtils.createExecutionContext(AwsClientHandlerUtils.java:76)
	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.createExecutionContext(AwsSyncClientHandler.java:68)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:97)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:167)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:94)
	at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:55)
	at software.amazon.awssdk.services.s3.DefaultS3Client.putObject(DefaultS3Client.java:8350)
	at org.apache.iceberg.aws.s3.S3OutputStream.completeUploads(S3OutputStream.java:396)
	at org.apache.iceberg.aws.s3.S3OutputStream.close(S3OutputStream.java:256)
	at org.apache.parquet.io.DelegatingPositionOutputStream.close(DelegatingPositionOutputStream.java:38)
	at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1106)
	at org.apache.iceberg.parquet.ParquetWriter.close(ParquetWriter.java:239)
	at org.apache.iceberg.io.DataWriter.close(DataWriter.java:82)
	at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.closeCurrent(BaseTaskWriter.java:288)
	at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:254)
	at org.apache.iceberg.io.PartitionedFanoutWriter.write(PartitionedFanoutWriter.java:58)
	at org.apache.iceberg.flink.sink.IcebergStreamWriter.processElement(IcebergStreamWriter.java:74)
	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
	... 27 more
Caused by: software.amazon.awssdk.core.exception.SdkServiceException: Unauthorized

  • Pipeline should keep running even on above failure, then snapshot barrier gets triggered
    • This calls close and ends up adding the datafile which was never uploaded to S3

Testing

  • Unit tests added
  • Testing on our dev pipeline, will update the results after the pipeline runs for a little bit

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it feasible for us to fix this in BaseTaskWriter? It sounds like a failed "close" call should just stop the commit process. It feels like a bit of a workaround to have subsequent "close" calls throw an exception when we probably shouldn't be making subsequent "close" calls

I could be misunderstanding this though

@rdblue
Copy link
Contributor

rdblue commented Jul 20, 2022

I agree with @RussellSpitzer, I think we should avoid the double close, since that is what is causing the problem (at least as far as I understand).

@github-actions github-actions bot added the data label Jul 20, 2022
@abmo-x
Copy link
Contributor Author

abmo-x commented Jul 20, 2022

@rdblue @RussellSpitzer
Added a commit to clear currentWriter on close in BaseTaskWriter and added 2 test cases around failure to close and complete.

I agree close should be only called once and we are relying on that behavior quite strongly and adding the data files.
However I found that the writers are held and closed more than once in various scenarios which causes this issue where a close resulted in failure and writers end up in a bad state:

  1. when user defined functions catch all exceptions and ignore failures on write as seen in Flink's processElement which internally triggers a roll to new file.
  2. This behavior was also observed before other than BaseTaskWriter and fix was made to not close already closed stream in AWS: fix bugs around using S3FileIO for table operations #1749

Let me know your thoughts.

@github-actions github-actions bot removed the AWS label Jul 25, 2022
@abmo-x
Copy link
Contributor Author

abmo-x commented Jul 25, 2022

reverted changes to S3OutputStream to keep close api consistent

@github-actions github-actions bot added the AWS label Jul 26, 2022
@abmo-x abmo-x changed the title DataWriter - failure to close should not add file to completedDataFiles S3OutputStream - failure to close should persist on subsequent close calls Jul 26, 2022
@abmo-x
Copy link
Contributor Author

abmo-x commented Jul 26, 2022

After further discussion with @RussellSpitzer, brought back my changes to S3OutputStream.

As failure to close a S3 stream leaves it in a bad state which cannot be recovered, any future calls to that stream should continue to fail. Changes now are simple and just in S3OutputStream.

cc @rdblue

@abmo-x abmo-x changed the base branch from master to 0.14.x August 2, 2022 06:08
@abmo-x abmo-x changed the base branch from 0.14.x to master August 2, 2022 06:08
@RussellSpitzer RussellSpitzer merged commit d44565b into apache:master Aug 2, 2022
@rdblue rdblue added this to the Iceberg 0.14.1 Release milestone Aug 9, 2022
rdblue pushed a commit to rdblue/iceberg that referenced this pull request Sep 2, 2022
rdblue pushed a commit that referenced this pull request Sep 3, 2022
sunchao pushed a commit to sunchao/iceberg that referenced this pull request May 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet