Skip to content

Commit

Permalink
Fixed pattern match order
Browse files Browse the repository at this point in the history
Updated insert syntax to INSERT INTO ..SELECT in order to use parse_json method for snowflake variant column
  • Loading branch information
Manasaveena Ravindran committed Dec 1, 2022
1 parent fb692b9 commit 93d7db2
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ object SqlColumnType {

final val JSON = SqlColumnType(
"json",
Types.OTHER,
Types.VARCHAR,
productConfig = Map(
Mysql -> withName("JSON", sharedDefaultConfig),
Postgresql -> withName("JSONB", sharedDefaultConfig),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,15 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](
.identifier(database, schema, table)
.append(" (")
buildColumnList()
sqlBuilder.append(")\nVALUES (")
sqlBuilder.append(")\nSELECT ")
Range(0, columns.length).foreach { i =>
sqlBuilder.append("?")
(columns(i).dataType,product) match {
case (SqlColumnType.JSON,SupportedDatabase.Snowflake) => sqlBuilder.append("parse_json(?)")
case _ => sqlBuilder.append("?")
}
if (i < columns.length - 1) sqlBuilder.append(", ")
}
sqlBuilder.append(")")
// sqlBuilder.append(")")
product match {
case SupportedDatabase.Postgresql =>
if (!isTimescale) {
Expand Down Expand Up @@ -604,16 +607,14 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](
data: Map[String, Any],
statement: PreparedStatement,
element: E): Unit = {

val encoder = new JsonRowEncoder[Map[String,Any]]()
columns.zipWithIndex.map(x => (x._1, x._2 + 1)).foreach {
case (column, i) =>
data.get(column.name) match {
case Some(v) =>
val value = v match {
case null | None => null
case Some(x) => _matcher(x)
case x => _matcher(x)
case null | None => null
}
statement.setObject(i, value, column.dataType.jdbcType)
case None =>
Expand Down

0 comments on commit 93d7db2

Please sign in to comment.