diff --git a/README.md b/README.md index 3241a57a5..2fb722c6d 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,7 @@ > > * 自定义create view 语法 > > * 自定义create function 语法 > > * 实现了流与维表的join +> > * 支持原生FLinkSQL所有的语法 # 已支持 * 源表:kafka 0.9,1.x版本 @@ -14,6 +15,8 @@ * 增加oracle维表,结果表功能 * 增加SQlServer维表,结果表功能 * 增加kafka结果表功能 + * 增加SQL支持CEP + * 维表快照 ## 1 快速起步 ### 1.1 运行模式 @@ -26,7 +29,7 @@ ### 1.2 执行环境 * Java: JDK8及以上 -* Flink集群: 1.4(单机模式不需要安装Flink集群) +* Flink集群: 1.4,1.5(单机模式不需要安装Flink集群) * 操作系统:理论上不限 ### 1.3 打包 @@ -44,12 +47,12 @@ mvn clean package -Dmaven.test.skip #### 1.4.1 启动命令 ``` -sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack/150_flinkplugin/sqlplugin -localSqlPluginPath D:\gitspace\flinkStreamSQL\plugins -mode yarn -flinkconf D:\flink_home\kudu150etc -yarnconf D:\hadoop\etc\hadoopkudu -confProp {\"time.characteristic\":\"EventTime\",\"sql.checkpoint.interval\":10000} +sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack/150_flinkplugin/sqlplugin -localSqlPluginPath D:\gitspace\flinkStreamSQL\plugins -addjar \["udf.jar\"\] -mode yarn -flinkconf D:\flink_home\kudu150etc -yarnconf D:\hadoop\etc\hadoopkudu -confProp \{\"time.characteristic\":\"EventTime\",\"sql.checkpoint.interval\":10000\} ``` #### 1.4.2 命令行参数选项 -* **model** +* **mode** * 描述:执行模式,也就是flink集群的工作模式 * local: 本地模式 * standalone: 独立部署模式的flink集群 @@ -79,6 +82,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack * **addjar** * 描述:扩展jar路径,当前主要是UDF定义的jar; + * 格式:json * 必选:否 * 默认值:无 @@ -139,12 +143,16 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack ## 3 样例 ``` + +CREATE (scala|table) FUNCTION CHARACTER_LENGTH WITH com.dtstack.Kun + + CREATE TABLE MyTable( name varchar, channel varchar, pv int, xctime bigint, - CHARACTER_LENGTH(channel) AS timeLeng + CHARACTER_LENGTH(channel) AS timeLeng //自定义的函数 )WITH( type ='kafka09', bootstrapServers ='172.16.8.198:9092', @@ -182,7 +190,7 @@ CREATE TABLE sideTable( cf:name varchar as name, cf:info varchar as info, PRIMARY KEY(name), - PERIOD FOR SYSTEM_TIME + PERIOD FOR SYSTEM_TIME //维表标识 )WITH( type ='hbase', zookeeperQuorum ='rdos1:2181', diff --git a/core/src/main/java/com/dtstack/flink/sql/Main.java b/core/src/main/java/com/dtstack/flink/sql/Main.java index a2f149018..610abf21f 100644 --- a/core/src/main/java/com/dtstack/flink/sql/Main.java +++ b/core/src/main/java/com/dtstack/flink/sql/Main.java @@ -91,8 +91,6 @@ public class Main { private static final Logger LOG = LoggerFactory.getLogger(Main.class); - private static final String LOCAL_MODE = "local"; - private static final int failureRate = 3; private static final int failureInterval = 6; //min @@ -141,7 +139,7 @@ public static void main(String[] args) throws Exception { Thread.currentThread().setContextClassLoader(dtClassLoader); URLClassLoader parentClassloader; - if(!LOCAL_MODE.equals(deployMode)){ + if(!ClusterMode.local.name().equals(deployMode)){ parentClassloader = (URLClassLoader) threadClassLoader.getParent(); }else{ parentClassloader = dtClassLoader; @@ -313,7 +311,7 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en } private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws IOException { - StreamExecutionEnvironment env = !LOCAL_MODE.equals(deployMode) ? + StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ? StreamExecutionEnvironment.getExecutionEnvironment() : new MyLocalStreamEnvironment(); diff --git a/docs/kafkaSource.md b/docs/kafkaSource.md index 2968dbe22..f382ba9ad 100644 --- a/docs/kafkaSource.md +++ b/docs/kafkaSource.md @@ -38,7 +38,7 @@ CREATE TABLE tableName( |bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|是|| |zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|是|| |topic | 需要读取的 topic 名称|是|| -|offsetReset | 读取的topic 的offset初始位置[latest\|earliest]|否|latest| +|offsetReset | 读取的topic 的offset初始位置[latest\|earliest\|指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})]|否|latest| |parallelism | 并行度设置|否|1| ## 5.样例: diff --git a/kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java b/kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java index 8486b9883..00988518a 100644 --- a/kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java +++ b/kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java @@ -23,16 +23,20 @@ import com.dtstack.flink.sql.source.IStreamSourceGener; import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo; import com.dtstack.flink.sql.table.SourceTableInfo; +import com.dtstack.flink.sql.util.PluginUtil; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; /** @@ -76,7 +80,20 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv //earliest,latest if("earliest".equalsIgnoreCase(kafka09SourceTableInfo.getOffsetReset())){ kafkaSrc.setStartFromEarliest(); - }else{ + }else if(kafka09SourceTableInfo.getOffsetReset().startsWith("{")){ + try { + // {"0":12312,"1":12321,"2":12312} + Properties properties = PluginUtil.jsonStrToObject(kafka09SourceTableInfo.getOffsetReset(), Properties.class); + Map offsetMap = PluginUtil.ObjectToMap(properties); + Map specificStartupOffsets = new HashMap<>(); + for(Map.Entry entry:offsetMap.entrySet()){ + specificStartupOffsets.put(new KafkaTopicPartition(topicName,Integer.valueOf(entry.getKey())),Long.valueOf(entry.getValue().toString())); + } + kafkaSrc.setStartFromSpecificOffsets(specificStartupOffsets); + } catch (Exception e) { + throw new RuntimeException("not support offsetReset type:" + kafka09SourceTableInfo.getOffsetReset()); + } + }else { kafkaSrc.setStartFromLatest(); } diff --git a/kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java b/kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java index ffc42f7ef..c9eaf05bc 100644 --- a/kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java +++ b/kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java @@ -23,15 +23,19 @@ import com.dtstack.flink.sql.source.IStreamSourceGener; import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo; import com.dtstack.flink.sql.table.SourceTableInfo; +import com.dtstack.flink.sql.util.PluginUtil; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; /** @@ -75,7 +79,20 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv //earliest,latest if("earliest".equalsIgnoreCase(kafka010SourceTableInfo.getOffsetReset())){ kafkaSrc.setStartFromEarliest(); - }else{ + }else if(kafka010SourceTableInfo.getOffsetReset().startsWith("{")){ + try { + // {"0":12312,"1":12321,"2":12312} + Properties properties = PluginUtil.jsonStrToObject(kafka010SourceTableInfo.getOffsetReset(), Properties.class); + Map offsetMap = PluginUtil.ObjectToMap(properties); + Map specificStartupOffsets = new HashMap<>(); + for(Map.Entry entry:offsetMap.entrySet()){ + specificStartupOffsets.put(new KafkaTopicPartition(topicName,Integer.valueOf(entry.getKey())),Long.valueOf(entry.getValue().toString())); + } + kafkaSrc.setStartFromSpecificOffsets(specificStartupOffsets); + } catch (Exception e) { + throw new RuntimeException("not support offsetReset type:" + kafka010SourceTableInfo.getOffsetReset()); + } + }else { kafkaSrc.setStartFromLatest(); } diff --git a/kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java b/kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java index 8bfec8107..5f6146cc9 100644 --- a/kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java +++ b/kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java @@ -23,15 +23,19 @@ import com.dtstack.flink.sql.source.IStreamSourceGener; import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo; import com.dtstack.flink.sql.table.SourceTableInfo; +import com.dtstack.flink.sql.util.PluginUtil; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; /** @@ -75,6 +79,19 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv //earliest,latest if("earliest".equalsIgnoreCase(kafka011SourceTableInfo.getOffsetReset())){ kafkaSrc.setStartFromEarliest(); + }else if(kafka011SourceTableInfo.getOffsetReset().startsWith("{")){ + try { + // {"0":12312,"1":12321,"2":12312} + Properties properties = PluginUtil.jsonStrToObject(kafka011SourceTableInfo.getOffsetReset(), Properties.class); + Map offsetMap = PluginUtil.ObjectToMap(properties); + Map specificStartupOffsets = new HashMap<>(); + for(Map.Entry entry:offsetMap.entrySet()){ + specificStartupOffsets.put(new KafkaTopicPartition(topicName,Integer.valueOf(entry.getKey())),Long.valueOf(entry.getValue().toString())); + } + kafkaSrc.setStartFromSpecificOffsets(specificStartupOffsets); + } catch (Exception e) { + throw new RuntimeException("not support offsetReset type:" + kafka011SourceTableInfo.getOffsetReset()); + } }else{ kafkaSrc.setStartFromLatest(); } diff --git a/mysql/mysql-side/mysql-all-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAllReqRow.java b/mysql/mysql-side/mysql-all-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAllReqRow.java index 812223a46..a8839e951 100644 --- a/mysql/mysql-side/mysql-all-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAllReqRow.java +++ b/mysql/mysql-side/mysql-all-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAllReqRow.java @@ -71,7 +71,6 @@ public MysqlAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List cacheInfo = (Map) sideInput; diff --git a/redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java b/redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java index 08c53a8a7..a541c9300 100644 --- a/redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java +++ b/redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java @@ -144,14 +144,14 @@ public void asyncInvoke(Row input, ResultFuture resultFuture) throws Except List value = async.keys(key + ":*").get(); String[] values = value.toArray(new String[value.size()]); RedisFuture>> future = async.mget(values); - while (future.isDone()){ - try { - List> kvList = future.get(); - if (kvList.size() != 0){ - for (int i=0; i>>() { + @Override + public void accept(List> keyValues) { + if (keyValues.size() != 0){ + for (int i=0; i resultFuture) throws Except putCache(key, CacheMissVal.getMissKeyObj()); } } - } catch (InterruptedException e1) { - e1.printStackTrace(); - } catch (ExecutionException e1) { - e1.printStackTrace(); } - } - + }); } private String buildCacheKey(List keyData) {