Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33641][test] Suppress the DirectoryNotEmptyException in StreamingWithStateTestBase to prevent test failures #23914

Merged
merged 4 commits into from
Dec 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,23 @@ import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.functions.source.FromElementsFunction
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.data.{RowData, StringData}
import org.apache.flink.table.data.binary.BinaryRowData
import org.apache.flink.table.data.writer.BinaryRowWriter
import org.apache.flink.table.data.{RowData, StringData}
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
import org.apache.flink.table.planner.utils.TableTestUtil
import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
import org.apache.flink.table.types.logical.RowType
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters
import org.apache.flink.testutils.junit.utils.TempDirUtils

import org.apache.flink.util.FileUtils
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.slf4j.LoggerFactory

import java.io.File
import java.io.{File, IOException}
import java.util

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand All @@ -55,6 +55,8 @@ class StreamingWithStateTestBase(state: StateBackendMode) extends StreamingTestB
case ROCKSDB_BACKEND => true
}

private val log = LoggerFactory.getLogger(classOf[StreamingWithStateTestBase])

private val classLoader = Thread.currentThread.getContextClassLoader

var baseCheckpointPath: File = _
Expand All @@ -63,7 +65,9 @@ class StreamingWithStateTestBase(state: StateBackendMode) extends StreamingTestB
override def before(): Unit = {
super.before()
// set state backend
baseCheckpointPath = tempFolder.toFile

// subfolder are managed here because the tests could fail during cleanup when concurrently executed (see FLINK-33820)
baseCheckpointPath = TempDirUtils.newFolder(tempFolder)
state match {
case HEAP_BACKEND =>
val conf = new Configuration()
Expand All @@ -82,6 +86,19 @@ class StreamingWithStateTestBase(state: StateBackendMode) extends StreamingTestB
@AfterEach
override def after(): Unit = {
super.after()
try {
FileUtils.deleteDirectory(baseCheckpointPath)
} catch {
case e: IOException =>
if (baseCheckpointPath.exists) {
log.error(
s"The temporary files are not being deleted gracefully, remaining files " +
s"${FileUtils.listFilesInDirectory(baseCheckpointPath.toPath, _ => true)}.",
e)
} else {
log.error("The temporary files are not being deleted gracefully.", e)
}
}
assertThat(FailingCollectionSource.failedBefore).isTrue
}

Expand Down