Skip to content

Commit

Permalink
[NU-1665] Table source test data generation (#6195)
Browse files Browse the repository at this point in the history
add data generation support for table source
  • Loading branch information
mslabek committed Jul 11, 2024
1 parent b974e65 commit 3f655e4
Show file tree
Hide file tree
Showing 14 changed files with 388 additions and 12 deletions.
11 changes: 6 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,6 @@ lazy val publishSettings = Seq(
def defaultMergeStrategy: String => MergeStrategy = {
// remove JPMS module descriptors (a proper soultion would be to merge them)
case PathList(ps @ _*) if ps.last == "module-info.class" => MergeStrategy.discard
// this prevents problem with table api in runtime:
// https://stackoverflow.com/questions/60436823/issue-when-flink-upload-a-job-with-stream-sql-query
case PathList("org", "codehaus", "janino", "CompilerFactory.class") => MergeStrategy.discard
// we override Spring's class and we want to keep only our implementation
case PathList(ps @ _*) if ps.last == "NumberUtils.class" => MergeStrategy.first
// merge Netty version information files
Expand Down Expand Up @@ -1805,6 +1802,7 @@ lazy val flinkKafkaComponents = (project in flink("components/kafka"))
componentsUtils % Provided
)

// TODO: check if any flink-table / connector / format dependencies' scope can be limited
lazy val flinkTableApiComponents = (project in flink("components/table"))
.settings(commonSettings)
.settings(assemblyNoScala("flinkTable.jar"): _*)
Expand All @@ -1817,6 +1815,9 @@ lazy val flinkTableApiComponents = (project in flink("components/table"))
"org.apache.flink" % "flink-table-api-java-bridge" % flinkV,
"org.apache.flink" % "flink-table-planner-loader" % flinkV,
"org.apache.flink" % "flink-table-runtime" % flinkV,
"org.apache.flink" % "flink-clients" % flinkV,
"org.apache.flink" % "flink-connector-files" % flinkV, // needed for testing data generation
"org.apache.flink" % "flink-json" % flinkV, // needed for testing data generation
)
}
)
Expand Down Expand Up @@ -1987,7 +1988,7 @@ lazy val designer = (project in file("designer/server"))
"org.scala-lang.modules" %% "scala-parallel-collections" % "1.0.4",
"org.scala-lang.modules" %% "scala-xml" % "2.1.0"
)
case _ => Seq(),
case _ => Seq()
}
}
)
Expand Down Expand Up @@ -2041,7 +2042,7 @@ lazy val e2eTests = (project in file("e2e-tests"))
)
.enablePlugins(BuildInfoPlugin)
.settings(buildInfoSettings)
.dependsOn(testUtils % Test)
.dependsOn(testUtils % Test, scenarioApi % Test, designer % Test)

lazy val doTest = Seq(
Test / testOptions += Tests.Setup { () =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
scenarioTypes {
"batch": {
deploymentConfig: {
type: "flinkStreaming"
restUrl: ${?FLINK_REST_URL}
}
modelConfig: {
classPath: ["model/defaultModel.jar", "components/flink/flinkBase.jar", "components/flink-table/flinkTable.jar", "model/flinkExecutor.jar", "flink-dropwizard-metrics-deps/"]
rocksDB: {
enable: true
}
components.flinkTable {
// The table configuration file should be mounted on the Flink Taskmanager container under this path. You can
// override this path through the 'TABLES_DEFINITION_FILE' env variable for a given Flink service.
tableDefinitionFilePath: ${TABLES_DEFINITION_FILE}
enableFlinkBatchExecutionMode: true
}
}
category: "Default"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE transactions (
datetime TIMESTAMP,
client_id STRING,
amount DECIMAL(15, 2)
) WITH (
'connector' = 'filesystem',
'path' = 'file:///opt/flink/data/transactions-data/transactions',
'format' = 'csv'
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
services:

designer:
environment:
CONFIG_FILE: "/opt/nussknacker/conf/application.conf,/opt/nussknacker/conf/application-customizations.conf,/opt/nussknacker/conf/batch-customizations.conf"
TABLES_DEFINITION_FILE: "/opt/nussknacker/conf/tables-definition.sql"
volumes:
- ../../e2e-tests/src/test/resources/spec-setup/batch-config/batch-customizations.conf:/opt/nussknacker/conf/batch-customizations.conf
- ../../e2e-tests/src/test/resources/spec-setup/batch-config/tables-definition.sql:/opt/nussknacker/conf/tables-definition.sql

flink-jobmanager:
environment:
TABLES_DEFINITION_FILE: "/opt/flink/data/test-data/tables-definition.sql"
volumes:
- ../../e2e-tests/src/test/resources/spec-setup/batch-config/tables-definition.sql:/opt/flink/data/test-data/tables-definition.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package pl.touk.nussknacker

import io.circe.syntax.EncoderOps
import io.restassured.RestAssured.`given`
import io.restassured.module.scala.RestAssuredSupport.AddThenToResponse
import org.scalatest.freespec.AnyFreeSpecLike
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.config.WithE2EInstallationExampleRestAssuredUsersExtensions
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.spel.SpelExtension._
import pl.touk.nussknacker.test.{NuRestAssureExtensions, NuRestAssureMatchers, VeryPatientScalaFutures}
import pl.touk.nussknacker.ui.process.marshall.CanonicalProcessConverter.toScenarioGraph

class BatchDataGenerationSpec
extends AnyFreeSpecLike
with DockerBasedInstallationExampleNuEnvironment
with Matchers
with VeryPatientScalaFutures
with NuRestAssureExtensions
with NuRestAssureMatchers
with WithE2EInstallationExampleRestAssuredUsersExtensions {

private val simpleBatchTableScenario = ScenarioBuilder
.streaming("SumTransactions")
.source("sourceId", "table", "Table" -> "'transactions'".spel)
.emptySink("end", "dead-end")

private val designerServiceUrl = "http://localhost:8080"

"Generate file endpoint should generate records with randomized values for scenario with table source" in {
given()
.applicationState(
createBatchScenario(simpleBatchTableScenario.name.value)
)
.when()
.request()
.basicAuthAdmin()
.jsonBody(toScenarioGraph(simpleBatchTableScenario).asJson.spaces2)
.post(
s"$designerServiceUrl/api/testInfo/${simpleBatchTableScenario.name.value}/generate/10"
)
.Then()
.statusCode(200)
.body(
matchAllNdJsonWithRegexValues(s"""
|{
| "sourceId": "sourceId",
| "record": {
| "datetime": "${regexes.localDateRegex}",
| "client_id": "[a-z\\\\d]{100}",
| "amount": "${regexes.decimalRegex}"
| }
|}
|""".stripMargin)
)
}

private def createBatchScenario(scenarioName: String): Unit = {
given()
.when()
.request()
.basicAuthAdmin()
.jsonBody(s"""
|{
| "name" : "$scenarioName",
| "category" : "Default",
| "isFragment" : false,
| "processingMode" : "Bounded-Stream"
|}
|""".stripMargin)
.post(s"$designerServiceUrl/api/processes")
.Then()
.statusCode(201)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ object DockerBasedInstallationExampleNuEnvironment extends LazyLogging {
composeFiles = Seq(
new JFile("examples/installation/docker-compose.yml"),
new JFile(Resource.getUrl("spec-setup/spec-setup.override.yml").toURI),
new JFile(Resource.getUrl("spec-setup/batch-nu-designer.override.yml").toURI),
new JFile(Resource.getUrl("spec-setup/debuggable-nu-designer.override.yml").toURI)
),
env = Map(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package pl.touk.nussknacker.config

import io.restassured.specification.RequestSpecification
import pl.touk.nussknacker.test.NuRestAssureExtensions

trait WithE2EInstallationExampleRestAssuredUsersExtensions extends NuRestAssureExtensions {

implicit class UsersBasicAuth[T <: RequestSpecification](requestSpecification: T) {

def basicAuthAdmin(): RequestSpecification =
requestSpecification.preemptiveBasicAuth("admin", "admin")

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package pl.touk.nussknacker.engine.flink.table.source

import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.flink.table.extractor.SqlStatementReader
import pl.touk.nussknacker.engine.flink.table.extractor.SqlTestData.SimpleTypesTestCase

class TableSourceDataGenerationTest extends AnyFunSuite with Matchers {

/*
Note: Testing features like data generation or scenario testing (like ad hoc test) requires a full e2e test where
Designer is deployed separately (like in a docker container). This is because these features rely on Flink
Minicluster which relies on proper classloader setup, which is hard to do in simple tests. These tests below
are useful for checking the output of these methods, but if they pass it doesn't mean that it works e2e.
*/
test("table source should generate random records with given schema") {
val tableSource = new TableSource(
tableDefinition = SimpleTypesTestCase.tableDefinition,
sqlStatements = SqlStatementReader.readSql(SimpleTypesTestCase.sqlStatement),
enableFlinkBatchExecutionMode = true
)
val records = tableSource.generateTestData(10)

records.testRecords.size shouldBe 10
val mapRecord = records.testRecords.head.json.asObject.get.toMap
mapRecord("someString").isString shouldBe true
mapRecord("someVarChar").isString shouldBe true
mapRecord("someInt").isNumber shouldBe true
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
package pl.touk.nussknacker.engine.flink.table

import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.types.DataType
import pl.touk.nussknacker.engine.api.typed.typing.TypingResult

final case class TableDefinition(tableName: String, typingResult: TypingResult, columns: List[ColumnDefinition])
import scala.jdk.CollectionConverters._

final case class TableDefinition(tableName: String, typingResult: TypingResult, columns: List[ColumnDefinition]) {

def toFlinkSchema: Schema = {
val cols = columns.map(c => DataTypes.FIELD(c.columnName, c.flinkDataType)).asJava
Schema.newBuilder().fromRowDataType(DataTypes.ROW(cols)).build()
}

}

final case class ColumnDefinition(columnName: String, typingResult: TypingResult, flinkDataType: DataType)
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package pl.touk.nussknacker.engine.flink.table.source

import cats.implicits._
import com.typesafe.scalalogging.LazyLogging
import io.circe.parser.parse
import org.apache.commons.io.FileUtils
import org.apache.flink.configuration.{Configuration, CoreOptions, PipelineOptions}
import org.apache.flink.table.api.{EnvironmentSettings, Schema, TableDescriptor, TableEnvironment}
import pl.touk.nussknacker.engine.api.test.{TestData, TestRecord}
import pl.touk.nussknacker.engine.flink.table.source.FlinkMiniClusterDataGenerator._
import pl.touk.nussknacker.engine.util.ThreadUtils
import java.util.UUID

import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try, Using}

class FlinkMiniClusterDataGenerator(flinkTableSchema: Schema) extends LazyLogging {

private val env = ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) {
TableEnvironment.create(miniClusterTestingEnvConfig)
}

// TODO: check the minicluster releases memory properly and if not, refactor to reuse one minicluster per all usages
def generateTestData(amountOfRecordsToGenerate: Int): TestData =
// setting context classloader because Flink in multiple places relies on it and without this temporary override it doesnt have
// the necessary classes
ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) {
val inputTableName = createGeneratorTable(amountOfRecordsToGenerate)
val (tempDirForOutputTable, outputTableName) = createOutputFileTable
val generatedRows = Try {
insertDataAndAwait(inputTableName, outputTableName)
readRecordsFromFilesUnderPath(tempDirForOutputTable)
}
cleanup(tempDirForOutputTable, List(inputTableName, outputTableName))
val rows = generatedRows.get
TestData(rows.map(TestRecord(_)))
}

private def readRecordsFromFilesUnderPath(path: Path) = {
val filesUnderPath = Using(Files.newDirectoryStream(path)) { dirStream =>
dirStream.asScala.toList
}.get
val parsedRecords = filesUnderPath
.flatMap(f => FileUtils.readLines(f.toFile, StandardCharsets.UTF_8).asScala)
.map(parse)
.sequence
parsedRecords match {
case Left(ex) =>
throw new IllegalStateException("Couldn't parse record from test data dump", ex)
case Right(records) => records
}
}

private def insertDataAndAwait(inputTableName: String, outputTableName: String): Unit = {
val inputTable = env.from(inputTableName)
// TODO: Avoid blocking the thread. Refactor `generateTestData` to return future and use a separate blocking thread
// pool here
inputTable.insertInto(outputTableName).execute().await()
}

private def createGeneratorTable(amountOfRecordsToGenerate: Int): String = {
val tableName = generateTestDataInputTableName
env.createTemporaryTable(
tableName,
TableDescriptor
.forConnector("datagen")
.option("number-of-rows", amountOfRecordsToGenerate.toString)
.schema(flinkTableSchema)
.build()
)
tableName
}

private def createOutputFileTable: (Path, String) = {
val tempDir = Files.createTempDirectory(tempTestDataOutputTablePrefix)
logger.debug(s"Created temporary directory for dumping test data at: '${tempDir.toUri.toURL}'")
val tableName = generateTestDataOutputTableName
env.createTemporaryTable(
tableName,
TableDescriptor
.forConnector("filesystem")
.option("path", tempDir.toUri.toURL.toString)
.format("json")
.schema(flinkTableSchema)
.build()
)
tempDir -> tableName
}

private def cleanup(dir: Path, tableNames: List[String]): Unit = {
delete(dir)
tableNames.foreach(deleteTable)
}

private def delete(dir: Path): Unit = Try {
Files
.walk(dir)
.sorted(java.util.Comparator.reverseOrder())
.forEach(path => Files.deleteIfExists(path))
logger.debug(s"Successfully deleted temporary test data dumping directory at: '${dir.toUri.toURL}'")
} match {
case Failure(e) =>
logger.error(
s"Couldn't properly delete temporary test data dumping directory at: '${dir.toUri.toURL}'",
e
)
case Success(_) => ()
}

private def deleteTable(tableName: String): Unit = {
if (!env.dropTemporaryTable(tableName)) {
logger.error(s"Couldn't properly delete temporary temporary table: '$tableName'")
}
}

}

object FlinkMiniClusterDataGenerator {

private def tableNameValidRandomValue = UUID.randomUUID().toString.replaceAll("-", "")
private def generateTestDataInputTableName = s"testDataInputTable_$tableNameValidRandomValue"
private def generateTestDataOutputTableName = s"testDataOutputTable_$tableNameValidRandomValue"
private val miniClusterTestingEnvParallelism = Int.box(1)
private val tempTestDataOutputTablePrefix = "tableSourceDataDump-"

// TODO: how to get path of jar cleaner? Through config?
private val classPathUrlsForMiniClusterTestingEnv = List(
"components/flink-table/flinkTable.jar"
).map(Path.of(_).toUri.toURL)

private val miniClusterTestingEnvConfig = {
val conf = new Configuration()

// parent-first - otherwise linkage error (loader constraint violation, a different class with the same name was
// previously loaded by 'app') for class 'org.apache.commons.math3.random.RandomDataGenerator'
conf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first")

// without this, on Flink taskmanager level the classloader is basically empty
conf.set(
PipelineOptions.CLASSPATHS,
classPathUrlsForMiniClusterTestingEnv.map(_.toString).asJava
)
conf.set(CoreOptions.DEFAULT_PARALLELISM, miniClusterTestingEnvParallelism)
EnvironmentSettings.newInstance().withConfiguration(conf).build()
}

}
Loading

0 comments on commit 3f655e4

Please sign in to comment.