-
Notifications
You must be signed in to change notification settings - Fork 90
/
ScenarioTestService.scala
159 lines (143 loc) · 6.81 KB
/
ScenarioTestService.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package pl.touk.nussknacker.ui.process.test
import com.carrotsearch.sizeof.RamUsageEstimator
import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.process.ProcessIdWithName
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.definition.test.{ModelDataTestInfoProvider, TestInfoProvider, TestingCapabilities}
import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults
import pl.touk.nussknacker.restmodel.definition.UISourceParameters
import pl.touk.nussknacker.engine.api.test.ScenarioTestData
import pl.touk.nussknacker.restmodel.displayedgraph.DisplayableProcess
import pl.touk.nussknacker.ui.api.{TestDataSettings, TestSourceParameters}
import pl.touk.nussknacker.ui.definition.UIProcessObjectsFactory
import pl.touk.nussknacker.ui.process.deployment.ScenarioTestExecutorService
import pl.touk.nussknacker.ui.process.processingtypedata.ProcessingTypeDataProvider
import pl.touk.nussknacker.ui.processreport.{NodeCount, ProcessCounter, RawCount}
import pl.touk.nussknacker.ui.security.api.LoggedUser
import pl.touk.nussknacker.ui.uiresolving.UIProcessResolving
import scala.concurrent.{ExecutionContext, Future}
object ScenarioTestService {
def apply(
providers: ProcessingTypeDataProvider[ModelData, _],
testDataSettings: TestDataSettings,
processResolving: UIProcessResolving,
processCounter: ProcessCounter,
testExecutorService: ScenarioTestExecutorService,
): ScenarioTestService = {
new ScenarioTestService(
providers.mapValues(new ModelDataTestInfoProvider(_)),
testDataSettings,
new PreliminaryScenarioTestDataSerDe(testDataSettings),
processResolving,
processCounter,
testExecutorService,
)
}
}
class ScenarioTestService(
testInfoProviders: ProcessingTypeDataProvider[TestInfoProvider, _],
testDataSettings: TestDataSettings,
preliminaryScenarioTestDataSerDe: PreliminaryScenarioTestDataSerDe,
processResolving: UIProcessResolving,
processCounter: ProcessCounter,
testExecutorService: ScenarioTestExecutorService,
) extends LazyLogging {
def getTestingCapabilities(displayableProcess: DisplayableProcess): TestingCapabilities = {
val testInfoProvider = testInfoProviders.forTypeUnsafe(displayableProcess.processingType)
val canonical = toCanonicalProcess(displayableProcess)
testInfoProvider.getTestingCapabilities(canonical)
}
def testParametersDefinition(displayableProcess: DisplayableProcess): List[UISourceParameters] = {
val testInfoProvider = testInfoProviders.forTypeUnsafe(displayableProcess.processingType)
val canonical = toCanonicalProcess(displayableProcess)
testInfoProvider
.getTestParameters(canonical)
.map { case (id, params) => UISourceParameters(id, params.map(UIProcessObjectsFactory.createUIParameter)) }
.toList
}
def generateData(displayableProcess: DisplayableProcess, testSampleSize: Int): Either[String, RawScenarioTestData] = {
val testInfoProvider = testInfoProviders.forTypeUnsafe(displayableProcess.processingType)
val canonical = toCanonicalProcess(displayableProcess)
for {
_ <- Either.cond(
testSampleSize <= testDataSettings.maxSamplesCount,
(),
s"Too many samples requested, limit is ${testDataSettings.maxSamplesCount}"
)
generatedData <- testInfoProvider
.generateTestData(canonical, testSampleSize)
.toRight("Test data could not be generated for scenario")
rawTestData <- preliminaryScenarioTestDataSerDe.serialize(generatedData)
} yield rawTestData
}
def performTest[T](
idWithName: ProcessIdWithName,
displayableProcess: DisplayableProcess,
rawTestData: RawScenarioTestData,
testResultsVariableEncoder: Any => T
)(implicit ec: ExecutionContext, user: LoggedUser): Future[ResultsWithCounts[T]] = {
val testInfoProvider = testInfoProviders.forTypeUnsafe(displayableProcess.processingType)
for {
preliminaryScenarioTestData <- preliminaryScenarioTestDataSerDe
.deserialize(rawTestData)
.fold(error => Future.failed(new IllegalArgumentException(error)), Future.successful)
canonical = toCanonicalProcess(displayableProcess)
scenarioTestData <- testInfoProvider
.prepareTestData(preliminaryScenarioTestData, canonical)
.fold(error => Future.failed(new IllegalArgumentException(error)), Future.successful)
testResults <- testExecutorService.testProcess(
idWithName,
canonical,
displayableProcess.category,
displayableProcess.processingType,
scenarioTestData,
testResultsVariableEncoder
)
_ <- assertTestResultsAreNotTooBig(testResults)
} yield ResultsWithCounts(testResults, computeCounts(canonical, testResults))
}
def performTest[T](
idWithName: ProcessIdWithName,
displayableProcess: DisplayableProcess,
parameterTestData: TestSourceParameters,
testResultsVariableEncoder: Any => T
)(implicit ec: ExecutionContext, user: LoggedUser): Future[ResultsWithCounts[T]] = {
val canonical = toCanonicalProcess(displayableProcess)
for {
testResults <- testExecutorService.testProcess(
idWithName,
canonical,
displayableProcess.category,
displayableProcess.processingType,
ScenarioTestData(parameterTestData.sourceId, parameterTestData.parameterExpressions),
testResultsVariableEncoder
)
_ <- assertTestResultsAreNotTooBig(testResults)
} yield ResultsWithCounts(testResults, computeCounts(canonical, testResults))
}
private def toCanonicalProcess(displayableProcess: DisplayableProcess): CanonicalProcess = {
val validationResult = processResolving.validateBeforeUiResolving(displayableProcess)
processResolving.resolveExpressions(displayableProcess, validationResult.typingInfo)
}
private def assertTestResultsAreNotTooBig(testResults: TestResults[_]): Future[Unit] = {
val testDataResultApproxByteSize = RamUsageEstimator.sizeOf(testResults)
if (testDataResultApproxByteSize > testDataSettings.resultsMaxBytes) {
logger.info(
s"Test data limit exceeded. Approximate test data size: $testDataResultApproxByteSize, but limit is: ${testDataSettings.resultsMaxBytes}"
)
Future.failed(new RuntimeException("Too much test data. Please decrease test input data size."))
} else {
Future.successful(())
}
}
private def computeCounts(canonical: CanonicalProcess, results: TestResults[_]): Map[String, NodeCount] = {
val counts = results.nodeResults.map { case (key, nresults) =>
key -> RawCount(
nresults.size.toLong,
results.exceptions.find(_.nodeComponentInfo.map(_.nodeId).contains(key)).size.toLong
)
}
processCounter.computeCounts(canonical, counts.get)
}
}