Skip to content

Commit

Permalink
Merge e022cba into 4279b13
Browse files Browse the repository at this point in the history
  • Loading branch information
sbhatt-mdsol committed Aug 10, 2022
2 parents 4279b13 + e022cba commit 154ef88
Showing 1 changed file with 13 additions and 9 deletions.
Expand Up @@ -8,6 +8,7 @@ import io.epiphanous.flinkrunner.operator.CreateTableJdbcSinkFunction
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat
import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.StatementExecutorFactory
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
import org.apache.flink.connector.jdbc.{
Expand Down Expand Up @@ -70,7 +71,7 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](
config.getBooleanOpt(pfx("table.create.if.not.exists")).getOrElse(true)
val schema: String =
config.getStringOpt("table.schema").getOrElse("_default_")
val table: String = config.getString("table.name")
val table: String = config.getString(pfx("table.name"))
val columns: Seq[JdbcSinkColumn] = config
.getObjectList(pfx("table.columns"))
.map(_.toConfig)
Expand Down Expand Up @@ -103,7 +104,7 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](
.filter(_.primaryKey.nonEmpty)
.sortBy(_.primaryKey.get)
.map(_.name)
.mkString(s" primary key pk_$table(", ", ", ")")}
.mkString(s"CONSTRAINT pk_$table PRIMARY KEY(", ", ", ")")}
|)
|""".stripMargin)
else None
Expand Down Expand Up @@ -194,12 +195,15 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](
getJdbcConnectionOptions
),
getJdbcExecutionOptions,
(_: RuntimeContext) =>
JdbcBatchStatementExecutor.simple(
queryDml,
getStatementBuilder[E],
JavaFunction.identity[E]
),
new StatementExecutorFactory[JdbcBatchStatementExecutor[E]] {
override def apply(t: RuntimeContext): JdbcBatchStatementExecutor[E] = {
JdbcBatchStatementExecutor.simple(
queryDml,
getStatementBuilder[E],
JavaFunction.identity[E]
)
}
},
JdbcOutputFormat.RecordExtractor.identity[E]
)
dataStream
Expand All @@ -214,4 +218,4 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](
object JdbcSinkConfig {
final val DEFAULT_CONNECTION_TIMEOUT = 5
final val QUOTE_CHAR = "\""
}
}

0 comments on commit 154ef88

Please sign in to comment.