New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark sql在喜马拉雅的使用之xql #21

Open
cjuexuan opened this Issue Dec 15, 2016 · 13 comments

Comments

Projects
None yet
5 participants
@cjuexuan
Owner

cjuexuan commented Dec 15, 2016

spark sql在喜马拉雅的使用之xql

什么是xql

在大数据领域,发展到今天hive依然是一个非常有意义的项目,因为它把工程师都懂的sql直接用到了大数据领域,但是它也存在一些问题,比如运行速度慢这个是最明显的,大数据领域中的先驱者们都为之做了各种优化,和hive集成的项目还是非常多的。
spark作为目前一站式的数据处理框架,在etl,数据分析与统计,流式处理,机器学习中都展现了很强的竞争力,而spark sql是目前spark社区中投入最多,而且期望最高的一个核心处理组件,由于可以使用现有和hive表,共用hive的metadata,而且使用spark sql能很大程度上提高我们的生产力,所以我们将sql也带到了喜马拉雅,并且开发了一套基于spark sql上层的dsl engine,用于处理etl和数据分析这一块,本文主要介绍我们对spark sql做的一些封装和落地

在使用场景中,我们主要总结了数据分析工程师和数据挖掘工程师的使用习惯,总结了以下需求

  1. bi:我现在有个报表需求,用到的关联表大概有一张mysql的表,两张hive表,和一张hbase表,以及一堆xdcs的落盘日志,这个报表很关键,需要马上出,还要能分享给运营
  2. 数据挖掘工程师:我要分析用户行为,需要做数据探索,还要能画图,我还希望有python的api
  3. 数仓:这个数据数据分析工程师每天都要用,原始数据太多,我需要按照时间每天跑一些etl任务,完成一些导数据的需求
  4. bi实习生:今天小伙伴给我个sql模版,让我写个统计,可是他的sql嵌套太多,我根本看不懂,怎么能写出可读性很好的sql呢

基于这些需求,我们调研了市面上的一些产品,presto在处理数据源管理上还是做的不错的,但他的展示显然达不到算法工程师的要求,apache zeppelin展示可以达到我们的要求,交互也还可以,但处理数据源的关联做的不够好,而且很多时候需求写python代码或者scala代码,而bi更擅长的是写sql,所以也不能完全满足我们的要求。针对以上需求我们整理总结,决定开发一套自己的系统,用于解决数据源的关联问题,而且要提供一些http接口给其他语言的api进行交互,而且类型存储过程的sql编程显然也是我们想要的,我们可以用一批sql去描述我们的逻辑,而不是写一个没人能看懂的复杂sql,所以我们开发了一个叫xql的项目

xql是一个data flow language,我们可以把一行sql的结果注册一个临时表,而且这个临时表可以在后面使用,这使得我们需要写嵌套query的概率大大的降低了。sql这个东西就是简单的query很简单。一旦开始嵌套了,就离地狱不远了

嵌套

语法简介

我们在常规的sql的query语法中又加入两个的dsl的语法:load和save

  • load用于加载数据源,也就是去标明一份可用的数据源
  • save用户保存结果,可以将query结果落地到存储介质,以便多次使用

load 语法

load format.`params` (schema) condition as tableName

其中format就是我们加载的数据源的格式,比如jdbc,hbase等数据格式,而params表示这种格式下的特殊含义,比如在jdbc中这里就是实例名.库名.表名,而在文件格式下这里就是路径,在hbase中则是表名,schema则是一个映射关系,比如hbase是一个二级的map,那我们要变成sql中平铺关系,就需要一个这样的映射关系,再比如我们的数据是存的string,我在spark中想变成int类型,那么也可以在这里指定类型,如
(:uid as uid,cf1:name as realname,cf1:age int),而condition主要用来做数据的filter,比如我的hdfs路径按照天存,我们就关心2016/01/01到2016/01/10这几天的数据,虽然本身有partition discovery,但我们还是提供了另一种方式,比如where path between '?' and '?'这种方式去进行筛选,这个同样可以用在hbase中,在hbase中则可以指定rowKey的范围来作为一个scan的filter,而在jdbc中,这就会作为条件被拼接到jdbc的查询语句中,而最后的tableName则是spark sql的一个tempTable的名字,因为我们会按照用户的逻辑注册tempTable,所以这个名字也就是你在后续逻辑中可以使用的一个名字

save 语法

save saveMode tableName as format.`params` partitionBy column1,coulmn2,udf(column3)

其中format和params和之前作用是一致的,而tableName则是一张已经存在的表名,而saveMode则如果保存的目标位置有数据时候的策略,比如是追加还是覆盖,或者报错,最后的partitionBy则用于分区,这个选项可以在落盘到hdfs时候按照一些字段和udf进行分区落盘

query语法

query语法和原生的spark sql基本类似,由于我们需要将查询的结果注册成表,所以在已有sql语句后面加入了一个as tempTable的语法,类似select * from xxxxx as my_temp_table_name,这个my_temp_table_name在后续逻辑中就可以被复用

综合的一个小demo

load jdbc.`mysql1.tb_v_user` as mysql_tb_user;
select * from mysql_tb_user limit 100 as result_csv;
save result_csv as csv.`/tmp/todd/csv_test`;
load csv.`/tmp/todd/csv_test` as csv_input;
select * from csv_input limit 10 as csv_input_result;
save csv_input_result as json.`/tmp/todd/result_json` partitionBy uid;

组件介绍

  1. xql-engine 主要用于dsl的解析执行以及与spark sql的交互
  2. xql-transport主要提供了与本地jvm和跑在yarn上的spark dirver jvm的通信,对外提供了提交任务和查询执行逻辑的接口,业务方可以直接使用而不用关注通信层面
  3. xql-rest-wrapper基于通信模块进一步封装,主要用于rest接口的包装,因为我们的web提供了多种部署方式,所以抽象service逻辑,放在该组件中
  4. xql-play-http 基于playframework的http模块,比较重量级一点,通常用于生产环境
  5. xql-akka-http 基于akka-http的http模块,比较轻量级一点,通常用于ide内部调试以及测试环境测试,由于我们用maven进行项目管理,而play和maven的集成还是比较差的,调试起来很不方便,每次都需要打包,所以有了这个项目
  6. xql-cli 主要基于jline开发了一个cli工具,用于调试和命令行操作,可以指定任意部署的xql-server实例,通信都通过http

如何解决我们设计之初的痛点问题

  1. 针对bi部门关联任意数据源的需求,我们可以通过load不同数据源很容易的解决这个痛点
  2. 针对数据挖掘工程师对数据进行探索的需求,我们通过python走http接口进行交互,而数据挖掘工程师的操作都在Jupyter中,他们可以用pandas等高效的数据分析库进行数据探索和图表展示
  3. 针对etl的定时任务,我们可以通过rest接口和crontab进行定时任务的提交执行,通过save到不同数据源完成数据的清洗和转换工作(ps:现阶段的data-task-controller项目可基于data-task项目进行更为复杂的调度)
  4. 至于xql是step by step的,所以可读性就非常的好,但原生的sql也能使用,所以你依然可以写出神鬼莫测的sql,天使和魔鬼,xql让你有两个选择

效果图

image

image

image

image

image

@mumutu

This comment has been minimized.

mumutu commented Dec 15, 2016

GJ

@cjuexuan cjuexuan added the bigdata label Dec 16, 2016

@Rayn-liuwei

This comment has been minimized.

Rayn-liuwei commented Dec 29, 2016

期待开源

@cjuexuan cjuexuan referenced this issue Dec 31, 2016

Open

2016年总结 #23

14 of 19 tasks complete
@cjuexuan

This comment has been minimized.

Owner

cjuexuan commented Nov 30, 2017

@Zhifeiyu

This comment has been minimized.

Zhifeiyu commented Feb 8, 2018

期待开源

@Zhifeiyu

This comment has been minimized.

Zhifeiyu commented Feb 28, 2018

多个xql任务是串行还是并行的呢?多个sparksession?

@cjuexuan

This comment has been minimized.

Owner

cjuexuan commented Feb 28, 2018

@Zhifeiyu 多个sparkSession,另外你可以上面的ppt,我们架构现在升级成了cluster模式了

@Zhifeiyu

This comment has been minimized.

Zhifeiyu commented Feb 28, 2018

好的。tks。

@Zhifeiyu

This comment has been minimized.

Zhifeiyu commented Feb 28, 2018

怎么在一个jvm创建多个sparksession呢?
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true.

已经设置了true了,conf.set("spark.driver.allowMultipleContexts", "true");

@cjuexuan

This comment has been minimized.

Owner

cjuexuan commented Feb 28, 2018

@Zhifeiyu 你没搞清楚sparkContext和sparkSession的区别吧,建议把官方文档过一下

@Rayn-liuwei

This comment has been minimized.

Rayn-liuwei commented Mar 21, 2018

@Zhifeiyu 在 SparkSession类里面有一个这个方法:public SparkSession newSession(),,你可以参考 API :http://spark.apache.org/docs/latest/api/java/index.html

@Zhifeiyu

This comment has been minimized.

Zhifeiyu commented Mar 22, 2018

@Rayn-liuwei 好的。谢了

@junjiem

This comment has been minimized.

junjiem commented Jun 13, 2018

@cjuexuan 有准备什么时候开源出来吗

@cjuexuan

This comment has been minimized.

Owner

cjuexuan commented Jun 15, 2018

@junjiem 在公司提过,只能说我们还在努力争取中,但是思路是可以开源的,这里或者邮件都可以问我

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment