diff --git a/cassandra/cassandra-side/cassandra-all-side/pom.xml b/cassandra/cassandra-side/cassandra-all-side/pom.xml new file mode 100644 index 000000000..74c62afdb --- /dev/null +++ b/cassandra/cassandra-side/cassandra-all-side/pom.xml @@ -0,0 +1,88 @@ + + + + sql.side.cassandra + com.dtstack.flink + 1.0-SNAPSHOT + ../pom.xml + + 4.0.0 + + sql.side.all.cassandra + cassandra-all-side + + jar + + + + com.dtstack.flink + sql.side.cassandra.core + 1.0-SNAPSHOT + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 1.4 + + + package + + shade + + + + + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + maven-antrun-plugin + 1.2 + + + copy-resources + + package + + run + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java b/cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java new file mode 100644 index 000000000..e8ccc739f --- /dev/null +++ b/cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flink.sql.side.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.PoolingOptions; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SocketOptions; +import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import com.dtstack.flink.sql.side.AllReqRow; +import com.dtstack.flink.sql.side.FieldInfo; +import com.dtstack.flink.sql.side.JoinInfo; +import com.dtstack.flink.sql.side.SideTableInfo; +import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo; +import org.apache.calcite.sql.JoinType; +import org.apache.commons.collections.CollectionUtils; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.calcite.shaded.com.google.common.collect.Lists; +import org.apache.flink.calcite.shaded.com.google.common.collect.Maps; +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; +import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Reason: + * Date: 2018/11/22 + * + * @author xuqianjin + */ +public class CassandraAllReqRow extends AllReqRow { + + private static final long serialVersionUID = 54015343561288219L; + + private static final Logger LOG = LoggerFactory.getLogger(CassandraAllReqRow.class); + + private static final String cassandra_DRIVER = "com.cassandra.jdbc.Driver"; + + private static final int CONN_RETRY_NUM = 3; + + private static final int FETCH_SIZE = 1000; + + private transient Cluster cluster; + private transient Session session = null; + + private AtomicReference>>> cacheRef = new AtomicReference<>(); + + public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List outFieldInfoList, SideTableInfo sideTableInfo) { + super(new com.dtstack.flink.sql.side.cassandra.CassandraAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo)); + } + + @Override + protected Row fillData(Row input, Object sideInput) { + Map cacheInfo = (Map) sideInput; + Row row = new Row(sideInfo.getOutFieldInfoList().size()); + for (Map.Entry entry : sideInfo.getInFieldIndex().entrySet()) { + Object obj = input.getField(entry.getValue()); + boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass()); + + //Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long. + if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) { + obj = ((Timestamp) obj).getTime(); + } + row.setField(entry.getKey(), obj); + } + + for (Map.Entry entry : sideInfo.getSideFieldNameIndex().entrySet()) { + if (cacheInfo == null) { + row.setField(entry.getKey(), null); + } else { + row.setField(entry.getKey(), cacheInfo.get(entry.getValue())); + } + } + + return row; + } + + @Override + protected void initCache() throws SQLException { + Map>> newCache = Maps.newConcurrentMap(); + cacheRef.set(newCache); + loadData(newCache); + } + + @Override + protected void reloadCache() { + //reload cacheRef and replace to old cacheRef + Map>> newCache = Maps.newConcurrentMap(); + try { + loadData(newCache); + } catch (SQLException e) { + LOG.error("", e); + } + + cacheRef.set(newCache); + LOG.info("----- cassandra all cacheRef reload end:{}", Calendar.getInstance()); + } + + + @Override + public void flatMap(Row value, Collector out) throws Exception { + List inputParams = Lists.newArrayList(); + for (Integer conValIndex : sideInfo.getEqualValIndex()) { + Object equalObj = value.getField(conValIndex); + if (equalObj == null) { + out.collect(null); + } + + inputParams.add(equalObj); + } + + String key = buildKey(inputParams); + List> cacheList = cacheRef.get().get(key); + if (CollectionUtils.isEmpty(cacheList)) { + if (sideInfo.getJoinType() == JoinType.LEFT) { + Row row = fillData(value, null); + out.collect(row); + } else { + return; + } + + return; + } + + for (Map one : cacheList) { + out.collect(fillData(value, one)); + } + + } + + private String buildKey(List equalValList) { + StringBuilder sb = new StringBuilder(""); + for (Object equalVal : equalValList) { + sb.append(equalVal).append("_"); + } + + return sb.toString(); + } + + private String buildKey(Map val, List equalFieldList) { + StringBuilder sb = new StringBuilder(""); + for (String equalField : equalFieldList) { + sb.append(val.get(equalField)).append("_"); + } + + return sb.toString(); + } + + private Session getConn(CassandraSideTableInfo tableInfo) { + try { + if (session == null) { + QueryOptions queryOptions = new QueryOptions(); + //The default consistency level for queries: ConsistencyLevel.TWO. + queryOptions.setConsistencyLevel(ConsistencyLevel.QUORUM); + Integer maxRequestsPerConnection = tableInfo.getMaxRequestsPerConnection() == null ? 1 : tableInfo.getMaxRequestsPerConnection(); + Integer coreConnectionsPerHost = tableInfo.getCoreConnectionsPerHost() == null ? 8 : tableInfo.getCoreConnectionsPerHost(); + Integer maxConnectionsPerHost = tableInfo.getMaxConnectionsPerHost() == null ? 32768 : tableInfo.getMaxConnectionsPerHost(); + Integer maxQueueSize = tableInfo.getMaxQueueSize() == null ? 100000 : tableInfo.getMaxQueueSize(); + Integer readTimeoutMillis = tableInfo.getReadTimeoutMillis() == null ? 60000 : tableInfo.getReadTimeoutMillis(); + Integer connectTimeoutMillis = tableInfo.getConnectTimeoutMillis() == null ? 60000 : tableInfo.getConnectTimeoutMillis(); + Integer poolTimeoutMillis = tableInfo.getPoolTimeoutMillis() == null ? 60000 : tableInfo.getPoolTimeoutMillis(); + Integer cassandraPort = 0; + String address = tableInfo.getAddress(); + String userName = tableInfo.getUserName(); + String password = tableInfo.getPassword(); + String database = tableInfo.getDatabase(); + + ArrayList serversList = new ArrayList(); + //Read timeout or connection timeout Settings + SocketOptions so = new SocketOptions() + .setReadTimeoutMillis(readTimeoutMillis) + .setConnectTimeoutMillis(connectTimeoutMillis); + + //The cluster USES hostdistance.local in the same machine room + //Hostdistance. REMOTE is used for different machine rooms + //Ignore use HostDistance. IGNORED + PoolingOptions poolingOptions = new PoolingOptions() + //Each connection allows a maximum of 64 concurrent requests + .setMaxRequestsPerConnection(HostDistance.LOCAL, maxRequestsPerConnection) + //Have at least two connections to each machine in the cluster + .setCoreConnectionsPerHost(HostDistance.LOCAL, coreConnectionsPerHost) + //There are up to eight connections to each machine in the cluster + .setMaxConnectionsPerHost(HostDistance.LOCAL, maxConnectionsPerHost) + .setMaxQueueSize(maxQueueSize) + .setPoolTimeoutMillis(poolTimeoutMillis); + //重试策略 + RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE; + + for (String server : address.split(",")) { + cassandraPort = Integer.parseInt(server.split(":")[1]); + serversList.add(InetAddress.getByName(server.split(":")[0])); + } + + if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) { + cluster = Cluster.builder().addContactPoints(serversList).withRetryPolicy(retryPolicy) + .withPort(cassandraPort) + .withPoolingOptions(poolingOptions).withSocketOptions(so) + .withQueryOptions(queryOptions).build(); + } else { + cluster = Cluster.builder().addContactPoints(serversList).withRetryPolicy(retryPolicy) + .withPort(cassandraPort) + .withPoolingOptions(poolingOptions).withSocketOptions(so) + .withCredentials(userName, password) + .withQueryOptions(queryOptions).build(); + } + // 建立连接 连接已存在的键空间 + session = cluster.connect(database); + LOG.info("connect cassandra is successed!"); + } + } catch (Exception e) { + LOG.error("connect cassandra is error:" + e.getMessage()); + } + return session; + } + + + private void loadData(Map>> tmpCache) throws SQLException { + CassandraSideTableInfo tableInfo = (CassandraSideTableInfo) sideInfo.getSideTableInfo(); + Session session = null; + + try { + for (int i = 0; i < CONN_RETRY_NUM; i++) { + try { + session = getConn(tableInfo); + break; + } catch (Exception e) { + if (i == CONN_RETRY_NUM - 1) { + throw new RuntimeException("", e); + } + try { + String connInfo = "address:" + tableInfo.getAddress() + ";userName:" + tableInfo.getUserName() + + ",pwd:" + tableInfo.getPassword(); + LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo); + Thread.sleep(5 * 1000); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + + } + + //load data from table + String sql = sideInfo.getSqlCondition() + " limit " + FETCH_SIZE; + ResultSet resultSet = session.execute(sql); + String[] sideFieldNames = sideInfo.getSideSelectFields().split(","); + for (com.datastax.driver.core.Row row : resultSet) { + Map oneRow = Maps.newHashMap(); + for (String fieldName : sideFieldNames) { + oneRow.put(fieldName.trim(), row.getObject(fieldName.trim())); + } + String cacheKey = buildKey(oneRow, sideInfo.getEqualFieldList()); + List> list = tmpCache.computeIfAbsent(cacheKey, key -> Lists.newArrayList()); + list.add(oneRow); + } + } catch (Exception e) { + LOG.error("", e); + } finally { + try { + if (session != null) { + session.close(); + } + } catch (Exception e) { + LOG.error("Error while closing session.", e); + } + try { + if (cluster != null) { + cluster.close(); + } + } catch (Exception e) { + LOG.error("Error while closing cluster.", e); + } + } + } +} diff --git a/cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllSideInfo.java b/cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllSideInfo.java new file mode 100644 index 000000000..bba39fc3e --- /dev/null +++ b/cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllSideInfo.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flink.sql.side.cassandra; + +import com.dtstack.flink.sql.side.FieldInfo; +import com.dtstack.flink.sql.side.JoinInfo; +import com.dtstack.flink.sql.side.SideInfo; +import com.dtstack.flink.sql.side.SideTableInfo; +import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo; +import org.apache.calcite.sql.SqlBasicCall; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.commons.collections.CollectionUtils; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.calcite.shaded.com.google.common.collect.Lists; + +import java.util.List; + +/** + * Reason: + * Date: 2018/11/22 + * + * @author xuqianjin + */ +public class CassandraAllSideInfo extends SideInfo { + + private static final long serialVersionUID = -8690814317653033557L; + + public CassandraAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List outFieldInfoList, SideTableInfo sideTableInfo) { + super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo); + } + + @Override + public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) { + CassandraSideTableInfo cassandraSideTableInfo = (CassandraSideTableInfo) sideTableInfo; + + sqlCondition = "select ${selectField} from ${tableName} "; + sqlCondition = sqlCondition.replace("${tableName}", cassandraSideTableInfo.getDatabase() + "." + + cassandraSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields); + System.out.println("---------side_exe_sql-----\n" + sqlCondition); + } + + @Override + public void parseSelectFields(JoinInfo joinInfo) { + String sideTableName = joinInfo.getSideTableName(); + String nonSideTableName = joinInfo.getNonSideTable(); + List fields = Lists.newArrayList(); + + int sideIndex = 0; + for (int i = 0; i < outFieldInfoList.size(); i++) { + FieldInfo fieldInfo = outFieldInfoList.get(i); + if (fieldInfo.getTable().equalsIgnoreCase(sideTableName)) { + fields.add(fieldInfo.getFieldName()); + sideFieldIndex.put(i, sideIndex); + sideFieldNameIndex.put(i, fieldInfo.getFieldName()); + sideIndex++; + } else if (fieldInfo.getTable().equalsIgnoreCase(nonSideTableName)) { + int nonSideIndex = rowTypeInfo.getFieldIndex(fieldInfo.getFieldName()); + inFieldIndex.put(i, nonSideIndex); + } else { + throw new RuntimeException("unknown table " + fieldInfo.getTable()); + } + } + + if (fields.size() == 0) { + throw new RuntimeException("select non field from table " + sideTableName); + } + + //add join on condition field to select fields + SqlNode conditionNode = joinInfo.getCondition(); + + List sqlNodeList = Lists.newArrayList(); + if (conditionNode.getKind() == SqlKind.AND) { + sqlNodeList.addAll(Lists.newArrayList(((SqlBasicCall) conditionNode).getOperands())); + } else { + sqlNodeList.add(conditionNode); + } + + for (SqlNode sqlNode : sqlNodeList) { + dealOneEqualCon(sqlNode, sideTableName); + } + + if (CollectionUtils.isEmpty(equalFieldList)) { + throw new RuntimeException("no join condition found after table " + joinInfo.getLeftTableName()); + } + + for (String equalField : equalFieldList) { + if (fields.contains(equalField)) { + continue; + } + + fields.add(equalField); + } + + sideSelectFields = String.join(",", fields); + } +} diff --git a/cassandra/cassandra-side/cassandra-async-side/pom.xml b/cassandra/cassandra-side/cassandra-async-side/pom.xml new file mode 100644 index 000000000..cd709fecd --- /dev/null +++ b/cassandra/cassandra-side/cassandra-async-side/pom.xml @@ -0,0 +1,103 @@ + + + + sql.side.cassandra + com.dtstack.flink + 1.0-SNAPSHOT + ../pom.xml + + 4.0.0 + + sql.side.async.cassandra + + cassandra-async-side + + jar + + + + + + io.vertx + vertx-jdbc-client + 3.5.2 + + + + io.vertx + vertx-core + 3.5.2 + + + + com.dtstack.flink + sql.side.cassandra.core + 1.0-SNAPSHOT + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 1.4 + + + package + + shade + + + + + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + maven-antrun-plugin + 1.2 + + + copy-resources + + package + + run + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java b/cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java new file mode 100644 index 000000000..94c8e6fb6 --- /dev/null +++ b/cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package com.dtstack.flink.sql.side.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.PoolingOptions; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SocketOptions; +import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import com.dtstack.flink.sql.enums.ECacheContentType; +import com.dtstack.flink.sql.side.AsyncReqRow; +import com.dtstack.flink.sql.side.CacheMissVal; +import com.dtstack.flink.sql.side.FieldInfo; +import com.dtstack.flink.sql.side.JoinInfo; +import com.dtstack.flink.sql.side.SideTableInfo; +import com.dtstack.flink.sql.side.cache.CacheObj; +import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo; +import com.google.common.base.Function; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.vertx.core.json.JsonArray; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * Reason: + * Date: 2018/11/22 + * + * @author xuqianjin + */ +public class CassandraAsyncReqRow extends AsyncReqRow { + + private static final long serialVersionUID = 6631584128079864735L; + + private static final Logger LOG = LoggerFactory.getLogger(CassandraAsyncReqRow.class); + + private final static int DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE = 10; + + private final static int DEFAULT_VERTX_WORKER_POOL_SIZE = 20; + + private final static int DEFAULT_MAX_DB_CONN_POOL_SIZE = 20; + + private transient Cluster cluster; + private transient ListenableFuture session; + private transient CassandraSideTableInfo cassandraSideTableInfo; + + public CassandraAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List outFieldInfoList, SideTableInfo sideTableInfo) { + super(new com.dtstack.flink.sql.side.cassandra.CassandraAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo)); + } + + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + cassandraSideTableInfo = (CassandraSideTableInfo) sideInfo.getSideTableInfo(); + connCassandraDB(cassandraSideTableInfo); + } + + private void connCassandraDB(CassandraSideTableInfo tableInfo) { + try { + if (session == null) { + QueryOptions queryOptions = new QueryOptions(); + //The default consistency level for queries: ConsistencyLevel.TWO. + queryOptions.setConsistencyLevel(ConsistencyLevel.QUORUM); + Integer maxRequestsPerConnection = tableInfo.getMaxRequestsPerConnection() == null ? 1 : tableInfo.getMaxRequestsPerConnection(); + Integer coreConnectionsPerHost = tableInfo.getCoreConnectionsPerHost() == null ? 8 : tableInfo.getCoreConnectionsPerHost(); + Integer maxConnectionsPerHost = tableInfo.getMaxConnectionsPerHost() == null ? 32768 : tableInfo.getMaxConnectionsPerHost(); + Integer maxQueueSize = tableInfo.getMaxQueueSize() == null ? 100000 : tableInfo.getMaxQueueSize(); + Integer readTimeoutMillis = tableInfo.getReadTimeoutMillis() == null ? 60000 : tableInfo.getReadTimeoutMillis(); + Integer connectTimeoutMillis = tableInfo.getConnectTimeoutMillis() == null ? 60000 : tableInfo.getConnectTimeoutMillis(); + Integer poolTimeoutMillis = tableInfo.getPoolTimeoutMillis() == null ? 60000 : tableInfo.getPoolTimeoutMillis(); + Integer cassandraPort = 0; + String address = tableInfo.getAddress(); + String userName = tableInfo.getUserName(); + String password = tableInfo.getPassword(); + String database = tableInfo.getDatabase(); + + ArrayList serversList = new ArrayList(); + //Read timeout or connection timeout Settings + SocketOptions so = new SocketOptions() + .setReadTimeoutMillis(readTimeoutMillis) + .setConnectTimeoutMillis(connectTimeoutMillis); + + //The cluster USES hostdistance.local in the same machine room + //Hostdistance. REMOTE is used for different machine rooms + //Ignore use HostDistance. IGNORED + PoolingOptions poolingOptions = new PoolingOptions() + //Each connection allows a maximum of 64 concurrent requests + .setMaxRequestsPerConnection(HostDistance.LOCAL, maxRequestsPerConnection) + //Have at least two connections to each machine in the cluster + .setCoreConnectionsPerHost(HostDistance.LOCAL, coreConnectionsPerHost) + //There are up to eight connections to each machine in the cluster + .setMaxConnectionsPerHost(HostDistance.LOCAL, maxConnectionsPerHost) + .setMaxQueueSize(maxQueueSize) + .setPoolTimeoutMillis(poolTimeoutMillis); + //重试策略 + RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE; + + for (String server : address.split(",")) { + cassandraPort = Integer.parseInt(server.split(":")[1]); + serversList.add(InetAddress.getByName(server.split(":")[0])); + } + + if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) { + cluster = Cluster.builder().addContactPoints(serversList).withRetryPolicy(retryPolicy) + .withPort(cassandraPort) + .withPoolingOptions(poolingOptions).withSocketOptions(so) + .withQueryOptions(queryOptions).build(); + } else { + cluster = Cluster.builder().addContactPoints(serversList).withRetryPolicy(retryPolicy) + .withPort(cassandraPort) + .withPoolingOptions(poolingOptions).withSocketOptions(so) + .withCredentials(userName, password) + .withQueryOptions(queryOptions).build(); + } + // 建立连接 连接已存在的键空间 + session = cluster.connectAsync(database); + LOG.info("connect cassandra is successed!"); + } + } catch (Exception e) { + LOG.error("connect cassandra is error:" + e.getMessage()); + } + } + + @Override + public void asyncInvoke(Row input, ResultFuture resultFuture) throws Exception { + + JsonArray inputParams = new JsonArray(); + StringBuffer stringBuffer = new StringBuffer(); + String sqlWhere = " where "; + + for (int i = 0; i < sideInfo.getEqualFieldList().size(); i++) { + Integer conValIndex = sideInfo.getEqualValIndex().get(i); + Object equalObj = input.getField(conValIndex); + if (equalObj == null) { + resultFuture.complete(null); + } + inputParams.add(equalObj); + stringBuffer.append(sideInfo.getEqualFieldList().get(i)) + .append(" = ").append("'" + equalObj + "'") + .append(" and "); + } + + String key = buildCacheKey(inputParams); + sqlWhere = sqlWhere + stringBuffer.toString().substring(0, stringBuffer.lastIndexOf(" and ")); + + if (openCache()) { + CacheObj val = getFromCache(key); + if (val != null) { + + if (ECacheContentType.MissVal == val.getType()) { + dealMissKey(input, resultFuture); + return; + } else if (ECacheContentType.MultiLine == val.getType()) { + + for (Object rowArray : (List) val.getContent()) { + Row row = fillData(input, rowArray); + resultFuture.complete(Collections.singleton(row)); + } + + } else { + throw new RuntimeException("not support cache obj type " + val.getType()); + } + return; + } + } + + //connect Cassandra + connCassandraDB(cassandraSideTableInfo); + + String sqlCondition = sideInfo.getSqlCondition() + " " + sqlWhere; + System.out.println("sqlCondition:" + sqlCondition); + + ListenableFuture resultSet = Futures.transformAsync(session, + new AsyncFunction() { + @Override + public ListenableFuture apply(Session session) throws Exception { + return session.executeAsync(sqlCondition); + } + }); + + ListenableFuture> data = Futures.transform(resultSet, + new Function>() { + @Override + public List apply(ResultSet rs) { + return rs.all(); + } + }); + + Futures.addCallback(data, new FutureCallback>() { + @Override + public void onSuccess(List rows) { + cluster.closeAsync(); + if (rows.size() > 0) { + List cacheContent = Lists.newArrayList(); + for (com.datastax.driver.core.Row line : rows) { + Row row = fillData(input, line); + if (openCache()) { + cacheContent.add(line); + } + resultFuture.complete(Collections.singleton(row)); + } + + if (openCache()) { + putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent)); + } + } else { + dealMissKey(input, resultFuture); + if (openCache()) { + putCache(key, CacheMissVal.getMissKeyObj()); + } + resultFuture.complete(null); + } + } + + @Override + public void onFailure(Throwable t) { + LOG.error("Failed to retrieve the data: %s%n", + t.getMessage()); + System.out.println("Failed to retrieve the data: " + t.getMessage()); + cluster.closeAsync(); + resultFuture.complete(null); + } + }); + } + + @Override + public Row fillData(Row input, Object line) { + com.datastax.driver.core.Row rowArray = (com.datastax.driver.core.Row) line; + Row row = new Row(sideInfo.getOutFieldInfoList().size()); + for (Map.Entry entry : sideInfo.getInFieldIndex().entrySet()) { + Object obj = input.getField(entry.getValue()); + boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass()); + + if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) { + obj = ((Timestamp) obj).getTime(); + } + + row.setField(entry.getKey(), obj); + } + + for (Map.Entry entry : sideInfo.getSideFieldIndex().entrySet()) { + if (rowArray == null) { + row.setField(entry.getKey(), null); + } else { + row.setField(entry.getKey(), rowArray.getObject(entry.getValue())); + } + } + + System.out.println("row:" + row.toString()); + return row; + } + + @Override + public void close() throws Exception { + super.close(); + if (cluster != null) { + cluster.close(); + cluster = null; + } + } + + public String buildCacheKey(JsonArray jsonArray) { + StringBuilder sb = new StringBuilder(); + for (Object ele : jsonArray.getList()) { + sb.append(ele.toString()) + .append("_"); + } + + return sb.toString(); + } +} diff --git a/cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncSideInfo.java b/cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncSideInfo.java new file mode 100644 index 000000000..b1d239440 --- /dev/null +++ b/cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncSideInfo.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flink.sql.side.cassandra; + +import com.dtstack.flink.sql.side.FieldInfo; +import com.dtstack.flink.sql.side.JoinInfo; +import com.dtstack.flink.sql.side.SideInfo; +import com.dtstack.flink.sql.side.SideTableInfo; +import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo; +import org.apache.calcite.sql.SqlBasicCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.calcite.shaded.com.google.common.collect.Lists; + +import java.util.List; + +/** + * Reason: + * Date: 2018/11/22 + * + * @author xuqianjin + */ +public class CassandraAsyncSideInfo extends SideInfo { + + private static final long serialVersionUID = -4403313049809013362L; + + public CassandraAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List outFieldInfoList, SideTableInfo sideTableInfo) { + super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo); + } + + @Override + public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) { + CassandraSideTableInfo cassandraSideTableInfo = (CassandraSideTableInfo) sideTableInfo; + + String sideTableName = joinInfo.getSideTableName(); + + SqlNode conditionNode = joinInfo.getCondition(); + + List sqlNodeList = Lists.newArrayList(); + if (conditionNode.getKind() == SqlKind.AND) { + sqlNodeList.addAll(Lists.newArrayList(((SqlBasicCall) conditionNode).getOperands())); + } else { + sqlNodeList.add(conditionNode); + } + + for (SqlNode sqlNode : sqlNodeList) { + dealOneEqualCon(sqlNode, sideTableName); + } + + sqlCondition = "select ${selectField} from ${tableName}"; + + sqlCondition = sqlCondition.replace("${tableName}", cassandraSideTableInfo.getDatabase()+"."+cassandraSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields); + System.out.println("---------side_exe_sql-----\n" + sqlCondition); + } + + + @Override + public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) { + if (sqlNode.getKind() != SqlKind.EQUALS) { + throw new RuntimeException("not equal operator."); + } + + SqlIdentifier left = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[0]; + SqlIdentifier right = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[1]; + + String leftTableName = left.getComponent(0).getSimple(); + String leftField = left.getComponent(1).getSimple(); + + String rightTableName = right.getComponent(0).getSimple(); + String rightField = right.getComponent(1).getSimple(); + + if (leftTableName.equalsIgnoreCase(sideTableName)) { + equalFieldList.add(leftField); + int equalFieldIndex = -1; + for (int i = 0; i < rowTypeInfo.getFieldNames().length; i++) { + String fieldName = rowTypeInfo.getFieldNames()[i]; + if (fieldName.equalsIgnoreCase(rightField)) { + equalFieldIndex = i; + } + } + if (equalFieldIndex == -1) { + throw new RuntimeException("can't deal equal field: " + sqlNode); + } + + equalValIndex.add(equalFieldIndex); + + } else if (rightTableName.equalsIgnoreCase(sideTableName)) { + + equalFieldList.add(rightField); + int equalFieldIndex = -1; + for (int i = 0; i < rowTypeInfo.getFieldNames().length; i++) { + String fieldName = rowTypeInfo.getFieldNames()[i]; + if (fieldName.equalsIgnoreCase(leftField)) { + equalFieldIndex = i; + } + } + if (equalFieldIndex == -1) { + throw new RuntimeException("can't deal equal field: " + sqlNode.toString()); + } + + equalValIndex.add(equalFieldIndex); + + } else { + throw new RuntimeException("resolve equalFieldList error:" + sqlNode.toString()); + } + + } + +} diff --git a/cassandra/cassandra-side/cassandra-side-core/pom.xml b/cassandra/cassandra-side/cassandra-side-core/pom.xml new file mode 100644 index 000000000..a3137b763 --- /dev/null +++ b/cassandra/cassandra-side/cassandra-side-core/pom.xml @@ -0,0 +1,24 @@ + + + + sql.side.cassandra + com.dtstack.flink + 1.0-SNAPSHOT + ../pom.xml + + 4.0.0 + + sql.side.cassandra.core + + + com.dtstack.flink + sql.core + 1.0-SNAPSHOT + provided + + + jar + + \ No newline at end of file diff --git a/cassandra/cassandra-side/cassandra-side-core/src/main/java/com/dtstack/flink/sql/side/cassandra/table/CassandraSideParser.java b/cassandra/cassandra-side/cassandra-side-core/src/main/java/com/dtstack/flink/sql/side/cassandra/table/CassandraSideParser.java new file mode 100644 index 000000000..6403a225b --- /dev/null +++ b/cassandra/cassandra-side/cassandra-side-core/src/main/java/com/dtstack/flink/sql/side/cassandra/table/CassandraSideParser.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package com.dtstack.flink.sql.side.cassandra.table; + +import com.dtstack.flink.sql.table.AbsSideTableParser; +import com.dtstack.flink.sql.table.TableInfo; +import com.dtstack.flink.sql.util.MathUtil; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static com.dtstack.flink.sql.table.TableInfo.PARALLELISM_KEY; + +/** + * Reason: + * Date: 2018/11/22 + * + * @author xuqianjin + */ +public class CassandraSideParser extends AbsSideTableParser { + + private final static String SIDE_SIGN_KEY = "sideSignKey"; + + private final static Pattern SIDE_TABLE_SIGN = Pattern.compile("(?i)^PERIOD\\s+FOR\\s+SYSTEM_TIME$"); + + public static final String ADDRESS_KEY = "address"; + + public static final String TABLE_NAME_KEY = "tableName"; + + public static final String USER_NAME_KEY = "userName"; + + public static final String PASSWORD_KEY = "password"; + + public static final String DATABASE_KEY = "database"; + + public static final String MAX_REQUEST_PER_CONNECTION_KEY = "maxRequestsPerConnection"; + + public static final String CORE_CONNECTIONS_PER_HOST_KEY = "coreConnectionsPerHost"; + + public static final String MAX_CONNECTIONS_PER_HOST_KEY = "maxConnectionsPerHost"; + + public static final String MAX_QUEUE_SIZE_KEY = "maxQueueSize"; + + public static final String READ_TIMEOUT_MILLIS_KEY = "readTimeoutMillis"; + + public static final String CONNECT_TIMEOUT_MILLIS_KEY = "connectTimeoutMillis"; + + public static final String POOL_TIMEOUT_MILLIS_KEY = "poolTimeoutMillis"; + + static { + keyPatternMap.put(SIDE_SIGN_KEY, SIDE_TABLE_SIGN); + keyHandlerMap.put(SIDE_SIGN_KEY, CassandraSideParser::dealSideSign); + } + + @Override + public TableInfo getTableInfo(String tableName, String fieldsInfo, Map props) { + com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo cassandraSideTableInfo = new com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo(); + cassandraSideTableInfo.setName(tableName); + parseFieldsInfo(fieldsInfo, cassandraSideTableInfo); + parseCacheProp(cassandraSideTableInfo, props); + + cassandraSideTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(PARALLELISM_KEY.toLowerCase()))); + cassandraSideTableInfo.setAddress(MathUtil.getString(props.get(ADDRESS_KEY.toLowerCase()))); + cassandraSideTableInfo.setTableName(MathUtil.getString(props.get(TABLE_NAME_KEY.toLowerCase()))); + cassandraSideTableInfo.setDatabase(MathUtil.getString(props.get(DATABASE_KEY.toLowerCase()))); + cassandraSideTableInfo.setUserName(MathUtil.getString(props.get(USER_NAME_KEY.toLowerCase()))); + cassandraSideTableInfo.setPassword(MathUtil.getString(props.get(PASSWORD_KEY.toLowerCase()))); + cassandraSideTableInfo.setMaxRequestsPerConnection(MathUtil.getIntegerVal(props.get(MAX_REQUEST_PER_CONNECTION_KEY.toLowerCase()))); + cassandraSideTableInfo.setCoreConnectionsPerHost(MathUtil.getIntegerVal(props.get(CORE_CONNECTIONS_PER_HOST_KEY.toLowerCase()))); + cassandraSideTableInfo.setMaxConnectionsPerHost(MathUtil.getIntegerVal(props.get(MAX_CONNECTIONS_PER_HOST_KEY.toLowerCase()))); + cassandraSideTableInfo.setMaxQueueSize(MathUtil.getIntegerVal(props.get(MAX_QUEUE_SIZE_KEY.toLowerCase()))); + cassandraSideTableInfo.setReadTimeoutMillis(MathUtil.getIntegerVal(props.get(READ_TIMEOUT_MILLIS_KEY.toLowerCase()))); + cassandraSideTableInfo.setConnectTimeoutMillis(MathUtil.getIntegerVal(props.get(CONNECT_TIMEOUT_MILLIS_KEY.toLowerCase()))); + cassandraSideTableInfo.setPoolTimeoutMillis(MathUtil.getIntegerVal(props.get(POOL_TIMEOUT_MILLIS_KEY.toLowerCase()))); + + return cassandraSideTableInfo; + } + + private static void dealSideSign(Matcher matcher, TableInfo tableInfo) { + } +} diff --git a/cassandra/cassandra-side/cassandra-side-core/src/main/java/com/dtstack/flink/sql/side/cassandra/table/CassandraSideTableInfo.java b/cassandra/cassandra-side/cassandra-side-core/src/main/java/com/dtstack/flink/sql/side/cassandra/table/CassandraSideTableInfo.java new file mode 100644 index 000000000..b1b36f7e8 --- /dev/null +++ b/cassandra/cassandra-side/cassandra-side-core/src/main/java/com/dtstack/flink/sql/side/cassandra/table/CassandraSideTableInfo.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package com.dtstack.flink.sql.side.cassandra.table; + +import com.dtstack.flink.sql.side.SideTableInfo; +import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions; + +/** + * Reason: + * Date: 2018/11/22 + * + * @author xuqianjin + */ +public class CassandraSideTableInfo extends SideTableInfo { + + private static final long serialVersionUID = -5556431094535478915L; + + private static final String CURR_TYPE = "cassandra"; + + public static final String ADDRESS_KEY = "address"; + + public static final String TABLE_NAME_KEY = "tableName"; + + public static final String USER_NAME_KEY = "userName"; + + public static final String PASSWORD_KEY = "password"; + + public static final String DATABASE_KEY = "database"; + + public static final String MAX_REQUEST_PER_CONNECTION_KEY = "maxRequestsPerConnection"; + + public static final String CORE_CONNECTIONS_PER_HOST_KEY = "coreConnectionsPerHost"; + + public static final String MAX_CONNECTIONS_PER_HOST_KEY = "maxConnectionsPerHost"; + + public static final String MAX_QUEUE_SIZE_KEY = "maxQueueSize"; + + public static final String READ_TIMEOUT_MILLIS_KEY = "readTimeoutMillis"; + + public static final String CONNECT_TIMEOUT_MILLIS_KEY = "connectTimeoutMillis"; + + public static final String POOL_TIMEOUT_MILLIS_KEY = "poolTimeoutMillis"; + + private String address; + private String tableName; + private String userName; + private String password; + private String database; + private Integer maxRequestsPerConnection; + private Integer coreConnectionsPerHost; + private Integer maxConnectionsPerHost; + private Integer maxQueueSize; + private Integer readTimeoutMillis; + private Integer connectTimeoutMillis; + private Integer poolTimeoutMillis; + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public Integer getMaxRequestsPerConnection() { + return maxRequestsPerConnection; + } + + public void setMaxRequestsPerConnection(Integer maxRequestsPerConnection) { + this.maxRequestsPerConnection = maxRequestsPerConnection; + } + + public Integer getCoreConnectionsPerHost() { + return coreConnectionsPerHost; + } + + public void setCoreConnectionsPerHost(Integer coreConnectionsPerHost) { + this.coreConnectionsPerHost = coreConnectionsPerHost; + } + + public Integer getMaxConnectionsPerHost() { + return maxConnectionsPerHost; + } + + public void setMaxConnectionsPerHost(Integer maxConnectionsPerHost) { + this.maxConnectionsPerHost = maxConnectionsPerHost; + } + + public Integer getMaxQueueSize() { + return maxQueueSize; + } + + public void setMaxQueueSize(Integer maxQueueSize) { + this.maxQueueSize = maxQueueSize; + } + + public Integer getReadTimeoutMillis() { + return readTimeoutMillis; + } + + public void setReadTimeoutMillis(Integer readTimeoutMillis) { + this.readTimeoutMillis = readTimeoutMillis; + } + + public Integer getConnectTimeoutMillis() { + return connectTimeoutMillis; + } + + public void setConnectTimeoutMillis(Integer connectTimeoutMillis) { + this.connectTimeoutMillis = connectTimeoutMillis; + } + + public Integer getPoolTimeoutMillis() { + return poolTimeoutMillis; + } + + public void setPoolTimeoutMillis(Integer poolTimeoutMillis) { + this.poolTimeoutMillis = poolTimeoutMillis; + } + + public CassandraSideTableInfo() { + setType(CURR_TYPE); + } + + @Override + public boolean check() { + Preconditions.checkNotNull(address, "Cassandra field of ADDRESS is required"); + Preconditions.checkNotNull(database, "Cassandra field of database is required"); + Preconditions.checkNotNull(tableName, "Cassandra field of tableName is required"); + return true; + } +} diff --git a/cassandra/cassandra-side/pom.xml b/cassandra/cassandra-side/pom.xml new file mode 100644 index 000000000..92d058900 --- /dev/null +++ b/cassandra/cassandra-side/pom.xml @@ -0,0 +1,23 @@ + + + + sql.cassandra + com.dtstack.flink + 1.0-SNAPSHOT + ../pom.xml + + 4.0.0 + + sql.side.cassandra + cassandra-side + + cassandra-side-core + cassandra-async-side + cassandra-all-side + + + pom + + \ No newline at end of file diff --git a/cassandra/cassandra-sink/pom.xml b/cassandra/cassandra-sink/pom.xml new file mode 100644 index 000000000..4fb20c373 --- /dev/null +++ b/cassandra/cassandra-sink/pom.xml @@ -0,0 +1,82 @@ + + + sql.cassandra + com.dtstack.flink + 1.0-SNAPSHOT + ../pom.xml + + 4.0.0 + + sql.sink.cassandra + jar + + cassandra-sink + http://maven.apache.org + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 1.4 + + + package + + shade + + + + + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + maven-antrun-plugin + 1.2 + + + copy-resources + + package + + run + + + + + + + + + + + + + + + + + + diff --git a/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java new file mode 100644 index 000000000..11fe24b81 --- /dev/null +++ b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flink.sql.sink.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.PoolingOptions; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SocketOptions; +import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import com.dtstack.flink.sql.metric.MetricConstant; +import org.apache.flink.api.common.io.RichOutputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.InetAddress; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.util.ArrayList; + +/** + * OutputFormat to write tuples into a database. + * The OutputFormat has to be configured using the supplied OutputFormatBuilder. + * + * @see Tuple + * @see DriverManager + */ +public class CassandraOutputFormat extends RichOutputFormat { + private static final long serialVersionUID = -7994311331389155692L; + + private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class); + + private String address; + private String tableName; + private String userName; + private String password; + private String database; + private Integer maxRequestsPerConnection; + private Integer coreConnectionsPerHost; + private Integer maxConnectionsPerHost; + private Integer maxQueueSize; + private Integer readTimeoutMillis; + private Integer connectTimeoutMillis; + private Integer poolTimeoutMillis; + + protected String[] fieldNames; + TypeInformation[] fieldTypes; + + private int batchInterval = 5000; + + private Cluster cluster; + private Session session = null; + + private int batchCount = 0; + + private transient Counter outRecords; + + private transient Meter outRecordsRate; + + public CassandraOutputFormat() { + } + + @Override + public void configure(Configuration parameters) { + } + + /** + * Connects to the target database and initializes the prepared statement. + * + * @param taskNumber The number of the parallel instance. + * @throws IOException Thrown, if the output could not be opened due to an + * I/O problem. + */ + @Override + public void open(int taskNumber, int numTasks) throws IOException { + try { + if (session == null) { + QueryOptions queryOptions = new QueryOptions(); + //The default consistency level for queries: ConsistencyLevel.TWO. + queryOptions.setConsistencyLevel(ConsistencyLevel.QUORUM); + Integer maxRequestsPerConnection = this.maxRequestsPerConnection == null ? 1 : this.maxRequestsPerConnection; + Integer coreConnectionsPerHost = this.coreConnectionsPerHost == null ? 8 : this.coreConnectionsPerHost; + Integer maxConnectionsPerHost = this.maxConnectionsPerHost == null ? 32768 : this.maxConnectionsPerHost; + Integer maxQueueSize = this.maxQueueSize == null ? 100000 : this.maxQueueSize; + Integer readTimeoutMillis = this.readTimeoutMillis == null ? 60000 : this.readTimeoutMillis; + Integer connectTimeoutMillis = this.connectTimeoutMillis == null ? 60000 : this.connectTimeoutMillis; + Integer poolTimeoutMillis = this.poolTimeoutMillis == null ? 60000 : this.poolTimeoutMillis; + Integer cassandraPort = 0; + + ArrayList serversList = new ArrayList(); + //Read timeout or connection timeout Settings + SocketOptions so = new SocketOptions() + .setReadTimeoutMillis(readTimeoutMillis) + .setConnectTimeoutMillis(connectTimeoutMillis); + + //The cluster USES hostdistance.local in the same machine room + //Hostdistance. REMOTE is used for different machine rooms + //Ignore use HostDistance. IGNORED + PoolingOptions poolingOptions = new PoolingOptions() + //Each connection allows a maximum of 64 concurrent requests + .setMaxRequestsPerConnection(HostDistance.LOCAL, maxRequestsPerConnection) + //Have at least two connections to each machine in the cluster + .setCoreConnectionsPerHost(HostDistance.LOCAL, coreConnectionsPerHost) + //There are up to eight connections to each machine in the cluster + .setMaxConnectionsPerHost(HostDistance.LOCAL, maxConnectionsPerHost) + .setMaxQueueSize(maxQueueSize) + .setPoolTimeoutMillis(poolTimeoutMillis); + //重试策略 + RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE; + + for (String server : address.split(",")) { + cassandraPort = Integer.parseInt(server.split(":")[1]); + serversList.add(InetAddress.getByName(server.split(":")[0])); + } + + if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) { + cluster = Cluster.builder().addContactPoints(serversList).withRetryPolicy(retryPolicy) + .withPort(cassandraPort) + .withPoolingOptions(poolingOptions).withSocketOptions(so) + .withQueryOptions(queryOptions).build(); + } else { + cluster = Cluster.builder().addContactPoints(serversList).withRetryPolicy(retryPolicy) + .withPort(cassandraPort) + .withPoolingOptions(poolingOptions).withSocketOptions(so) + .withCredentials(userName, password) + .withQueryOptions(queryOptions).build(); + } + // 建立连接 连接已存在的键空间 + session = cluster.connect(database); + LOG.info("connect cassandra is successed!"); + initMetric(); + } + } catch (Exception e) { + LOG.error("connect cassandra is error:" + e.getMessage()); + } + } + + private void initMetric() { + outRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT); + outRecordsRate = getRuntimeContext().getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(outRecords, 20)); + } + + /** + * Adds a record to the prepared statement. + *

+ * When this method is called, the output format is guaranteed to be opened. + *

+ *

+ * WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to + * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null)) + * + * @param tuple2 The records to add to the output. + * @throws IOException Thrown, if the records could not be added due to an I/O problem. + * @see PreparedStatement + */ + @Override + public void writeRecord(Tuple2 tuple2) throws IOException { + Tuple2 tupleTrans = tuple2; + Boolean retract = tupleTrans.getField(0); + Row row = tupleTrans.getField(1); + try { + if (retract) { + insertWrite(row); + outRecords.inc(); + } else { + //do nothing + } + } catch (Exception e) { + throw new IllegalArgumentException("writeRecord() failed", e); + } + } + + private void insertWrite(Row row) { + try { + String cql = buildSql(row); + if (cql != null) { + ResultSet resultSet = session.execute(cql); + resultSet.wasApplied(); + } + } catch (Exception e) { + LOG.error("[upsert] is error:" + e.getMessage()); + } + } + + private String buildSql(Row row) { + StringBuffer fields = new StringBuffer(); + StringBuffer values = new StringBuffer(); + for (int index = 0; index < row.getArity(); index++) { + if (row.getField(index) == null) { + } else { + fields.append(fieldNames[index] + ","); + values.append("'" + row.getField(index) + "'" + ","); + } + } + fields.deleteCharAt(fields.length() - 1); + values.deleteCharAt(values.length() - 1); + String cql = "INSERT INTO " + database + "." + tableName + " (" + fields.toString() + ") " + + " VALUES (" + values.toString() + ")"; + return cql; + } + + /** + * Executes prepared statement and closes all resources of this instance. + * + * @throws IOException Thrown, if the input could not be closed properly. + */ + @Override + public void close() throws IOException { + try { + if (session != null) { + session.close(); + } + } catch (Exception e) { + LOG.error("Error while closing session.", e); + } + try { + if (cluster != null) { + cluster.close(); + } + } catch (Exception e) { + LOG.error("Error while closing cluster.", e); + } + LOG.info("close cassandra is successed!"); + } + + public static CassandraFormatBuilder buildOutputFormat() { + return new CassandraFormatBuilder(); + } + + public static class CassandraFormatBuilder { + private final CassandraOutputFormat format; + + protected CassandraFormatBuilder() { + this.format = new CassandraOutputFormat(); + } + + public CassandraFormatBuilder setUsername(String username) { + format.userName = username; + return this; + } + + public CassandraFormatBuilder setPassword(String password) { + format.password = password; + return this; + } + + public CassandraFormatBuilder setAddress(String address) { + format.address = address; + return this; + } + + public CassandraFormatBuilder setTableName(String tableName) { + format.tableName = tableName; + return this; + } + + public CassandraFormatBuilder setDatabase(String database) { + format.database = database; + return this; + } + + public CassandraFormatBuilder setFieldNames(String[] fieldNames) { + format.fieldNames = fieldNames; + return this; + } + + public CassandraFormatBuilder setFieldTypes(TypeInformation[] fieldTypes) { + format.fieldTypes = fieldTypes; + return this; + } + + public CassandraFormatBuilder setMaxRequestsPerConnection(Integer maxRequestsPerConnection) { + format.maxRequestsPerConnection = maxRequestsPerConnection; + return this; + } + + public CassandraFormatBuilder setCoreConnectionsPerHost(Integer coreConnectionsPerHost) { + format.coreConnectionsPerHost = coreConnectionsPerHost; + return this; + } + + public CassandraFormatBuilder setMaxConnectionsPerHost(Integer maxConnectionsPerHost) { + format.maxConnectionsPerHost = maxConnectionsPerHost; + return this; + } + + public CassandraFormatBuilder setMaxQueueSize(Integer maxQueueSize) { + format.maxQueueSize = maxQueueSize; + return this; + } + + public CassandraFormatBuilder setReadTimeoutMillis(Integer readTimeoutMillis) { + format.readTimeoutMillis = readTimeoutMillis; + return this; + } + + public CassandraFormatBuilder setConnectTimeoutMillis(Integer connectTimeoutMillis) { + format.connectTimeoutMillis = connectTimeoutMillis; + return this; + } + + public CassandraFormatBuilder setPoolTimeoutMillis(Integer poolTimeoutMillis) { + format.poolTimeoutMillis = poolTimeoutMillis; + return this; + } + + /** + * Finalizes the configuration and checks validity. + * + * @return Configured RetractJDBCOutputFormat + */ + public CassandraOutputFormat finish() { + if (format.userName == null) { + LOG.info("Username was not supplied separately."); + } + if (format.password == null) { + LOG.info("Password was not supplied separately."); + } + if (format.address == null) { + throw new IllegalArgumentException("No address URL supplied."); + } + if (format.database == null) { + throw new IllegalArgumentException("No dababase suplied"); + } + if (format.tableName == null) { + throw new IllegalArgumentException("No tableName supplied"); + } + return format; + } + } +} diff --git a/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java new file mode 100644 index 000000000..eb7b23b53 --- /dev/null +++ b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package com.dtstack.flink.sql.sink.cassandra; + + +import com.dtstack.flink.sql.sink.IStreamSinkGener; +import com.dtstack.flink.sql.sink.cassandra.table.CassandraTableInfo; +import com.dtstack.flink.sql.table.TargetTableInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.sinks.RetractStreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +/** + * Reason: + * Date: 2018/11/22 + * + * @author xuqianjin + */ +public class CassandraSink implements RetractStreamTableSink, IStreamSinkGener { + + + protected String[] fieldNames; + TypeInformation[] fieldTypes; + protected String address; + protected String tableName; + protected String userName; + protected String password; + protected String database; + protected Integer maxRequestsPerConnection; + protected Integer coreConnectionsPerHost; + protected Integer maxConnectionsPerHost; + protected Integer maxQueueSize; + protected Integer readTimeoutMillis; + protected Integer connectTimeoutMillis; + protected Integer poolTimeoutMillis; + + public CassandraSink() { + // TO DO NOTHING + } + + @Override + public CassandraSink genStreamSink(TargetTableInfo targetTableInfo) { + CassandraTableInfo cassandraTableInfo = (CassandraTableInfo) targetTableInfo; + this.address = cassandraTableInfo.getAddress(); + this.tableName = cassandraTableInfo.getTableName(); + this.userName = cassandraTableInfo.getUserName(); + this.password = cassandraTableInfo.getPassword(); + this.database = cassandraTableInfo.getDatabase(); + this.maxRequestsPerConnection = cassandraTableInfo.getMaxRequestsPerConnection(); + this.coreConnectionsPerHost = cassandraTableInfo.getCoreConnectionsPerHost(); + this.maxConnectionsPerHost = cassandraTableInfo.getMaxConnectionsPerHost(); + this.maxQueueSize = cassandraTableInfo.getMaxQueueSize(); + this.readTimeoutMillis = cassandraTableInfo.getReadTimeoutMillis(); + this.connectTimeoutMillis = cassandraTableInfo.getConnectTimeoutMillis(); + this.poolTimeoutMillis = cassandraTableInfo.getPoolTimeoutMillis(); + return this; + } + + @Override + public void emitDataStream(DataStream> dataStream) { + CassandraOutputFormat.CassandraFormatBuilder builder = CassandraOutputFormat.buildOutputFormat(); + builder.setAddress(this.address) + .setDatabase(this.database) + .setTableName(this.tableName) + .setPassword(this.password) + .setUsername(this.userName) + .setMaxRequestsPerConnection(this.maxRequestsPerConnection) + .setCoreConnectionsPerHost(this.coreConnectionsPerHost) + .setMaxConnectionsPerHost(this.maxConnectionsPerHost) + .setMaxQueueSize(this.maxQueueSize) + .setReadTimeoutMillis(this.readTimeoutMillis) + .setConnectTimeoutMillis(this.connectTimeoutMillis) + .setPoolTimeoutMillis(this.poolTimeoutMillis) + .setFieldNames(this.fieldNames) + .setFieldTypes(this.fieldTypes); + + CassandraOutputFormat outputFormat = builder.finish(); + RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat); + dataStream.addSink(richSinkFunction); + } + + @Override + public TableSink> configure(String[] fieldNames, TypeInformation[] fieldTypes) { + this.fieldNames = fieldNames; + this.fieldTypes = fieldTypes; + return this; + } + + @Override + public TupleTypeInfo> getOutputType() { + return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), getRecordType()); + } + + @Override + public TypeInformation getRecordType() { + return new RowTypeInfo(fieldTypes, fieldNames); + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation[] getFieldTypes() { + return fieldTypes; + } + +} diff --git a/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/table/CassandraSinkParser.java b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/table/CassandraSinkParser.java new file mode 100644 index 000000000..4c68e71ae --- /dev/null +++ b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/table/CassandraSinkParser.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package com.dtstack.flink.sql.sink.cassandra.table; + +import com.dtstack.flink.sql.table.AbsTableParser; +import com.dtstack.flink.sql.table.TableInfo; +import com.dtstack.flink.sql.util.MathUtil; + +import java.util.Map; + +import static com.dtstack.flink.sql.table.TableInfo.PARALLELISM_KEY; + +/** + * Reason: + * Date: 2018/11/22 + * + * @author xuqianjin + */ +public class CassandraSinkParser extends AbsTableParser { + + public static final String ADDRESS_KEY = "address"; + + public static final String TABLE_NAME_KEY = "tableName"; + + public static final String USER_NAME_KEY = "userName"; + + public static final String PASSWORD_KEY = "password"; + + public static final String DATABASE_KEY = "database"; + + public static final String MAX_REQUEST_PER_CONNECTION_KEY = "maxRequestsPerConnection"; + + public static final String CORE_CONNECTIONS_PER_HOST_KEY = "coreConnectionsPerHost"; + + public static final String MAX_CONNECTIONS_PER_HOST_KEY = "maxConnectionsPerHost"; + + public static final String MAX_QUEUE_SIZE_KEY = "maxQueueSize"; + + public static final String READ_TIMEOUT_MILLIS_KEY = "readTimeoutMillis"; + + public static final String CONNECT_TIMEOUT_MILLIS_KEY = "connectTimeoutMillis"; + + public static final String POOL_TIMEOUT_MILLIS_KEY = "poolTimeoutMillis"; + + @Override + public TableInfo getTableInfo(String tableName, String fieldsInfo, Map props) { + CassandraTableInfo cassandraTableInfo = new CassandraTableInfo(); + cassandraTableInfo.setName(tableName); + parseFieldsInfo(fieldsInfo, cassandraTableInfo); + + cassandraTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(PARALLELISM_KEY.toLowerCase()))); + cassandraTableInfo.setAddress(MathUtil.getString(props.get(ADDRESS_KEY.toLowerCase()))); + cassandraTableInfo.setTableName(MathUtil.getString(props.get(TABLE_NAME_KEY.toLowerCase()))); + cassandraTableInfo.setDatabase(MathUtil.getString(props.get(DATABASE_KEY.toLowerCase()))); + cassandraTableInfo.setUserName(MathUtil.getString(props.get(USER_NAME_KEY.toLowerCase()))); + cassandraTableInfo.setPassword(MathUtil.getString(props.get(PASSWORD_KEY.toLowerCase()))); + cassandraTableInfo.setMaxRequestsPerConnection(MathUtil.getIntegerVal(props.get(MAX_REQUEST_PER_CONNECTION_KEY.toLowerCase()))); + cassandraTableInfo.setCoreConnectionsPerHost(MathUtil.getIntegerVal(props.get(CORE_CONNECTIONS_PER_HOST_KEY.toLowerCase()))); + cassandraTableInfo.setMaxConnectionsPerHost(MathUtil.getIntegerVal(props.get(MAX_CONNECTIONS_PER_HOST_KEY.toLowerCase()))); + cassandraTableInfo.setMaxQueueSize(MathUtil.getIntegerVal(props.get(MAX_QUEUE_SIZE_KEY.toLowerCase()))); + cassandraTableInfo.setReadTimeoutMillis(MathUtil.getIntegerVal(props.get(READ_TIMEOUT_MILLIS_KEY.toLowerCase()))); + cassandraTableInfo.setConnectTimeoutMillis(MathUtil.getIntegerVal(props.get(CONNECT_TIMEOUT_MILLIS_KEY.toLowerCase()))); + cassandraTableInfo.setPoolTimeoutMillis(MathUtil.getIntegerVal(props.get(POOL_TIMEOUT_MILLIS_KEY.toLowerCase()))); + + return cassandraTableInfo; + } +} diff --git a/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/table/CassandraTableInfo.java b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/table/CassandraTableInfo.java new file mode 100644 index 000000000..7d52b23bb --- /dev/null +++ b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/table/CassandraTableInfo.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package com.dtstack.flink.sql.sink.cassandra.table; + +import com.dtstack.flink.sql.table.TargetTableInfo; +import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions; + +/** + * Reason: + * Date: 2018/11/22 + * + * @author xuqianjin + */ +public class CassandraTableInfo extends TargetTableInfo { + + private static final String CURR_TYPE = "cassandra"; + + private String address; + private String tableName; + private String userName; + private String password; + private String database; + private Integer maxRequestsPerConnection; + private Integer coreConnectionsPerHost; + private Integer maxConnectionsPerHost; + private Integer maxQueueSize; + private Integer readTimeoutMillis; + private Integer connectTimeoutMillis; + private Integer poolTimeoutMillis; + + public CassandraTableInfo() { + setType(CURR_TYPE); + } + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public Integer getMaxRequestsPerConnection() { + return maxRequestsPerConnection; + } + + public void setMaxRequestsPerConnection(Integer maxRequestsPerConnection) { + this.maxRequestsPerConnection = maxRequestsPerConnection; + } + + public Integer getCoreConnectionsPerHost() { + return coreConnectionsPerHost; + } + + public void setCoreConnectionsPerHost(Integer coreConnectionsPerHost) { + this.coreConnectionsPerHost = coreConnectionsPerHost; + } + + public Integer getMaxConnectionsPerHost() { + return maxConnectionsPerHost; + } + + public void setMaxConnectionsPerHost(Integer maxConnectionsPerHost) { + this.maxConnectionsPerHost = maxConnectionsPerHost; + } + + public Integer getMaxQueueSize() { + return maxQueueSize; + } + + public void setMaxQueueSize(Integer maxQueueSize) { + this.maxQueueSize = maxQueueSize; + } + + public Integer getReadTimeoutMillis() { + return readTimeoutMillis; + } + + public void setReadTimeoutMillis(Integer readTimeoutMillis) { + this.readTimeoutMillis = readTimeoutMillis; + } + + public Integer getConnectTimeoutMillis() { + return connectTimeoutMillis; + } + + public void setConnectTimeoutMillis(Integer connectTimeoutMillis) { + this.connectTimeoutMillis = connectTimeoutMillis; + } + + public Integer getPoolTimeoutMillis() { + return poolTimeoutMillis; + } + + public void setPoolTimeoutMillis(Integer poolTimeoutMillis) { + this.poolTimeoutMillis = poolTimeoutMillis; + } + + @Override + public boolean check() { + Preconditions.checkNotNull(address, "Cassandra field of ADDRESS is required"); + Preconditions.checkNotNull(database, "Cassandra field of database is required"); + Preconditions.checkNotNull(tableName, "Cassandra field of tableName is required"); + return true; + } + + @Override + public String getType() { + // return super.getType().toLowerCase() + TARGET_SUFFIX; + return super.getType().toLowerCase(); + } +} diff --git a/cassandra/cassandra-sink/src/test/java/com/dtstack/flinkx/AppTest.java b/cassandra/cassandra-sink/src/test/java/com/dtstack/flinkx/AppTest.java new file mode 100644 index 000000000..33a0233ac --- /dev/null +++ b/cassandra/cassandra-sink/src/test/java/com/dtstack/flinkx/AppTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package com.dtstack.flinkx; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + } +} diff --git a/cassandra/pom.xml b/cassandra/pom.xml new file mode 100644 index 000000000..f49de388b --- /dev/null +++ b/cassandra/pom.xml @@ -0,0 +1,39 @@ + + + + flink.sql + com.dtstack.flink + 1.0-SNAPSHOT + + 4.0.0 + sql.cassandra + pom + + + cassandra-sink + cassandra-side + + + + + junit + junit + 3.8.1 + test + + + com.dtstack.flink + sql.core + 1.0-SNAPSHOT + provided + + + com.datastax.cassandra + cassandra-driver-core + 3.6.0 + + + + \ No newline at end of file diff --git a/docs/cassandraSide.md b/docs/cassandraSide.md new file mode 100644 index 000000000..131560047 --- /dev/null +++ b/docs/cassandraSide.md @@ -0,0 +1,85 @@ + +## 1.格式: +``` + CREATE TABLE tableName( + colName cloType, + ... + PRIMARY KEY(keyInfo), + PERIOD FOR SYSTEM_TIME + )WITH( + type ='cassandra', + address ='ip:port[,ip:port]', + userName='dbUserName', + password='dbPwd', + tableName='tableName', + database='database', + cache ='LRU', + cacheSize ='10000', + cacheTTLMs ='60000', + parallelism ='1', + partitionedJoin='false' + ); +``` + +# 2.支持版本 + cassandra-3.6.x + +## 3.表结构定义 + + |参数名称|含义| + |----|---| + | tableName | 注册到flink的表名称(可选填;不填默认和hbase对应的表名称相同)| + | colName | 列名称| + | colType | 列类型 [colType支持的类型](colType.md)| + | PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息| + | PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开| + +## 4.参数 + + |参数名称|含义|是否必填|默认值| + |----|---|---|----| + | type |表明 输出表类型 cassandra|是|| + | address | 连接cassandra数据库 jdbcUrl |是|| + | userName | cassandra连接用户名|否|| + | password | cassandra连接密码|否|| + | tableName | cassandra表名称|是|| + | database | cassandra表名称|是|| + | cache | 维表缓存策略(NONE/LRU)|否|NONE| + | partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)|否|false| + | maxRequestsPerConnection | 每个连接最多允许64个并发请求|否|NONE| + | coreConnectionsPerHost | 和Cassandra集群里的每个机器都至少有2个连接|否|NONE| + | maxConnectionsPerHost | 和Cassandra集群里的每个机器都最多有6个连接|否|NONE| + | maxQueueSize | Cassandra队列大小|否|NONE| + | readTimeoutMillis | Cassandra读超时|否|NONE| + | connectTimeoutMillis | Cassandra连接超时|否|NONE| + | poolTimeoutMillis | Cassandra线程池超时|否|NONE| + + ---------- + > 缓存策略 + * NONE: 不做内存缓存 + * LRU: + * cacheSize: 缓存的条目数量 + * cacheTTLMs:缓存的过期时间(ms) + + +## 5.样例 +``` +create table sideTable( + CHANNEL varchar, + XCCOUNT int, + PRIMARY KEY(channel), + PERIOD FOR SYSTEM_TIME + )WITH( + type ='cassandra', + address ='172.21.32.1:9042,172.21.32.1:9042', + database ='test', + tableName ='sidetest', + cache ='LRU', + parallelism ='1', + partitionedJoin='false' + ); + + +``` + + diff --git a/docs/cassandraSink.md b/docs/cassandraSink.md new file mode 100644 index 000000000..8ea38e104 --- /dev/null +++ b/docs/cassandraSink.md @@ -0,0 +1,63 @@ +## 1.格式: +``` +CREATE TABLE tableName( + colName colType, + ... + colNameX colType + )WITH( + type ='cassandra', + address ='ip:port[,ip:port]', + userName ='userName', + password ='pwd', + database ='databaseName', + tableName ='tableName', + parallelism ='parllNum' + ); + +``` + +## 2.支持版本 + cassandra-3.6.x + +## 3.表结构定义 + +|参数名称|含义| +|----|---| +| tableName| 在 sql 中使用的名称;即注册到flink-table-env上的名称| +| colName | 列名称| +| colType | 列类型 [colType支持的类型](colType.md)| + +## 4.参数: + +|参数名称|含义|是否必填|默认值| +|----|----|----|----| +|type |表明 输出表类型 cassandra|是|| +|address | 连接cassandra数据库 jdbcUrl |是|| +|userName | cassandra连接用户名|否|| +|password | cassandra连接密码|否|| +|tableName | cassandra表名称|是|| +|database | cassandra表名称|是|| +|parallelism | 并行度设置|否|1| +|maxRequestsPerConnection | 每个连接最多允许64个并发请求|否|NONE| +|coreConnectionsPerHost | 和Cassandra集群里的每个机器都至少有2个连接|否|NONE| +|maxConnectionsPerHost | 和Cassandra集群里的每个机器都最多有6个连接|否|NONE| +|maxQueueSize | Cassandra队列大小|否|NONE| +|readTimeoutMillis | Cassandra读超时|否|NONE| +|connectTimeoutMillis | Cassandra连接超时|否|NONE| +|poolTimeoutMillis | Cassandra线程池超时|否|NONE| + +## 5.样例: +``` +CREATE TABLE MyResult( + channel VARCHAR, + pv VARCHAR + )WITH( + type ='cassandra', + address ='172.21.32.1:9042,172.21.32.1:9042', + userName ='dtstack', + password ='abc123', + database ='test', + tableName ='pv', + parallelism ='1' + ) + ``` \ No newline at end of file diff --git a/mongo/mongo-side/mongo-all-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAllReqRow.java b/mongo/mongo-side/mongo-all-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAllReqRow.java index c1c812cd0..d1a38ca5f 100644 --- a/mongo/mongo-side/mongo-all-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAllReqRow.java +++ b/mongo/mongo-side/mongo-all-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAllReqRow.java @@ -152,7 +152,6 @@ public void flatMap(Row value, Collector out) throws Exception { } for (Map one : cacheList) { - System.out.println(fillData(value, one)); out.collect(fillData(value, one)); } } diff --git a/pom.xml b/pom.xml index a31cc7392..7ef6c1fdd 100644 --- a/pom.xml +++ b/pom.xml @@ -5,6 +5,9 @@ com.dtstack.flink flink.sql 1.0-SNAPSHOT + pom + flink.sql + http://maven.apache.org core kafka09 @@ -19,11 +22,9 @@ rdb sqlserver oracle + cassandra - pom - flink.sql - http://maven.apache.org UTF-8