From 1680de40a3149292908385bdd7437d061ec3972b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Cio=C5=82ecki?= Date: Mon, 23 Oct 2023 19:51:07 +0200 Subject: [PATCH] Use ResultsCollectingListenerHolder --- .../ui/api/ManagementResources.scala | 62 +++++++++++++------ .../ui/process/test/ScenarioTestService.scala | 5 +- .../process/runner/FlinkTestMainSpec.scala | 6 +- .../test/RequestResponseTestMainSpec.scala | 5 +- .../lite/InterpreterTestRunnerTest.scala | 7 ++- .../engine/testmode/TestProcess.scala | 15 +---- .../engine/util/test/TestScenarioRunner.scala | 16 ++--- ...inkProcessCompilerWithTestComponents.scala | 28 +++++---- .../util/test/FlinkTestExceptionHolder.scala | 36 ----------- .../util/test/FlinkTestScenarioRunner.scala | 11 ++-- 10 files changed, 90 insertions(+), 101 deletions(-) delete mode 100644 utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestExceptionHolder.scala diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ManagementResources.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ManagementResources.scala index dc4bab68031..f042f42fc2d 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ManagementResources.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ManagementResources.scala @@ -12,8 +12,10 @@ import io.circe.syntax._ import io.circe.{Decoder, Encoder, Json, parser} import io.dropwizard.metrics5.MetricRegistry import pl.touk.nussknacker.engine.ModelData -import pl.touk.nussknacker.engine.api.DisplayJson +import pl.touk.nussknacker.engine.api.{Context, DisplayJson} +import pl.touk.nussknacker.engine.api.component.{ComponentInfo, NodeComponentInfo} import pl.touk.nussknacker.engine.api.deployment._ +import pl.touk.nussknacker.engine.api.exception.NuExceptionInfo import pl.touk.nussknacker.engine.testmode.TestProcess._ import pl.touk.nussknacker.engine.util.json.BestEffortJsonEncoder import pl.touk.nussknacker.restmodel.displayedgraph.DisplayableProcess @@ -40,6 +42,23 @@ object ManagementResources { import pl.touk.nussknacker.engine.api.CirceUtil._ + // TODO: Why not just use the BestEffortJsonEncoder? + val testResultsVariableEncoder: Any => io.circe.Json = { + case displayable: DisplayJson => + def safeString(a: String) = Option(a).map(Json.fromString).getOrElse(Json.Null) + + val displayableJson = displayable.asJson + displayable.originalDisplay match { + case None => Json.obj("pretty" -> displayableJson) + case Some(original) => Json.obj("original" -> safeString(original), "pretty" -> displayableJson) + } + case null => Json.Null + case a => + Json.obj( + "pretty" -> BestEffortJsonEncoder(failOnUnknown = false, a.getClass.getClassLoader).circeEncoder.apply(a) + ) + } + implicit val resultsWithCountsEncoder: Encoder[ResultsWithCounts[Json]] = deriveConfiguredEncoder implicit val testResultsEncoder: Encoder[TestResults[Json]] = new Encoder[TestResults[Json]]() { @@ -47,10 +66,31 @@ object ManagementResources { implicit val nodeResult: Encoder[NodeResult[Json]] = deriveConfiguredEncoder implicit val expressionInvocationResult: Encoder[ExpressionInvocationResult[Json]] = deriveConfiguredEncoder implicit val externalInvocationResult: Encoder[ExternalInvocationResult[Json]] = deriveConfiguredEncoder + implicit val componentInfo: Encoder[ComponentInfo] = deriveConfiguredEncoder + implicit val nodeComponentInfo: Encoder[NodeComponentInfo] = deriveConfiguredEncoder implicit val resultContext: Encoder[ResultContext[Json]] = deriveConfiguredEncoder // TODO: do we want more information here? - implicit val throwable: Encoder[Throwable] = Encoder[Option[String]].contramap(th => Option(th.getMessage)) - implicit val exceptionResult: Encoder[ExceptionResult[Json]] = deriveConfiguredEncoder + + implicit val mapAnyEncoder: Encoder[Map[String, Any]] = (value: Map[String, Any]) => + value.map { case (key, value) => + key -> testResultsVariableEncoder(value) + }.asJson + + implicit val throwableEncoder: Encoder[Throwable] = Encoder[Option[String]].contramap(th => Option(th.getMessage)) + + implicit val contextEncoder: Encoder[Context] = (a: Context) => + Json.obj( + "id" -> Json.fromString(a.id), + "variables" -> a.variables.asJson + ) + + implicit val exceptionsEncoder: Encoder[NuExceptionInfo[_ <: Throwable]] = + (value: NuExceptionInfo[_ <: Throwable]) => + Json.obj( + "nodeComponentInfo" -> value.nodeComponentInfo.asJson, + "throwable" -> value.throwable.asInstanceOf[Throwable].asJson, + "context" -> value.context.asJson + ) override def apply(a: TestResults[Json]): Json = a match { case TestResults(nodeResults, invocationResults, externalInvocationResults, exceptions, _) => @@ -66,22 +106,6 @@ object ManagementResources { } - val testResultsVariableEncoder: Any => io.circe.Json = { - case displayable: DisplayJson => - def safeString(a: String) = Option(a).map(Json.fromString).getOrElse(Json.Null) - - val displayableJson = displayable.asJson - displayable.originalDisplay match { - case None => Json.obj("pretty" -> displayableJson) - case Some(original) => Json.obj("original" -> safeString(original), "pretty" -> displayableJson) - } - case null => Json.Null - case a => - Json.obj( - "pretty" -> BestEffortJsonEncoder(failOnUnknown = false, a.getClass.getClassLoader).circeEncoder.apply(a) - ) - } - } class ManagementResources( diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/test/ScenarioTestService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/test/ScenarioTestService.scala index b7dc677b6b6..88d869057a2 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/test/ScenarioTestService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/test/ScenarioTestService.scala @@ -148,7 +148,10 @@ class ScenarioTestService( private def computeCounts(canonical: CanonicalProcess, results: TestResults[_]): Map[String, NodeCount] = { val counts = results.nodeResults.map { case (key, nresults) => - key -> RawCount(nresults.size.toLong, results.exceptions.find(_.nodeId.contains(key)).size.toLong) + key -> RawCount( + nresults.size.toLong, + results.exceptions.find(_.nodeComponentInfo.map(_.nodeId).contains(key)).size.toLong + ) } processCounter.computeCounts(canonical, counts.get) } diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala index dba2ce3bc64..0ba086e9d20 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala @@ -207,12 +207,12 @@ class FlinkTestMainSpec extends AnyFunSuite with Matchers with Inside with Befor results.exceptions should have length 2 val exceptionFromExpression = results.exceptions.head - exceptionFromExpression.nodeId shouldBe Some("filter") + exceptionFromExpression.nodeComponentInfo.map(_.nodeId) shouldBe Some("filter") exceptionFromExpression.context.variables("input").asInstanceOf[SimpleRecord].id shouldBe "1" exceptionFromExpression.throwable.getMessage shouldBe "Expression [1 / #input.value1 >= 0] evaluation failed, message: / by zero" val exceptionFromService = results.exceptions.last - exceptionFromService.nodeId shouldBe Some("failing") + exceptionFromService.nodeComponentInfo.map(_.nodeId) shouldBe Some("failing") exceptionFromService.context.variables("input").asInstanceOf[SimpleRecord].id shouldBe "2" exceptionFromService.throwable.getMessage shouldBe "Thrown as expected" } @@ -316,7 +316,7 @@ class FlinkTestMainSpec extends AnyFunSuite with Matchers with Inside with Befor val results = runFlinkTest(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2)))) results.exceptions should have length 1 - results.exceptions.head.nodeId shouldBe Some("out") + results.exceptions.head.nodeComponentInfo.map(_.nodeId) shouldBe Some("out") results.exceptions.head.throwable.getMessage should include("message: / by zero") SinkForInts.data should have length 0 diff --git a/engine/lite/request-response/runtime/src/test/scala/pl/touk/nussknacker/engine/requestresponse/test/RequestResponseTestMainSpec.scala b/engine/lite/request-response/runtime/src/test/scala/pl/touk/nussknacker/engine/requestresponse/test/RequestResponseTestMainSpec.scala index 94758221297..394d8625708 100644 --- a/engine/lite/request-response/runtime/src/test/scala/pl/touk/nussknacker/engine/requestresponse/test/RequestResponseTestMainSpec.scala +++ b/engine/lite/request-response/runtime/src/test/scala/pl/touk/nussknacker/engine/requestresponse/test/RequestResponseTestMainSpec.scala @@ -5,6 +5,7 @@ import io.circe.Json import org.scalatest.BeforeAndAfterEach import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.api.Context import pl.touk.nussknacker.engine.api.runtimecontext.IncContextIdGenerator import pl.touk.nussknacker.engine.api.test.{ScenarioTestData, ScenarioTestJsonRecord} import pl.touk.nussknacker.engine.build.ScenarioBuilder @@ -103,8 +104,8 @@ class RequestResponseTestMainSpec extends AnyFunSuite with Matchers with BeforeA ExpressionInvocationResult(secondId, "expression", true) ) results.exceptions should have size 1 - results.exceptions.head.context shouldBe ResultContext(firstId, Map("input" -> Request1("a", "b"))) - results.exceptions.head.nodeId shouldBe Some("occasionallyThrowFilter") + results.exceptions.head.context shouldBe Context(firstId, Map("input" -> Request1("a", "b")), None) + results.exceptions.head.nodeComponentInfo.map(_.nodeId) shouldBe Some("occasionallyThrowFilter") results.exceptions.head.throwable.getMessage shouldBe """Expression [#input.field1() == 'a' ? 1/{0, 1}[0] == 0 : true] evaluation failed, message: / by zero""" } diff --git a/engine/lite/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/InterpreterTestRunnerTest.scala b/engine/lite/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/InterpreterTestRunnerTest.scala index c1e72cf8eda..9bf6580b122 100644 --- a/engine/lite/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/InterpreterTestRunnerTest.scala +++ b/engine/lite/runtime/src/test/scala/pl/touk/nussknacker/engine/lite/InterpreterTestRunnerTest.scala @@ -3,6 +3,7 @@ package pl.touk.nussknacker.engine.lite import io.circe.Json import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.api.Context import pl.touk.nussknacker.engine.api.test.{ScenarioTestData, ScenarioTestJsonRecord} import pl.touk.nussknacker.engine.build.{GraphBuilder, ScenarioBuilder} import pl.touk.nussknacker.engine.graph.expression.Expression @@ -194,14 +195,14 @@ class InterpreterTestRunnerTest extends AnyFunSuite with Matchers { results.nodeResults("fragment1") shouldBe List(NodeResult(ResultContext("fragment1", Map("in" -> 0)))) results.nodeResults("fragmentEnd") shouldBe List(NodeResult(ResultContext("fragment1", Map("in" -> 0)))) - results.exceptions.map(e => (e.context, e.nodeId, e.throwable.getMessage)) shouldBe List( + results.exceptions.map(e => (e.context, e.nodeComponentInfo.map(_.nodeId), e.throwable.getMessage)) shouldBe List( ( - ResultContext("fragment1", Map("in" -> 0)), + Context("fragment1", Map("in" -> 0), None), Some("fragmentEnd"), "Expression [4 / #in] evaluation failed, message: / by zero" ), ( - ResultContext("fragment1", Map("in" -> 0)), + Context("fragment1", Map("in" -> 0), None), Some("fragmentEnd"), "Expression [8 / #in] evaluation failed, message: / by zero" ) diff --git a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/testmode/TestProcess.scala b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/testmode/TestProcess.scala index e981c06867e..e9f20b21d40 100644 --- a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/testmode/TestProcess.scala +++ b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/testmode/TestProcess.scala @@ -9,7 +9,7 @@ object TestProcess { nodeResults: Map[String, List[NodeResult[T]]], invocationResults: Map[String, List[ExpressionInvocationResult[T]]], externalInvocationResults: Map[String, List[ExternalInvocationResult[T]]], - exceptions: List[ExceptionResult[T]], + exceptions: List[NuExceptionInfo[_ <: Throwable]], variableEncoder: Any => T ) { @@ -33,15 +33,8 @@ object TestProcess { ) } - def updateExceptionResult(espExceptionInfo: NuExceptionInfo[_ <: Throwable]) = { - copy(exceptions = - exceptions :+ ExceptionResult( - toResult(espExceptionInfo.context), - espExceptionInfo.nodeComponentInfo.map(_.nodeId), - espExceptionInfo.throwable - ) - ) - } + def updateExceptionResult(espExceptionInfo: NuExceptionInfo[_ <: Throwable]) = + copy(exceptions = exceptions :+ espExceptionInfo) // when evaluating e.g. keyBy expression can be invoked more than once... // TODO: is it the best way to handle it?? @@ -70,8 +63,6 @@ object TestProcess { case class ExternalInvocationResult[T](contextId: String, name: String, value: T) - case class ExceptionResult[T](context: ResultContext[T], nodeId: Option[String], throwable: Throwable) - case class ResultContext[T](id: String, variables: Map[String, T]) { def variableTyped[U <: T](name: String): Option[U] = variables.get(name).map(_.asInstanceOf[U]) diff --git a/utils/components-testkit/src/main/scala/pl/touk/nussknacker/engine/util/test/TestScenarioRunner.scala b/utils/components-testkit/src/main/scala/pl/touk/nussknacker/engine/util/test/TestScenarioRunner.scala index af4b1969f23..38ab6524a8b 100644 --- a/utils/components-testkit/src/main/scala/pl/touk/nussknacker/engine/util/test/TestScenarioRunner.scala +++ b/utils/components-testkit/src/main/scala/pl/touk/nussknacker/engine/util/test/TestScenarioRunner.scala @@ -67,21 +67,23 @@ trait TestScenarioRunnerBuilder[R <: TestScenarioRunner, B <: TestScenarioRunner object TestScenarioCollectorHandler { def createHandler(componentUseCase: ComponentUseCase): TestScenarioCollectorHandler = { - val (resultCollector, resultsCollectingHolder) = if (ComponentUseCase.TestRuntime == componentUseCase) { - val collectingListener = ResultsCollectingListenerHolder.registerRun(identity) - (new TestServiceInvocationCollector(collectingListener.runId), Some(collectingListener)) + + val resultsCollectingListener = ResultsCollectingListenerHolder.registerRun(identity) + + val resultCollector = if (ComponentUseCase.TestRuntime == componentUseCase) { + new TestServiceInvocationCollector(resultsCollectingListener.runId) } else { - (ProductionServiceInvocationCollector, None) + ProductionServiceInvocationCollector } - new TestScenarioCollectorHandler(resultCollector, resultsCollectingHolder) + new TestScenarioCollectorHandler(resultCollector, resultsCollectingListener) } final class TestScenarioCollectorHandler( val resultCollector: ResultCollector, - private val resultsCollectingListener: Option[ResultsCollectingListener] + val resultsCollectingListener: ResultsCollectingListener ) extends AutoCloseable { - def close(): Unit = resultsCollectingListener.foreach(_.clean()) + def close(): Unit = resultsCollectingListener.close() } } diff --git a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkProcessCompilerWithTestComponents.scala b/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkProcessCompilerWithTestComponents.scala index efd42baeacd..dfa7f3a32e3 100644 --- a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkProcessCompilerWithTestComponents.scala +++ b/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkProcessCompilerWithTestComponents.scala @@ -18,6 +18,7 @@ import pl.touk.nussknacker.engine.definition.ProcessDefinitionExtractor.{ import pl.touk.nussknacker.engine.definition.{GlobalVariableDefinitionExtractor, ProcessObjectDefinitionExtractor} import pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompiler import pl.touk.nussknacker.engine.process.exception.FlinkExceptionHandler +import pl.touk.nussknacker.engine.testmode.ResultsCollectingListener import pl.touk.nussknacker.engine.util.test.TestExtensionsHolder import scala.reflect.ClassTag @@ -29,7 +30,7 @@ class FlinkProcessCompilerWithTestComponents( objectNaming: ObjectNaming, componentUseCase: ComponentUseCase, testExtensionsHolder: TestExtensionsHolder, - testExceptionHolder: FlinkTestExceptionHolder, + resultsCollectingListener: ResultsCollectingListener, ) extends FlinkProcessCompiler(creator, processConfig, diskStateBackendSupport, objectNaming, componentUseCase) { override protected def definitions( @@ -93,6 +94,11 @@ class FlinkProcessCompilerWithTestComponents( (ModelDefinitionWithTypes(definitionsWithTestComponents), dictRegistry) } + override protected def adjustListeners( + defaults: List[ProcessListener], + processObjectDependencies: ProcessObjectDependencies + ): List[ProcessListener] = defaults :+ resultsCollectingListener + override protected def exceptionHandler( metaData: MetaData, processObjectDependencies: ProcessObjectDependencies, @@ -105,19 +111,17 @@ class FlinkProcessCompilerWithTestComponents( RestartStrategies.noRestart() override def handle(exceptionInfo: NuExceptionInfo[_ <: Throwable]): Unit = { - testExceptionHolder.addException(exceptionInfo) + resultsCollectingListener.exceptionThrown(exceptionInfo) } } case _ => - // additionally we forward errors to FlinkTestExceptionHolder - val testExceptionListener = new EmptyProcessListener { - override def exceptionThrown(exceptionInfo: NuExceptionInfo[_ <: Throwable]): Unit = { - testExceptionHolder.addException(exceptionInfo) - } - } - - new FlinkExceptionHandler(metaData, processObjectDependencies, listeners :+ testExceptionListener, classLoader) + new FlinkExceptionHandler( + metaData, + processObjectDependencies, + listeners, + classLoader + ) } private def testComponentsWithCategories[T <: Component: ClassTag] = @@ -125,7 +129,7 @@ class FlinkProcessCompilerWithTestComponents( def this( testExtensionsHolder: TestExtensionsHolder, - testExceptionHolder: FlinkTestExceptionHolder, + resultsCollectingListener: ResultsCollectingListener, modelData: ModelData, componentUseCase: ComponentUseCase ) = this( @@ -135,7 +139,7 @@ class FlinkProcessCompilerWithTestComponents( modelData.objectNaming, componentUseCase, testExtensionsHolder, - testExceptionHolder + resultsCollectingListener ) } diff --git a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestExceptionHolder.scala b/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestExceptionHolder.scala deleted file mode 100644 index 12ed94a9315..00000000000 --- a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestExceptionHolder.scala +++ /dev/null @@ -1,36 +0,0 @@ -package pl.touk.nussknacker.engine.flink.util.test - -import pl.touk.nussknacker.engine.api.exception.NuExceptionInfo -import pl.touk.nussknacker.engine.testmode.TestRunId - -final class FlinkTestExceptionHolder(testRunId: TestRunId) extends Serializable with AutoCloseable { - - def addException(exceptionInfo: NuExceptionInfo[_ <: Throwable]): Unit = - FlinkTestExceptionHolder.addException(testRunId, exceptionInfo) - - def getExceptions: List[NuExceptionInfo[_ <: Throwable]] = - FlinkTestExceptionHolder.exceptionsForId(testRunId) - - def close(): Unit = FlinkTestExceptionHolder.clean(testRunId) - -} - -object FlinkTestExceptionHolder { - - private var exceptions = Map[TestRunId, List[NuExceptionInfo[_ <: Throwable]]]().withDefaultValue(Nil) - - def register() = - new FlinkTestExceptionHolder(TestRunId.generate) - - private def exceptionsForId(testRunId: TestRunId): List[NuExceptionInfo[_ <: Throwable]] = - exceptions(testRunId) - - private def addException(testRunId: TestRunId, exceptionInfo: NuExceptionInfo[_ <: Throwable]): Unit = synchronized { - exceptions += (testRunId -> (exceptions(testRunId) ++ List(exceptionInfo))) - } - - private def clean(runId: TestRunId): Unit = synchronized { - exceptions -= runId - } - -} diff --git a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunner.scala b/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunner.scala index 8685c0157e2..6b82f0c2336 100644 --- a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunner.scala +++ b/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunner.scala @@ -2,8 +2,9 @@ package pl.touk.nussknacker.engine.flink.util.test import com.typesafe.config.Config import org.apache.flink.api.common.typeinfo.TypeInformation -import pl.touk.nussknacker.engine.api.ProcessVersion -import pl.touk.nussknacker.engine.api.component.ComponentDefinition +import pl.touk.nussknacker.engine.api.{Context, ProcessVersion} +import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, NodeComponentInfo} +import pl.touk.nussknacker.engine.api.exception.NuExceptionInfo import pl.touk.nussknacker.engine.api.process.{ComponentUseCase, EmptyProcessConfigCreator, SourceFactory} import pl.touk.nussknacker.engine.api.typed.typing import pl.touk.nussknacker.engine.api.typed.typing.Typed @@ -116,7 +117,6 @@ class FlinkTestScenarioRunner( // TODO: get flink mini cluster through composition val env = flinkMiniCluster.createExecutionEnvironment() - val exceptionHolder = FlinkTestExceptionHolder.register() val collectorHandler = TestScenarioCollectorHandler.createHandler(componentUseCase) // It's copied from registrar.register only for handling compilation errors.. @@ -124,7 +124,7 @@ class FlinkTestScenarioRunner( val compiler = new FlinkProcessCompilerWithTestComponents( testExtensionsHolder, - exceptionHolder, + collectorHandler.resultsCollectingListener, modelData, componentUseCase ) @@ -150,10 +150,9 @@ class FlinkTestScenarioRunner( env.executeAndWaitForFinished(scenario.id)() - RunUnitResult(errors = exceptionHolder.getExceptions) + RunUnitResult(errors = collectorHandler.resultsCollectingListener.results.exceptions) } } finally { - exceptionHolder.close() collectorHandler.close() } }