diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/SqlColumnType.scala b/src/main/scala/io/epiphanous/flinkrunner/model/SqlColumnType.scala index 4353ae1..cb6cb78 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/SqlColumnType.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/SqlColumnType.scala @@ -188,7 +188,7 @@ object SqlColumnType { final val JSON = SqlColumnType( "json", - Types.OTHER, + Types.VARCHAR, productConfig = Map( Mysql -> withName("JSON", sharedDefaultConfig), Postgresql -> withName("JSONB", sharedDefaultConfig), diff --git a/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala b/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala index 958a8d2..812f2c7 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/model/sink/JdbcSinkConfig.scala @@ -281,12 +281,15 @@ case class JdbcSinkConfig[ADT <: FlinkEvent]( .identifier(database, schema, table) .append(" (") buildColumnList() - sqlBuilder.append(")\nVALUES (") + sqlBuilder.append(")\nSELECT ") Range(0, columns.length).foreach { i => - sqlBuilder.append("?") + (columns(i).dataType,product) match { + case (SqlColumnType.JSON,SupportedDatabase.Snowflake) => sqlBuilder.append("parse_json(?)") + case _ => sqlBuilder.append("?") + } if (i < columns.length - 1) sqlBuilder.append(", ") } - sqlBuilder.append(")") + // sqlBuilder.append(")") product match { case SupportedDatabase.Postgresql => if (!isTimescale) { @@ -604,16 +607,14 @@ case class JdbcSinkConfig[ADT <: FlinkEvent]( data: Map[String, Any], statement: PreparedStatement, element: E): Unit = { - - val encoder = new JsonRowEncoder[Map[String,Any]]() columns.zipWithIndex.map(x => (x._1, x._2 + 1)).foreach { case (column, i) => data.get(column.name) match { case Some(v) => val value = v match { + case null | None => null case Some(x) => _matcher(x) case x => _matcher(x) - case null | None => null } statement.setObject(i, value, column.dataType.jdbcType) case None =>