Skip to content

Commit

Permalink
[FLINK-33641][test] Suppress the DirectoryNotEmptyException in Stream…
Browse files Browse the repository at this point in the history
…ingWithStateTestBase to prevent test failures (#23914)
  • Loading branch information
Jiabao-Sun committed Dec 14, 2023
1 parent 07d159b commit b2b8323
Showing 1 changed file with 22 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,23 @@ import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.functions.source.FromElementsFunction
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.data.{RowData, StringData}
import org.apache.flink.table.data.binary.BinaryRowData
import org.apache.flink.table.data.writer.BinaryRowWriter
import org.apache.flink.table.data.{RowData, StringData}
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
import org.apache.flink.table.planner.utils.TableTestUtil
import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
import org.apache.flink.table.types.logical.RowType
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters
import org.apache.flink.testutils.junit.utils.TempDirUtils

import org.apache.flink.util.FileUtils
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.slf4j.LoggerFactory

import java.io.File
import java.io.{File, IOException}
import java.util

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand All @@ -55,6 +55,8 @@ class StreamingWithStateTestBase(state: StateBackendMode) extends StreamingTestB
case ROCKSDB_BACKEND => true
}

private val log = LoggerFactory.getLogger(classOf[StreamingWithStateTestBase])

private val classLoader = Thread.currentThread.getContextClassLoader

var baseCheckpointPath: File = _
Expand All @@ -63,7 +65,9 @@ class StreamingWithStateTestBase(state: StateBackendMode) extends StreamingTestB
override def before(): Unit = {
super.before()
// set state backend
baseCheckpointPath = tempFolder.toFile

// subfolder are managed here because the tests could fail during cleanup when concurrently executed (see FLINK-33820)
baseCheckpointPath = TempDirUtils.newFolder(tempFolder)
state match {
case HEAP_BACKEND =>
val conf = new Configuration()
Expand All @@ -82,6 +86,19 @@ class StreamingWithStateTestBase(state: StateBackendMode) extends StreamingTestB
@AfterEach
override def after(): Unit = {
super.after()
try {
FileUtils.deleteDirectory(baseCheckpointPath)
} catch {
case e: IOException =>
if (baseCheckpointPath.exists) {
log.error(
s"The temporary files are not being deleted gracefully, remaining files " +
s"${FileUtils.listFilesInDirectory(baseCheckpointPath.toPath, _ => true)}.",
e)
} else {
log.error("The temporary files are not being deleted gracefully.", e)
}
}
assertThat(FailingCollectionSource.failedBefore).isTrue
}

Expand Down

0 comments on commit b2b8323

Please sign in to comment.