Skip to content

Commit

Permalink
[NU-1455] Fix for broken encoding mechanism in tests from file, revert
Browse files Browse the repository at this point in the history
  • Loading branch information
lciolecki committed Mar 25, 2024
1 parent fc70a20 commit d27ff29
Show file tree
Hide file tree
Showing 43 changed files with 535 additions and 344 deletions.
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.api.deployment

import io.circe.Json
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.api.test.ScenarioTestData
Expand Down Expand Up @@ -73,7 +74,7 @@ case class TestScenarioCommand(
scenarioName: ProcessName,
canonicalProcess: CanonicalProcess,
scenarioTestData: ScenarioTestData
) extends ScenarioCommand[TestResults]
) extends ScenarioCommand[TestResults[Json]]

case class MakeScenarioSavepointCommand(scenarioName: ProcessName, savepointDir: Option[String])
extends ScenarioCommand[SavepointResult]
Expand Down
Expand Up @@ -7,16 +7,12 @@ import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller}
import com.typesafe.scalalogging.LazyLogging
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport
import io.circe.generic.extras.semiauto.deriveConfiguredEncoder
import io.circe.syntax._
import io.circe.{Decoder, Encoder, Json, parser}
import io.dropwizard.metrics5.MetricRegistry
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.graph.ScenarioGraph
import pl.touk.nussknacker.engine.api.exception.NuExceptionInfo
import pl.touk.nussknacker.engine.api.{Context, DisplayJson}
import pl.touk.nussknacker.engine.testmode.TestProcess._
import pl.touk.nussknacker.engine.util.json.BestEffortJsonEncoder
import pl.touk.nussknacker.restmodel.{CustomActionRequest, CustomActionResponse}
import pl.touk.nussknacker.ui.api.description.NodesApiEndpoints.Dtos.{
TestFromParametersRequest,
Expand All @@ -37,63 +33,21 @@ object ManagementResources {

import pl.touk.nussknacker.engine.api.CirceUtil._

import io.circe.syntax._

implicit val resultsWithCountsEncoder: Encoder[ResultsWithCounts] = deriveConfiguredEncoder

implicit val testResultsEncoder: Encoder[TestResults] = new Encoder[TestResults]() {
private implicit val testResultsEncoder: Encoder[TestResults[Json]] = new Encoder[TestResults[Json]]() {

implicit val anyEncoder: Encoder[Any] = {
case scenarioGraph: DisplayJson =>
def safeString(a: String) = Option(a).map(Json.fromString).getOrElse(Json.Null)

val scenarioGraphJson = scenarioGraph.asJson
scenarioGraph.originalDisplay match {
case None => Json.obj("pretty" -> scenarioGraphJson)
case Some(original) => Json.obj("original" -> safeString(original), "pretty" -> scenarioGraphJson)
}
case null => Json.Null
case a =>
Json.obj(
"pretty" -> BestEffortJsonEncoder(failOnUnknown = false, a.getClass.getClassLoader).circeEncoder.apply(a)
)
}
implicit val nodeResult: Encoder[ResultContext[Json]] = deriveConfiguredEncoder
implicit val expressionInvocationResult: Encoder[ExpressionInvocationResult[Json]] = deriveConfiguredEncoder
implicit val externalInvocationResult: Encoder[ExternalInvocationResult[Json]] = deriveConfiguredEncoder

// TODO: do we want more information here?
implicit val contextEncoder: Encoder[Context] = (a: Context) =>
Json.obj(
"id" -> Json.fromString(a.id),
"variables" -> a.variables.asJson
)

val throwableEncoder: Encoder[Throwable] = Encoder[Option[String]].contramap(th => Option(th.getMessage))
implicit val throwableEncoder: Encoder[Throwable] = Encoder[Option[String]].contramap(th => Option(th.getMessage))
implicit val exceptionResultEncoder: Encoder[ExceptionResult[Json]] = deriveConfiguredEncoder

// It has to be done manually, deriveConfiguredEncoder doesn't work properly with value: Any
implicit val externalInvocationResultEncoder: Encoder[ExternalInvocationResult] =
(value: ExternalInvocationResult) =>
Json.obj(
"name" -> Json.fromString(value.name),
"contextId" -> Json.fromString(value.contextId),
"value" -> value.value.asJson,
)

// It has to be done manually, deriveConfiguredEncoder doesn't work properly with value: Any
implicit val expressionInvocationResultEncoder: Encoder[ExpressionInvocationResult] =
(value: ExpressionInvocationResult) =>
Json.obj(
"name" -> Json.fromString(value.name),
"contextId" -> Json.fromString(value.contextId),
"value" -> value.value.asJson,
)

implicit val exceptionsEncoder: Encoder[NuExceptionInfo[_ <: Throwable]] =
(value: NuExceptionInfo[_ <: Throwable]) =>
Json.obj(
// We don't need componentId on the FE here
"nodeId" -> value.nodeComponentInfo.map(_.nodeId).asJson,
"throwable" -> throwableEncoder(value.throwable),
"context" -> value.context.asJson
)

override def apply(a: TestResults): Json = a match {
override def apply(a: TestResults[Json]): Json = a match {
case TestResults(nodeResults, invocationResults, externalInvocationResults, exceptions) =>
Json.obj(
"nodeResults" -> nodeResults.map { case (node, list) => node -> list.sortBy(_.id) }.asJson,
Expand Down Expand Up @@ -217,9 +171,7 @@ class ManagementResources(
details.isFragment,
RawScenarioTestData(testDataContent)
)
.flatMap {
mapResultsToHttpResponse
}
.flatMap(mapResultsToHttpResponse)
case Left(error) =>
Future.failed(ProcessUnmarshallingError(error.toString))
}
Expand Down Expand Up @@ -253,9 +205,7 @@ class ManagementResources(
details.isFragment,
rawScenarioTestData
)
.flatMap {
mapResultsToHttpResponse
}
.flatMap(mapResultsToHttpResponse)
}
}
}
Expand Down Expand Up @@ -283,9 +233,7 @@ class ManagementResources(
process.isFragment,
testParametersRequest.sourceParameters
)
.flatMap {
mapResultsToHttpResponse
}
.flatMap(mapResultsToHttpResponse)
}
}
}
Expand Down
Expand Up @@ -6,7 +6,7 @@ import pl.touk.nussknacker.engine.api.test.ScenarioTestData
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults
import pl.touk.nussknacker.ui.security.api.LoggedUser

import io.circe.Json
import scala.concurrent.{ExecutionContext, Future}

trait ScenarioTestExecutorService {
Expand All @@ -15,7 +15,7 @@ trait ScenarioTestExecutorService {
id: ProcessIdWithName,
canonicalProcess: CanonicalProcess,
scenarioTestData: ScenarioTestData,
)(implicit loggedUser: LoggedUser, ec: ExecutionContext): Future[TestResults]
)(implicit loggedUser: LoggedUser, ec: ExecutionContext): Future[TestResults[Json]]

}

Expand All @@ -25,8 +25,8 @@ class ScenarioTestExecutorServiceImpl(scenarioResolver: ScenarioResolver, deploy
override def testProcess(
id: ProcessIdWithName,
canonicalProcess: CanonicalProcess,
scenarioTestData: ScenarioTestData
)(implicit loggedUser: LoggedUser, ec: ExecutionContext): Future[TestResults] = {
scenarioTestData: ScenarioTestData,
)(implicit loggedUser: LoggedUser, ec: ExecutionContext): Future[TestResults[Json]] = {
for {
resolvedProcess <- Future.fromTry(scenarioResolver.resolveScenario(canonicalProcess))
testResult <- deploymentManager.processCommand(TestScenarioCommand(id.name, resolvedProcess, scenarioTestData))
Expand Down
@@ -1,6 +1,7 @@
package pl.touk.nussknacker.ui.process.test

import io.circe.Json
import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults
import pl.touk.nussknacker.ui.processreport.NodeCount

final case class ResultsWithCounts(results: TestResults, counts: Map[String, NodeCount])
final case class ResultsWithCounts(results: TestResults[Json], counts: Map[String, NodeCount])
Expand Up @@ -2,6 +2,7 @@ package pl.touk.nussknacker.ui.process.test

import com.carrotsearch.sizeof.RamUsageEstimator
import com.typesafe.scalalogging.LazyLogging
import io.circe.Json
import pl.touk.nussknacker.engine.api.graph.ScenarioGraph
import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName}
import pl.touk.nussknacker.engine.api.test.ScenarioTestData
Expand Down Expand Up @@ -74,7 +75,7 @@ class ScenarioTestService(
idWithName: ProcessIdWithName,
scenarioGraph: ScenarioGraph,
isFragment: Boolean,
rawTestData: RawScenarioTestData
rawTestData: RawScenarioTestData,
)(implicit ec: ExecutionContext, user: LoggedUser): Future[ResultsWithCounts] = {
for {
preliminaryScenarioTestData <- preliminaryScenarioTestDataSerDe
Expand All @@ -87,7 +88,7 @@ class ScenarioTestService(
testResults <- testExecutorService.testProcess(
idWithName,
canonical,
scenarioTestData
scenarioTestData,
)
_ <- {
assertTestResultsAreNotTooBig(testResults)
Expand All @@ -99,14 +100,14 @@ class ScenarioTestService(
idWithName: ProcessIdWithName,
scenarioGraph: ScenarioGraph,
isFragment: Boolean,
parameterTestData: TestSourceParameters
parameterTestData: TestSourceParameters,
)(implicit ec: ExecutionContext, user: LoggedUser): Future[ResultsWithCounts] = {
val canonical = toCanonicalProcess(scenarioGraph, idWithName.name, isFragment)
for {
testResults <- testExecutorService.testProcess(
idWithName,
canonical,
ScenarioTestData(parameterTestData.sourceId, parameterTestData.parameterExpressions)
ScenarioTestData(parameterTestData.sourceId, parameterTestData.parameterExpressions),
)
_ <- assertTestResultsAreNotTooBig(testResults)
} yield ResultsWithCounts(testResults, computeCounts(canonical, isFragment, testResults))
Expand All @@ -120,7 +121,7 @@ class ScenarioTestService(
processResolver.validateAndResolve(scenarioGraph, processName, isFragment)
}

private def assertTestResultsAreNotTooBig(testResults: TestResults): Future[Unit] = {
private def assertTestResultsAreNotTooBig(testResults: TestResults[_]): Future[Unit] = {
val testDataResultApproxByteSize = RamUsageEstimator.sizeOf(testResults)
if (testDataResultApproxByteSize > testDataSettings.resultsMaxBytes) {
logger.info(
Expand All @@ -132,13 +133,13 @@ class ScenarioTestService(
}
}

private def computeCounts(canonical: CanonicalProcess, isFragment: Boolean, results: TestResults)(
private def computeCounts(canonical: CanonicalProcess, isFragment: Boolean, results: TestResults[_])(
implicit loggedUser: LoggedUser
): Map[String, NodeCount] = {
val counts = results.nodeResults.map { case (key, nresults) =>
key -> RawCount(
nresults.size.toLong,
results.exceptions.find(_.nodeComponentInfo.map(_.nodeId).contains(key)).size.toLong
results.exceptions.find(_.nodeId.contains(key)).size.toLong
)
}
processCounter.computeCounts(canonical, isFragment, counts.get)
Expand Down
Expand Up @@ -38,7 +38,7 @@ class ManagementApiHttpServiceBusinessSpec
nodeResults = Map.empty,
invocationResults = Map.empty,
externalInvocationResults = Map.empty,
exceptions = List.empty
exceptions = List.empty,
)
)
)
Expand Down
1 change: 1 addition & 0 deletions docs/Changelog.md
Expand Up @@ -59,6 +59,7 @@
* [#5641](https://github.com/TouK/nussknacker/pull/5641) Fix: fetching/parsing batch periodic json only when needed (stop parsing during status check)
* [#5656](https://github.com/TouK/nussknacker/pull/5656) Added: Decision Table component - detailed validation
* [#5657](https://github.com/TouK/nussknacker/pull/5657) Improved heuristic for eventhub to Azure's schema name mapping.
* [#5754](https://github.com/TouK/nussknacker/pull/5754) Fix for broken encoding mechanism in tests from file with Avro format, revert [0d9b600][https://github.com/TouK/nussknacker/commit/0d9b600]

1.13.2 (7 Mar 2024)
------------------------
Expand Down
5 changes: 5 additions & 0 deletions docs/MigrationGuide.md
Expand Up @@ -142,6 +142,11 @@ To see the biggest differences please consult the [changelog](Changelog.md).
* [#5641](https://github.com/TouK/nussknacker/pull/5641) `PeriodicProcessDeployment`/`DeploymentWithJarData`/`PeriodicProcess` now takes type parameter `CanonicalProcess` or `Unit` to point out whether it contains scenario json.
* [#5656](https://github.com/TouK/nussknacker/pull/5656) `pl.touk.nussknacker.engine.api.expression.Expression#language` method returns `Language` trait instead of `String`
* [#5707](https://github.com/TouK/nussknacker/pull/5707) `ParameterName` data class was introduced. It replaces `String` in whole places where it's used as a parameter name
* [#5754](https://github.com/TouK/nussknacker/pull/5754) Fix for broken encoding mechanism in tests from file with Avro format, revert [0d9b600][https://github.com/TouK/nussknacker/commit/0d9b600]
* Classes `ResultsCollectingListener`, `TestResults`, `ExpressionInvocationResult`, `ExternalInvocationResult` depend on `T`
* Classes `TestResults.nodeResults` uses `ResultContext` instead of `Context`
* Classes `TestResults.exceptions` uses `ExceptionResult` instead of `NuExceptionInfo`
* Added `variableEncoder` to `ResultsCollectingListenerHolder.registerRun`

### REST API changes
* [#5280](https://github.com/TouK/nussknacker/pull/5280)[#5368](https://github.com/TouK/nussknacker/pull/5368) Changes in the definition API:
Expand Down
Expand Up @@ -3,6 +3,7 @@ package pl.touk.nussknacker.development.manager
import cats.data.Validated.valid
import cats.data.ValidatedNel
import com.typesafe.config.Config
import io.circe.Json
import pl.touk.nussknacker.development.manager.MockableDeploymentManagerProvider.MockableDeploymentManager
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus}
Expand Down Expand Up @@ -47,13 +48,13 @@ object MockableDeploymentManagerProvider {
object MockableDeploymentManager extends DeploymentManager with StubbingCommands {

private val scenarioStatuses = new AtomicReference[Map[ScenarioName, StateStatus]](Map.empty)
private val testResults = new AtomicReference[Map[ScenarioName, TestResults]](Map.empty)
private val testResults = new AtomicReference[Map[ScenarioName, TestResults[Json]]](Map.empty)

def configure(scenarioStates: Map[ScenarioName, StateStatus]): Unit = {
scenarioStatuses.set(scenarioStates)
}

def configureTestResults(scenarioTestResults: Map[ScenarioName, TestResults]): Unit = {
def configureTestResults(scenarioTestResults: Map[ScenarioName, TestResults[Json]]): Unit = {
testResults.set(scenarioTestResults)
}

Expand Down
Expand Up @@ -36,7 +36,7 @@ class ForEachTransformerSpec extends AnyFunSuite with FlinkSpec with Matchers wi
val testProcess =
aProcessWithForEachNode(elements = "{'one', 'other'}", resultExpression = s"#$forEachOutputVariableName + '_1'")

val results = collectTestResults[String](model, testProcess, collectingListener)
val results = collectTestResults(model, testProcess, collectingListener)
extractResultValues(results) shouldBe List("one_1", "other_1")
}

Expand All @@ -47,7 +47,7 @@ class ForEachTransformerSpec extends AnyFunSuite with FlinkSpec with Matchers wi
val testProcess =
aProcessWithForEachNode(elements = "{'one', 'other'}", resultExpression = s"#$forEachOutputVariableName + '_1'")

val results = collectTestResults[String](model, testProcess, collectingListener)
val results = collectTestResults(model, testProcess, collectingListener)
extractContextIds(results) shouldBe List("forEachProcess-start-0-0-0", "forEachProcess-start-0-0-1")
}

Expand All @@ -70,15 +70,15 @@ class ForEachTransformerSpec extends AnyFunSuite with FlinkSpec with Matchers wi

val testProcess = aProcessWithForEachNode(elements = "{}")

val results = collectTestResults[String](model, testProcess, collectingListener)
val results = collectTestResults(model, testProcess, collectingListener)
results.nodeResults shouldNot contain key sinkId
}

private def initializeListener = ResultsCollectingListenerHolder.registerRun
private def initializeListener = ResultsCollectingListenerHolder.registerListener

private def modelData(
list: List[TestRecord] = List(),
collectingListener: ResultsCollectingListener
collectingListener: ResultsCollectingListener[Any]
): LocalModelData = {
val modelConfig = ConfigFactory
.empty()
Expand Down Expand Up @@ -110,17 +110,17 @@ class ForEachTransformerSpec extends AnyFunSuite with FlinkSpec with Matchers wi
private def collectTestResults[T](
model: LocalModelData,
testProcess: CanonicalProcess,
collectingListener: ResultsCollectingListener
): TestProcess.TestResults = {
collectingListener: ResultsCollectingListener[T]
): TestProcess.TestResults[T] = {
runProcess(model, testProcess)
collectingListener.results
}

private def extractResultValues(results: TestProcess.TestResults): List[String] = results
private def extractResultValues(results: TestProcess.TestResults[_]): List[String] = results
.nodeResults(sinkId)
.map(_.get[String](resultVariableName).get)
.map(_.variableTyped(resultVariableName).get.asInstanceOf[String])

private def extractContextIds(results: TestProcess.TestResults): List[String] = results
private def extractContextIds(results: TestProcess.TestResults[_]): List[String] = results
.nodeResults(forEachNodeResultId)
.map(_.id)

Expand Down
Expand Up @@ -19,7 +19,7 @@ class PeriodicSourceFactorySpec extends AnyFunSuite with FlinkSpec with PatientS
val sinkId = "sinkId"
val input = "some value"

val collectingListener = ResultsCollectingListenerHolder.registerRun
val collectingListener = ResultsCollectingListenerHolder.registerListener
val model = LocalModelData(
ConfigFactory.empty(),
FlinkBaseComponentProvider.Components,
Expand All @@ -43,7 +43,7 @@ class PeriodicSourceFactorySpec extends AnyFunSuite with FlinkSpec with PatientS
try {
eventually {
val results = collectingListener.results.nodeResults.get(sinkId)
results.flatMap(_.headOption).flatMap(_.get[String]("input")) shouldBe Some(input)
results.flatMap(_.headOption).flatMap(_.variableTyped("input")) shouldBe Some(input)
}
} finally {
stoppableEnv.cancel(id.getJobID)
Expand Down

0 comments on commit d27ff29

Please sign in to comment.