Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iceberg specific files (manifest files, manifest list file, metadata file) are not created, only Flink specific manifest file is created #2033

Closed
elkhand opened this issue Jan 5, 2021 · 25 comments
Assignees
Labels

Comments

@elkhand
Copy link
Contributor

elkhand commented Jan 5, 2021

[ Update on the issue - an updated description which describes the problem more accurately: Jan 12, 2021 ]

Hello, Iceberg community,

I'm facing an issue with metadata files in S3 for the Iceberg table. After the job was running for a while, and few times suspended via savepoints and started again, Flink only creates Flink specific manifest files, and Iceberg specific files are not created. Only in the next job suspension time, Flink will create Iceberg specific files and remove Flink specific manifest files.

Flink job consists of single Kafka source, mapper and a FlinkSink.

Iceberg Flink connector 0.10.0 version is used.

Flink job ingests data into S3, with a checkpointing interval of 1 hour.

Normal scenario - expected behavior
every hour in the metadata folder 3 files are created:

  • Metadata file: 00001-180a83f3-c229-4ed5-a9dc-2f9c235f6d52.metadata.json
  • Manifest list file: snap-5807373598091371828-1-87p0b872-3f55-9b79-8cee-b1d354f2c378.avro
  • Manifest file: 39d0b872-4f56-4b79-8cee-c0a354f2c575-m0.avro

After a few successful checkpoints and the job is suspended via savepoint, the job started again:
every hour in the metadata folder only a single file is created

  • Manifest file : 4c806ffdb03c41e09337b90f18781570-00000-0-466-00001.avro

The metadata file and manifest list file are NOT created or updated either.

This causes the issue of new data(partitions) not be available until the metadata and manifest list file is created.

When the next time the Flink job is suspended again (via savepoint), the Flink job will create Iceberg specific files for all missing checkpoints (thus the missing already ingested partitions will become visible Iceberg), and remove Flink specific manifest files, and then shut down the job.

============================
[BELOW is an outdated old description, please ignore]

Hello, Iceberg community,

I'm facing an issue with metadata files in S3 for the Iceberg table when bucket versioning is enabled for S3 bucket.

Iceberg Flink connector 0.10.0 version is used.

Flink job ingests data into S3, with a checkpointing interval of 1 hour.

for S3 bucket with versioning disabled:
every hour in the metadata folder 3 files are created:

  • Metadata file: 00001-180a83f3-c229-4ed5-a9dc-2f9c235f6d52.metadata.json
  • Manifest list file: snap-5807373598091371828-1-87p0b872-3f55-9b79-8cee-b1d354f2c378.avro
  • Manifest file: 39d0b872-4f56-4b79-8cee-c0a354f2c575-m0.avro

for S3 bucket with versioning enabled:
every hour in the metadata folder only a single file is created

  • Manifest file : 4c806ffdb03c41e09337b90f18781570-00000-0-466-00001.avro

The metadata file and manifest list file are NOT created or updated either.

This causes the issue of new data(partitions) not be available until the metadata and manifest list file is created.

If I restart the Flink job, the new metadata and manifest list file are created, and the missing partitions become visible again.

Few questions:

  1. Did anyone face a similar issue with the S3 bucket versioning?

  2. Why metadata files and manifest list files are not created/updated with every checkpoint?

  3. How can S3 bucket versioning impact the manifest file version? "-m0.avro" suffix V1 version vs ".avro" suffix V2 version

Thank you.

@openinx
Copy link
Member

openinx commented Jan 6, 2021

every hour in the metadata folder only a single file is created
Manifest file : 4c806ffdb03c41e09337b90f18781570-00000-0-466-00001.avro

@elkhand , the manifests file is actually not the iceberg's metadata , it's a manifest file that was designed for flink (Pls see here). The reason that we have designed the flink specific manifests is https://github.com/apache/iceberg/pull/1185/files#r478772412 . When committing the iceberg transaction during completing checkpoint, the iceberg-files-committer will read all data files from this flink specific manifest file to construct a new iceberg manifest files. You could see more details in this issue #1959.

For your issue, I'm pretty sure that flink did not commit any iceberg transaction after you enabled the version feature in S3 bucket. But I'm not sure what's the reason causing this. Did you have any logs indicating the flink job was failover (error log or stacktrace ? ) .

@openinx openinx self-assigned this Jan 6, 2021
@kbendick
Copy link
Contributor

I have been thinking about this, and I have some questions related to the bucket being used for versioning.

It's not an uncommon situation to have a versioned s3 bucket which does not have a policy which removes expired object deletion markers or a policy to expire non-current versions. By default, versioned buckets do not hav this. In such a situation, it's not uncommon to either have a very large number of object deletion markers or to have a single key with a very high number of versions (sometimes in the millions), which can greatly affect your S3 throughput.

I have personally encountered this issue when using a versioned bucket with Flink (without using iceberg) for storing checkpoint and savepoint data for jobs. For Flink, it's typical for the job manager to delete checkpoints depending on how many are configured to be saved. With regular checkpointing, it's very easy to then get a very large number of object deletion markers that are never expired. Additionally, it's not uncommon to setup a Flink job to checkpoint to a bucket where much of the data has a very similar prefix for the key (and therefore likely winds up in the same physical partition). For example, when using a per job cluster, where the job ids are always 0000000000000000, it's easy to have your checkpoint data and savepoint data wind up with a long, consistent prefix in the key name (Flink provides a configuration to add randomness wherever desired in the checkpoint path).

Additionally, I know that for RocksDB state backend in Flink there is a /shared directory when using incremental checkpointing that I have observed grow pretty much indefinitely. We have special logic in place to remove this folder when a valid savepoint is taken (amongst other criteria) at my work.

TLDR: For a versioned S3 buckets, particularly for PUT and DELETE requests, the likelihood of getting a 503-slow down response increases quite a lot, due to the problem of so many object versions / a very large number of retained object deletion markers, and per partition throughput limitations. When using Apache Flink, without having a policy in place to aggressively remove expired object versions and object deletion markers, it's not uncommon in my experience to run into 503-slow down issues in my personal experience.

What you can do to debug this issue: First and foremost, if you have access to the console (or if you're the one managing the bucket), I'd be sure that when enabling versioning that the required lifecycle policies are in place. That would be expiring noncurrent versions, removing object deletion markers, and removing stale / failed parts of multipart uploads (more on that below). Some things you can do to debug your current bucket, without having to create additional buckets just for testing etc, is to enable logs for your S3 bucket. You can enable basic server access logs, which do not have added cost beyond the writes to the S3 bucket according to the instructions here: https://docs.aws.amazon.com/AmazonS3/latest/user-guide/server-access-logging.html. Additionally, you can enable lifecycle logging and checking for the relevant lifecycle logs in cloudtrail to see what's happening with versions in your bucket: https://docs.aws.amazon.com/AmazonS3/latest/dev/lifecycle-and-other-bucket-config.html

You can read some about it at the bottom of this page https://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectVersioning.html. I've written the relevant part here. It does not mention it, but not having a bucket policy in place to remove expired object deletion markers will also cause this issue (I believe that the underlying issue is that it affects HEAD requests, which are needed for both PUT and DELETE on versioned buckets).

If you notice a significant increase in the number of HTTP 503-slow down responses received for Amazon S3 PUT or DELETE object requests to a bucket that has S3 Versioning enabled, you might have one or more objects in the bucket for which there are millions of versions. For more information, see Troubleshooting Amazon S3.

I would also be interested if you have any error logs @elkhand, as Iceberg retries requests but will eventually error out. So error logs would be helpful.

Have you tested this using a fresh bucket, with no preexisting object keys? And did any transactions ever complete once versioning was enabled, or did it only happen after some time? Additionally, have you observed this issue with a bucket that started its life as a versioned bucket (or at the least, did not have any non-versioned keys in it). I've also encountered instances where version policies are placed on buckets after the fact, and a large number of objects remain in the bucket indefinitely because I've forgotten to remove them.

Some things you can do to test this out, without having to create additional buckets just for testing etc, is to enable logs for your S3 bucket. You can enable basic server access logs, which do not have added cost beyond the writes to the S3 bucket according to the instructions here: https://docs.aws.amazon.com/AmazonS3/latest/user-guide/server-access-logging.html. Additionally, you can enable lifecycle logging and checking for the relevant lifecycle logs in cloudtrail to see what's happening with versions in your bucket: https://docs.aws.amazon.com/AmazonS3/latest/dev/lifecycle-and-other-bucket-config.html

Lastly, and perhaps most importantly, here is the documentation on lifecycle rules. I personally have experienced issues when using writing Flink savepoints and checkpoints to S3 buckets that were versioned, mostly because of the high frequency with which Flink can create and delete objects and then not having the proper lifecycle rules to handle expiring old versions, removing object deletion markers, as well as removing failed inflight multipart uploads (parts of a multipart upload that has never successfully completed - while there's not exactly a definitive way for the bucket to know fi the upload has failed or not, it's common to simply decide on a large enough time frame to then remove parts of a multipart upload if the upload does not complete - I typically use 24 hours or even 7 days - the most important thing is just having the policy in place, which AWS does not add by default). https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html

If one does not explicitly add these policies to the bucket, these objects and their metadata will remain forever and severely impact S3 performance on versioned buckets. Additionally, there is cost associated with storing all of this useless (or potentially useless data, such as very old object versions) as AWS still bills you for them. So it's extra important to ensure that these are all in place.

Without error logs, I'm not sure I can be of much more help. But I've been thinking about this issue recently and thought I'd add my personal experience with using versioned S3 buckets with Flink. I have been able to use Flink to read and write checkpoint data as well as data files to versioned S3 buckets, so I don't personally think that alone is the issue. However, I have experienced a lot of headaches when writing to versioned buckets without having aggressive policies in place to remove files with S3 lifecycle policies, as well as having a separate process in place for removing files from the /shared directory for rocksdb incremental checkpoints stored on S3. Having a large number of objects (which include different object versions as well as object deletion markers) is relatively easy to do with Flink without a good lifecycle policy in place.

Lastly, it might also be important to note that if you've enabled versioning on a bucket, it can never technically be reverted to a non-versioned bucket. When turning off versioning on a versioned S3 bucket, it technically becomes a version-suspended S3 bucket. This means that your old files, including object deletion markers and non-current object versions, still exist and that only going forward will the changes take place. So if you've enabled versioning for some time and then turned it off, it's important to ensure that any unneeded non-current object versions / deleted object markers are removed.

@kbendick
Copy link
Contributor

kbendick commented Jan 12, 2021

Sorry for the large wall of text, but I've personally dealt with some weird issues on versioned S3 buckets (particularly when using Flink), and I thought I'd share what information I have. I try not to use them if possible due to performance issues, but with strong bucket policies in place they're manageable.

@openinx As you've assigned this issue to yourself, and since I've written a small essay already in this issue, feel free to reach out on the ASF slack and I'd be happy to help in any way that I can (though I imagine you know more about S3 buckets than I do, but perhaps you're more accustomed to traditional HDFS). I'm not sure what investigation can be done, beyond testing writing to an empty versioned bucket, without error logs. As @elkhand mentioned that the issue occurred after enabling versioning on an existing bucket, it's quite possible that there are a large number of writers / many small files and that 503-slow down exceptions accrued before the transaction could be completed, especially if there are other jobs writing to this same bucket (like with a smaller checkpoint time or a shorter interval between Iceberg commits). As I mentioned, I've personally encountered this situation when I first started using Flink on S3 as the number of jobs, with varying checkpoint intervals, writing to the same bucket increased.

@elkhand
Copy link
Contributor Author

elkhand commented Jan 12, 2021

@openinx Thanks for clarifying the Flink specific manifest file - being different from Iceberg manifest file.

Correction on the issue based on observations:

  • this is not related to S3 bucket versioning.
  • this is not related to the S3 bucket policy setup.

Unfortunately, there is nothing in logs, and no failover has happened, Job continues running normally, and producing data files and Flink specific manifest files. But it is not creating Iceberg manifest file, manifest list file, or metadata file.

I'm still investigating the root cause on and off for this Flink job, will share the finding on this issue.

Few characteristics of this job:

  • Checkpointing frequency is every hour.
  • Each Iceberg table has date/hour partitioning.

The issue can be reproduced
Here is the way how to reproduce the failure scenario @openinx :
The issue (having only Flink specific manifest files, and missing Iceberg specific files) occurs :

  • after the job completes several checkpoints successfully
  • job is suspended (via savepoint) after a few minutes since the last checkpoint.
  • Before the job is shut down and savepoint is completed (during the suspension flow), Flink will produce Iceberg specific files that were missing for the last checkpoints since the job started and remove Flink specific files.
  • But when you start the job next time from the saved savepoint, Flink only produces Flink specific manifest files, and does not create Iceberg specific files. You need to suspend the job so Flink can create Iceberg specific files, and remove Flink specific manifest files.

Thanks.

@elkhand elkhand changed the title Manifest list file and metadata file are not created when versioning is enabled for S3 bucket Iceberg specific files (manifest files, manifest list file, metadata file) are not created, only Flink specific manifest file is created Jan 12, 2021
@elkhand
Copy link
Contributor Author

elkhand commented Jan 14, 2021

@openinx tried with Apache iceberg build from the master(01fca3d0), this issue still occurs.

Basically, when the job is suspended - savepoint is created. Then the job is started from that saved savepoint.
Flink Iceberg connector will only create Flink specific manifest file (.avro), and none of the Iceberg specific files will be created for every checkpoint going forward.

@elkhand
Copy link
Contributor Author

elkhand commented Jan 22, 2021

This can be reproduced locally too(running in local Flink cluster):

  • start the job, let few checkpoints succeed
  • stop the job with the final savepoint
./bin/flink stop --savepointPath /tmp/flink-savepoints $JOB_ID
  • restart the job from the savepoint
./bin/flink run \
--detached \
--fromSavepoint /tmp/flink-savepoints/savepoint-f0d209-025f264f2001 \
-c com.demo.MainClassOfYourPipeline \
yourJar.jar \
arg1 arg2

The result:

  • only Flink specific manifest files are created, and Iceberg specific files are NOT created.

@elkhand
Copy link
Contributor Author

elkhand commented Jan 23, 2021

The root cause of the issue is related to this part of the code in IcebergFilesCommitter class:

 @Override
  public void notifyCheckpointComplete(long checkpointId) throws Exception {
    super.notifyCheckpointComplete(checkpointId);
    // It's possible that we have the following events:
    //   1. snapshotState(ckpId);
    //   2. snapshotState(ckpId+1);
    //   3. notifyCheckpointComplete(ckpId+1);
    //   4. notifyCheckpointComplete(ckpId);
    // For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files,
    // Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
    if (checkpointId > maxCommittedCheckpointId) {
      commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
      this.maxCommittedCheckpointId = checkpointId;
    }
  }

When savepoint is created, in metadata.json file flink.max-committed-checkpoint-id is set to Long.MAX_VALUE 9223372036854775807:

...
{
    "snapshot-id" : 1466502797494274716,
    "parent-snapshot-id" : 7039191958488319023,
    "timestamp-ms" : 1611375333522,
    "summary" : {
         ...
         "flink.max-committed-checkpoint-id" : "9223372036854775807",
         ...
...

This condition if (checkpointId > maxCommittedCheckpointId) always evaluates to fase, thus the transaction is not committed., thus Iceberg specific files are not created.

Flink job when started from the saved savepoint, gets checkpoint ID correctly (I created savepoint savepoint after checkpoint 4):

# Triggering savepoint

2021-01-22 20:15:32,092 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Triggering stop-with-savepoint for job 5a5b5ce39afbdfaf3afe58b3fbecae5e.
2021-01-22 20:15:32,096 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 4 (type=SYNC_SAVEPOINT) @ 1611375332094 for job 5a5b5ce39afbdfaf3afe58b3fbecae5e.
2021-01-22 20:15:33,064 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed checkpoint 4 for job 5a5b5ce39afbdfaf3afe58b3fbecae5e (8754 bytes in 969 ms).
...
# After starting job from savepoint

2021-01-22 20:33:35,600 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Starting job 6ed8c20ab75edf9a741b266f145de214 from savepoint /tmp/flink-savepoints/savepoint-5a5b5c-1a65b8fad3ea ()
2021-01-22 20:33:35,605 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Reset the checkpoint ID of job 6ed8c20ab75edf9a741b266f145de214 to 5.
2021-01-22 20:33:35,605 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring job 6ed8c20ab75edf9a741b266f145de214 from latest valid checkpoint: Checkpoint 4 @ 0 for 6ed8c20ab75edf9a741b266f145de214.

@openinx can you please share the reason for Flush the buffered data files into 'dataFilesPerCheckpoint' firstly. case, why currentCheckpointId is set to Long.MAX_VALUE?

in IcebergFilesCommitter class :

  @Override
  public void endInput() throws IOException {
    // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
    long currentCheckpointId = Long.MAX_VALUE;
    dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId));
    dataFilesOfCurrentCheckpoint.clear();

    commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, currentCheckpointId);
  }

@kezhuw
Copy link
Member

kezhuw commented Jan 23, 2021

@sunhaibotb @zhijiangW @rkhachatryan @pnowojski @kl0u @zentol @aljoscha

I think it is a bug of Flink and was introduced in FLINK-14228(apache/flink#9854, apache/flink#10151). The changes for FLINK-14228 rely on StreamTask.afterInvoke and OperatorChain.closeOperators will only be invoked after end of input. But that is not true after FLIP-34: Terminate/Suspend Job with Savepoint long before. Task could enter a finished state after synchronous savepoint, that is an expected job suspension and stopping. I am also aware of FLIP-147: Support Checkpoints After Tasks Finished, I think maybe the three should make agreement on terminologies.

Besides, I think FLIP-27 Source does not work with synchronous savepoint. @becketqin @StephanEwen

I have pushed branch synchronous-savepoint-conflict-with-bounded-end-input-case in my local repository. Two test cases failed both:

  • SavepointITCase.testStopSavepointWithBoundedInput failed due to BoundedOneInput.endInput called.
  • SavepointITCase.testStopSavepointWithFlip27Source failed due to timeout.

FLINK-14228: https://issues.apache.org/jira/browse/FLINK-14228
FLIP-34: https://cwiki.apache.org/confluence/x/JAglBg
FLIP-147: https://cwiki.apache.org/confluence/x/mw-ZCQ
StreamTask.finishTask: https://github.com/apache/flink/blob/d20867a58d8f9ba7520d611d5fab01e6f98253a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L467

@kezhuw
Copy link
Member

kezhuw commented Jan 25, 2021

Hi @elkhand, I have reported your investigation and my speculation in https://issues.apache.org/jira/browse/FLINK-21132. Hopefully, there will be response in next few days.

@pnowojski
Copy link

pnowojski commented Jan 25, 2021

Is it possible for you to verify, if the problem goes away if you disable operator chaining for the IcebergFilesCommitter operator? If IceberFilesCommiter is an operator, you could override the default AbstractStreamOperator#chainingStrategy field. If it's a sink function I think you could use startNewChain() or disableChaining() methods

@elkhand
Copy link
Contributor Author

elkhand commented Jan 25, 2021

Thank you @kezhuw @pnowojski

This is the call order of endInput():
image

New findings:
This issue occurs when you take savepoint which also terminates the job:

./bin/flink stop --savepointPath /tmp/flink-savepoints $JOB_ID

Suspending job "c74e13c841e468b0ce0c75ecc810ecf3" with a savepoint.
Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-c74e13-8a50ac842048

But if you just take savepoint, and NOT terminate the job, the flink.max-committed-checkpoint-id is set to expected value.

./bin/flink savepoint \
$JOB_ID \
/tmp/flink-savepoints

One way to bypass this issue

  • One way is taking manual savepoint and then cancel the job instead of creating savepoint with job stop/terminate.

For already corrupted metadata files, fixing Iceberg metadata files by overwriting flink.max-committed-checkpoint-id to an expected value, might be one possible (not the best fix).

Any other suggestions?

@pnowojski problem does not go away if I separate chaining between IcebergStreamWriter and IcebergFilesCommitter.

@pnowojski is there a way to take savepoint & suspend the job, instead of terminating the job?

The current behavior of this command ./bin/flink stop --savepointPath /tmp/flink-savepoints $JOB_ID it will take savepoint, and terminate the job.

If there was a way to tell take savepoint and stop/cancel the job (job will be started from this savepoint in future), that might be helpful here. Because the job is a streaming job, when we stop it, we do not want it to be terminated(or endInput() not to be called), but savepoint to be taken and the job to be stopped/canceled.

Is there a way to achieve this in Flink's 1.11 or 1.12 versions?

@elkhand
Copy link
Contributor Author

elkhand commented Jan 25, 2021

Tried this:
https://flink.apache.org/news/2019/08/22/release-1.9.0.html#stop-with-savepoint-flip-34
Stop-with-Savepoint (FLIP-34)
Cancelling with a savepoint

bin/flink stop -p [:targetDirectory] :jobId

This did not work, seems endInput() was still called and flink.max-committed-checkpoint-id set to Long.MAX_VALUE.

The deprecated method bin/flink cancel -s [:targetDirectory] :jobId works correctly though:
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#cancel-job-with-savepoint

This will atomically trigger a savepoint for the job with ID :jobid and cancel the job.

bin/flink cancel -s /tmp/flink-savepoints d76df1dca8592a2901f82af7ae510f75
DEPRECATION WARNING: Cancelling a job with savepoint is deprecated. Use "stop" instead.
Cancelling job d76df1dca8592a2901f82af7ae510f75 with savepoint to /tmp/flink-savepoints.
Cancelled job d76df1dca8592a2901f82af7ae510f75. Savepoint stored in file:/tmp/flink-savepoints/savepoint-d76df1-fba95e37a600.

@pnowojski
Copy link

pnowojski commented Jan 25, 2021

@elkhand and what happens if you disable chaining for the whole job (StreamExecutionEnvironment.disableOperatorChaining())?

@tweise
Copy link

tweise commented Jan 25, 2021

@pnowojski can you share how chaining interacts with job termination? What we want is suspend, we don't want any form of drain or final watermark processing.

see https://issues.apache.org/jira/browse/FLINK-21132

@tweise
Copy link

tweise commented Jan 25, 2021

The difference we observe is that between "SUSPEND" and "TERMINATE" on
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212

@elkhand
Copy link
Contributor Author

elkhand commented Jan 25, 2021

@pnowojski after disabling operator chaining (via StreamExecutionEnvironment.disableOperatorChaining()) for the whole job level, still facing the same issue:

bin/flink stop -p /tmp/flink-savepoints 20ec3ed84657bbf43ec746bb771ae4fb this cmd still has the same wird issue - flink.max-committed-checkpoint-id set to Long.MAX_VALUE (which is set in endInput())

./bin/flink stop --savepointPath /tmp/flink-savepoints 475bc652baabb1a5d52a6c175f576267 this cmd still has the same wird issue - flink.max-committed-checkpoint-id set to Long.MAX_VALUE (which is set in endInput())

@stevenzwu
Copy link
Contributor

I think one of the problem is how IcebergFilesCommitter use maxCommittedCheckpointId to de-dup committed manifests upon recovery. That is why it checkpoint and restore the checkpointId, which was impacted by the Long.MAX_VALUE with endInput().

In our internal implementation, we are storing a hash value for each of the committing manifest file path. Upon recovery, we use that metadata in snapshot summary to filter out committed manifests.

@kezhuw
Copy link
Member

kezhuw commented Jan 26, 2021

@pnowojski I think changing chaining strategy does not help. Synchronous savepoint causes successful job termination(!isCanceledOrFailed), which causes finish of partition writer of legacy source. Downstream operator will receive EndOfPartitionEvent finally.

@elkhand
Copy link
Contributor Author

elkhand commented Jan 29, 2021

Anyone facing the same issue, to bypass the issue until this bug is fixed on the Flink side:
You do not need to fix corrupted metadata files where flink.max-committed-checkpoint-id is set to Long.MAX_VALUE.

Just follow this workflow for stateful upgrades going forward - this flow works as expected and you do not get corrupted metadata file anymore:

  • take manual savepoint, no job stopping or canceling ./bin/flink savepoint ${JOB_ID} /tmp/flink-savepoints
  • cancel your job
  • do your code changes, upload a new jar of your job
  • start your job with the last saved savepoint

After this flow, your flink.max-committed-checkpoint-id will be set to correct checkpointId .

@elkhand
Copy link
Contributor Author

elkhand commented Jan 29, 2021

Verified with @RussellSpitzer (thanks!):

You do not need to fix values of flink.max-committed-checkpoint-id in existing Iceberg metadata files either. You should be safe going forward: Flink/Presto/HMS/ Spark/Other engine should not be impacted by the wrong value (Long.MAX_VALUE) of flink.max-committed-checkpoint-id field.

If you follow the above mentioned alternative flow, Flink will also not be impacted by the wrong value in Iceberg metadata files.

Snapshot Summary is used as a way to store engine/platform specific metadata which a user may need to know about and which some engines use for their own record keeping.
The Flink property above is only referenced to (written or read) by Flink’s IcbergFileComitter so there is no impact on any other system.
There should be no issue with this property having a wrong value for any system other than Flink.

@openinx
Copy link
Member

openinx commented Feb 1, 2021

Thanks @elkhand for your patient debugging, it's really impressive, your suggested workflow looks great before we resolve this bug in flink. and thanks @kezhuw for reporting this bug to apache flink. I think this issue exposes a problem with the current integration of flink and iceberg: we did not have unit tests to address the flink savepoint issue, so I opened an issue to address this : #2190.

@pnowojski
Copy link

Thanks for your patience and help in investigating/discovering this issue. We will be merging fix today. I don't have a timeline when the respective bug fix releases will prepared and published, but the 1.12.2 release should be the first one containing bug fix for this issue hopefully somewhere next week.

@kezhuw
Copy link
Member

kezhuw commented Feb 23, 2021

@stevenzwu I think you are right, at least partially, it is a Flink bug anyway. Before FLIP-147(Support Checkpoints After Tasks Finished) , sink writers sit in dilemma situation that there is no reliable way to commit final result to external system. endInput was not designed to commit final result, but actually has been used for a workaround/last-resort. There is no strong guarantee that endInput will be invoked only once. So basically, if sink writers want to commit final result in endInput, they should prepare for situations that endInput could run multiple times. Thus, @openinx the test should not focus (or focus only) on stop-with-savepoint, but multiple runs of endInput.

I want quote some comments from our discussions here.

@aljoscha said that:

The motivation for introducing endOfInput() were things like hash-join in the SQL runner where an operator would read from the build side until getting an end-of-input, at which point it would switch over to reading from the probe side. With these use cases in mind sending an endOfInput() is a bug. The same is true for sinks, which will do some bookkeeping based on knowing that all the input data has been read.

@gaoyunhaii said that:

First of all, I think there would be some problem fro the current implementation that committed in endOfInput(), considering if the job is bounded (namely the first case). The problem is that if the failover happens right after commit() in the endOfInput, then the job will be restarted and fallback to the last checkpoint, which will cause replay of the data committed in endOfInput. FLIP-147: Support checkpoint after tasks finished is a precedent step to solve the commit problem of the sinks in the first case.

cc @pnowojski @aljoscha @gaoyunhaii @rkhachatryan @becketqin @StephanEwen @tillrohrmann

Copy link

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

@github-actions github-actions bot added the stale label Mar 14, 2024
Copy link

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Mar 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

7 participants