Skip to content

Test: Fix flaky testOlderThanTimestamp in TestRemoveOrphanFilesAction3#4825

Merged
rdblue merged 1 commit intoapache:masterfrom
sumeetgajjar:fix-flaky-testOlderThanTimestamp
Jun 12, 2022
Merged

Test: Fix flaky testOlderThanTimestamp in TestRemoveOrphanFilesAction3#4825
rdblue merged 1 commit intoapache:masterfrom
sumeetgajjar:fix-flaky-testOlderThanTimestamp

Conversation

@sumeetgajjar
Copy link
Contributor

@sumeetgajjar sumeetgajjar commented May 20, 2022

We at Cloudera run Iceberg tests on Apache/master branch on a daily basis.
Ever since #4711 was merged org.apache.iceberg.spark.actions.TestRemoveOrphanFilesAction3.testOlderThanTimestamp is failing in our internal builds with the following assertion error:

java.lang.AssertionError: Should delete only 2 files expected:<2> but was:<3>
	at org.junit.Assert.fail(Assert.java:89)
	at org.junit.Assert.failNotEquals(Assert.java:835)
	at org.junit.Assert.assertEquals(Assert.java:647)
	at org.apache.iceberg.spark.actions.TestRemoveOrphanFilesAction.testOlderThanTimestamp(TestRemoveOrphanFilesAction.java:433)

We identified the RC as the following:
The code execution on our VM is so fast that the millisecond precision used for selecting the candidate as an orphan file is simply not enough here.

The test code is as follows:

df.write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA");
df.write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA");
waitUntilAfter(System.currentTimeMillis());
long timestamp = System.currentTimeMillis();
waitUntilAfter(System.currentTimeMillis());
df.write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA");
SparkActions actions = SparkActions.get();
DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table)
.olderThan(timestamp)
.execute();

The predicate used to test if a given file should be considered an orphan or not is as follows:

Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;

Our Hypothesis

The code execution for creating files using the three append statements in the above test code is so fast that all three files have the same milliseconds in their modificationTime.
Thus when the DeleteOrphanFiles action is executed it classifies all three files as orphans thereby failing the test.

Running the test locally or in GitHub actions does not show any failure. The local machine runs a lot of other processes apart from the test code thus the execution is relatively slow giving enough time for the Predicate to work correctly.

Hypothesis test

We used the following code to validate our hypothesis and the result is clearly visible in the Stdout of the test

diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
index bcabe38e7..3e3b6b9a1 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
@@ -424,6 +424,20 @@ public abstract class TestRemoveOrphanFilesAction extends SparkTestBase {
 
     df.write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA");
 
+    try {
+      System.out.println("OlderThanTimeStamp -> " + timestamp);
+      Path dfDir = new Path(tableLocation + "/data/c2_trunc=AA/c3=AAAA");
+      FileSystem fs = dfDir.getFileSystem(spark.sparkContext().hadoopConfiguration());
+      String[] inputFiles = spark.read().parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA").inputFiles();
+      for (FileStatus fileStatus : fs.listStatus(dfDir)) {
+        if (Arrays.stream(inputFiles).anyMatch(inputFileName -> inputFileName.contains(fileStatus.getPath().getName()))) {
+          System.out.println("SparkInputFileMTime -> " + fileStatus.getPath() + " -> " + fileStatus.getModificationTime());
+        }
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
     SparkActions actions = SparkActions.get();
 
     DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table) 

Stdout of test with the above addition without the fix

OlderThanTimeStamp -> 1653037291034
SparkInputFileMTime -> file:/tmp/junit4128601999154387775/junit5914981474932610748/data/c2_trunc=AA/c3=AAAA/part-00000-6f36c2a7-a937-4e60-bbf5-062316b365b5-c000.snappy.parquet -> 1653037290000
SparkInputFileMTime -> file:/tmp/junit4128601999154387775/junit5914981474932610748/data/c2_trunc=AA/c3=AAAA/part-00000-b0e3b4eb-c5cf-47df-9979-b9ec5ac6c326-c000.snappy.parquet -> 1653037291000
SparkInputFileMTime -> file:/tmp/junit4128601999154387775/junit5914981474932610748/data/c2_trunc=AA/c3=AAAA/part-00000-13ff79b6-1625-4280-86d5-ee70a341bb3d-c000.snappy.parquet -> 1653037291000
SparkInputFileMTime -> file:/tmp/junit4128601999154387775/junit5914981474932610748/data/c2_trunc=AA/c3=AAAA/00000-0-5e8a0bd0-2b14-4416-8f33-77bc081aa064-00001.parquet -> 1653037290000 

Stdout of test with the above addition and the fix

OlderThanTimeStamp -> 1653037621575
SparkInputFileMTime -> file:/tmp/junit1543487080421345175/junit4445606422769491498/data/c2_trunc=AA/c3=AAAA/part-00000-72c3ce45-561e-4d3d-b39f-36f2b935431e-c000.snappy.parquet -> 1653037621000
SparkInputFileMTime -> file:/tmp/junit1543487080421345175/junit4445606422769491498/data/c2_trunc=AA/c3=AAAA/part-00000-08156057-da54-41c6-b3b4-ff243be98469-c000.snappy.parquet -> 1653037621000
SparkInputFileMTime -> file:/tmp/junit1543487080421345175/junit4445606422769491498/data/c2_trunc=AA/c3=AAAA/00000-0-b215cd5b-0d7d-4eac-b738-71107c1c22ed-00001.parquet -> 1653037620000
SparkInputFileMTime -> file:/tmp/junit1543487080421345175/junit4445606422769491498/data/c2_trunc=AA/c3=AAAA/part-00000-df7f77e7-c71d-40a3-bfa3-1f3907abbb97-c000.snappy.parquet -> 1653037622000

@sumeetgajjar
Copy link
Contributor Author

Hi @kbendick @rdblue @ulmako,
Can you please review this PR?

@github-actions github-actions bot added the spark label May 20, 2022
long timestamp = System.currentTimeMillis();

waitUntilAfter(System.currentTimeMillis());
waitUntilAfter(System.currentTimeMillis() + 1000L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be better, if possible, would be to access the timestamp from the snapshot summary and then wait until after that (which is what we do in many other tests).

However, given that this is a Spark test, the time it would take to access the summary from the commit means that it would likely take longer to do that than any of the waitUntilAfter (i.e. that time would likely have passed by at least a few hundred milliseconds on any machine).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let me try that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I believe in this explicit case, using the timestamp from the snapshot summary won't be useful. The primary reason being we are trying to delete parquet files at the "data/c2_trunc=AA/c3=AAAA" location which are not managed by Iceberg.

df.write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA");
df.write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA");

Had it been the case if the supposed to be orphan files were created using iceberg we could have leveraged the timestamp from the snapshot summary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the point is still valid. waitUntilAfter should be used so that you know that a certain amount of time, in milliseconds, has elapsed. That gives you the ability to create one file, wait until the next millisecond, create another, wait, etc. to make sure they don't have the same write time. It doesn't need to be a snapshot timestamp, but you shouldn't need to wait for an entire second.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That gives you the ability to create one file, wait until the next millisecond, create another, wait, etc. to make sure they don't have the same write time.

Hi @rdblue - I agree with you.
Writing the file at the very next millisecond should do the trick here but I believe that is what the original code is doing and still, we are seeing the failure.

We experimented with various values less than 1000L milliseconds but none of them got us a 100% success rate for the test. 1000L was the lowest value that gave a 100% success rate.

I can investigate more if required or use the 1000L value. Using a value of 1000L would make this test equivalent to what we had before #4711

Please let me know your thoughts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sumeetgajjar left a comment on the main page of this PR, but check out the solution we came up with in another PR / issue discussion: #4859

We noticed a different test in this suite fail in GitHub CI as well. We just made the olderThan argument in the future to account for it the time precision issue without having to do excessive busy waiting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sumeetgajjar, sorry, I think this was my misunderstanding. After going through your write-up more carefully, I see that the timestamps reported by the files are in seconds. So you're right: to ensure that the timestamp is different, we need to wait a full second.

@@ -420,7 +420,7 @@ public void testOlderThanTimestamp() throws InterruptedException {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't comment on it, but do any of the other waitUntilAfter calls need this change? There's one right above this comment.

I would think the one below would be sufficient, as it's two in a row (essentially this is sort of like thread.sleep(1000)).

Copy link
Contributor Author

@sumeetgajjar sumeetgajjar May 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't comment on it, but do any of the other waitUntilAfter calls need this change?

This is the only place where it is necessary.
I took a look at all the other tests in this Suite when I filed the PR, and I did not find any places where we would require such a change.

as it's two in a row (essentially this is sort of like thread.sleep(1000)).

Yes, exactly :-)

@kbendick
Copy link
Contributor

kbendick commented May 25, 2022

I’m not sure if you have seen this PR I opened today, but I noticed one of these tests fail in CI (details and link in PR summary). I opened the following PR for that test case (admittedly I forgot about this one so we can close mine if you’d like) - #4859

TLDR - As suggested by Russell, for the test case in my PR, we changed the “olderThan” argument to be further in the future. The benefit being no unnecessary busy waiting and all of the files are still caught. I went with 5 seconds given that the solution in the other PR adds no busy waiting and that specific test removes every orphan file so the olderThan argument doesn’t need to be very precise… just far enough “in the future” relative to the timestamp of the files that Spark writes to grab them all.

@sumeetgajjar
Copy link
Contributor Author

I’m not sure if you have seen this PR I opened today, but I noticed one of these tests fail in CI (details and link in PR summary). I opened the following PR for that test case (admittedly I forgot about this one so we can close mine if you’d like) - #4859

TLDR - As suggested by Russell, for the test case in my PR, we changed the “olderThan” argument to be further in the future. The benefit being no unnecessary busy waiting and all of the files are still caught. I went with 5 seconds given that the solution in the other PR adds no busy waiting and that specific test removes every orphan file so the olderThan argument doesn’t need to be very precise… just far enough “in the future” relative to the timestamp of the files that Spark writes to grab them all.

Hi @kbendick - thanks for the suggestion, however, the test that you are fixing in #4859 (TestRemoveOrphanFilesAction#orphanedFileRemovedWithParallelTasks) requires all the files to be removed. Thus providing a future time would ensure that Predicate selects all the files as candidates for removal.

However, for this PR (TestRemoveOrphanFilesAction#testOlderThanTimestamp), we intentionally want to wait until a second has passed to avoid scenarios like mtime from java is truncated to seconds where we loose tracking millisecond precision while getting lastModification time.

The flow for this test is as follows:

  • spark writes non-table files to tableLocation
  • spark writes non-table files to tableLocation
  • spark writes non-table files to tableLocation
  • delete orphan files such that only files created from the first two writes are removed and the file created from the third write is preserved

And because of this caveat of preserving the files from the third, we'll have to busy wait for a second.

@rdblue rdblue merged commit 9f5b0d7 into apache:master Jun 12, 2022
@rdblue
Copy link
Contributor

rdblue commented Jun 12, 2022

Thanks, @sumeetgajjar!

namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants