Skip to content
Permalink
Browse files
[hotfix][table] Add objectReuse parameter for AsyncLookupJoinITCase. (#…
  • Loading branch information
lincoln-lil committed May 17, 2022
1 parent 772ac63 commit d5821eb96755c22bb7a0e681f7a5bf97830bd63f
Showing 1 changed file with 14 additions and 9 deletions.
@@ -38,7 +38,10 @@ import java.util.{Collection => JCollection}
import scala.collection.JavaConversions._

@RunWith(classOf[Parameterized])
class AsyncLookupJoinITCase(legacyTableSource: Boolean, backend: StateBackendMode)
class AsyncLookupJoinITCase(
legacyTableSource: Boolean,
backend: StateBackendMode,
objectReuse: Boolean)
extends StreamingWithStateTestBase(backend) {

val data = List(
@@ -53,9 +56,11 @@ class AsyncLookupJoinITCase(legacyTableSource: Boolean, backend: StateBackendMod
@Before
override def before(): Unit = {
super.before()
// TODO: remove this until [FLINK-12351] is fixed.
// currently AsyncWaitOperator doesn't copy input element which is a bug
env.getConfig.disableObjectReuse()
if (objectReuse) {
env.getConfig.enableObjectReuse()
} else {
env.getConfig.disableObjectReuse()
}

createScanTable("src", data)
createLookupTable("user_table", userData)
@@ -298,13 +303,13 @@ class AsyncLookupJoinITCase(legacyTableSource: Boolean, backend: StateBackendMod
}

object AsyncLookupJoinITCase {
@Parameterized.Parameters(name = "LegacyTableSource={0}, StateBackend={1}")
@Parameterized.Parameters(name = "LegacyTableSource={0}, StateBackend={1}, ObjectReuse={2}")
def parameters(): JCollection[Array[Object]] = {
Seq[Array[AnyRef]](
Array(JBoolean.TRUE, HEAP_BACKEND),
Array(JBoolean.TRUE, ROCKSDB_BACKEND),
Array(JBoolean.FALSE, HEAP_BACKEND),
Array(JBoolean.FALSE, ROCKSDB_BACKEND)
Array(JBoolean.TRUE, HEAP_BACKEND, JBoolean.TRUE),
Array(JBoolean.TRUE, ROCKSDB_BACKEND, JBoolean.FALSE),
Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.FALSE),
Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.TRUE)
)
}
}

0 comments on commit d5821eb

Please sign in to comment.