diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala index ef69983a..6c830f88 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala @@ -8,6 +8,7 @@ import io.epiphanous.flinkrunner.operator.CreateTableJdbcSinkFunction import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat +import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.StatementExecutorFactory import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor import org.apache.flink.connector.jdbc.{ @@ -70,7 +71,7 @@ case class JdbcSinkConfig[ADT <: FlinkEvent]( config.getBooleanOpt(pfx("table.create.if.not.exists")).getOrElse(true) val schema: String = config.getStringOpt("table.schema").getOrElse("_default_") - val table: String = config.getString("table.name") + val table: String = config.getString(pfx("table.name")) val columns: Seq[JdbcSinkColumn] = config .getObjectList(pfx("table.columns")) .map(_.toConfig) @@ -103,7 +104,7 @@ case class JdbcSinkConfig[ADT <: FlinkEvent]( .filter(_.primaryKey.nonEmpty) .sortBy(_.primaryKey.get) .map(_.name) - .mkString(s" primary key pk_$table(", ", ", ")")} + .mkString(s"CONSTRAINT pk_$table PRIMARY KEY(", ", ", ")")} |) |""".stripMargin) else None @@ -194,12 +195,15 @@ case class JdbcSinkConfig[ADT <: FlinkEvent]( getJdbcConnectionOptions ), getJdbcExecutionOptions, - (_: RuntimeContext) => - JdbcBatchStatementExecutor.simple( - queryDml, - getStatementBuilder[E], - JavaFunction.identity[E] - ), + new StatementExecutorFactory[JdbcBatchStatementExecutor[E]] { + override def apply(t: RuntimeContext): JdbcBatchStatementExecutor[E] = { + JdbcBatchStatementExecutor.simple( + queryDml, + getStatementBuilder[E], + JavaFunction.identity[E] + ) + } + }, JdbcOutputFormat.RecordExtractor.identity[E] ) dataStream @@ -214,4 +218,4 @@ case class JdbcSinkConfig[ADT <: FlinkEvent]( object JdbcSinkConfig { final val DEFAULT_CONNECTION_TIMEOUT = 5 final val QUOTE_CHAR = "\"" -} +} \ No newline at end of file