Skip to content

Commit

Permalink
Use ResultsCollectingListenerHolder
Browse files Browse the repository at this point in the history
  • Loading branch information
Łukasz Ciołecki committed Oct 23, 2023
1 parent 173fe9b commit 1680de4
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 101 deletions.
Expand Up @@ -12,8 +12,10 @@ import io.circe.syntax._
import io.circe.{Decoder, Encoder, Json, parser}
import io.dropwizard.metrics5.MetricRegistry
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.DisplayJson
import pl.touk.nussknacker.engine.api.{Context, DisplayJson}
import pl.touk.nussknacker.engine.api.component.{ComponentInfo, NodeComponentInfo}
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.exception.NuExceptionInfo
import pl.touk.nussknacker.engine.testmode.TestProcess._
import pl.touk.nussknacker.engine.util.json.BestEffortJsonEncoder
import pl.touk.nussknacker.restmodel.displayedgraph.DisplayableProcess
Expand All @@ -40,17 +42,55 @@ object ManagementResources {

import pl.touk.nussknacker.engine.api.CirceUtil._

// TODO: Why not just use the BestEffortJsonEncoder?
val testResultsVariableEncoder: Any => io.circe.Json = {
case displayable: DisplayJson =>
def safeString(a: String) = Option(a).map(Json.fromString).getOrElse(Json.Null)

val displayableJson = displayable.asJson
displayable.originalDisplay match {
case None => Json.obj("pretty" -> displayableJson)
case Some(original) => Json.obj("original" -> safeString(original), "pretty" -> displayableJson)
}
case null => Json.Null
case a =>
Json.obj(
"pretty" -> BestEffortJsonEncoder(failOnUnknown = false, a.getClass.getClassLoader).circeEncoder.apply(a)
)
}

implicit val resultsWithCountsEncoder: Encoder[ResultsWithCounts[Json]] = deriveConfiguredEncoder

implicit val testResultsEncoder: Encoder[TestResults[Json]] = new Encoder[TestResults[Json]]() {

implicit val nodeResult: Encoder[NodeResult[Json]] = deriveConfiguredEncoder
implicit val expressionInvocationResult: Encoder[ExpressionInvocationResult[Json]] = deriveConfiguredEncoder
implicit val externalInvocationResult: Encoder[ExternalInvocationResult[Json]] = deriveConfiguredEncoder
implicit val componentInfo: Encoder[ComponentInfo] = deriveConfiguredEncoder
implicit val nodeComponentInfo: Encoder[NodeComponentInfo] = deriveConfiguredEncoder
implicit val resultContext: Encoder[ResultContext[Json]] = deriveConfiguredEncoder
// TODO: do we want more information here?
implicit val throwable: Encoder[Throwable] = Encoder[Option[String]].contramap(th => Option(th.getMessage))
implicit val exceptionResult: Encoder[ExceptionResult[Json]] = deriveConfiguredEncoder

implicit val mapAnyEncoder: Encoder[Map[String, Any]] = (value: Map[String, Any]) =>
value.map { case (key, value) =>
key -> testResultsVariableEncoder(value)
}.asJson

implicit val throwableEncoder: Encoder[Throwable] = Encoder[Option[String]].contramap(th => Option(th.getMessage))

implicit val contextEncoder: Encoder[Context] = (a: Context) =>
Json.obj(
"id" -> Json.fromString(a.id),
"variables" -> a.variables.asJson
)

implicit val exceptionsEncoder: Encoder[NuExceptionInfo[_ <: Throwable]] =
(value: NuExceptionInfo[_ <: Throwable]) =>
Json.obj(
"nodeComponentInfo" -> value.nodeComponentInfo.asJson,
"throwable" -> value.throwable.asInstanceOf[Throwable].asJson,
"context" -> value.context.asJson
)

override def apply(a: TestResults[Json]): Json = a match {
case TestResults(nodeResults, invocationResults, externalInvocationResults, exceptions, _) =>
Expand All @@ -66,22 +106,6 @@ object ManagementResources {

}

val testResultsVariableEncoder: Any => io.circe.Json = {
case displayable: DisplayJson =>
def safeString(a: String) = Option(a).map(Json.fromString).getOrElse(Json.Null)

val displayableJson = displayable.asJson
displayable.originalDisplay match {
case None => Json.obj("pretty" -> displayableJson)
case Some(original) => Json.obj("original" -> safeString(original), "pretty" -> displayableJson)
}
case null => Json.Null
case a =>
Json.obj(
"pretty" -> BestEffortJsonEncoder(failOnUnknown = false, a.getClass.getClassLoader).circeEncoder.apply(a)
)
}

}

class ManagementResources(
Expand Down
Expand Up @@ -148,7 +148,10 @@ class ScenarioTestService(

private def computeCounts(canonical: CanonicalProcess, results: TestResults[_]): Map[String, NodeCount] = {
val counts = results.nodeResults.map { case (key, nresults) =>
key -> RawCount(nresults.size.toLong, results.exceptions.find(_.nodeId.contains(key)).size.toLong)
key -> RawCount(
nresults.size.toLong,
results.exceptions.find(_.nodeComponentInfo.map(_.nodeId).contains(key)).size.toLong
)
}
processCounter.computeCounts(canonical, counts.get)
}
Expand Down
Expand Up @@ -207,12 +207,12 @@ class FlinkTestMainSpec extends AnyFunSuite with Matchers with Inside with Befor
results.exceptions should have length 2

val exceptionFromExpression = results.exceptions.head
exceptionFromExpression.nodeId shouldBe Some("filter")
exceptionFromExpression.nodeComponentInfo.map(_.nodeId) shouldBe Some("filter")
exceptionFromExpression.context.variables("input").asInstanceOf[SimpleRecord].id shouldBe "1"
exceptionFromExpression.throwable.getMessage shouldBe "Expression [1 / #input.value1 >= 0] evaluation failed, message: / by zero"

val exceptionFromService = results.exceptions.last
exceptionFromService.nodeId shouldBe Some("failing")
exceptionFromService.nodeComponentInfo.map(_.nodeId) shouldBe Some("failing")
exceptionFromService.context.variables("input").asInstanceOf[SimpleRecord].id shouldBe "2"
exceptionFromService.throwable.getMessage shouldBe "Thrown as expected"
}
Expand Down Expand Up @@ -316,7 +316,7 @@ class FlinkTestMainSpec extends AnyFunSuite with Matchers with Inside with Befor
val results = runFlinkTest(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))))

results.exceptions should have length 1
results.exceptions.head.nodeId shouldBe Some("out")
results.exceptions.head.nodeComponentInfo.map(_.nodeId) shouldBe Some("out")
results.exceptions.head.throwable.getMessage should include("message: / by zero")

SinkForInts.data should have length 0
Expand Down
Expand Up @@ -5,6 +5,7 @@ import io.circe.Json
import org.scalatest.BeforeAndAfterEach
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.Context
import pl.touk.nussknacker.engine.api.runtimecontext.IncContextIdGenerator
import pl.touk.nussknacker.engine.api.test.{ScenarioTestData, ScenarioTestJsonRecord}
import pl.touk.nussknacker.engine.build.ScenarioBuilder
Expand Down Expand Up @@ -103,8 +104,8 @@ class RequestResponseTestMainSpec extends AnyFunSuite with Matchers with BeforeA
ExpressionInvocationResult(secondId, "expression", true)
)
results.exceptions should have size 1
results.exceptions.head.context shouldBe ResultContext(firstId, Map("input" -> Request1("a", "b")))
results.exceptions.head.nodeId shouldBe Some("occasionallyThrowFilter")
results.exceptions.head.context shouldBe Context(firstId, Map("input" -> Request1("a", "b")), None)
results.exceptions.head.nodeComponentInfo.map(_.nodeId) shouldBe Some("occasionallyThrowFilter")
results.exceptions.head.throwable.getMessage shouldBe """Expression [#input.field1() == 'a' ? 1/{0, 1}[0] == 0 : true] evaluation failed, message: / by zero"""
}

Expand Down
Expand Up @@ -3,6 +3,7 @@ package pl.touk.nussknacker.engine.lite
import io.circe.Json
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.Context
import pl.touk.nussknacker.engine.api.test.{ScenarioTestData, ScenarioTestJsonRecord}
import pl.touk.nussknacker.engine.build.{GraphBuilder, ScenarioBuilder}
import pl.touk.nussknacker.engine.graph.expression.Expression
Expand Down Expand Up @@ -194,14 +195,14 @@ class InterpreterTestRunnerTest extends AnyFunSuite with Matchers {

results.nodeResults("fragment1") shouldBe List(NodeResult(ResultContext("fragment1", Map("in" -> 0))))
results.nodeResults("fragmentEnd") shouldBe List(NodeResult(ResultContext("fragment1", Map("in" -> 0))))
results.exceptions.map(e => (e.context, e.nodeId, e.throwable.getMessage)) shouldBe List(
results.exceptions.map(e => (e.context, e.nodeComponentInfo.map(_.nodeId), e.throwable.getMessage)) shouldBe List(
(
ResultContext("fragment1", Map("in" -> 0)),
Context("fragment1", Map("in" -> 0), None),
Some("fragmentEnd"),
"Expression [4 / #in] evaluation failed, message: / by zero"
),
(
ResultContext("fragment1", Map("in" -> 0)),
Context("fragment1", Map("in" -> 0), None),
Some("fragmentEnd"),
"Expression [8 / #in] evaluation failed, message: / by zero"
)
Expand Down
Expand Up @@ -9,7 +9,7 @@ object TestProcess {
nodeResults: Map[String, List[NodeResult[T]]],
invocationResults: Map[String, List[ExpressionInvocationResult[T]]],
externalInvocationResults: Map[String, List[ExternalInvocationResult[T]]],
exceptions: List[ExceptionResult[T]],
exceptions: List[NuExceptionInfo[_ <: Throwable]],
variableEncoder: Any => T
) {

Expand All @@ -33,15 +33,8 @@ object TestProcess {
)
}

def updateExceptionResult(espExceptionInfo: NuExceptionInfo[_ <: Throwable]) = {
copy(exceptions =
exceptions :+ ExceptionResult(
toResult(espExceptionInfo.context),
espExceptionInfo.nodeComponentInfo.map(_.nodeId),
espExceptionInfo.throwable
)
)
}
def updateExceptionResult(espExceptionInfo: NuExceptionInfo[_ <: Throwable]) =
copy(exceptions = exceptions :+ espExceptionInfo)

// when evaluating e.g. keyBy expression can be invoked more than once...
// TODO: is it the best way to handle it??
Expand Down Expand Up @@ -70,8 +63,6 @@ object TestProcess {

case class ExternalInvocationResult[T](contextId: String, name: String, value: T)

case class ExceptionResult[T](context: ResultContext[T], nodeId: Option[String], throwable: Throwable)

case class ResultContext[T](id: String, variables: Map[String, T]) {

def variableTyped[U <: T](name: String): Option[U] = variables.get(name).map(_.asInstanceOf[U])
Expand Down
Expand Up @@ -67,21 +67,23 @@ trait TestScenarioRunnerBuilder[R <: TestScenarioRunner, B <: TestScenarioRunner
object TestScenarioCollectorHandler {

def createHandler(componentUseCase: ComponentUseCase): TestScenarioCollectorHandler = {
val (resultCollector, resultsCollectingHolder) = if (ComponentUseCase.TestRuntime == componentUseCase) {
val collectingListener = ResultsCollectingListenerHolder.registerRun(identity)
(new TestServiceInvocationCollector(collectingListener.runId), Some(collectingListener))

val resultsCollectingListener = ResultsCollectingListenerHolder.registerRun(identity)

val resultCollector = if (ComponentUseCase.TestRuntime == componentUseCase) {
new TestServiceInvocationCollector(resultsCollectingListener.runId)
} else {
(ProductionServiceInvocationCollector, None)
ProductionServiceInvocationCollector
}

new TestScenarioCollectorHandler(resultCollector, resultsCollectingHolder)
new TestScenarioCollectorHandler(resultCollector, resultsCollectingListener)
}

final class TestScenarioCollectorHandler(
val resultCollector: ResultCollector,
private val resultsCollectingListener: Option[ResultsCollectingListener]
val resultsCollectingListener: ResultsCollectingListener
) extends AutoCloseable {
def close(): Unit = resultsCollectingListener.foreach(_.clean())
def close(): Unit = resultsCollectingListener.close()
}

}
Expand Down
Expand Up @@ -18,6 +18,7 @@ import pl.touk.nussknacker.engine.definition.ProcessDefinitionExtractor.{
import pl.touk.nussknacker.engine.definition.{GlobalVariableDefinitionExtractor, ProcessObjectDefinitionExtractor}
import pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompiler
import pl.touk.nussknacker.engine.process.exception.FlinkExceptionHandler
import pl.touk.nussknacker.engine.testmode.ResultsCollectingListener
import pl.touk.nussknacker.engine.util.test.TestExtensionsHolder

import scala.reflect.ClassTag
Expand All @@ -29,7 +30,7 @@ class FlinkProcessCompilerWithTestComponents(
objectNaming: ObjectNaming,
componentUseCase: ComponentUseCase,
testExtensionsHolder: TestExtensionsHolder,
testExceptionHolder: FlinkTestExceptionHolder,
resultsCollectingListener: ResultsCollectingListener,
) extends FlinkProcessCompiler(creator, processConfig, diskStateBackendSupport, objectNaming, componentUseCase) {

override protected def definitions(
Expand Down Expand Up @@ -93,6 +94,11 @@ class FlinkProcessCompilerWithTestComponents(
(ModelDefinitionWithTypes(definitionsWithTestComponents), dictRegistry)
}

override protected def adjustListeners(
defaults: List[ProcessListener],
processObjectDependencies: ProcessObjectDependencies
): List[ProcessListener] = defaults :+ resultsCollectingListener

override protected def exceptionHandler(
metaData: MetaData,
processObjectDependencies: ProcessObjectDependencies,
Expand All @@ -105,27 +111,25 @@ class FlinkProcessCompilerWithTestComponents(
RestartStrategies.noRestart()

override def handle(exceptionInfo: NuExceptionInfo[_ <: Throwable]): Unit = {
testExceptionHolder.addException(exceptionInfo)
resultsCollectingListener.exceptionThrown(exceptionInfo)
}

}
case _ =>
// additionally we forward errors to FlinkTestExceptionHolder
val testExceptionListener = new EmptyProcessListener {
override def exceptionThrown(exceptionInfo: NuExceptionInfo[_ <: Throwable]): Unit = {
testExceptionHolder.addException(exceptionInfo)
}
}

new FlinkExceptionHandler(metaData, processObjectDependencies, listeners :+ testExceptionListener, classLoader)
new FlinkExceptionHandler(
metaData,
processObjectDependencies,
listeners,
classLoader
)
}

private def testComponentsWithCategories[T <: Component: ClassTag] =
testExtensionsHolder.components[T].map(cd => cd.name -> WithCategories(cd.component.asInstanceOf[T])).toMap

def this(
testExtensionsHolder: TestExtensionsHolder,
testExceptionHolder: FlinkTestExceptionHolder,
resultsCollectingListener: ResultsCollectingListener,
modelData: ModelData,
componentUseCase: ComponentUseCase
) = this(
Expand All @@ -135,7 +139,7 @@ class FlinkProcessCompilerWithTestComponents(
modelData.objectNaming,
componentUseCase,
testExtensionsHolder,
testExceptionHolder
resultsCollectingListener
)

}

This file was deleted.

Expand Up @@ -2,8 +2,9 @@ package pl.touk.nussknacker.engine.flink.util.test

import com.typesafe.config.Config
import org.apache.flink.api.common.typeinfo.TypeInformation
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.component.ComponentDefinition
import pl.touk.nussknacker.engine.api.{Context, ProcessVersion}
import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, NodeComponentInfo}
import pl.touk.nussknacker.engine.api.exception.NuExceptionInfo
import pl.touk.nussknacker.engine.api.process.{ComponentUseCase, EmptyProcessConfigCreator, SourceFactory}
import pl.touk.nussknacker.engine.api.typed.typing
import pl.touk.nussknacker.engine.api.typed.typing.Typed
Expand Down Expand Up @@ -116,15 +117,14 @@ class FlinkTestScenarioRunner(

// TODO: get flink mini cluster through composition
val env = flinkMiniCluster.createExecutionEnvironment()
val exceptionHolder = FlinkTestExceptionHolder.register()
val collectorHandler = TestScenarioCollectorHandler.createHandler(componentUseCase)

// It's copied from registrar.register only for handling compilation errors..
// TODO: figure how to get compilation result on highest level - registrar.register?
val compiler =
new FlinkProcessCompilerWithTestComponents(
testExtensionsHolder,
exceptionHolder,
collectorHandler.resultsCollectingListener,
modelData,
componentUseCase
)
Expand All @@ -150,10 +150,9 @@ class FlinkTestScenarioRunner(

env.executeAndWaitForFinished(scenario.id)()

RunUnitResult(errors = exceptionHolder.getExceptions)
RunUnitResult(errors = collectorHandler.resultsCollectingListener.results.exceptions)
}
} finally {
exceptionHolder.close()
collectorHandler.close()
}
}
Expand Down

0 comments on commit 1680de4

Please sign in to comment.