Skip to content
This repository has been archived by the owner on Jun 4, 2020. It is now read-only.

Commit

Permalink
bug(corgi-spark): Oracle DATE 数据类型映射 HIVE DATE 数据类型,丢失时分秒
Browse files Browse the repository at this point in the history
将 DATE 类型强转为 TIMESTAMP

closes #9
  • Loading branch information
dyingbleed committed Jan 18, 2019
1 parent 79cea2c commit d19daa4
Show file tree
Hide file tree
Showing 14 changed files with 175 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ private[spark] case class Column (name: String, dataType: Int) {
dataType == Types.NCHAR
}

def isDate: Boolean = dataType == Types.DATE

}
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,22 @@ case class Table (
JDBCUtils.getDistinct(conn, db, table, columnName, tsColumnName.get, beginTime, endTime).toSet
})

/**
* 将字段转换为 SQL SELECT 表达式
* @param columns 字段
* @return SQL SELECT 表达式
* */
def toSelectExpr(columns: Seq[Column]): String = {
vendor match {
case MYSQL => columns.map(c => c.name).mkString(",")
case ORACLE => {
columns.map(c => {
// fix bug #9
if (c.isDate) s"CAST(${c.name} AS TIMESTAMP) AS ${c.name}"
else c.name
}).mkString(",")
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ trait DataSource {
}
}

//TODO ugly implements
protected final def toSQLExpr(v: Any): String = {
tableMeta.vendor match {
case MYSQL => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import com.dyingbleed.corgi.spark.core.ODSMode._
import com.dyingbleed.corgi.spark.core.{Conf, Constants}
import com.dyingbleed.corgi.spark.util.DataSourceUtils
import com.google.inject.Inject
import org.apache.spark.internal.Logging
import org.apache.spark.sql.functions.{col, date_format, lit}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.joda.time.LocalDateTime

/**
* Created by 李震 on 2018/3/2.
*/
private[spark] class EL extends Logging {
private[spark] class EL {

@Inject
var conf: Conf = _
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package com.dyingbleed.corgi.spark.ds.el

import com.dyingbleed.corgi.spark.ds.DataSource
import org.apache.spark.internal.Logging
import org.apache.spark.sql.DataFrame

/**
* Created by 李震 on 2018/6/26.
*/
private[spark] abstract class IncrementalDataSource extends DataSource with Logging {
private[spark] abstract class IncrementalDataSource extends DataSource {

/**
* 加载数据源
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,18 @@ private[spark] class MySQLCompleteDataSource extends CompleteDataSource {
case COMPLETE => {
s"""
|(SELECT
| *
| ${tableMeta.toSelectExpr(tableMeta.columns)}
|FROM ${tableMeta.db}.${tableMeta.table}
|) t
""".stripMargin
}
case UPDATE | APPEND => {
val selectExr = tableMeta.columns
val normalColumns = tableMeta.columns
.filter(c => c.name.equals(tableMeta.tsColumnName.get))
.map(c => c.name).mkString(",")

s"""
|(SELECT
| $selectExr,
| ${tableMeta.toSelectExpr(normalColumns)},
| IFNULL(${tableMeta.tsColumnName.get}, TIMESTAMP('${tableMeta.tsDefaultVal.toString(Constants.DATETIME_FORMAT)}')) AS ${tableMeta.tsColumnName.get}
|FROM ${tableMeta.db}.${tableMeta.table}
|WHERE ${tableMeta.tsColumnName.get} < TIMESTAMP('${executeDateTime.toString(Constants.DATETIME_FORMAT)}')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ private[spark] class MySQLIncrementalDataSource extends IncrementalDataSource {
override protected def incrementalSQL: String = {
s"""
|(SELECT
| *
| ${tableMeta.toSelectExpr(tableMeta.columns)}
|FROM ${tableMeta.db}.${tableMeta.table}
|WHERE ${tableMeta.tsColumnName.get} > TIMESTAMP('${lastExecuteDateTime.toString(Constants.DATETIME_FORMAT)}')
|AND ${tableMeta.tsColumnName.get} < TIMESTAMP('${executeDateTime.toString(Constants.DATETIME_FORMAT)}')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,17 @@ private[spark] class OracleCompleteDataSource extends CompleteDataSource {
case COMPLETE => {
s"""
|(SELECT
| *
| ${tableMeta.toSelectExpr(tableMeta.columns)}
|FROM ${tableMeta.db}.${tableMeta.table}
|) t
""".stripMargin
}
case UPDATE | APPEND => {
val selectExp = tableMeta.columns
.filter(c => !c.name.equals(tableMeta.tsColumnName.get))
.map(c => c.name).mkString(",")
val normalColumns = tableMeta.columns.filter(c => !c.name.equals(tableMeta.tsColumnName.get))

s"""
|(SELECT
| $selectExp,
| ${tableMeta.toSelectExpr(normalColumns)},
| NVL(${tableMeta.tsColumnName.get}, TO_DATE('${tableMeta.tsDefaultVal.toString(Constants.DATETIME_FORMAT)}', 'yyyy-mm-dd hh24:mi:ss')) AS ${tableMeta.tsColumnName.get}
|FROM ${tableMeta.db}.${tableMeta.table}
|WHERE ${tableMeta.tsColumnName.get} < TO_DATE('${executeDateTime.toString(Constants.DATETIME_FORMAT)}', 'yyyy-mm-dd hh24:mi:ss')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ private[spark] class OracleIncrementalDataSource extends IncrementalDataSource {
override protected def incrementalSQL: String = {
s"""
|(SELECT
| *
|FROM ${conf.sourceDb}.${conf.sourceTable}
|WHERE ${conf.sourceTimeColumn} > TO_DATE('${lastExecuteDateTime.toString(Constants.DATETIME_FORMAT)}', 'yyyy-mm-dd hh24:mi:ss')
|AND ${conf.sourceTimeColumn} < TO_DATE('${executeDateTime.toString(Constants.DATETIME_FORMAT)}', 'yyyy-mm-dd hh24:mi:ss')
| ${tableMeta.toSelectExpr(tableMeta.columns)}
|FROM ${tableMeta.db}.${tableMeta.table}
|WHERE ${tableMeta.tsColumnName.get} > TO_DATE('${lastExecuteDateTime.toString(Constants.DATETIME_FORMAT)}', 'yyyy-mm-dd hh24:mi:ss')
|AND ${tableMeta.tsColumnName.get} < TO_DATE('${executeDateTime.toString(Constants.DATETIME_FORMAT)}', 'yyyy-mm-dd hh24:mi:ss')
|) t
""".stripMargin
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,32 @@ import org.joda.time.{Days, LocalDate}
private[spark] class MySQLCompleteSplitDataSource extends CompleteSplitDataSource {

override protected def loadPKRangeSplitDF: DataFrame = {
val sql = if (tableMeta.tsColumnName.isEmpty) s"${tableMeta.db}.${tableMeta.table}" else {
val selectExp = tableMeta.columns
.filter(c => !c.name.equals(tableMeta.tsColumnName.get))
.map(c => c.name)
.mkString(",")

s"""
|(SELECT
| *
|FROM (
| SELECT
| $selectExp,
| IFNULL(${tableMeta.tsColumnName.get}, TIMESTAMP('${tableMeta.tsDefaultVal.toString(Constants.DATETIME_FORMAT)}')) AS ${tableMeta.tsColumnName.get}
| FROM ${tableMeta.db}.${tableMeta.table}
|) s
|WHERE ${tableMeta.tsColumnName.get} < TIMESTAMP('${executeDateTime.toString(Constants.DATETIME_FORMAT)}')
|) t
val sql = conf.mode match {
case COMPLETE => {
s"""
|(SELECT
| ${tableMeta.toSelectExpr(tableMeta.columns)}
|FROM ${tableMeta.db}.${tableMeta.table}
|WHERE ${tableMeta.tsColumnName.get} < TIMESTAMP('${executeDateTime.toString(Constants.DATETIME_FORMAT)}')
|) t
""".stripMargin
}
case UPDATE | APPEND => {
val normalColumns = tableMeta.columns.filter(c => !c.name.equals(tableMeta.tsColumnName.get))

s"""
|(SELECT
| *
|FROM (
| SELECT
| ${tableMeta.toSelectExpr(normalColumns)},
| IFNULL(${tableMeta.tsColumnName.get}, TIMESTAMP('${tableMeta.tsDefaultVal.toString(Constants.DATETIME_FORMAT)}')) AS ${tableMeta.tsColumnName.get}
| FROM ${tableMeta.db}.${tableMeta.table}
|) s
|WHERE ${tableMeta.tsColumnName.get} < TIMESTAMP('${executeDateTime.toString(Constants.DATETIME_FORMAT)}')
|) t
""".stripMargin
}
}

val pkStats = tableMeta.stats(tableMeta.pk.last.name)
Expand All @@ -45,33 +53,33 @@ private[spark] class MySQLCompleteSplitDataSource extends CompleteSplitDataSourc
tableMeta.pk.last.name
}

val sql = if (tableMeta.tsColumnName.isEmpty) {
s"""
|(SELECT
| *
|FROM ${tableMeta.db}.${tableMeta.table}
|WHERE MOD(CONV(MD5($hashExpr), 16, 10), ${Constants.DEFAULT_PARALLEL}) = $mod
|) t
val sql = conf.mode match {
case COMPLETE => {
s"""
|(SELECT
| ${tableMeta.toSelectExpr(tableMeta.columns)}
|FROM ${tableMeta.db}.${tableMeta.table}
|WHERE MOD(CONV(MD5($hashExpr), 16, 10), ${Constants.DEFAULT_PARALLEL}) = $mod
|) t
""".stripMargin
} else {
val selectExp = tableMeta.columns
.filter(c => !c.name.equals(tableMeta.tsColumnName.get))
.map(c => c.name)
.mkString(",")
}
case UPDATE | APPEND => {
val normalColumns = tableMeta.columns.filter(c => !c.name.equals(tableMeta.tsColumnName.get))

s"""
|(SELECT
| *
|FROM (
| SELECT
| $selectExp,
| IFNULL(${tableMeta.tsColumnName}, TIMESTAMP('${tableMeta.tsDefaultVal.toString(Constants.DATETIME_FORMAT)}')) AS ${tableMeta.tsColumnName}
| FROM ${tableMeta.db}.${tableMeta.table}
|) s
|WHERE ${tableMeta.tsColumnName.get} < TIMESTAMP('${executeDateTime.toString(Constants.DATETIME_FORMAT)}')
|AND MOD(CONV(MD5($hashExpr), 16, 10), ${Constants.DEFAULT_PARALLEL}) = $mod
|) t
s"""
|(SELECT
| *
|FROM (
| SELECT
| ${tableMeta.toSelectExpr(normalColumns)},
| IFNULL(${tableMeta.tsColumnName}, TIMESTAMP('${tableMeta.tsDefaultVal.toString(Constants.DATETIME_FORMAT)}')) AS ${tableMeta.tsColumnName}
| FROM ${tableMeta.db}.${tableMeta.table}
|) s
|WHERE ${tableMeta.tsColumnName.get} < TIMESTAMP('${executeDateTime.toString(Constants.DATETIME_FORMAT)}')
|AND MOD(CONV(MD5($hashExpr), 16, 10), ${Constants.DEFAULT_PARALLEL}) = $mod
|) t
""".stripMargin
}
}

val df = jdbcDF(sql)
Expand All @@ -91,18 +99,15 @@ private[spark] class MySQLCompleteSplitDataSource extends CompleteSplitDataSourc
if (conf.mode == UPDATE || conf.mode == APPEND) {
val days = Days.daysBetween(tableMeta.tsDefaultVal, LocalDate.now()).getDays
for (d <- 0 to days) {
val selectExp = tableMeta.columns
.filter(c => !c.name.equals(tableMeta.tsColumnName.get))
.map(c => c.name)
.mkString(",")
val normalColumns = tableMeta.columns.filter(c => !c.name.equals(tableMeta.tsColumnName.get))

val sql =
s"""
|(SELECT
| s.*
|FROM (
| SELECT
| $selectExp,
| ${tableMeta.toSelectExpr(normalColumns)},
| NVL(${tableMeta.tsColumnName.get}, TO_DATE('${tableMeta.tsDefaultVal.toString(Constants.DATETIME_FORMAT)}', 'yyyy-mm-dd hh24:mi:ss')) AS ${tableMeta.tsColumnName.get}
| FROM ${tableMeta.db}.${tableMeta.table}
|) s
Expand All @@ -125,7 +130,7 @@ private[spark] class MySQLCompleteSplitDataSource extends CompleteSplitDataSourc
val sql =
s"""
|(SELECT
| *
| ${tableMeta.toSelectExpr(tableMeta.columns)}
|FROM ${tableMeta.db}.${tableMeta.table}
|WHERE $partitionColumnName = ${toSQLExpr(v)}
|) t
Expand All @@ -138,6 +143,8 @@ private[spark] class MySQLCompleteSplitDataSource extends CompleteSplitDataSourc
unionDF = unionDF.union(df)
}
}
} else {
throw new RuntimeException("不支持按分区字段分片")
}

unionDF
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.dyingbleed.corgi.spark.ds.el.split
import com.dyingbleed.corgi.spark.core.Constants
import org.apache.spark.sql.DataFrame
import org.joda.time.{Days, LocalDate}

/**
* Created by 李震 on 2019/1/8.
Expand All @@ -11,7 +10,7 @@ private[spark] class MySQLIncrementalSplitDataSource extends IncrementalSplitDat
override protected def loadPKRangeSplitDF: DataFrame = {
val sql = s"""
|(SELECT
| *
| ${tableMeta.toSelectExpr(tableMeta.columns)}
|FROM ${tableMeta.db}.${tableMeta.table}
|WHERE ${tableMeta.tsColumnName.get} > TIMESTAMP('${lastExecuteDateTime.toString(Constants.DATETIME_FORMAT)}')
|AND ${tableMeta.tsColumnName.get} < TIMESTAMP('${executeDateTime.toString(Constants.DATETIME_FORMAT)}')
Expand All @@ -34,7 +33,7 @@ private[spark] class MySQLIncrementalSplitDataSource extends IncrementalSplitDat

val sql = s"""
|(SELECT
| *
| ${tableMeta.toSelectExpr(tableMeta.columns)}
|FROM ${tableMeta.db}.${tableMeta.table}
|WHERE MOD(CONV(MD5($hashExpr), 16, 10), ${Constants.DEFAULT_PARALLEL}) = $mod
|AND ${tableMeta.tsColumnName.get} > TIMESTAMP('${lastExecuteDateTime.toString(Constants.DATETIME_FORMAT)}')
Expand All @@ -59,16 +58,15 @@ private[spark] class MySQLIncrementalSplitDataSource extends IncrementalSplitDat
val partitionColumnName = conf.partitionColumns(1)

for (v <- tableMeta.distinct(partitionColumnName, lastExecuteDateTime, executeDateTime)) {
val sql =
s"""
|(SELECT
| *
|FROM ${tableMeta.db}.${tableMeta.table}
|WHERE ${tableMeta.tsColumnName.get} > TIMESTAMP('${lastExecuteDateTime.toString(Constants.DATETIME_FORMAT)}')
|AND ${tableMeta.tsColumnName.get} < TIMESTAMP('${executeDateTime.toString(Constants.DATETIME_FORMAT)}')
|AND $partitionColumnName = ${toSQLExpr(v)}
|) t
""".stripMargin
val sql = s"""
|(SELECT
| ${tableMeta.toSelectExpr(tableMeta.columns)}
|FROM ${tableMeta.db}.${tableMeta.table}
|WHERE ${tableMeta.tsColumnName.get} > TIMESTAMP('${lastExecuteDateTime.toString(Constants.DATETIME_FORMAT)}')
|AND ${tableMeta.tsColumnName.get} < TIMESTAMP('${executeDateTime.toString(Constants.DATETIME_FORMAT)}')
|AND $partitionColumnName = ${toSQLExpr(v)}
|) t
""".stripMargin

val df = jdbcDF(sql)
if (unionDF == null) {
Expand Down
Loading

0 comments on commit d19daa4

Please sign in to comment.