Skip to content

etl engine融合查询语法

hw2499 edited this page Oct 27, 2023 · 4 revisions

流批一体数据交换 etl-engine 融合查询语法
流批一体融合查询

场景1

流批一体融合查询

场景2

融合查询语法

etl-engine引擎中的融合查询提供将多源数据在内存中重组关联查询并输出查询结果的能力。 融合查询语法遵循ANSI SQL标准,与常规MySQL查询语法很相似。

  • 支持对多种类别数据库之间读取的数据进行融合查询。
  • 支持消息流数据传输过程中动态产生的数据与多种类型数据库之间的流计算查询。
  • 融合查询语法遵循ANSI SQL标准。

样本

SELECT 
	t_u.u_id,t_u.u_name,t_u.u_phone,
	t_o.o_id,t_o.o_price,t_o.o_number,t_o.o_money,t_o.o_writetime,
	t_p.p_id,t_p.p_name,t_p.p_contacts,t_p.p_desc  
FROM  t_u 
	inner JOIN  t_o  
	ON t_o.u_id=t_u.u_id 
	inner JOIN  t_p 
	ON t_o.p_id=t_p.p_id 
ORDER BY  
	INTEGER(SUBSTRING(t_u.u_id,3))  ASC 
-- 左连接样本1

WITH  t_r2 
 ( u_id,u_name,u_phone) 
as (
select u_id,u_name,lpad(substr(u_phone,7,4),11,'*') as u_phone from t_u1
union all
select u_id,u_name,lpad(substr(u_phone,7,4),11,'*') as u_phone from t_u2
)

select 
 sj_uuid as rdi_uuid,u_id as rdi_uid,u_name as rdi_uname, u_phone as rdi_uphone, IFNULL(sj_name,'未知') as rdi_sname, IFNULL(sj_score,0) as rdi_score, IFNULL(sj_writetime,'未知') as rdi_writetime 
from t_r1 left join  t_r2 on t_r1.sj_uid=t_r2.u_id 


-- 左连接样本2

WITH  t_r2 
 ( u_id,u_name,u_phone) 
as (
select u_id,u_name,lpad(substr(u_phone,7,4),11,'*') as u_phone from t_u1
union all
select u_id,u_name,lpad(substr(u_phone,7,4),11,'*') as u_phone from t_u2
)
,  t_r3
 ( u_id,u_name,u_phone) 
as (
  select * from t_r2 where u_id>100
) 
select 
 sj_uuid as rdi_uuid,IFNULL(u_id,0) as rdi_uid,IFNULL(u_name,'未知') as rdi_uname, IFNULL(u_phone,'未知') as rdi_uphone, IFNULL(sj_name,'未知') as rdi_sname, IFNULL(sj_score,0) as rdi_score, IFNULL(sj_writetime,'未知') as rdi_writetime 
from t_r1 left join  t_r3 on t_r1.sj_uid=t_r3.u_id 

融合查询API

  • CommonFusionQuery
    融合查询使用样本
    //融合查询语法
	federationQuery := "SELECT t_u.u_id,t_u.u_name,t_u.u_phone,t_o.o_id,t_o.o_price,t_o.o_number,t_o.o_money,t_o.o_writetime,t_p.p_id,t_p.p_name,t_p.p_contacts,t_p.p_desc  FROM  t_u inner JOIN  t_o  ON t_o.u_id=t_u.u_id inner JOIN  t_p ON t_o.p_id=t_p.p_id ORDER BY  INTEGER(SUBSTRING(t_u.u_id,3))  ASC "
	//表别名
	tableAliasName := []string{"t_o","t_u","t_p"}

	jsonRows := []string{
		gjson.Parse(newRows).Get("rows").String(),
		gjson.Parse(userInfoRows).Get("rows").String(),
		gjson.Parse(productInfoRows).Get("rows").String(),
		}


	newRows,err1 := common.CommonFusionQuery(federationQuery,tableAliasName,jsonRows)
	if err1!=nil{
		fmt.Println("融合查询失败:",err1)
		return "",err1
	}else{
		return newRows,nil
	}

语法格式

  • select 支持
SELECT
      [DISTINCT] field [, field ...]
      FROM table [, table ...]
      [WHERE condition]
      [GROUP BY field [, field ...] ]
      [HAVING condition]
   [ORDER BY order_item [, order_item ...] ]
   [LIMIT number_of_records [OFFSET number_of_records ] ]
  • table 支持 INNER JOIN、LEFT JOIN、RIGHT JOIN、OUTER JOIN、CROSS JOIN等

运算符支持

+ - * / == < <= > >= <> != || IS BETWEEN IN LIKE NOT ANY AND OR INTERSECT UNION EXCEPT EXISTS

函数支持

逻辑函数

COALESCE(value [, value ...])
IF(condition, value1, value2)
IFNULL(value1, value2)
NULLIF(value1, value2)

数字函数(30多个)

ABS(number)
ACOS(number)
ACOSH(number)
ASIN(number)
ASINH(number)
ATAN(number)
ATAN2(number2, number1)
ATANH(number)
CBRT(number)
CEIL(number)
COS(number)
COSH(number)
EXP(number)
EXP2(number)
EXPM1(number)
FLOOR(number)
IS_INF(number [, sign])
IS_NAN(number)
LOG(number)
LOG10(number)
LOG1P(number)
LOG2(number)
LOGB(number)
POW(base, exponent)
ROUND(number)
SIN(number)
SINH(number)
SQRT(number)
TAN(number)
TANH(number)
BIN_TO_DEC(bin)
OCT_TO_DEC(oct)
HEX_TO_DEC(hex)
ENOTATION_TO_DEC(enotation)
BIN(integer)
OCT(integer)
HEX(integer)
ENOTATION(float)
NUMBER_FORMAT(number [, precision, decimalPoint, thousandsSeparator, decimalSeparator])
RAND(min, max)

日期函数(30多个)

NOW()
DATETIME_FORMAT(datetime, format)
YEAR(datetime)
MONTH(datetime)
DAY(datetime)
HOUR(datetime)
MINUTE(datetime)
SECOND(datetime)
MILLISECOND(datetime)
MICROSECOND(datetime)
NANOSECOND(datetime)
WEEKDAY(datetime)
UNIX_TIME(datetime)
UNIX_NANO_TIME(datetime)
DAY_OF_YEAR(datetime)
WEEK_OF_YEAR(datetime)
ADD_YEAR(datetime, duration)
ADD_MONTH(datetime, duration)
ADD_DAY(datetime, duration)
ADD_HOUR(datetime, duration)
ADD_MINUTE(datetime, duration)
ADD_SECOND(datetime, duration)
ADD_MILLI(datetime, duration)
ADD_MICRO(datetime, duration)
ADD_NANO(datetime, duration)
TRUNC_MONTH(datetime)
TRUNC_DAY(datetime)
TRUNC_TIME(datetime)
TRUNC_MINUTE(datetime)
TRUNC_SECOND(datetime)
TRUNC_MILLI(datetime)
TRUNC_MICRO(datetime)
TRUNC_NANO(datetime)
DATE_DIFF(datetime1, datetime2)
TIME_DIFF(datetime1, datetime2)
TIME_NANO_DIFF(datetime1, datetime2)
UTC(datetime)
MILLI_TO_DATETIME(unix_milliseconds)
NANO_TO_DATETIME(unix_nano_time)

字符串函数(20多个)

TRIM(str)
LTRIM(str)
RTRIM(str)
UPPER(str)
LOWER(str)
BASE64_ENCODE(str)
BASE64_DECODE(str)
HEX_ENCODE(str)
HEX_DECODE(str)
LEN(str)
SUBSTRING(str, position [, len])
SUBSTR(str, position [, len])
INSTR(str, substr)
LIST_ELEM(str, sep, index)
REPLACE(str, old, new)
REGEXP_MATCH(str, regexp [, flags])
REGEXP_FIND(str, regexp [, flags])
TITLE_CASE(str)

加密函数

MD5(str) 	
SHA1(str) 	
SHA256(str) 	
SHA512(str) 	
MD5_HMAC(str,key_str) 	
SHA1_HMAC(str,key_str)  
SHA256_HMAC(str,key_str)  	
SHA512_HMAC(str,key_str)  	

类型转换函数

STRING(value)
INTEGER(value)
FLOAT(value)
DATETIME(value [, timezone])
BOOLEAN(value)

聚合函数

COUNT([DISTINCT] *)
MIN(expr)
MAX(expr)
SUM([DISTINCT] expr)
AVG([DISTINCT] expr)

参考资料

  etl-engine使用手册(https://github.com/hw2499/etl-engine)
  etl-crontab使用手册(https://github.com/hw2499/etl-engine/wiki/etl-crontab%E8%B0%83%E5%BA%A6)
  嵌入脚本开发(https://github.com/hw2499/etl-engine/wiki/B-%E5%B5%8C%E5%85%A5%E8%84%9A%E6%9C%AC%E5%BC%80%E5%8F%91)
  etl-engine配置样例(https://github.com/hw2499/etl-engine/wiki/etl-engine%E4%BD%BF%E7%94%A8%E6%A0%B7%E4%BE%8B)