11## 1.格式:
22```
3- 数据现在支持json格式{"xx":"bb","cc":"dd"}
43
54CREATE TABLE tableName(
65 colName colType,
@@ -15,9 +14,8 @@ CREATE TABLE tableName(
1514 topic ='topicName',
1615 groupId='test',
1716 parallelism ='parllNum',
18- --timezone='America/Los_Angeles',
1917 timezone='Asia/Shanghai',
20- sourcedatatype ='json ' #可不设置
18+ sourcedatatype ='dt_nest ' #可不设置
2119 );
2220```
2321
@@ -47,7 +45,9 @@ CREATE TABLE tableName(
4745|topicIsPattern | topic是否是正则表达式格式(true| ; false) |否| false
4846| offsetReset | 读取的topic 的offset初始位置[ latest| ; earliest| ; 指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no": offset_value })] | 否| latest|
4947| parallelism | 并行度设置| 否| 1|
50- | sourcedatatype | 数据类型| 否| json|
48+ | sourcedatatype | 数据类型,avro,csv,json,dt_nest。dt_nest为默认JSON解析器,能够解析嵌套JSON数据类型,其他仅支持非嵌套格式| 否| dt_nest|
49+ | schemaInfo | avro类型使用的schema信息| 否||
50+ | fieldDelimiter | csv类型使用的数据分隔符| 否| | |
5151|timezone|时区设置[ timezone支持的参数] ( timeZone.md ) |否|'Asia/Shanghai'
5252** kafka相关参数可以自定义,使用kafka.开头即可。**
5353```
@@ -169,24 +169,10 @@ CREATE TABLE MyTable(
169169 parallelism ='1'
170170 );
171171```
172- # 二、csv格式数据源
173- 根据字段分隔符进行数据分隔,按顺序匹配sql中配置的列。如数据分隔列数和sql中配置的列数相等直接匹配;如不同参照lengthcheckpolicy策略处理。
174- ## 1.参数:
175-
176- | 参数名称| 含义| 是否必填| 默认值|
177- | ----| ---| ---| ---|
178- | type | kafka09 | 是||
179- | bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)| 是||
180- | zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)| 是||
181- | topic | 需要读取的 topic 名称| 是||
182- | offsetReset | 读取的topic 的offset初始位置[ latest| ; earliest] | 否| latest|
183- | parallelism | 并行度设置 | 否| 1|
184- | sourcedatatype | 数据类型| 是 | csv|
185- | fielddelimiter | 字段分隔符| 是 ||
186- | lengthcheckpolicy | 单行字段条数检查策略 | 否| 可选,默认为SKIP,其它可选值为EXCEPTION、PAD。SKIP:字段数目不符合时跳过 。EXCEPTION:字段数目不符合时抛出异常。PAD:按顺序填充,不存在的置为null。|
187- ** kafka相关参数可以自定义,使用kafka.开头即可。**
188172
189- ## 2.样例:
173+ ## 7.csv格式数据源
174+
175+
190176```
191177CREATE TABLE MyTable(
192178 name varchar,
@@ -203,186 +189,28 @@ CREATE TABLE MyTable(
203189 --topic ='mqTest.*',
204190 --topicIsPattern='true'
205191 parallelism ='1',
206- sourcedatatype ='csv',
207- fielddelimiter ='\|',
208- lengthcheckpolicy = 'PAD'
192+ sourceDatatype ='csv'
209193 );
210194 ```
211- # 三、text格式数据源UDF自定义拆分
212- Kafka源表数据解析流程:Kafka Source Table -> UDTF ->Realtime Compute -> SINK。从Kakfa读入的数据,都是VARBINARY(二进制)格式,对读入的每条数据,都需要用UDTF将其解析成格式化数据。
213- 与其他格式不同,本格式定义DDL必须与以下SQL一摸一样,表中的五个字段顺序务必保持一致:
214-
215- ## 1. 定义源表,注意:kafka源表DDL字段必须与以下例子一模一样。WITH中参数可改。
216- ```
217- create table kafka_stream(
218- _topic STRING,
219- _messageKey STRING,
220- _message STRING,
221- _partition INT,
222- _offset BIGINT,
223- ) with (
224- type ='kafka09',
225- bootstrapServers ='172.16.8.198:9092',
226- zookeeperQuorum ='172.16.8.198:2181/kafka',
227- offsetReset ='latest',
228- topic ='nbTest1',
229- parallelism ='1',
230- sourcedatatype='text'
231- )
232- ```
233- ## 2.参数:
234-
235- | 参数名称| 含义| 是否必填| 默认值|
236- | ----| ---| ---| ---|
237- | type | kafka09 | 是||
238- | bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)| 是||
239- | zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)| 是||
240- | topic | 需要读取的 topic 名称| 是||
241- | offsetReset | 读取的topic 的offset初始位置[ latest| ; earliest] | 否| latest|
242- | parallelism | 并行度设置| 否| 1|
243- | sourcedatatype | 数据类型| 否| text|
244- ** kafka相关参数可以自定义,使用kafka.开头即可。**
195+ ## 8.avro格式数据源
245196
246- ## 2.自定义:
247- 从kafka读出的数据,需要进行窗口计算。 按照实时计算目前的设计,滚窗/滑窗等窗口操作,需要(且必须)在源表DDL上定义Watermark。Kafka源表比较特殊。如果要以kafka中message字段中的的Event Time进行窗口操作,
248- 需要先从message字段,使用UDX解析出event time,才能定义watermark。 在kafka源表场景中,需要使用计算列。 假设,kafka中写入的数据如下:
249- 2018-11-11 00:00:00|1|Anna|female整个计算流程为:Kafka SOURCE->UDTF->Realtime Compute->RDS SINK(单一分隔符可直接使用类csv格式模板,自定义适用于更复杂的数据类型,本说明只做参考)
250-
251- ** SQL**
252197```
253- -- 定义解析Kakfa message的UDTF
254- CREATE FUNCTION kafkapaser AS 'com.XXXX.kafkaUDTF';
255- CREATE FUNCTION kafkaUDF AS 'com.XXXX.kafkaUDF';
256- -- 定义源表,注意:kafka源表DDL字段必须与以下例子一模一样。WITH中参数可改。
257- create table kafka_src (
258- _topic STRING,
259- _messageKey STRING,
260- _message STRING,
261- _partition INT,
262- _offset BIGINT,
263- ctime AS TO_TIMESTAMP(kafkaUDF(_message)), -- 定义计算列,计算列可理解为占位符,源表中并没有这一列,其中的数据可经过下游计算得出。注意计算里的类型必须为timestamp才能在做watermark。
264- watermark for ctime as withoffset(ctime,0) -- 在计算列上定义watermark
265- ) WITH (
266- type = 'kafka010', -- Kafka Source类型,与Kafka版本强相关,目前支持的Kafka版本请参考本文档
267- topic = 'test_kafka_topic',
268- ...
269- );
270- create table rds_sink (
271- name VARCHAR,
272- age INT,
273- grade VARCHAR,
274- updateTime TIMESTAMP
275- ) WITH(
276- type='mysql',
277- url='jdbc:mysql://localhost:3306/test',
278- tableName='test4',
279- userName='test',
280- password='XXXXXX'
198+ CREATE TABLE MyTable(
199+ channel varchar,
200+ pv varchar
201+ --xctime bigint
202+ )WITH(
203+ type='kafka',
204+ bootstrapServers='172.16.8.107:9092',
205+ groupId='mqTest01',
206+ offsetReset='latest',
207+ topic='mqTest01',
208+ parallelism ='1',
209+ topicIsPattern ='false',
210+ kafka.group.id='mqTest',
211+ sourceDataType ='avro',
212+ schemaInfo = '{"type":"record","name":"MyResult","fields":[{"name":"channel","type":"string"},{"name":"pv","type":"string"}]}'
281213 );
282- -- 使用UDTF,将二进制数据解析成格式化数据
283- CREATE VIEW input_view (
284- name,
285- age,
286- grade,
287- updateTime
288- ) AS
289- SELECT
290- COUNT(*) as cnt,
291- T.ctime,
292- T.order,
293- T.name,
294- T.sex
295- from
296- kafka_src as S,
297- LATERAL TABLE (kafkapaser _message)) as T (
298- ctime,
299- order,
300- name,
301- sex
302- )
303- Group BY T.sex,
304- TUMBLE(ctime, INTERVAL '1' MINUTE);
305- -- 对input_view中输出的数据做计算
306- CREATE VIEW view2 (
307- cnt,
308- sex
309- ) AS
310- SELECT
311- COUNT(*) as cnt,
312- T.sex
313- from
314- input_view
315- Group BY sex, TUMBLE(ctime, INTERVAL '1' MINUTE);
316- -- 使用解析出的格式化数据进行计算,并将结果输出到RDS中
317- insert into rds_sink
318- SELECT
319- cnt,sex
320- from view2;
321- ```
322- ** UDF&UDTF**
214+
323215```
324- package com.XXXX;
325- import com.XXXX.fastjson.JSONObject;
326- import org.apache.flink.table.functions.TableFunction;
327- import org.apache.flink.table.types.DataType;
328- import org.apache.flink.table.types.DataTypes;
329- import org.apache.flink.types.Row;
330- import java.io.UnsupportedEncodingException;
331- /**
332- 以下例子解析输入Kafka中的JSON字符串,并将其格式化输出
333- **/
334- public class kafkaUDTF extends TableFunction<Row> {
335- public void eval(byte[] message) {
336- try {
337- // 读入一个二进制数据,并将其转换为String格式
338- String msg = new String(message, "UTF-8");
339- // 提取JSON Object中各字段
340- String ctime = Timestamp.valueOf(data.split('\\|')[0]);
341- String order = data.split('\\|')[1];
342- String name = data.split('\\|')[2];
343- String sex = data.split('\\|')[3];
344- // 将解析出的字段放到要输出的Row()对象
345- Row row = new Row(4);
346- row.setField(0, ctime);
347- row.setField(1, age);
348- row.setField(2, grade);
349- row.setField(3, updateTime);
350- System.out.println("Kafka message str ==>" + row.toString());
351- // 输出一行
352- collect(row);
353- } catch (ClassCastException e) {
354- System.out.println("Input data format error. Input data " + msg + "is not json string");
355- }
356- } catch (UnsupportedEncodingException e) {
357- e.printStackTrace();
358- }
359- }
360- @Override
361- // 如果返回值是Row,就必须重载实现这个方法,显式地告诉系统返回的字段类型
362- // 定义输出Row()对象的字段类型
363- public DataType getResultType(Object[] arguments, Class[] argTypes) {
364- return DataTypes.createRowType(DataTypes.TIMESTAMP,DataTypes.STRING, DataTypes.Integer, DataTypes.STRING,DataTypes.STRING);
365- }
366- }
367-
368- package com.dp58;
369- package com.dp58.sql.udx;
370- import org.apache.flink.table.functions.FunctionContext;
371- import org.apache.flink.table.functions.ScalarFunction;
372- public class KafkaUDF extends ScalarFunction {
373- // 可选,open方法可以不写
374- // 需要import org.apache.flink.table.functions.FunctionContext;
375- public String eval(byte[] message) {
376- // 读入一个二进制数据,并将其转换为String格式
377- String msg = new String(message, "UTF-8");
378- return msg.split('\\|')[0];
379- }
380- public long eval(String b, String c) {
381- return eval(b) + eval(c);
382- }
383- //可选,close方法可以不写
384- @Override
385- public void close() {
386- }
387- }
388- ```
216+
0 commit comments