Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33641][test] Suppress the DirectoryNotEmptyException in StreamingWithStateTestBase to prevent test failures #23914

Merged
merged 4 commits into from
Dec 14, 2023

Conversation

Jiabao-Sun
Copy link
Contributor

What is the purpose of the change

[FLINK-33641][test] Suppress the DirectoryNotEmptyException in StreamingWithStateTestBase to prevent test failures

Brief change log

Suppress the DirectoryNotEmptyException in StreamingWithStateTestBase to prevent test failures

Verifying this change

This change is already covered by existing tests.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not documented)

…ingWithStateTestBase to prevent test failures
@flinkbot
Copy link
Collaborator

flinkbot commented Dec 12, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@XComp XComp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw. while going through the code and Jira once more: I second what @snuyanzin suggests in his comment in FLINK-33641. StreamingTestBase#tempFolder is a class field which makes all tests use the same temporary folder. You might want to make it an object field, instead. Otherwise, we might run into the same issue in other tests later on if a contributor overlooks that the temporary folder is shared between test classes. That should fix FLINK-33641. WDYT?

@@ -22,10 +22,14 @@
import org.apache.flink.test.junit5.MiniClusterExtension;

import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Base class for unit tests that run multiple tests and want to reuse the same Flink cluster. */
public class StreamAbstractTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamAbstractTestBase is only used by StreamingTestBase. Can't we merge the two? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made some attempts to put MiniClusterExtension in StreamingTestBase, but it violated the rules of Arch Unit Test (the scala code could not be well recognized).

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=55472&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4&l=11485


/** Base class for unit tests that run multiple tests and want to reuse the same Flink cluster. */
public class StreamAbstractTestBase {

protected final Logger log = LoggerFactory.getLogger(getClass());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The logger is not used anywhere else except for StreamingWithStateTestBase. It feels like a premature optimization with the risk that other test classes will miss that there's a protected log field already present and will create their own logger, anyway. That's a proposal you can reject if you want, but to me it would be good enough to have the code change being done in StreamingWithStateTestBase as a private field.

@@ -63,7 +64,7 @@ class StreamingWithStateTestBase(state: StateBackendMode) extends StreamingTestB
override def before(): Unit = {
super.before()
// set state backend
baseCheckpointPath = tempFolder.toFile
baseCheckpointPath = Files.createTempDirectory("junit").toFile
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
baseCheckpointPath = Files.createTempDirectory("junit").toFile
baseCheckpointPath = Files.createTempDirectory(tempFolder, "junit").toFile

maybe, we should still utilize the temporary folder that is created in the parent class to utilize JUnit5's temporary folder feature.

@snuyanzin
Copy link
Contributor

snuyanzin commented Dec 12, 2023

@Jiabao-Sun I would suggest to update JUnit #23917

where they improved error output for Tempdir removal
at https://github.com/junit-team/junit5/pull/3249/files#diff-722173375ef777095d4d3ba36f4e5063f25c21f23701ad4cbd470018c2888fa8
and
junit-team/junit5#3236

@Jiabao-Sun
Copy link
Contributor Author

Thanks @XComp, @snuyanzin.
The comments were addressed and print the files that are not deleted when IOException throws.

StreamingTestBase#tempFolder is a class field which makes all tests use the same temporary folder

The var tempFolder: Path = _ defines in scala is an object field already, for each @Test method, it is different.
However, if the test of the same test is allowed to be executed concurrently, it may also cause path errors.

@XComp
Copy link
Contributor

XComp commented Dec 13, 2023

The var tempFolder: Path = _ defines in scala is an object field already, for each @test method, it is different.

Thanks for clarification. My lack of Scala knowledge made me mix up the class and companion object declaration.

However, if the test of the same test is allowed to be executed concurrently, it may also cause path errors.

That, I still don't get. Why would concurrent execution of tests cause this issue if each test class/instance has it's own temporary directory (because the tempFolder field is an object field)?

Or do you mean if "test methods" are run concurrently? As far as I understand, that shouldn't happen: The concurrent test method execution has to be enabled explicitly using @Execution(ExecutionMode.CONCURRENT) (see parent pom.xml:1745ff). Or do I miss something here? 🤔

@snuyanzin
Copy link
Contributor

snuyanzin commented Dec 13, 2023

@Jiabao-Sun , @XComp
it seems I found the reason

junit5.10.1 makes it always failing and it is becoming a bit more clear

there are 2 threads

  1. junit5 trying to delete dir
  2. cleanup snapshot
	at org.apache.flink.runtime.state.SnapshotDirectory.cleanup(SnapshotDirectory.java:93)
	at org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase$NativeRocksDBSnapshotResources.release(RocksDBSnapshotStrategyBase.java:384)
	at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.cleanupProvidedResources(SnapshotStrategyRunner.java:97)
	at org.apache.flink.runtime.state.AsyncSnapshotCallable.cleanup(AsyncSnapshotCallable.java:163)
	at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:87)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:508)
	at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

suspect that in JUnit5 they made removal in AfterEach which makes it concurrent with checkpoint cleanup...

Since junit 5.10.1 makes it failing even locally
as a WA I replaced TempDir with

  val baseCheckpointPath = Files.createTempDirectory(getClass.getCanonicalName)
   Files.deleteIfExists(baseCheckpointPath);

locally it helps
https://dev.azure.com/snuyanzin/flink/_build/results?buildId=2676&view=results
now it is running on my ci to see whether it helps or not

@XComp
Copy link
Contributor

XComp commented Dec 13, 2023

Thanks for sharing this finding, @snuyanzin . That would mean that we should wait for the snapshot cleanup to finish before calling @AfterEach-annotated methods, wouldn't it?

@Jiabao-Sun
Copy link
Contributor Author

Thanks @snuyanzin @XComp, I'll try to fix it.

@snuyanzin
Copy link
Contributor

That would mean that we should wait for the snapshot cleanup to finish before calling @AfterEach-annotated methods, wouldn't it?

ideally yes

@Jiabao-Sun
Copy link
Contributor Author

Hi @snuyanzin, I think the SnapshotDirectory.cleanup may not be the root cause.

I debugged it locally, and the directory structure cleared by SnapshotDirectory is start with minicluster, which is different from the directory structure that caused the error in CI.

/var/folders/7_/y05pf91d5x57xkzdbg05jyxm0000gn/T/junit5535370187595258518/junit5409077863179965513/minicluster_090d7d63818a49f384322545ef962853/tm_0/tmp/job_ba1234fb7067bbd96eebc77ce0e59f0b_op_SlicingWindowOperator_f6dc7f4d2283f4605b127b9364e21148__4_4__uuid_6d8887f8-f218-45e1-a6e5-3e2395b8a09c/chk-1

Suppressed: java.nio.file.DirectoryNotEmptyException: /tmp/junit7546196118280881501/2e0195724b946eddf625188a385b28e8

I tend to avoid too much CI failure through this PR until we find the root cause.
WDYT 🤔️

@snuyanzin
Copy link
Contributor

snuyanzin commented Dec 13, 2023

I tested with junit5.10.1 and I think it also introduced some changes
if you look at PR #23917 where there is only junit bump 5.9.1 -> 5.10.1
ci failure is also different from what we have in master

2023-12-13T01:01:30.0309316Z Dec 13 01:01:29 	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
2023-12-13T01:01:30.0310427Z Dec 13 01:01:29 	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
2023-12-13T01:01:30.0312635Z Dec 13 01:01:29 	Suppressed: java.nio.file.NoSuchFileException: /tmp/junit8675027143221640473/b26755eb2623b363024e04d5db7543aa/chk-3
2023-12-13T01:01:30.0313874Z Dec 13 01:01:29 		at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
2023-12-13T01:01:30.0314987Z Dec 13 01:01:29 		at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
2023-12-13T01:01:30.0316077Z Dec 13 01:01:29 		at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
2023-12-13T01:01:30.0317253Z Dec 13 01:01:29 		at sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
2023-12-13T01:01:30.0318483Z Dec 13 01:01:29 		at sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:144)
2023-12-13T01:01:30.0319812Z Dec 13 01:01:29 		at sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99)
2023-12-13T01:01:30.0320982Z Dec 13 01:01:29 		at java.nio.file.Files.readAttributes(Files.java:1737)
2023-12-13T01:01:30.0322005Z Dec 13 01:01:29 		at java.nio.file.FileTreeWalker.getAttributes(FileTreeWalker.java:219)
2023-12-13T01:01:30.0323049Z Dec 13 01:01:29 		at java.nio.file.FileTreeWalker.visit(FileTreeWalker.java:276)
2023-12-13T01:01:30.0324067Z Dec 13 01:01:29 		at java.nio.file.FileTreeWalker.next(FileTreeWalker.java:372)
2023-12-13T01:01:30.0325253Z Dec 13 01:01:29 		at java.nio.file.Files.walkFileTree(Files.java:2706)
2023-12-13T01:01:30.0326179Z Dec 13 01:01:29 		at java.nio.file.Files.walkFileTree(Files.java:2742)
2023-12-13T01:01:30.0326926Z Dec 13 01:01:29 		... 40 more
2023-12-13T01:01:30.0328421Z Dec 13 01:01:29 		Suppressed: java.nio.file.NoSuchFileException: /tmp/junit8675027143221640473/b26755eb2623b363024e04d5db7543aa/chk-3
2023-12-13T01:01:30.0329745Z Dec 13 01:01:29 			at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
2023-12-13T01:01:30.0330947Z Dec 13 01:01:29 			at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
2023-12-13T01:01:30.0332078Z Dec 13 01:01:29 			at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
2023-12-13T01:01:30.0333192Z Dec 13 01:01:29 			at sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:244)
2023-12-13T01:01:30.0334389Z Dec 13 01:01:29 			at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
2023-12-13T01:01:30.0335445Z Dec 13 01:01:29 			at java.nio.file.Files.delete(Files.java:1126)
2023-12-13T01:01:30.0336348Z Dec 13 01:01:29 			at java.nio.file.Files.walkFileTree(Files.java:2672)
2023-12-13T01:01:30.0337105Z Dec 13 01:01:29 			... 41 more

May be they fixed something between these versions (They have a number of statements about @TempDir in their release notes).
My statement about checkpoint failure is for cases with JUnit 5.10.1 (which is not in master yet) since with current master I can not reproduce the problem locally

I would propose to go with newer JUnit to avoid refixing this again after update

@snuyanzin
Copy link
Contributor

@Jiabao-Sun

I tend to avoid too much CI failure through this PR until we find the root cause.
WDYT 🤔️

I put commit with WA I've described above at #23917
and it is green

moreover I've scheduled it both in flink ci and my own ci more than 5 times in total and there is no any failure...

If there is no objections we can merge it to avoid too much ci failures and continue looking for more optimal solution in a calmer way

https://dev.azure.com/snuyanzin/flink/_build/results?buildId=2678&view=results
https://dev.azure.com/snuyanzin/flink/_build/results?buildId=2676&view=results
https://dev.azure.com/snuyanzin/flink/_build/results?buildId=2675&view=results
https://dev.azure.com/snuyanzin/flink/_build/results?buildId=2679&view=results
https://dev.azure.com/snuyanzin/flink/_build/results?buildId=2681&view=results

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=55493&view=results
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=55487&view=results

Copy link
Contributor

@XComp XComp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting catch with the ArchUnit failure. Does it mean that ArchJunit is not able to recognize Scala? Because the MiniClusterExtension was still present; just in a different parent class (which is written in scala). 🤔

Anyway, the change looks reasonable. 👍 I created FLINK-33820 to cover the investigation efforts and added a in-code comment to your PR referring to it. But we should get the workaround into master rather sooner than later to make master more stable again.

Thanks for the good collaboration @Jiabao-Sun and @snuyanzin . Much appreciated :-)

@XComp XComp merged commit b2b8323 into apache:master Dec 14, 2023
@Jiabao-Sun
Copy link
Contributor Author

Thanks @XComp, the ArchUnit's rules require the MiniclusteExtension declared as static final, but in scala, we can only declare it private val in object, but this cannot be recognized well by the rules. Maybe we can modify the ArchUnit rules to identify this mode.

object StreamingTestBase {

  @RegisterExtension
  private val _: MiniClusterExtension = new MiniClusterExtension(
    () =>
      new MiniClusterResourceConfiguration.Builder()
        .setNumberTaskManagers(1)
        .setNumberSlotsPerTaskManager(4)
        .build())
}

@XComp
Copy link
Contributor

XComp commented Dec 14, 2023

Sounds reasonable. Can you create a follow-up ticket for that one?

@snuyanzin
Copy link
Contributor

@XComp , @Jiabao-Sun
it seems current build failure is related
could you please have a look
#23929

@XComp XComp mentioned this pull request Dec 14, 2023
@snuyanzin
Copy link
Contributor

snuyanzin commented Dec 15, 2023

ArchUnit does not support scala, it is mentioned in readme

## How do I test Scala classes?
Scala is not supported by ArchUnit. Although it operates on the byte-code level and can, in general,
process classes compiled from Scala as well, there are Scala-specific constructs that do not work
well in practice. Therefore, all architecture rules should exclude non-Java classes.

@Jiabao-Sun
Copy link
Contributor Author

Thanks @snuyanzin, it makes sense to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants