Skip to content

Commit

Permalink
[NU-1473] Table API sink (#5653)
Browse files Browse the repository at this point in the history
add sql based table api sink
  • Loading branch information
mslabek committed Mar 19, 2024
1 parent 9be7dc9 commit b2ab247
Show file tree
Hide file tree
Showing 27 changed files with 526 additions and 700 deletions.
2 changes: 1 addition & 1 deletion designer/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@
"start:backend-remote": "npm run clean-translations && webpack serve",
"start:backend-staging": "BACKEND_DOMAIN=https://staging.nussknacker.io npm run start:backend-remote",
"start:backend-demo": "BACKEND_DOMAIN=https://demo.nussknacker.io npm run start:backend-remote",
"backend:docker": "docker run -i -p ${BE_PORT:-8080}:8080 -e CONFIG_FORCE_usageStatisticsReports_enabled=false -e SCHEMA_REGISTRY_URL=http://dummy:8888 -e KAFKA_ADDRESS=dummy:9092 -e OPENAPI_SERVICE_URL=http://dummy:5000 -e SQL_ENRICHER_URL=dummy:5432 -e CONFIG_FORCE_scenarioTypes_streaming__dev_deploymentConfig_type=stub -e CONFIG_FILE=/opt/nussknacker/conf/dev-application.conf -v `readlink -f ../../nussknacker-dist/src/universal/conf/dev-application.conf`:/opt/nussknacker/conf/dev-application.conf --pull ${DOCKER_PULL_OPTION:=always} -P touk/nussknacker:${NUSSKNACKER_VERSION:=staging-latest}",
"backend:docker": "docker run -i -p ${BE_PORT:-8080}:8080 -e CONFIG_FORCE_usageStatisticsReports_enabled=false -e SCHEMA_REGISTRY_URL=http://dummy:8888 -e KAFKA_ADDRESS=dummy:9092 -e OPENAPI_SERVICE_URL=http://dummy:5000 -e SQL_ENRICHER_URL=dummy:5432 -e CONFIG_FORCE_scenarioTypes_streaming__dev_deploymentConfig_type=stub -e CONFIG_FORCE_scenarioTypes_streaming__dev_modelConfig_components_flinkTable_disabled=true -e CONFIG_FILE=/opt/nussknacker/conf/dev-application.conf -v `readlink -f ../../nussknacker-dist/src/universal/conf/dev-application.conf`:/opt/nussknacker/conf/dev-application.conf --pull ${DOCKER_PULL_OPTION:=always} -P touk/nussknacker:${NUSSKNACKER_VERSION:=staging-latest}",
"pretest": "npm run check",
"test:unit": "jest --reporters=default --reporters=jest-junit",
"test:e2e": "cypress run",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
pl.touk.nussknacker.engine.flink.table.TableComponentProvider
pl.touk.nussknacker.engine.flink.table.SqlComponentProvider
pl.touk.nussknacker.engine.flink.table.FlinkTableComponentProvider
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import com.typesafe.scalalogging.LazyLogging
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, ComponentProvider, NussknackerVersion}
import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies
import pl.touk.nussknacker.engine.flink.table.extractor.DataSourceSqlExtractor.extractTablesFromFlinkRuntime
import pl.touk.nussknacker.engine.flink.table.extractor.TableExtractor.extractTablesFromFlinkRuntime
import pl.touk.nussknacker.engine.flink.table.extractor.SqlStatementReader
import pl.touk.nussknacker.engine.flink.table.extractor.SqlStatementReader.SqlStatement
import pl.touk.nussknacker.engine.flink.table.extractor.{DataSourceTableDefinition, SqlStatementReader}
import pl.touk.nussknacker.engine.flink.table.source.SqlSourceFactory
import pl.touk.nussknacker.engine.flink.table.sink.TableSinkFactory
import pl.touk.nussknacker.engine.flink.table.source.TableSourceFactory
import pl.touk.nussknacker.engine.util.ResourceLoader
import pl.touk.nussknacker.engine.util.config.ConfigEnrichments.RichConfig

Expand All @@ -25,26 +26,30 @@ import scala.util.{Failure, Success, Try}
* kafka and schema registry addresses are provided as environment variables that are different for designer and
* jobmanager/taskmanager services. For reference see the nussknacker-quickstart repository.
*/
class SqlComponentProvider extends ComponentProvider with LazyLogging {
class FlinkTableComponentProvider extends ComponentProvider with LazyLogging {

import net.ceedubs.ficus.Ficus._

override def providerName: String = "sqlFile"
override def providerName: String = "flinkTable"
private val tableComponentName = "table"

override def resolveConfigForExecution(config: Config): Config = config

override def create(config: Config, dependencies: ProcessObjectDependencies): List[ComponentDefinition] = {
val parsedConfig = config.rootAs[SqlFileDataSourcesConfig]
val parsedConfig = config.rootAs[TableComponentProviderConfig]

val configs = extractDataSourceConfigFromSqlFileOrThrow(parsedConfig.sqlFilePath)
val definition = extractTableDefinitionsFromSqlFileOrThrow(parsedConfig.tableDefinitionFilePath)

ComponentDefinition(
"tableApi-source-sql",
new SqlSourceFactory(configs)
tableComponentName,
new TableSourceFactory(definition)
) :: ComponentDefinition(
tableComponentName,
new TableSinkFactory(definition)
) :: Nil
}

private def extractDataSourceConfigFromSqlFileOrThrow(filePath: String): SqlDataSourcesDefinition = {
private def extractTableDefinitionsFromSqlFileOrThrow(filePath: String) = {
val sqlStatements = readSqlFromFile(Paths.get(filePath))
val results = extractTablesFromFlinkRuntime(sqlStatements)

Expand All @@ -56,13 +61,13 @@ class SqlComponentProvider extends ComponentProvider with LazyLogging {
)
}

SqlDataSourcesDefinition(results.tableDefinitions, sqlStatements)
TableSqlDefinitions(results.tableDefinitions, sqlStatements)
}

private def readSqlFromFile(pathToFile: Path): List[SqlStatement] = Try(ResourceLoader.load(pathToFile)) match {
private def readSqlFromFile(pathToFile: Path) = Try(ResourceLoader.load(pathToFile)) match {
case Failure(exception) =>
throw new IllegalStateException(
s"""Sql file with configuration of sql data source components was not found under specified path: $pathToFile.
s"""Sql file with configuration of sql data source components was not found under specified path: $pathToFile.
|Exception: $exception""".stripMargin
)
case Success(fileContent) =>
Expand All @@ -75,9 +80,9 @@ class SqlComponentProvider extends ComponentProvider with LazyLogging {

}

final case class SqlDataSourcesDefinition(
tableDefinitions: List[DataSourceTableDefinition],
final case class TableSqlDefinitions(
tableDefinitions: List[TableDefinition],
sqlStatements: List[SqlStatement]
)

final case class SqlFileDataSourcesConfig(sqlFilePath: String)
final case class TableComponentProviderConfig(tableDefinitionFilePath: String)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package pl.touk.nussknacker.engine.flink.table

import org.apache.flink.table.types.DataType
import pl.touk.nussknacker.engine.api.typed.typing.TypingResult

final case class TableDefinition(tableName: String, typingResult: TypingResult, columns: List[ColumnDefinition])
final case class ColumnDefinition(columnName: String, typingResult: TypingResult, flinkDataType: DataType)
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ object SqlStatementReader extends LazyLogging {

type SqlStatement = String

// TODO: if configurator forgets a ';' - how do we signal it?
// TODO: if administrator forgets a ';' - how do we signal it?
def readSql(value: String): List[SqlStatement] = value.split(separatorPattern).toList.map(_.trim).filterNot(_.isEmpty)

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,23 @@ package pl.touk.nussknacker.engine.flink.table.extractor

import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
import pl.touk.nussknacker.engine.api.typed.typing.TypingResult
import pl.touk.nussknacker.engine.flink.table.TableDefinition
import pl.touk.nussknacker.engine.flink.table.extractor.SqlStatementNotExecutedError.statementNotExecutedErrorDescription
import pl.touk.nussknacker.engine.flink.table.extractor.SqlStatementReader.SqlStatement
import pl.touk.nussknacker.engine.flink.table.extractor.TypeExtractor.extractTypingResult

import scala.jdk.OptionConverters.RichOptional
import scala.util.{Failure, Success, Try}

object DataSourceSqlExtractor extends LazyLogging {
object TableExtractor extends LazyLogging {

import scala.jdk.CollectionConverters._

// TODO: Make this extractor more memory/cpu efficient and ensure closing of resources. For more details see
// https://github.com/TouK/nussknacker/pull/5627#discussion_r1512881038
def extractTablesFromFlinkRuntime(
sqlStatements: List[SqlStatement]
): DataSourceSqlExtractorResult = {
): TableExtractorResult = {
val settings = EnvironmentSettings
.newInstance()
.build()
Expand All @@ -44,24 +44,19 @@ object DataSourceSqlExtractor extends LazyLogging {
.getOrElse(
throw new IllegalStateException(s"Table extractor could not locate a created table with path: $tablePath")
)
tableSchemaTypingResult = extractTypingResult(table)
} yield DataSourceTableDefinition(tableName, tableSchemaTypingResult)
typedTable = extractTypingResult(table)
} yield TableDefinition(tableName, typedTable.typingResult, typedTable.columns)

DataSourceSqlExtractorResult(tableDefinitions, sqlErrors)
TableExtractorResult(tableDefinitions, sqlErrors)
}

}

final case class DataSourceSqlExtractorResult(
tableDefinitions: List[DataSourceTableDefinition],
final case class TableExtractorResult(
tableDefinitions: List[TableDefinition],
sqlStatementExecutionErrors: List[SqlStatementNotExecutedError]
)

final case class DataSourceTableDefinition(
tableName: String,
schemaTypingResult: TypingResult,
)

final case class SqlStatementNotExecutedError(
statement: SqlStatement,
exception: Throwable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,26 @@ package pl.touk.nussknacker.engine.flink.table.extractor
import org.apache.flink.table.api.Table
import org.apache.flink.table.types.DataType
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult}
import pl.touk.nussknacker.engine.flink.table.ColumnDefinition

object TypeExtractor {

import scala.jdk.CollectionConverters._

private case class ColumnTypingResult(columnName: String, typingResult: TypingResult)
final case class TableTypingResult(typingResult: TypingResult, columns: List[ColumnDefinition])

def extractTypingResult(tableFromEnv: Table): TypingResult = {
def extractTypingResult(tableFromEnv: Table): TableTypingResult = {
val columnsTypingData = tableFromEnv.getResolvedSchema.getColumns.asScala.map { column =>
ColumnTypingResult(column.getName, flinkTypeToTypingResult(column.getDataType))
ColumnDefinition(column.getName, flinkTypeToTypingResult(column.getDataType), column.getDataType)
}.toList
typedColumnsToRecordTypingResult(columnsTypingData)
TableTypingResult(typedColumnsToRecordTypingResult(columnsTypingData), columnsTypingData)
}

// TODO: handle complex types like maps, lists, rows, raws
private def flinkTypeToTypingResult(dataType: DataType) =
Typed.typedClass(dataType.getLogicalType.getDefaultConversion)

private def typedColumnsToRecordTypingResult(columns: List[ColumnTypingResult]) =
private def typedColumnsToRecordTypingResult(columns: List[ColumnDefinition]) =
Typed.record(columns.map(c => c.columnName -> c.typingResult).toMap)

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,31 @@ package pl.touk.nussknacker.engine.flink.table.sink

import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink, SingleOutputStreamOperator}
import org.apache.flink.streaming.api.functions.sink.DiscardingSink
import org.apache.flink.table.api.Expressions.$
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
import org.apache.flink.table.api.{Schema, Table}
import org.apache.flink.types.Row
import pl.touk.nussknacker.engine.api.{Context, LazyParameter, ValueWithContext}
import pl.touk.nussknacker.engine.flink.api.process.{FlinkCustomNodeContext, FlinkSink}
import pl.touk.nussknacker.engine.flink.table.utils.HardcodedSchema.{intColumnName, stringColumnName}
import pl.touk.nussknacker.engine.flink.table.utils.TableUtils.buildTableDescriptor
import pl.touk.nussknacker.engine.flink.table.DataSourceConfig
import pl.touk.nussknacker.engine.flink.table.utils.HardcodedSchema
import pl.touk.nussknacker.engine.flink.table.TableDefinition
import pl.touk.nussknacker.engine.flink.table.extractor.SqlStatementReader.SqlStatement
import pl.touk.nussknacker.engine.flink.table.utils.NestedRowConversions._
import pl.touk.nussknacker.engine.flink.table.utils.RowConversions.mapToRowUnsafe

class TableSink(config: DataSourceConfig, value: LazyParameter[java.util.Map[String, Any]]) extends FlinkSink {
class TableSink(
tableDefinition: TableDefinition,
sqlStatements: List[SqlStatement],
value: LazyParameter[AnyRef]
) extends FlinkSink {

override type Value = java.util.Map[String, Any]
override type Value = AnyRef

override def prepareValue(
dataStream: DataStream[Context],
flinkNodeContext: FlinkCustomNodeContext
): DataStream[ValueWithContext[Value]] = {
dataStream.flatMap(
flinkNodeContext.lazyParameterHelper.lazyMapFunction(value),
flinkNodeContext.valueWithContextInfo.forType(HardcodedSchema.typingResult)
flinkNodeContext.valueWithContextInfo.forType(tableDefinition.typingResult)
)
}

Expand All @@ -34,9 +37,6 @@ class TableSink(config: DataSourceConfig, value: LazyParameter[java.util.Map[Str
val env = dataStream.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

// TODO: this will not work with multiple sinks
val outputTableName = "some_sink_table_name"

/*
DataStream to Table transformation:
1. Map the dataStream to dataStream[Row] to fit schema in later step
Expand All @@ -48,38 +48,30 @@ class TableSink(config: DataSourceConfig, value: LazyParameter[java.util.Map[Str
6. Put the insert operation in the statementSet and do attachAsDataStream on it
7. Continue with a DiscardingSink as DataStream
*/
val streamOfRows: SingleOutputStreamOperator[Row] =
dataStream.map(ctx => HardcodedSchema.MapRowConversion.fromMap(ctx.value))
val streamOfRows: SingleOutputStreamOperator[Row] = dataStream
.map(valueWithContext => {
mapToRowUnsafe(valueWithContext.value.asInstanceOf[java.util.Map[String, Any]], tableDefinition.columns)
})

/*
This "f0" value is name given by flink at conversion of one element stream. For details read:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/.
This "f0" value is name given by flink at conversion of one element stream. For details read:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/.
*/
val nestedRowColumnName = "f0"
// TODO: avoid this step by mapping datastream directly without this intermediate table with nested row
val nestedRowSchema = Schema
.newBuilder()
.column(
nestedRowColumnName,
HardcodedSchema.rowDataType
)
.build()
// TODO: avoid this step by mapping DataStream directly without this intermediate table with nested row
val nestedRowSchema = columnsToSingleRowFlinkSchema(tableDefinition.columns)

val tableWithNestedRow: Table = tableEnv.fromDataStream(
streamOfRows,
nestedRowSchema
)

val flatInputValueTable = tableWithNestedRow
.select(
$(nestedRowColumnName).get(stringColumnName).as(stringColumnName),
$(nestedRowColumnName).get(intColumnName).as(intColumnName)
)
val inputValueTable = tableWithNestedRow
.select(flatteningSelectExpressions(tableDefinition.columns): _*)

val sinkTableDescriptor = buildTableDescriptor(config, HardcodedSchema.schema)
tableEnv.createTable(outputTableName, sinkTableDescriptor)
sqlStatements.foreach(tableEnv.executeSql)

val statementSet = tableEnv.createStatementSet();
statementSet.add(flatInputValueTable.insertInto(outputTableName))
val statementSet = tableEnv.createStatementSet()
statementSet.add(inputValueTable.insertInto(tableDefinition.tableName))
statementSet.attachAsDataStream()

/*
Expand Down

0 comments on commit b2ab247

Please sign in to comment.