From 626433aaecd3e260563097d40e9cc0c0ca2aa7c9 Mon Sep 17 00:00:00 2001 From: doleyzi Date: Tue, 7 May 2024 16:45:12 +0800 Subject: [PATCH 1/8] Clean up Elasticsearch and ClickHouse related code of audit-store --- inlong-audit/audit-docker/Dockerfile | 19 +- inlong-audit/audit-docker/audit-docker.sh | 45 ++- .../inlong/audit/cache/RealTimeQuery.java | 10 +- .../inlong/audit/config/ClickHouseConfig.java | 50 --- .../inlong/audit/config/DataServerConfig.java | 47 --- .../audit/config/ElasticsearchConfig.java | 125 -------- .../inlong/audit/config/StoreConfig.java | 14 +- .../apache/inlong/audit/db/DruidConfig.java | 80 ----- .../audit/db/DruidDataSourceProperties.java | 47 --- .../inlong/audit/db/dao/AuditDataDao.java | 28 -- .../inlong/audit/db/entities/AuditDataPo.java | 46 --- .../audit/db/entities/ClickHouseDataPo.java | 45 --- .../inlong/audit/db/entities/ESDataPo.java | 49 --- .../audit/service/AuditMsgConsumerServer.java | 27 -- .../audit/service/ClickHouseService.java | 217 ------------- .../audit/service/ElasticsearchService.java | 288 ------------------ .../inlong/audit/service/MySqlService.java | 65 ---- .../main/resources/mapper/AuditDataDao.xml | 51 ---- .../service/ElasticsearchServiceTest.java | 96 ------ .../service/consume/KafkaConsumeTest.java | 15 +- .../service/consume/TubeConsumeTest.java | 17 +- .../test/resources/mapper/AuditDataDao.xml | 47 --- inlong-audit/conf/mapper/AuditDataDao.xml | 47 --- .../sql/apache_inlong_audit_clickhouse.sql | 47 --- 24 files changed, 40 insertions(+), 1482 deletions(-) delete mode 100644 inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/ClickHouseConfig.java delete mode 100644 inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/DataServerConfig.java delete mode 100644 inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/ElasticsearchConfig.java delete mode 100644 inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/DruidConfig.java delete mode 100644 inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/DruidDataSourceProperties.java delete mode 100644 inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/dao/AuditDataDao.java delete mode 100644 inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/AuditDataPo.java delete mode 100644 inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java delete mode 100644 inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ESDataPo.java delete mode 100644 inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java delete mode 100644 inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java delete mode 100644 inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java delete mode 100644 inlong-audit/audit-store/src/main/resources/mapper/AuditDataDao.xml delete mode 100644 inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/ElasticsearchServiceTest.java delete mode 100644 inlong-audit/audit-store/src/test/resources/mapper/AuditDataDao.xml delete mode 100644 inlong-audit/conf/mapper/AuditDataDao.xml delete mode 100644 inlong-audit/sql/apache_inlong_audit_clickhouse.sql diff --git a/inlong-audit/audit-docker/Dockerfile b/inlong-audit/audit-docker/Dockerfile index 6717eca684e..f6c26954724 100644 --- a/inlong-audit/audit-docker/Dockerfile +++ b/inlong-audit/audit-docker/Dockerfile @@ -35,23 +35,12 @@ ENV TUBE_AUDIT_TOPIC="inlong-audit" ENV AUDIT_DBNAME="apache_inlong_audit" # proxy/store/all, start audit module individually, or all ENV START_MODE="all" -# mysql / clickhouse / elasticsearch / starrocks +# mysql / starrocks ENV STORE_MODE=mysql # mysql -ENV JDBC_URL=127.0.0.1:3306 -ENV USERNAME=root -ENV PASSWORD=inlong -# clickhouse -ENV STORE_CK_URL=127.0.0.1:8123 -ENV STORE_CK_USERNAME=default -ENV STORE_CK_PASSWD=default -ENV STORE_CK_DBNAME="apache_inlong_audit" -# elasticsearch -ENV STORE_ES_HOST=127.0.0.1 -ENV STORE_ES_PORT=9200 -ENV STORE_ES_AUTHENABLE=false -ENV STORE_ES_USERNAME=elastic -ENV STORE_ES_PASSWD=inlong +ENV ENV_MYSQL_JDBC_URL=127.0.0.1:3306 +ENV ENV_MYSQL_USERNAME=root +ENV ENV_MYSQL_PASSWORD=inlong # starrocks ENV STORE_SR_URL=127.0.0.1:9030 ENV STORE_SR_USERNAME=default diff --git a/inlong-audit/audit-docker/audit-docker.sh b/inlong-audit/audit-docker/audit-docker.sh index 71c71f5545b..201c6b3542e 100755 --- a/inlong-audit/audit-docker/audit-docker.sh +++ b/inlong-audit/audit-docker/audit-docker.sh @@ -17,14 +17,17 @@ # file_path=$(cd "$(dirname "$0")"/../;pwd) -# store config -store_conf_file=${file_path}/conf/application.properties -# proxy config -proxy_conf_file=${file_path}/conf/audit-proxy-${MQ_TYPE}.conf + +#SQL file sql_mysql_file="${file_path}"/sql/apache_inlong_audit_mysql.sql -sql_ck_file="${file_path}"/sql/apache_inlong_audit_clickhouse.sql sql_sr_file="${file_path}"/sql/apache_inlong_audit_starrocks.sql +# proxy config +proxy_conf_file=${file_path}/conf/audit-proxy-${MQ_TYPE}.conf + +# store config +store_conf_file=${file_path}/conf/application.properties + # audit-service config service_conf_file=${file_path}/conf/audit-service.properties @@ -52,46 +55,35 @@ if [ -n "${STORE_MODE}" ]; then sed -i "s/audit.config.store.mode=.*$/audit.config.store.mode=${STORE_MODE}/g" "${store_conf_file}" fi # DB -sed -i "s/127.0.0.1:3306\/apache_inlong_audit/${JDBC_URL}\/${AUDIT_DBNAME}/g" "${store_conf_file}" -sed -i "s/spring.datasource.druid.username=.*$/spring.datasource.druid.username=${USERNAME}/g" "${store_conf_file}" -sed -i "s/spring.datasource.druid.password=.*$/spring.datasource.druid.password=${PASSWORD}/g" "${store_conf_file}" +sed -i "s/127.0.0.1:3306\/apache_inlong_audit/${ENV_MYSQL_JDBC_URL}\/${AUDIT_DBNAME}/g" "${store_conf_file}" +sed -i "s/spring.datasource.druid.username=.*$/spring.datasource.druid.username=${ENV_MYSQL_USERNAME}/g" "${store_conf_file}" +sed -i "s/spring.datasource.druid.password=.*$/spring.datasource.druid.password=${ENV_MYSQL_PASSWORD}/g" "${store_conf_file}" # mysql file for audit sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_mysql_file}" -# clickhouse -sed -i "s/clickhouse.url=.*$/clickhouse.url=jdbc:clickhouse:\/\/${STORE_CK_URL}\/${STORE_CK_DBNAME}/g" "${store_conf_file}" -sed -i "s/clickhouse.username=.*$/clickhouse.username=${STORE_CK_USERNAME}/g" "${store_conf_file}" -sed -i "s/clickhouse.password=.*$/clickhouse.password=${STORE_CK_PASSWD}/g" "${store_conf_file}" -# mysql file for clickhouse -sed -i "s/apache_inlong_audit/${STORE_CK_DBNAME}/g" "${sql_ck_file}" -# elasticsearch -sed -i "s/elasticsearch.host=.*$/elasticsearch.host=${STORE_ES_HOST}/g" "${store_conf_file}" -sed -i "s/elasticsearch.port=.*$/elasticsearch.port=${STORE_ES_PORT}/g" "${store_conf_file}" -sed -i "s/elasticsearch.authEnable=.*$/elasticsearch.authEnable=${STORE_ES_AUTHENABLE}/g" "${store_conf_file}" -sed -i "s/elasticsearch.username=.*$/elasticsearch.username=${STORE_ES_USERNAME}/g" "${store_conf_file}" -sed -i "s/elasticsearch.password=.*$/elasticsearch.password=${STORE_ES_PASSWD}/g" "${store_conf_file}" # StarRocks SQL file for audit sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_sr_file}" + # StarRocks sed -i "s/jdbc.url=.*$/jdbc.url=jdbc:mysql:\/\/${STORE_SR_URL}\/${STORE_SR_DBNAME}/g" "${store_conf_file}" sed -i "s/jdbc.username=.*$/jdbc.username=${STORE_SR_USERNAME}/g" "${store_conf_file}" sed -i "s/jdbc.password=.*$/jdbc.password=${STORE_SR_PASSWD}/g" "${store_conf_file}" # audit-service config -sed -i "s/mysql.jdbc.url=.*$/mysql.jdbc.url=jdbc:mysql:\/\/${JDBC_URL}\/${AUDIT_DBNAME}/g" "${service_conf_file}" -sed -i "s/mysql.jdbc.username=.*$/mysql.jdbc.username=${USERNAME}/g" "${service_conf_file}" -sed -i "s/mysql.jdbc.password=.*$/mysql.jdbc.password=${PASSWORD}/g" "${service_conf_file}" +sed -i "s/mysql.jdbc.url=.*$/mysql.jdbc.url=jdbc:mysql:\/\/${ENV_MYSQL_JDBC_URL}\/${AUDIT_DBNAME}/g" "${service_conf_file}" +sed -i "s/mysql.jdbc.username=.*$/mysql.jdbc.username=${ENV_MYSQL_USERNAME}/g" "${service_conf_file}" +sed -i "s/mysql.jdbc.password=.*$/mysql.jdbc.password=${ENV_MYSQL_PASSWORD}/g" "${service_conf_file}" # Whether the database table exists. If it does not exist, initialize the database and skip if it exists. -if [[ "${JDBC_URL}" =~ (.+):([0-9]+) ]]; then +if [[ "${ENV_MYSQL_JDBC_URL}" =~ (.+):([0-9]+) ]]; then datasource_hostname=${BASH_REMATCH[1]} datasource_port=${BASH_REMATCH[2]} select_db_sql="SELECT COUNT(*) FROM information_schema.TABLES WHERE table_schema = 'apache_inlong_audit'" - inlong_audit_count=$(mysql -h${datasource_hostname} -P${datasource_port} -u${USERNAME} -p${PASSWORD} -e "${select_db_sql}") + inlong_audit_count=$(mysql -h${datasource_hostname} -P${datasource_port} -u${ENV_MYSQL_USERNAME} -p${ENV_MYSQL_PASSWORD} -e "${select_db_sql}") inlong_num=$(echo "$inlong_audit_count" | tr -cd "[0-9]") if [ "${inlong_num}" = 0 ]; then - mysql -h${datasource_hostname} -P${datasource_port} -u${USERNAME} -p${PASSWORD} < sql/apache_inlong_audit_mysql.sql + mysql -h${datasource_hostname} -P${datasource_port} -u${ENV_MYSQL_USERNAME} -p${ENV_MYSQL_PASSWORD} < sql/apache_inlong_audit_mysql.sql fi fi @@ -108,6 +100,7 @@ if [ "${START_MODE}" = "all" ] || [ "${START_MODE}" = "proxy" ]; then bash +x ./bin/proxy-start.sh tubemq fi fi + # start store if [ "${START_MODE}" = "all" ] || [ "${START_MODE}" = "store" ]; then bash +x ./bin/store-start.sh diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java index ff9de5c6d53..fd9288d425c 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java @@ -139,8 +139,8 @@ public List queryLogTs(String startTime, String endTime, String inlong }); futures.add(future); } - CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join(); - LOGGER.info("Query log ts by params: {} {} {} {} {}, cost {} ms", startTime, endTime, inlongGroupId, + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + LOGGER.info("Query log ts by params: {} {} {} {} {}, total cost {} ms", startTime, endTime, inlongGroupId, inlongStreamId, auditId, System.currentTimeMillis() - currentTime); return filterMaxAuditVersion(statDataList); } @@ -165,8 +165,7 @@ public List filterMaxAuditVersion(List allStatData) { for (Map.Entry> entry : allData.entrySet()) { long maxAuditVersion = Long.MIN_VALUE; for (StatData maxData : entry.getValue()) { - maxAuditVersion = - maxData.getAuditVersion() > maxAuditVersion ? maxData.getAuditVersion() : maxAuditVersion; + maxAuditVersion = Math.max(maxData.getAuditVersion(), maxAuditVersion); } for (StatData statData : entry.getValue()) { if (statData.getAuditVersion() == maxAuditVersion) { @@ -191,6 +190,7 @@ public List filterMaxAuditVersion(List allStatData) { */ private List doQueryLogTs(DataSource dataSource, String startTime, String endTime, String inlongGroupId, String inlongStreamId, String auditId) { + long currentTime = System.currentTimeMillis(); List result = new LinkedList<>(); try (Connection connection = dataSource.getConnection(); PreparedStatement pstat = connection.prepareStatement(queryLogTsSql)) { @@ -219,6 +219,8 @@ private List doQueryLogTs(DataSource dataSource, String startTime, Str } catch (Exception exception) { LOGGER.error("Query log time has exception!, datasource={} ", dataSource, exception); } + LOGGER.info("Query log ts by params: {} {} {} {} {}, cost {} ms", startTime, endTime, inlongGroupId, + inlongStreamId, auditId, System.currentTimeMillis() - currentTime); return result; } diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/ClickHouseConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/ClickHouseConfig.java deleted file mode 100644 index 3ed02e69a1b..00000000000 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/ClickHouseConfig.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.audit.config; - -import lombok.Getter; -import lombok.Setter; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Configuration; - -@Configuration -@Getter -@Setter -public class ClickHouseConfig { - - @Value("${clickhouse.driver}") - private String driver; - - @Value("${clickhouse.url}") - private String url; - - @Value("${clickhouse.username}") - private String username; - - @Value("${clickhouse.password}") - private String password; - - @Value("${clickhouse.batchIntervalMs:1000}") - private int batchIntervalMs; - - @Value("${clickhouse.batchThreshold:500}") - private int batchThreshold; - - @Value("${clickhouse.processIntervalMs:100}") - private int processIntervalMs; -} diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/DataServerConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/DataServerConfig.java deleted file mode 100644 index c366e6cd459..00000000000 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/DataServerConfig.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.audit.config; - -import org.mybatis.spring.annotation.MapperScan; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.ComponentScan.Filter; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.FilterType; -import org.springframework.context.annotation.PropertySource; -import org.springframework.context.annotation.PropertySources; -import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; -import org.springframework.stereotype.Component; -import org.springframework.stereotype.Service; - -@Configuration -@ComponentScan(basePackages = "org.apache.inlong.audit", useDefaultFilters = false, includeFilters = { - @Filter(type = FilterType.ANNOTATION, value = Component.class), - @Filter(type = FilterType.ANNOTATION, value = Service.class)}) -@MapperScan(basePackages = "org.apache.inlong.audit.db.dao") -@PropertySources({ - @PropertySource("classpath:application.properties"), -}) - -public class DataServerConfig { - - @Bean - public static PropertySourcesPlaceholderConfigurer propertyPlaceholderConfigurer() { - return new PropertySourcesPlaceholderConfigurer(); - } -} \ No newline at end of file diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/ElasticsearchConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/ElasticsearchConfig.java deleted file mode 100644 index 9b8e197f355..00000000000 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/ElasticsearchConfig.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.audit.config; - -import lombok.Getter; -import lombok.Setter; -import org.apache.commons.lang3.StringUtils; -import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.client.RestHighLevelClient; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -@Configuration -@Getter -@Setter -public class ElasticsearchConfig { - - @Value("${elasticsearch.host}") - private String host; - - @Value("${elasticsearch.port}") - private int port; - - @Value("${elasticsearch.connTimeout:3000}") - private int connTimeout; - - @Value("${elasticsearch.socketTimeout:5000}") - private int socketTimeout; - - @Value("${elasticsearch.connectionRequestTimeout:500}") - private int connectionRequestTimeout; - - @Value("${elasticsearch.authEnable:false}") - private boolean authEnable; - - @Value("${elasticsearch.username}") - private String username; - - @Value("${elasticsearch.password}") - private String password; - - @Value("${elasticsearch.shardsNum:5}") - private int shardsNum; - - @Value("${elasticsearch.replicaNum:1}") - private int replicaNum; - - @Value("${elasticsearch.indexDeleteDay:5}") - private int indexDeleteDay; - - @Value("${elasticsearch.enableCustomDocId:true}") - private boolean enableCustomDocId; - - @Value("${elasticsearch.bulkInterval:10}") - private int bulkInterval; - - @Value("${elasticsearch.bulkThreshold:5000}") - private int bulkThreshold; - - @Value("${elasticsearch.auditIdSet}") - private String auditIdSet; - - @Bean(destroyMethod = "close", name = "restClient") - public RestHighLevelClient initRestClient() { - - // support es cluster with multi hosts - List hosts = new ArrayList<>(); - String[] hostArrays = host.split(","); - for (String host : hostArrays) { - if (StringUtils.isNotEmpty(host)) { - hosts.add(new HttpHost(host.trim(), port, "http")); - } - } - - RestClientBuilder restClientBuilder = RestClient.builder(hosts.toArray(new HttpHost[0])); - - // configurable auth - if (authEnable) { - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); - restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder - .setDefaultCredentialsProvider(credentialsProvider)); - } - - restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder - .setConnectTimeout(connTimeout).setSocketTimeout(socketTimeout) - .setConnectionRequestTimeout(connectionRequestTimeout)); - - return new RestHighLevelClient(restClientBuilder); - } - - public List getAuditIdList() { - List auditIdList = new ArrayList<>(); - if (!StringUtils.isEmpty(auditIdSet)) { - auditIdList = Arrays.asList(auditIdSet.split(",")); - } - return auditIdList; - } -} diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java index c4a2db33289..cdad13e3494 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java @@ -27,21 +27,9 @@ @Setter public class StoreConfig { - @Value("${audit.config.store.mode:mysql}") + @Value("${audit.config.store.mode:jdbc}") private String store; - public boolean isMysqlStore() { - return store.contains("mysql"); - } - - public boolean isElasticsearchStore() { - return store.contains("elasticsearch"); - } - - public boolean isClickHouseStore() { - return store.contains("clickhouse"); - } - public boolean isJdbc() { return store.contains("jdbc"); } diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/DruidConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/DruidConfig.java deleted file mode 100644 index 1e8fc91619f..00000000000 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/DruidConfig.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.audit.db; - -import org.apache.inlong.audit.config.StoreConfig; - -import com.alibaba.druid.pool.DruidDataSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import javax.sql.DataSource; - -import java.sql.SQLException; - -@Configuration -@EnableConfigurationProperties({DruidDataSourceProperties.class}) -public class DruidConfig { - - private static final Logger LOGGER = LoggerFactory.getLogger(DruidConfig.class); - - @Autowired - protected StoreConfig storeConfig; - @Autowired - private DruidDataSourceProperties properties; - - @Bean - @ConditionalOnMissingBean - public DataSource druidDataSource() { - LOGGER.info("druidDataSource url = {} ", properties.getUrl()); - DruidDataSource druidDataSource = new DruidDataSource(); - druidDataSource.setDriverClassName(properties.getDriverClassName()); - druidDataSource.setUrl(properties.getUrl()); - druidDataSource.setUsername(properties.getUsername()); - druidDataSource.setPassword(properties.getPassword()); - druidDataSource.setInitialSize(properties.getInitialSize()); - druidDataSource.setMinIdle(properties.getMinIdle()); - druidDataSource.setMaxActive(properties.getMaxActive()); - druidDataSource.setMaxWait(properties.getMaxWait()); - druidDataSource.setTimeBetweenEvictionRunsMillis(properties - .getTimeBetweenEvictionRunsMillis()); - druidDataSource.setMinEvictableIdleTimeMillis(properties.getMinEvictableIdleTimeMillis()); - druidDataSource.setValidationQuery(properties.getValidationQuery()); - druidDataSource.setTestWhileIdle(properties.isTestWhileIdle()); - druidDataSource.setTestOnBorrow(properties.isTestOnBorrow()); - druidDataSource.setTestOnReturn(properties.isTestOnReturn()); - druidDataSource.setPoolPreparedStatements(properties.isPoolPreparedStatements()); - druidDataSource.setMaxPoolPreparedStatementPerConnectionSize(properties - .getMaxPoolPreparedStatementPerConnectionSize()); - try { - druidDataSource.setFilters(properties.getFilters()); - if (storeConfig.isMysqlStore()) { - druidDataSource.init(); - } - } catch (SQLException e) { - LOGGER.error("init druidDataSource failed: ", e); - } - return druidDataSource; - } - -} diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/DruidDataSourceProperties.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/DruidDataSourceProperties.java deleted file mode 100644 index e27399a177b..00000000000 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/DruidDataSourceProperties.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.audit.db; - -import lombok.Getter; -import lombok.Setter; -import org.springframework.boot.context.properties.ConfigurationProperties; - -@ConfigurationProperties(prefix = "spring.datasource.druid") -@Getter -@Setter -public class DruidDataSourceProperties { - - private String driverClassName; - private String url; - private String username; - private String password; - private int initialSize; - private int minIdle; - private int maxActive = 100; - private long maxWait; - private long timeBetweenEvictionRunsMillis; - private long minEvictableIdleTimeMillis; - private String validationQuery; - private boolean testWhileIdle; - private boolean testOnBorrow; - private boolean testOnReturn; - private boolean poolPreparedStatements; - private int maxPoolPreparedStatementPerConnectionSize; - private String filters; - -} \ No newline at end of file diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/dao/AuditDataDao.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/dao/AuditDataDao.java deleted file mode 100644 index cb3227334ba..00000000000 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/dao/AuditDataDao.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.audit.db.dao; - -import org.apache.inlong.audit.db.entities.AuditDataPo; - -import org.springframework.stereotype.Repository; - -@Repository -public interface AuditDataDao { - - int insert(AuditDataPo auditDataPo); -} diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/AuditDataPo.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/AuditDataPo.java deleted file mode 100644 index 60a9a1766e6..00000000000 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/AuditDataPo.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.audit.db.entities; - -import lombok.Getter; -import lombok.Setter; - -import java.sql.Timestamp; -import java.util.Date; - -@Getter -@Setter -public class AuditDataPo { - - private String ip; - private String dockerId; - private String threadId; - private Date sdkTs; - private Long packetId; - private Date logTs; - private String inlongGroupId; - private String inlongStreamId; - private String auditId; - private String auditTag; - private long auditVersion; - private Long count; - private Long size; - private Long delay; - private Timestamp updateTime; - -} diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java deleted file mode 100644 index 7bebd26090c..00000000000 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.audit.db.entities; - -import lombok.Getter; -import lombok.Setter; - -import java.sql.Timestamp; - -@Getter -@Setter -public class ClickHouseDataPo { - - private String ip; - private String dockerId; - private String threadId; - private Timestamp sdkTs; - private Long packetId; - private Timestamp logTs; - private String inlongGroupId; - private String inlongStreamId; - private String auditId; - private String auditTag; - private long auditVersion; - private Long count; - private Long size; - private Long delay; - private Timestamp updateTime; - -} diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ESDataPo.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ESDataPo.java deleted file mode 100644 index c41a182f62b..00000000000 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ESDataPo.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.audit.db.entities; - -import lombok.Getter; -import lombok.Setter; - -import java.util.Date; - -@Getter -@Setter -public class ESDataPo { - - private String ip; - private String dockerId; - private String threadId; - private long sdkTs; - private Date logTs; - private String inlongGroupId; - private String inlongStreamId; - private String auditId; - private String auditTag; - private long auditVersion; - private long count; - private long size; - private long delay; - private long packetId; - - public String getDocId() { - String docId = ip + dockerId + threadId + sdkTs + packetId + logTs + inlongGroupId + inlongStreamId - + auditId + auditTag + count; - return docId; - } -} diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java index 2e2f4d94696..bf32271a690 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java @@ -17,12 +17,10 @@ package org.apache.inlong.audit.service; -import org.apache.inlong.audit.config.ClickHouseConfig; import org.apache.inlong.audit.config.JdbcConfig; import org.apache.inlong.audit.config.MessageQueueConfig; import org.apache.inlong.audit.config.StoreConfig; import org.apache.inlong.audit.consts.ConfigConstants; -import org.apache.inlong.audit.db.dao.AuditDataDao; import org.apache.inlong.audit.file.RemoteConfigJson; import org.apache.inlong.audit.service.consume.BaseConsume; import org.apache.inlong.audit.service.consume.KafkaConsume; @@ -60,16 +58,8 @@ public class AuditMsgConsumerServer implements InitializingBean { @Autowired private MessageQueueConfig mqConfig; @Autowired - private AuditDataDao auditDataDao; - @Autowired - private ElasticsearchService esService; - @Autowired private StoreConfig storeConfig; @Autowired - private ClickHouseConfig chConfig; - // ClickHouseService - private ClickHouseService ckService; - @Autowired private JdbcConfig jdbcConfig; private JdbcService jdbcService; private static final String DEFAULT_CONFIG_PROPERTIES = "application.properties"; @@ -104,12 +94,6 @@ public void afterPropertiesSet() { if (mqConsume == null) { LOG.error("Unknown MessageQueue {}", mqConfig.getMqType()); } - if (storeConfig.isElasticsearchStore()) { - esService.startTimerRoutine(); - } - if (storeConfig.isClickHouseStore()) { - ckService.start(); - } if (storeConfig.isJdbc()) { jdbcService.start(); } @@ -123,17 +107,6 @@ public void afterPropertiesSet() { */ private List getInsertServiceList() { List insertServiceList = new ArrayList<>(); - if (storeConfig.isMysqlStore()) { - insertServiceList.add(new MySqlService(auditDataDao)); - } - if (storeConfig.isElasticsearchStore()) { - insertServiceList.add(esService); - } - if (storeConfig.isClickHouseStore()) { - // create ck object - ckService = new ClickHouseService(chConfig); - insertServiceList.add(ckService); - } if (storeConfig.isJdbc()) { // create jdbc object jdbcService = new JdbcService(jdbcConfig); diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java deleted file mode 100644 index 47c63aa3952..00000000000 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.audit.service; - -import org.apache.inlong.audit.config.ClickHouseConfig; -import org.apache.inlong.audit.db.entities.ClickHouseDataPo; -import org.apache.inlong.audit.protocol.AuditData; - -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.MessageId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Timestamp; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -/** - * ClickHouseService - */ -public class ClickHouseService implements InsertData, AutoCloseable { - - private static final Logger LOG = LoggerFactory.getLogger(ClickHouseService.class); - private static final String INSERT_SQL = "insert into audit_data (ip, docker_id, thread_id, \r\n" - + " sdk_ts, packet_id, log_ts, \r\n" - + " inlong_group_id, inlong_stream_id, audit_id, audit_tag, audit_version, \r\n" - + " count, size, delay, \r\n" - + " update_time)\r\n" - + " values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; - - private ClickHouseConfig chConfig; - - private ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor(); - private LinkedBlockingQueue batchQueue; - private AtomicBoolean needBatchOutput = new AtomicBoolean(false); - private AtomicInteger batchCounter = new AtomicInteger(0); - private AtomicLong lastCheckTime = new AtomicLong(System.currentTimeMillis()); - private Connection conn; - - /** - * Constructor - * - * @param chConfig ClickHouse service config, such as jdbc url, jdbc username, jdbc password. - */ - public ClickHouseService(ClickHouseConfig chConfig) { - this.chConfig = chConfig; - } - - /** - * start - */ - public void start() { - // queue - this.batchQueue = new LinkedBlockingQueue<>( - chConfig.getBatchThreshold() * chConfig.getBatchIntervalMs() / chConfig.getProcessIntervalMs()); - // connection - try { - Class.forName(chConfig.getDriver()); - this.reconnect(); - } catch (Exception e) { - LOG.error("ClickHouseService start failure!", e); - } - // start timer - timerService.scheduleWithFixedDelay(this::processOutput, - chConfig.getProcessIntervalMs(), - chConfig.getProcessIntervalMs(), TimeUnit.MILLISECONDS); - } - - /** - * processOutput - */ - private void processOutput() { - if (!this.needBatchOutput.get() - && (System.currentTimeMillis() - lastCheckTime.get() < chConfig.getBatchIntervalMs())) { - return; - } - // output - try (PreparedStatement pstat = this.conn.prepareStatement(INSERT_SQL)) { - int counter = 0; - // output data to clickhouse - ClickHouseDataPo data = this.batchQueue.poll(); - while (data != null) { - pstat.setString(1, data.getIp()); - pstat.setString(2, data.getDockerId()); - pstat.setString(3, data.getThreadId()); - pstat.setTimestamp(4, data.getSdkTs()); - pstat.setLong(5, data.getPacketId()); - pstat.setTimestamp(6, data.getLogTs()); - pstat.setString(7, data.getInlongGroupId()); - pstat.setString(8, data.getInlongStreamId()); - pstat.setString(9, data.getAuditId()); - pstat.setString(10, data.getAuditTag()); - pstat.setLong(11, data.getAuditVersion()); - pstat.setLong(12, data.getCount()); - pstat.setLong(13, data.getSize()); - pstat.setLong(14, data.getDelay()); - pstat.setTimestamp(15, data.getUpdateTime()); - pstat.addBatch(); - this.batchCounter.decrementAndGet(); - if (++counter >= chConfig.getBatchThreshold()) { - pstat.executeBatch(); - this.conn.commit(); - counter = 0; - } - data = this.batchQueue.poll(); - } - if (counter > 0) { - pstat.executeBatch(); - this.conn.commit(); - } - } catch (Exception e1) { - LOG.error("Execute output to clickhouse failure!", e1); - // re-connect clickhouse - try { - this.reconnect(); - } catch (SQLException e2) { - LOG.error("Re-connect clickhouse failure!", e2); - } - } - // recover flag - lastCheckTime.set(System.currentTimeMillis()); - this.needBatchOutput.compareAndSet(true, false); - } - - /** - * reconnect - * - * @throws SQLException Exception when creating connection. - */ - private void reconnect() throws SQLException { - if (this.conn != null) { - try { - this.conn.close(); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - this.conn = null; - } - this.conn = DriverManager.getConnection(chConfig.getUrl(), chConfig.getUsername(), - chConfig.getPassword()); - this.conn.setAutoCommit(false); - } - - /** - * insert - * - * @param msgBody audit data reading from Pulsar or other MessageQueue. - */ - @Override - public void insert(AuditData msgBody) { - ClickHouseDataPo data = new ClickHouseDataPo(); - data.setIp(msgBody.getIp()); - data.setThreadId(msgBody.getThreadId()); - data.setDockerId(msgBody.getDockerId()); - data.setPacketId(msgBody.getPacketId()); - data.setSdkTs(new Timestamp(msgBody.getSdkTs())); - data.setLogTs(new Timestamp(msgBody.getLogTs())); - data.setAuditId(msgBody.getAuditId()); - data.setAuditTag(msgBody.getAuditTag()); - data.setAuditVersion(msgBody.getAuditVersion()); - data.setCount(msgBody.getCount()); - data.setDelay(msgBody.getDelay()); - data.setInlongGroupId(msgBody.getInlongGroupId()); - data.setInlongStreamId(msgBody.getInlongStreamId()); - data.setSize(msgBody.getSize()); - data.setUpdateTime(new Timestamp(System.currentTimeMillis())); - try { - this.batchQueue.offer(data, Long.MAX_VALUE, TimeUnit.MILLISECONDS); - if (this.batchCounter.incrementAndGet() >= chConfig.getBatchThreshold()) { - this.needBatchOutput.compareAndSet(false, true); - } - } catch (InterruptedException e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException(e.getMessage(), e); - } - } - - @Override - public void insert(AuditData msgBody, Consumer consumer, MessageId messageId) { - - } - - /** - * close - * - * @throws Exception Exception when closing ClickHouse connection. - */ - @Override - public void close() throws Exception { - this.conn.close(); - this.timerService.shutdown(); - } -} diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java deleted file mode 100644 index 3de234146f3..00000000000 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java +++ /dev/null @@ -1,288 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.audit.service; - -import org.apache.inlong.audit.config.ElasticsearchConfig; -import org.apache.inlong.audit.db.entities.ESDataPo; -import org.apache.inlong.audit.protocol.AuditData; - -import com.google.gson.FieldNamingPolicy; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.MessageId; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.admin.indices.get.GetIndexRequest; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.rest.RestStatus; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.stereotype.Service; - -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.Date; -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; - -@Service -public class ElasticsearchService implements InsertData, AutoCloseable { - - private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchService.class); - - private static ScheduledExecutorService timerService = Executors.newScheduledThreadPool(1); - private final Semaphore semaphore = new Semaphore(1); - private List datalist = new ArrayList<>(); - @Autowired - @Qualifier("restClient") - private RestHighLevelClient client; - @Autowired - private ElasticsearchConfig esConfig; - - private final static String X_CONTENT_BUILDER_TYPE = "type"; - private final static String X_CONTENT_BUILDER_LONG_VALUE = "long"; - private final static String X_CONTENT_BUILDER_KEYWORD_VALUE = "keyword"; - - public void startTimerRoutine() { - timerService.scheduleAtFixedRate((new Runnable() { - - @Override - public void run() { - try { - deleteTimeoutIndices(); - } catch (IOException e) { - LOG.error("deleteTimeoutIndices has err: ", e); - } - } - }), 1, 1, TimeUnit.DAYS); - - timerService.scheduleWithFixedDelay((new Runnable() { - - @Override - public void run() { - try { - bulkInsert(); - } catch (IOException e) { - LOG.error("bulkInsert has err: ", e); - } - } - }), esConfig.getBulkInterval(), esConfig.getBulkInterval(), TimeUnit.SECONDS); - } - - public void insertData(ESDataPo data) { - if (datalist.size() >= esConfig.getBulkThreshold()) { - try { - if (bulkInsert()) { - LOG.info("success bulk insert {} docs", esConfig.getBulkThreshold()); - } else { - LOG.error("failed to bulk insert"); - } - } catch (IOException e) { - LOG.error("bulkInsert has err: ", e); - } - } - try { - semaphore.acquire(); - datalist.add(data); - semaphore.release(); - } catch (InterruptedException e) { - LOG.error("datalist semaphore has err: ", e); - } - } - - protected boolean createIndex(String index) throws IOException { - if (existsIndex(index)) { - return true; - } - CreateIndexRequest createIndexRequest = new CreateIndexRequest(index); - createIndexRequest.settings(Settings.builder().put("index.number_of_shards", esConfig.getShardsNum()) - .put("index.number_of_replicas", esConfig.getReplicaNum())); - createIndexRequest.mapping("_doc", generateBuilder()); - CreateIndexResponse response = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); - boolean res = response.isAcknowledged(); - if (res) { - LOG.info("success creating index {}", index); - } else { - LOG.info("fail to create index {}", index); - } - return res; - } - - protected boolean existsIndex(String index) throws IOException { - GetIndexRequest getIndexRequest = new GetIndexRequest(); - getIndexRequest.indices(index); - return client.indices().exists(getIndexRequest, RequestOptions.DEFAULT); - } - - protected boolean bulkInsert() throws IOException { - if (datalist.isEmpty()) { - return true; - } - BulkRequest bulkRequest = new BulkRequest(); - try { - semaphore.acquire(); - for (ESDataPo esDataPo : datalist) { - SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd"); - String index = formatter.format(esDataPo.getLogTs()) + "_" + esDataPo.getAuditId(); - GsonBuilder gsonBuilder = new GsonBuilder(); - gsonBuilder.setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES) - .setDateFormat("yyyy-MM-dd HH:mm:ss"); - Gson gson = gsonBuilder.create(); - String esJson = gson.toJson(esDataPo); - if (!createIndex(index)) { - LOG.error("fail to create index {}", index); - continue; - } - IndexRequest indexRequest; - if (esConfig.isEnableCustomDocId()) { - indexRequest = new IndexRequest(index).type("_doc").id(esDataPo.getDocId()) - .source(esJson, XContentType.JSON); - } else { - indexRequest = new IndexRequest(index).type("_doc").source(esJson, XContentType.JSON); - } - bulkRequest.add(indexRequest); - } - BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT); - datalist.clear(); - semaphore.release(); - return bulkResponse.status().equals(RestStatus.OK); - } catch (InterruptedException e) { - LOG.error("datalist semaphore has err: ", e); - } - return false; - } - - protected void deleteTimeoutIndices() throws IOException { - List auditIdList = esConfig.getAuditIdList(); - if (auditIdList.isEmpty()) { - return; - } - SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd"); - Calendar calendar = Calendar.getInstance(); - calendar.add(Calendar.DATE, -esConfig.getIndexDeleteDay()); - Date deleteDay = calendar.getTime(); - String preIndex = formatter.format(deleteDay); - for (String auditId : auditIdList) { - String index = preIndex + "_" + auditId; - deleteSingleIndex(index); - } - - } - - protected boolean deleteSingleIndex(String index) throws IOException { - if (!existsIndex(index)) { - return true; - } - DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index); - AcknowledgedResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT); - boolean res = deleteIndexResponse.isAcknowledged(); - if (res) { - LOG.info("success deleting index {}", index); - } else { - LOG.error("fail to delete index {}", index); - } - return res; - } - - @Override - public void close() { - try { - bulkInsert(); - } catch (IOException e) { - LOG.error("bulkInsert has err: ", e); - } - timerService.shutdown(); - } - - protected XContentBuilder generateBuilder() throws IOException { - XContentBuilder builder = XContentFactory.jsonBuilder(); - builder.startObject(); - builder.startObject("properties"); - doBuild(builder, "audit_id", X_CONTENT_BUILDER_KEYWORD_VALUE); - doBuild(builder, "audit_tag", X_CONTENT_BUILDER_KEYWORD_VALUE); - doBuild(builder, "audit_version", X_CONTENT_BUILDER_KEYWORD_VALUE); - doBuild(builder, "inlong_group_id", X_CONTENT_BUILDER_KEYWORD_VALUE); - doBuild(builder, "inlong_stream_id", X_CONTENT_BUILDER_KEYWORD_VALUE); - doBuild(builder, "docker_id", X_CONTENT_BUILDER_KEYWORD_VALUE); - doBuild(builder, "thread_id", X_CONTENT_BUILDER_KEYWORD_VALUE); - doBuild(builder, "ip", X_CONTENT_BUILDER_KEYWORD_VALUE); - doBuild(builder, "log_ts", X_CONTENT_BUILDER_KEYWORD_VALUE); - doBuild(builder, "sdk_ts", X_CONTENT_BUILDER_LONG_VALUE); - doBuild(builder, "count", X_CONTENT_BUILDER_LONG_VALUE); - doBuild(builder, "size", X_CONTENT_BUILDER_LONG_VALUE); - doBuild(builder, "delay", X_CONTENT_BUILDER_LONG_VALUE); - doBuild(builder, "packet_id", X_CONTENT_BUILDER_LONG_VALUE); - builder.endObject(); - builder.endObject(); - return builder; - } - - private void doBuild(XContentBuilder builder, String name, String value) throws IOException { - builder.startObject(name); - builder.field(X_CONTENT_BUILDER_TYPE, value); - builder.endObject(); - } - - /** - * insert - * - * @param msgBody - */ - @Override - public void insert(AuditData msgBody) { - ESDataPo esPo = new ESDataPo(); - esPo.setIp(msgBody.getIp()); - esPo.setThreadId(msgBody.getThreadId()); - esPo.setDockerId(msgBody.getDockerId()); - esPo.setSdkTs(new Date(msgBody.getSdkTs()).getTime()); - esPo.setLogTs(new Date(msgBody.getLogTs())); - esPo.setAuditId(msgBody.getAuditId()); - esPo.setAuditTag(msgBody.getAuditTag()); - esPo.setAuditVersion(msgBody.getAuditVersion()); - esPo.setCount(msgBody.getCount()); - esPo.setDelay(msgBody.getDelay()); - esPo.setInlongGroupId(msgBody.getInlongGroupId()); - esPo.setInlongStreamId(msgBody.getInlongStreamId()); - esPo.setSize(msgBody.getSize()); - esPo.setPacketId(msgBody.getPacketId()); - this.insertData(esPo); - } - - @Override - public void insert(AuditData msgBody, Consumer consumer, MessageId messageId) { - - } -} diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java deleted file mode 100644 index 21f471d003f..00000000000 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.audit.service; - -import org.apache.inlong.audit.db.dao.AuditDataDao; -import org.apache.inlong.audit.db.entities.AuditDataPo; -import org.apache.inlong.audit.protocol.AuditData; - -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.MessageId; - -import java.util.Date; - -/** - * MySqlService - */ -public class MySqlService implements InsertData { - - private final AuditDataDao dao; - - public MySqlService(AuditDataDao dao) { - this.dao = dao; - } - - @Override - public void insert(AuditData msgBody) { - AuditDataPo po = new AuditDataPo(); - po.setIp(msgBody.getIp()); - po.setThreadId(msgBody.getThreadId()); - po.setDockerId(msgBody.getDockerId()); - po.setPacketId(msgBody.getPacketId()); - po.setSdkTs(new Date(msgBody.getSdkTs())); - po.setLogTs(new Date(msgBody.getLogTs())); - po.setAuditId(msgBody.getAuditId()); - po.setAuditTag(msgBody.getAuditTag()); - po.setAuditVersion(msgBody.getAuditVersion()); - po.setCount(msgBody.getCount()); - po.setDelay(msgBody.getDelay()); - po.setInlongGroupId(msgBody.getInlongGroupId()); - po.setInlongStreamId(msgBody.getInlongStreamId()); - po.setSize(msgBody.getSize()); - dao.insert(po); - } - - @Override - public void insert(AuditData msgBody, Consumer consumer, MessageId messageId) { - - } - -} diff --git a/inlong-audit/audit-store/src/main/resources/mapper/AuditDataDao.xml b/inlong-audit/audit-store/src/main/resources/mapper/AuditDataDao.xml deleted file mode 100644 index aadf7e47cff..00000000000 --- a/inlong-audit/audit-store/src/main/resources/mapper/AuditDataDao.xml +++ /dev/null @@ -1,51 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - insert into audit_data (ip, docker_id, thread_id, - sdk_ts, packet_id, log_ts, - inlong_group_id, inlong_stream_id, audit_id, audit_tag, audit_version, - `count`, size, delay) - values (#{ip,jdbcType=VARCHAR}, #{dockerId,jdbcType=VARCHAR}, #{threadId,jdbcType=VARCHAR}, - #{sdkTs,jdbcType=TIMESTAMP}, #{packetId,jdbcType=BIGINT}, #{logTs,jdbcType=TIMESTAMP}, - #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR}, #{auditId,jdbcType=VARCHAR}, - #{auditTag,jdbcType=VARCHAR}, #{auditVersion,jdbcType=BIGINT}, - #{count,jdbcType=BIGINT}, #{size,jdbcType=BIGINT}, #{delay,jdbcType=BIGINT}) - - \ No newline at end of file diff --git a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/ElasticsearchServiceTest.java b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/ElasticsearchServiceTest.java deleted file mode 100644 index 6aa5e3835b3..00000000000 --- a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/ElasticsearchServiceTest.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.audit.service; - -import org.apache.inlong.audit.db.entities.ESDataPo; - -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mockito; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.ActiveProfiles; -import org.springframework.test.context.junit4.SpringRunner; - -import java.io.IOException; -import java.util.Date; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -@RunWith(SpringRunner.class) -@ActiveProfiles(value = {"test"}) -@SpringBootTest(classes = ElasticsearchServiceTest.class) -public class ElasticsearchServiceTest { - - private static ElasticsearchService elasticsearchService; - - private final String index = "20220112_1"; - - @BeforeClass - public static void setUp() throws IOException { - elasticsearchService = mock(ElasticsearchService.class); - when(elasticsearchService.existsIndex(Mockito.anyString())).thenReturn(true); - when(elasticsearchService.createIndex(Mockito.anyString())).thenReturn(true); - when(elasticsearchService.deleteSingleIndex(Mockito.anyString())).thenReturn(true); - } - - @Test - public void testExistsIndex() throws IOException { - boolean res = elasticsearchService.createIndex(index); - Assert.assertTrue(res); - - res = elasticsearchService.existsIndex(index); - Assert.assertTrue(res); - } - - @Test - public void testInsertData() { - for (int i = 0; i < 5; i++) { - ESDataPo po = new ESDataPo(); - po.setIp("0.0.0.0"); - po.setThreadId(String.valueOf(i)); - po.setDockerId(String.valueOf(i)); - po.setSdkTs(new Date().getTime()); - po.setLogTs(new Date()); - po.setAuditId("1"); - po.setCount(i); - po.setDelay(i); - po.setInlongGroupId(String.valueOf(i)); - po.setInlongStreamId(String.valueOf(i)); - po.setSize(i); - po.setPacketId(i); - elasticsearchService.insertData(po); - } - } - - @Test - public void testDeleteSingleIndex() throws IOException { - boolean res = elasticsearchService.createIndex(index); - Assert.assertTrue(res); - res = elasticsearchService.deleteSingleIndex(index); - Assert.assertTrue(res); - } - - @Test - public void testDeleteTimeoutIndices() throws IOException { - elasticsearchService.deleteTimeoutIndices(); - } - -} diff --git a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java index e7f55fd594d..5ddee887bd6 100644 --- a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java +++ b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java @@ -17,14 +17,11 @@ package org.apache.inlong.audit.service.consume; -import org.apache.inlong.audit.config.ClickHouseConfig; +import org.apache.inlong.audit.config.JdbcConfig; import org.apache.inlong.audit.config.MessageQueueConfig; import org.apache.inlong.audit.config.StoreConfig; -import org.apache.inlong.audit.db.dao.AuditDataDao; -import org.apache.inlong.audit.service.ClickHouseService; -import org.apache.inlong.audit.service.ElasticsearchService; import org.apache.inlong.audit.service.InsertData; -import org.apache.inlong.audit.service.MySqlService; +import org.apache.inlong.audit.service.JdbcService; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -42,11 +39,9 @@ public class KafkaConsumeTest { private KafkaConsumer consumer; - private AuditDataDao auditDataDao; - private ElasticsearchService esService; - private ClickHouseConfig ckConfig; private StoreConfig storeConfig; private MessageQueueConfig mqConfig; + private JdbcConfig jdbcConfig; private String topic = "inlong-audit"; private ConsumerRecords records; @@ -77,9 +72,7 @@ public void testConsumer() { */ private List getInsertServiceList() { List insertData = new ArrayList<>(); - insertData.add(new MySqlService(auditDataDao)); - insertData.add(esService); - insertData.add(new ClickHouseService(ckConfig)); + insertData.add(new JdbcService(jdbcConfig)); return insertData; } } diff --git a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java index 12a5e97efc9..4085f295149 100644 --- a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java +++ b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java @@ -17,14 +17,11 @@ package org.apache.inlong.audit.service.consume; -import org.apache.inlong.audit.config.ClickHouseConfig; +import org.apache.inlong.audit.config.JdbcConfig; import org.apache.inlong.audit.config.MessageQueueConfig; import org.apache.inlong.audit.config.StoreConfig; -import org.apache.inlong.audit.db.dao.AuditDataDao; -import org.apache.inlong.audit.service.ClickHouseService; -import org.apache.inlong.audit.service.ElasticsearchService; import org.apache.inlong.audit.service.InsertData; -import org.apache.inlong.audit.service.MySqlService; +import org.apache.inlong.audit.service.JdbcService; import org.apache.inlong.tubemq.client.consumer.ConsumerResult; import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer; import org.apache.inlong.tubemq.client.exception.TubeClientException; @@ -41,9 +38,7 @@ public class TubeConsumeTest { private PullMessageConsumer pullMessageConsumer; - private AuditDataDao auditDataDao; - private ElasticsearchService esService; - private ClickHouseConfig chConfig; + private JdbcConfig jdbcConfig; private StoreConfig storeConfig; private MessageQueueConfig mqConfig; private String topic = "inlong-audit"; @@ -63,6 +58,7 @@ public void setUp() throws TubeClientException { /** * testConsume + * * @throws InterruptedException */ @Test @@ -76,13 +72,12 @@ public void testConsume() throws InterruptedException { /** * getInsertServiceList + * * @return */ private List getInsertServiceList() { List insertServiceList = new ArrayList<>(); - insertServiceList.add(new MySqlService(auditDataDao)); - insertServiceList.add(esService); - insertServiceList.add(new ClickHouseService(chConfig)); + insertServiceList.add(new JdbcService(jdbcConfig)); return insertServiceList; } } diff --git a/inlong-audit/audit-store/src/test/resources/mapper/AuditDataDao.xml b/inlong-audit/audit-store/src/test/resources/mapper/AuditDataDao.xml deleted file mode 100644 index 1215347a037..00000000000 --- a/inlong-audit/audit-store/src/test/resources/mapper/AuditDataDao.xml +++ /dev/null @@ -1,47 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - insert into audit_data (ip, docker_id, thread_id, - sdk_ts, packet_id, log_ts, - inlong_group_id, inlong_stream_id, audit_id, - count, size, delay) - values (#{ip,jdbcType=VARCHAR}, #{dockerId,jdbcType=VARCHAR}, #{threadId,jdbcType=VARCHAR}, - #{sdkTs,jdbcType=TIMESTAMP}, #{packetId,jdbcType=BIGINT}, #{logTs,jdbcType=TIMESTAMP}, - #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR}, #{auditId,jdbcType=VARCHAR}, - #{count,jdbcType=BIGINT}, #{size,jdbcType=BIGINT}, #{delay,jdbcType=BIGINT}) - - \ No newline at end of file diff --git a/inlong-audit/conf/mapper/AuditDataDao.xml b/inlong-audit/conf/mapper/AuditDataDao.xml deleted file mode 100644 index ffb6ede2d3c..00000000000 --- a/inlong-audit/conf/mapper/AuditDataDao.xml +++ /dev/null @@ -1,47 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - insert into audit_data (ip, docker_id, thread_id, - sdk_ts, packet_id, log_ts, - inlong_group_id, inlong_stream_id, audit_id, - count, size, delay) - values (#{ip,jdbcType=VARCHAR}, #{dockerId,jdbcType=VARCHAR}, #{threadId,jdbcType=VARCHAR}, - #{sdkTs,jdbcType=TIMESTAMP}, #{packetId,jdbcType=BIGINT}, #{logTs,jdbcType=TIMESTAMP}, - #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR}, #{auditId,jdbcType=VARCHAR}, - #{count,jdbcType=BIGINT}, #{size,jdbcType=BIGINT}, #{delay,jdbcType=BIGINT}) - - diff --git a/inlong-audit/sql/apache_inlong_audit_clickhouse.sql b/inlong-audit/sql/apache_inlong_audit_clickhouse.sql deleted file mode 100644 index 7e96f5e8bc0..00000000000 --- a/inlong-audit/sql/apache_inlong_audit_clickhouse.sql +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - --- ---------------------------- --- Database for InLong Audit --- ---------------------------- -CREATE DATABASE IF NOT EXISTS apache_inlong_audit; - -USE apache_inlong_audit; - --- ---------------------------- --- Table structure for audit_data --- The table creation statement of the audit flow table is used to record the real-time flow data of the audit. --- ---------------------------- -CREATE TABLE IF NOT EXISTS `audit_data` -( - `ip` String COMMENT 'Client IP', - `docker_id` String COMMENT 'Client docker id', - `thread_id` String COMMENT 'Client thread id', - `sdk_ts` DateTime COMMENT 'SDK timestamp', - `packet_id` Int64 COMMENT 'Packet id', - `log_ts` DateTime COMMENT 'Log timestamp', - `inlong_group_id` String COMMENT 'The target inlong group id', - `inlong_stream_id` String COMMENT 'The target inlong stream id', - `audit_id` String COMMENT 'Audit id', - `audit_tag` String COMMENT 'Audit tag', - `count` Int64 COMMENT 'Message count', - `size` Int64 COMMENT 'Message size', - `delay` Int64 COMMENT 'Message delay', - `update_time` DateTime COMMENT 'Update time' -) ENGINE = MergeTree -ORDER BY inlong_group_id -SETTINGS index_granularity = 8192; From c89afeb550bda984380098e0e875ef4ca0af666a Mon Sep 17 00:00:00 2001 From: doleyzi Date: Tue, 7 May 2024 17:58:54 +0800 Subject: [PATCH 2/8] Adjust thread pool --- .../inlong/audit/cache/RealTimeQuery.java | 13 +++- inlong-audit/conf/application.properties | 62 ++----------------- 2 files changed, 16 insertions(+), 59 deletions(-) diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java index fd9288d425c..6eea81ec9d1 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java @@ -39,6 +39,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_DETECT_INTERVAL_MS; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS; @@ -48,6 +51,8 @@ import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_IDLE_CONNECTIONS; import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS; import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MIN_IDLE_CONNECTIONS; +import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE; +import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_THREAD_POOL_SIZE; import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_IDS_SQL; import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_IPS_SQL; import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_MINUTE_SQL; @@ -68,8 +73,12 @@ public class RealTimeQuery { private final String queryLogTsSql; private final String queryIdsByIpSql; private final String queryReportIpsSql; + private final ExecutorService executor = + Executors.newFixedThreadPool( + Configuration.getInstance().get(KEY_API_THREAD_POOL_SIZE, DEFAULT_API_THREAD_POOL_SIZE)); private RealTimeQuery() { + List jdbcConfigList = ConfigService.getInstance().getAllAuditSource(); for (JdbcConfig jdbcConfig : jdbcConfigList) { BasicDataSource dataSource = new BasicDataSource(); @@ -126,7 +135,7 @@ public static RealTimeQuery getInstance() { public List queryLogTs(String startTime, String endTime, String inlongGroupId, String inlongStreamId, String auditId) { long currentTime = System.currentTimeMillis(); - List statDataList = new LinkedList<>(); + List statDataList = new CopyOnWriteArrayList<>(); if (dataSourceList.isEmpty()) { return statDataList; } @@ -136,7 +145,7 @@ public List queryLogTs(String startTime, String endTime, String inlong List statDataListTemp = doQueryLogTs(dataSource, startTime, endTime, inlongGroupId, inlongStreamId, auditId); statDataList.addAll(statDataListTemp); - }); + }, executor); futures.add(future); } CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); diff --git a/inlong-audit/conf/application.properties b/inlong-audit/conf/application.properties index d721e375cbb..b1587668b33 100644 --- a/inlong-audit/conf/application.properties +++ b/inlong-audit/conf/application.properties @@ -16,37 +16,11 @@ # specific language governing permissions and limitations # under the License. # -# datasource config -# datasource config, set org.postgresql.Driver if using PostgreSQL -spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver -spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_audit?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2b8&rewriteBatchedStatements=true&allowMultiQueries=true&zeroDateTimeBehavior=CONVERT_TO_NULL&allowPublicKeyRetrieval=true -spring.datasource.druid.username=root -spring.datasource.druid.password=inlong -spring.datasource.druid.filters=stat,log4j,config -spring.datasource.druid.max-active=100 -spring.datasource.druid.initial-size=1 -spring.datasource.druid.max-wait=60000 -spring.datasource.druid.min-idle=1 -spring.datasource.druid.time-between-eviction-runs-millis=60000 -spring.datasource.druid.min-evictable-idle-time-millis=300000 -spring.datasource.druid.validation-query=select 'x' -spring.datasource.druid.test-while-idle=true -spring.datasource.druid.test-on-borrow=false -spring.datasource.druid.test-on-return=false -spring.datasource.druid.pool-prepared-statements=true -spring.datasource.druid.filter.wall.config.multi-statement-allow=true -spring.datasource.druid.max-open-prepared-statements=50 -spring.datasource.druid.max-pool-prepared-statement-per-connection-size=20 - -# mybatis config -mybatis.mapper-locations=classpath*:mapper/*.xml -mybatis.type-aliases-package=org.apache.inlong.audit.db.entities - # proxy.type: pulsar / tube / kafka audit.config.proxy.type=pulsar -# store.server: mysql / clickhouse / elasticsearch -audit.config.store.mode=mysql +# Supports common JDBC protocol +audit.config.store.mode=jdbc # manger config manager.hosts=127.0.0.1:8083 @@ -70,35 +44,9 @@ audit.kafka.topic.replicationFactor=2 audit.kafka.consumer.name=inlong-audit-consumer audit.kafka.group.id=audit-consumer-group -# Invalid data will be discarded if exceed the threshold days -msg.valid.threshold.days=7 - -# es config -elasticsearch.host=127.0.0.1 -elasticsearch.port=9200 -elasticsearch.authEnable=false -elasticsearch.username=elastic -elasticsearch.password=inlong -elasticsearch.shardsNum=5 -elasticsearch.replicaNum=1 -elasticsearch.indexDeleteDay=5 -elasticsearch.enableDocId=true -elasticsearch.bulkInterval=10 -elasticsearch.bulkThreshold=5000 -elasticsearch.auditIdSet=1,2 - -# clickhouse config -clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver -clickhouse.url=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit -clickhouse.username=default -clickhouse.password=default -clickhouse.batchIntervalMs=1000 -clickhouse.batchThreshold=500 -clickhouse.processIntervalMs=100 - # Generic jdbc storage jdbc.driver=com.mysql.cj.jdbc.Driver -jdbc.url=jdbc:mysql://127.0.0.1:9020/apache_inlong_audit?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2b8&rewriteBatchedStatements=true&allowMultiQueries=true&zeroDateTimeBehavior=CONVERT_TO_NULL -jdbc.username=******* -jdbc.password=******** +jdbc.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_audit?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2b8&rewriteBatchedStatements=true&allowMultiQueries=true&zeroDateTimeBehavior=CONVERT_TO_NULL +jdbc.username=root +jdbc.password=inlong From b0f61a2d60a5ebca5a445a9e21bdc3fb3ed9027a Mon Sep 17 00:00:00 2001 From: doleyzi Date: Tue, 7 May 2024 18:49:46 +0800 Subject: [PATCH 3/8] Optimize variable names --- inlong-audit/audit-docker/Dockerfile | 6 +++--- inlong-audit/audit-docker/audit-docker.sh | 18 +++++++++--------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/inlong-audit/audit-docker/Dockerfile b/inlong-audit/audit-docker/Dockerfile index f6c26954724..aee83b780a1 100644 --- a/inlong-audit/audit-docker/Dockerfile +++ b/inlong-audit/audit-docker/Dockerfile @@ -38,9 +38,9 @@ ENV START_MODE="all" # mysql / starrocks ENV STORE_MODE=mysql # mysql -ENV ENV_MYSQL_JDBC_URL=127.0.0.1:3306 -ENV ENV_MYSQL_USERNAME=root -ENV ENV_MYSQL_PASSWORD=inlong +ENV AUDIT_MYSQL_JDBC_URL=127.0.0.1:3306 +ENV AUDIT_MYSQL_USERNAME=root +ENV AUDIT_MYSQL_PASSWORD=inlong # starrocks ENV STORE_SR_URL=127.0.0.1:9030 ENV STORE_SR_USERNAME=default diff --git a/inlong-audit/audit-docker/audit-docker.sh b/inlong-audit/audit-docker/audit-docker.sh index 201c6b3542e..78411f2aece 100755 --- a/inlong-audit/audit-docker/audit-docker.sh +++ b/inlong-audit/audit-docker/audit-docker.sh @@ -55,9 +55,9 @@ if [ -n "${STORE_MODE}" ]; then sed -i "s/audit.config.store.mode=.*$/audit.config.store.mode=${STORE_MODE}/g" "${store_conf_file}" fi # DB -sed -i "s/127.0.0.1:3306\/apache_inlong_audit/${ENV_MYSQL_JDBC_URL}\/${AUDIT_DBNAME}/g" "${store_conf_file}" -sed -i "s/spring.datasource.druid.username=.*$/spring.datasource.druid.username=${ENV_MYSQL_USERNAME}/g" "${store_conf_file}" -sed -i "s/spring.datasource.druid.password=.*$/spring.datasource.druid.password=${ENV_MYSQL_PASSWORD}/g" "${store_conf_file}" +sed -i "s/127.0.0.1:3306\/apache_inlong_audit/${AUDIT_MYSQL_JDBC_URL}\/${AUDIT_DBNAME}/g" "${store_conf_file}" +sed -i "s/spring.datasource.druid.username=.*$/spring.datasource.druid.username=${AUDIT_MYSQL_USERNAME}/g" "${store_conf_file}" +sed -i "s/spring.datasource.druid.password=.*$/spring.datasource.druid.password=${AUDIT_MYSQL_PASSWORD}/g" "${store_conf_file}" # mysql file for audit sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_mysql_file}" @@ -70,20 +70,20 @@ sed -i "s/jdbc.username=.*$/jdbc.username=${STORE_SR_USERNAME}/g" "${store_conf_ sed -i "s/jdbc.password=.*$/jdbc.password=${STORE_SR_PASSWD}/g" "${store_conf_file}" # audit-service config -sed -i "s/mysql.jdbc.url=.*$/mysql.jdbc.url=jdbc:mysql:\/\/${ENV_MYSQL_JDBC_URL}\/${AUDIT_DBNAME}/g" "${service_conf_file}" -sed -i "s/mysql.jdbc.username=.*$/mysql.jdbc.username=${ENV_MYSQL_USERNAME}/g" "${service_conf_file}" -sed -i "s/mysql.jdbc.password=.*$/mysql.jdbc.password=${ENV_MYSQL_PASSWORD}/g" "${service_conf_file}" +sed -i "s/mysql.jdbc.url=.*$/mysql.jdbc.url=jdbc:mysql:\/\/${AUDIT_MYSQL_JDBC_URL}\/${AUDIT_DBNAME}/g" "${service_conf_file}" +sed -i "s/mysql.jdbc.username=.*$/mysql.jdbc.username=${AUDIT_MYSQL_USERNAME}/g" "${service_conf_file}" +sed -i "s/mysql.jdbc.password=.*$/mysql.jdbc.password=${AUDIT_MYSQL_PASSWORD}/g" "${service_conf_file}" # Whether the database table exists. If it does not exist, initialize the database and skip if it exists. -if [[ "${ENV_MYSQL_JDBC_URL}" =~ (.+):([0-9]+) ]]; then +if [[ "${AUDIT_MYSQL_JDBC_URL}" =~ (.+):([0-9]+) ]]; then datasource_hostname=${BASH_REMATCH[1]} datasource_port=${BASH_REMATCH[2]} select_db_sql="SELECT COUNT(*) FROM information_schema.TABLES WHERE table_schema = 'apache_inlong_audit'" - inlong_audit_count=$(mysql -h${datasource_hostname} -P${datasource_port} -u${ENV_MYSQL_USERNAME} -p${ENV_MYSQL_PASSWORD} -e "${select_db_sql}") + inlong_audit_count=$(mysql -h${datasource_hostname} -P${datasource_port} -u${AUDIT_MYSQL_USERNAME} -p${AUDIT_MYSQL_PASSWORD} -e "${select_db_sql}") inlong_num=$(echo "$inlong_audit_count" | tr -cd "[0-9]") if [ "${inlong_num}" = 0 ]; then - mysql -h${datasource_hostname} -P${datasource_port} -u${ENV_MYSQL_USERNAME} -p${ENV_MYSQL_PASSWORD} < sql/apache_inlong_audit_mysql.sql + mysql -h${datasource_hostname} -P${datasource_port} -u${AUDIT_MYSQL_USERNAME} -p${AUDIT_MYSQL_PASSWORD} < sql/apache_inlong_audit_mysql.sql fi fi From 41076c5bb35f70b74c0a249764a298e4e4562a5f Mon Sep 17 00:00:00 2001 From: doleyzi Date: Tue, 7 May 2024 19:00:28 +0800 Subject: [PATCH 4/8] Modify the default storage mode --- inlong-audit/audit-docker/Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/inlong-audit/audit-docker/Dockerfile b/inlong-audit/audit-docker/Dockerfile index aee83b780a1..48c124dfd61 100644 --- a/inlong-audit/audit-docker/Dockerfile +++ b/inlong-audit/audit-docker/Dockerfile @@ -35,8 +35,8 @@ ENV TUBE_AUDIT_TOPIC="inlong-audit" ENV AUDIT_DBNAME="apache_inlong_audit" # proxy/store/all, start audit module individually, or all ENV START_MODE="all" -# mysql / starrocks -ENV STORE_MODE=mysql +# # Supports common JDBC protocol. Such as mysql / starrocks +ENV STORE_MODE=jdbc # mysql ENV AUDIT_MYSQL_JDBC_URL=127.0.0.1:3306 ENV AUDIT_MYSQL_USERNAME=root From 61779de8f802cc8aa56581053feda5c4ff9d73e0 Mon Sep 17 00:00:00 2001 From: doleyzi Date: Tue, 7 May 2024 19:25:36 +0800 Subject: [PATCH 5/8] Modify the default value of the configuration --- inlong-audit/audit-docker/Dockerfile | 8 +----- inlong-audit/audit-docker/audit-docker.sh | 26 +++++-------------- .../inlong/audit/config/StoreConfig.java | 1 + .../sql/apache_inlong_audit_mysql.sql | 6 ++--- 4 files changed, 12 insertions(+), 29 deletions(-) diff --git a/inlong-audit/audit-docker/Dockerfile b/inlong-audit/audit-docker/Dockerfile index 48c124dfd61..9c882aa8ab6 100644 --- a/inlong-audit/audit-docker/Dockerfile +++ b/inlong-audit/audit-docker/Dockerfile @@ -35,17 +35,11 @@ ENV TUBE_AUDIT_TOPIC="inlong-audit" ENV AUDIT_DBNAME="apache_inlong_audit" # proxy/store/all, start audit module individually, or all ENV START_MODE="all" -# # Supports common JDBC protocol. Such as mysql / starrocks -ENV STORE_MODE=jdbc # mysql ENV AUDIT_MYSQL_JDBC_URL=127.0.0.1:3306 ENV AUDIT_MYSQL_USERNAME=root ENV AUDIT_MYSQL_PASSWORD=inlong -# starrocks -ENV STORE_SR_URL=127.0.0.1:9030 -ENV STORE_SR_USERNAME=default -ENV STORE_SR_PASSWD=default -ENV STORE_SR_DBNAME="apache_inlong_audit" + # jvm ENV AUDIT_JVM_HEAP_OPTS="-XX:+UseContainerSupport -XX:InitialRAMPercentage=40.0 -XX:MaxRAMPercentage=80.0 -XX:-UseAdaptiveSizePolicy" WORKDIR /opt/inlong-audit diff --git a/inlong-audit/audit-docker/audit-docker.sh b/inlong-audit/audit-docker/audit-docker.sh index 78411f2aece..8e8f91f2ed2 100755 --- a/inlong-audit/audit-docker/audit-docker.sh +++ b/inlong-audit/audit-docker/audit-docker.sh @@ -20,7 +20,6 @@ file_path=$(cd "$(dirname "$0")"/../;pwd) #SQL file sql_mysql_file="${file_path}"/sql/apache_inlong_audit_mysql.sql -sql_sr_file="${file_path}"/sql/apache_inlong_audit_starrocks.sql # proxy config proxy_conf_file=${file_path}/conf/audit-proxy-${MQ_TYPE}.conf @@ -31,7 +30,7 @@ store_conf_file=${file_path}/conf/application.properties # audit-service config service_conf_file=${file_path}/conf/audit-service.properties -# replace the configuration for audit proxy +# replace the configuration for audit-proxy sed -i "s/manager.hosts=.*$/manager.hosts=${MANAGER_OPENAPI_IP}:${MANAGER_OPENAPI_PORT}/g" "${store_conf_file}" sed -i "s/proxy.cluster.tag=.*$/proxy.cluster.tag=${CLUSTER_TAG}/g" "${store_conf_file}" if [ "${MQ_TYPE}" = "pulsar" ]; then @@ -50,26 +49,15 @@ if [ "${MQ_TYPE}" = "tubemq" ]; then sed -i "s/agent1.sinks.tube-sink-msg2.topic = .*$/agent1.sinks.tube-sink-msg2.topic = ${TUBE_AUDIT_TOPIC}/g" "${proxy_conf_file}" fi -# replace the configuration for audit store -if [ -n "${STORE_MODE}" ]; then - sed -i "s/audit.config.store.mode=.*$/audit.config.store.mode=${STORE_MODE}/g" "${store_conf_file}" -fi -# DB -sed -i "s/127.0.0.1:3306\/apache_inlong_audit/${AUDIT_MYSQL_JDBC_URL}\/${AUDIT_DBNAME}/g" "${store_conf_file}" -sed -i "s/spring.datasource.druid.username=.*$/spring.datasource.druid.username=${AUDIT_MYSQL_USERNAME}/g" "${store_conf_file}" -sed -i "s/spring.datasource.druid.password=.*$/spring.datasource.druid.password=${AUDIT_MYSQL_PASSWORD}/g" "${store_conf_file}" -# mysql file for audit +# replace the audit db name for audit sql file sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_mysql_file}" -# StarRocks SQL file for audit -sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_sr_file}" - -# StarRocks -sed -i "s/jdbc.url=.*$/jdbc.url=jdbc:mysql:\/\/${STORE_SR_URL}\/${STORE_SR_DBNAME}/g" "${store_conf_file}" -sed -i "s/jdbc.username=.*$/jdbc.username=${STORE_SR_USERNAME}/g" "${store_conf_file}" -sed -i "s/jdbc.password=.*$/jdbc.password=${STORE_SR_PASSWD}/g" "${store_conf_file}" +# replace the configuration for audit-store +sed -i "s/127.0.0.1:3306\/apache_inlong_audit/${AUDIT_MYSQL_JDBC_URL}\/${AUDIT_DBNAME}/g" "${store_conf_file}" +sed -i "s/jdbc.username=.*$/jdbc.username=${AUDIT_MYSQL_USERNAME}/g" "${store_conf_file}" +sed -i "s/jdbc.password=.*$/jdbc.password=${AUDIT_MYSQL_PASSWORD}/g" "${store_conf_file}" -# audit-service config +# replace the configuration for audit-service sed -i "s/mysql.jdbc.url=.*$/mysql.jdbc.url=jdbc:mysql:\/\/${AUDIT_MYSQL_JDBC_URL}\/${AUDIT_DBNAME}/g" "${service_conf_file}" sed -i "s/mysql.jdbc.username=.*$/mysql.jdbc.username=${AUDIT_MYSQL_USERNAME}/g" "${service_conf_file}" sed -i "s/mysql.jdbc.password=.*$/mysql.jdbc.password=${AUDIT_MYSQL_PASSWORD}/g" "${service_conf_file}" diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java index cdad13e3494..ca3358701ed 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java @@ -27,6 +27,7 @@ @Setter public class StoreConfig { + // Supports common JDBC protocol. Such as mysql / StarRocks @Value("${audit.config.store.mode:jdbc}") private String store; diff --git a/inlong-audit/sql/apache_inlong_audit_mysql.sql b/inlong-audit/sql/apache_inlong_audit_mysql.sql index 4914fd78546..333052c97f1 100644 --- a/inlong-audit/sql/apache_inlong_audit_mysql.sql +++ b/inlong-audit/sql/apache_inlong_audit_mysql.sql @@ -102,7 +102,7 @@ CREATE TABLE IF NOT EXISTS `leader_selector` `leader_id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `last_seen_active` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`service_id`) -) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = 'selector db' +) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = 'selector db'; -- ---------------------------- -- Table structure for audit id config @@ -113,7 +113,7 @@ CREATE TABLE IF NOT EXISTS `audit_id_config` `status` int(11) DEFAULT '1' COMMENT 'Audit source config status. 0:Offline,1:Online', `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Update time', PRIMARY KEY (`audit_id`) -) ENGINE = InnoDB DEFAULT CHARSET = UTF8 COMMENT = 'Audit id config' +) ENGINE = InnoDB DEFAULT CHARSET = UTF8 COMMENT = 'Audit id config'; -- ---------------------------- @@ -130,5 +130,5 @@ CREATE TABLE IF NOT EXISTS `audit_source_config` `status` int(11) DEFAULT '1' COMMENT 'Audit source config status. 0:Offline,1:Online', `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Update time', PRIMARY KEY (`source_name`, `jdbc_url`) -) ENGINE = InnoDB DEFAULT CHARSET = UTF8 COMMENT = 'Audit source config' +) ENGINE = InnoDB DEFAULT CHARSET = UTF8 COMMENT = 'Audit source config'; From 03ca4824646cf835949c3580dd7814a90a199303 Mon Sep 17 00:00:00 2001 From: doleyzi Date: Tue, 7 May 2024 19:39:26 +0800 Subject: [PATCH 6/8] Modify variable name. --- inlong-audit/audit-docker/Dockerfile | 8 ++++---- inlong-audit/audit-docker/audit-docker.sh | 18 +++++++++--------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/inlong-audit/audit-docker/Dockerfile b/inlong-audit/audit-docker/Dockerfile index 9c882aa8ab6..f1849afa830 100644 --- a/inlong-audit/audit-docker/Dockerfile +++ b/inlong-audit/audit-docker/Dockerfile @@ -35,10 +35,10 @@ ENV TUBE_AUDIT_TOPIC="inlong-audit" ENV AUDIT_DBNAME="apache_inlong_audit" # proxy/store/all, start audit module individually, or all ENV START_MODE="all" -# mysql -ENV AUDIT_MYSQL_JDBC_URL=127.0.0.1:3306 -ENV AUDIT_MYSQL_USERNAME=root -ENV AUDIT_MYSQL_PASSWORD=inlong +# MySQL / StarRocks +ENV AUDIT_JDBC_URL=127.0.0.1:3306 +ENV AUDIT_USERNAME=root +ENV AUDIT_PASSWORD=inlong # jvm ENV AUDIT_JVM_HEAP_OPTS="-XX:+UseContainerSupport -XX:InitialRAMPercentage=40.0 -XX:MaxRAMPercentage=80.0 -XX:-UseAdaptiveSizePolicy" diff --git a/inlong-audit/audit-docker/audit-docker.sh b/inlong-audit/audit-docker/audit-docker.sh index 8e8f91f2ed2..480e760676b 100755 --- a/inlong-audit/audit-docker/audit-docker.sh +++ b/inlong-audit/audit-docker/audit-docker.sh @@ -53,25 +53,25 @@ fi sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_mysql_file}" # replace the configuration for audit-store -sed -i "s/127.0.0.1:3306\/apache_inlong_audit/${AUDIT_MYSQL_JDBC_URL}\/${AUDIT_DBNAME}/g" "${store_conf_file}" -sed -i "s/jdbc.username=.*$/jdbc.username=${AUDIT_MYSQL_USERNAME}/g" "${store_conf_file}" -sed -i "s/jdbc.password=.*$/jdbc.password=${AUDIT_MYSQL_PASSWORD}/g" "${store_conf_file}" +sed -i "s/127.0.0.1:3306\/apache_inlong_audit/${AUDIT_JDBC_URL}\/${AUDIT_DBNAME}/g" "${store_conf_file}" +sed -i "s/jdbc.username=.*$/jdbc.username=${AUDIT_USERNAME}/g" "${store_conf_file}" +sed -i "s/jdbc.password=.*$/jdbc.password=${AUDIT_PASSWORD}/g" "${store_conf_file}" # replace the configuration for audit-service -sed -i "s/mysql.jdbc.url=.*$/mysql.jdbc.url=jdbc:mysql:\/\/${AUDIT_MYSQL_JDBC_URL}\/${AUDIT_DBNAME}/g" "${service_conf_file}" -sed -i "s/mysql.jdbc.username=.*$/mysql.jdbc.username=${AUDIT_MYSQL_USERNAME}/g" "${service_conf_file}" -sed -i "s/mysql.jdbc.password=.*$/mysql.jdbc.password=${AUDIT_MYSQL_PASSWORD}/g" "${service_conf_file}" +sed -i "s/mysql.jdbc.url=.*$/mysql.jdbc.url=jdbc:mysql:\/\/${AUDIT_JDBC_URL}\/${AUDIT_DBNAME}/g" "${service_conf_file}" +sed -i "s/mysql.jdbc.username=.*$/mysql.jdbc.username=${AUDIT_USERNAME}/g" "${service_conf_file}" +sed -i "s/mysql.jdbc.password=.*$/mysql.jdbc.password=${AUDIT_PASSWORD}/g" "${service_conf_file}" # Whether the database table exists. If it does not exist, initialize the database and skip if it exists. -if [[ "${AUDIT_MYSQL_JDBC_URL}" =~ (.+):([0-9]+) ]]; then +if [[ "${AUDIT_JDBC_URL}" =~ (.+):([0-9]+) ]]; then datasource_hostname=${BASH_REMATCH[1]} datasource_port=${BASH_REMATCH[2]} select_db_sql="SELECT COUNT(*) FROM information_schema.TABLES WHERE table_schema = 'apache_inlong_audit'" - inlong_audit_count=$(mysql -h${datasource_hostname} -P${datasource_port} -u${AUDIT_MYSQL_USERNAME} -p${AUDIT_MYSQL_PASSWORD} -e "${select_db_sql}") + inlong_audit_count=$(mysql -h${datasource_hostname} -P${datasource_port} -u${AUDIT_USERNAME} -p${AUDIT_PASSWORD} -e "${select_db_sql}") inlong_num=$(echo "$inlong_audit_count" | tr -cd "[0-9]") if [ "${inlong_num}" = 0 ]; then - mysql -h${datasource_hostname} -P${datasource_port} -u${AUDIT_MYSQL_USERNAME} -p${AUDIT_MYSQL_PASSWORD} < sql/apache_inlong_audit_mysql.sql + mysql -h${datasource_hostname} -P${datasource_port} -u${AUDIT_USERNAME} -p${AUDIT_PASSWORD} < sql/apache_inlong_audit_mysql.sql fi fi From bc9fcdb138eabb4cef7ad449a669fe24380bfe22 Mon Sep 17 00:00:00 2001 From: doleyzi Date: Tue, 7 May 2024 19:44:02 +0800 Subject: [PATCH 7/8] Modify api.cache.expired.hours default value to 18 --- .../java/org/apache/inlong/audit/config/OpenApiConstants.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java index 05643c43bf3..5e186716e5a 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java @@ -45,7 +45,7 @@ public class OpenApiConstants { public static final int DEFAULT_API_CACHE_MAX_SIZE = 50000000; public static final String KEY_API_CACHE_EXPIRED_HOURS = "api.cache.expired.hours"; - public static final int DEFAULT_API_CACHE_EXPIRED_HOURS = 12; + public static final int DEFAULT_API_CACHE_EXPIRED_HOURS = 18; // Http config public static final String PARAMS_START_TIME = "startTime"; From aa4114b2571fcc23aee667384b411c7dbbfc367d Mon Sep 17 00:00:00 2001 From: doleyzi Date: Wed, 8 May 2024 09:54:33 +0800 Subject: [PATCH 8/8] Remove duplicate dependencies --- inlong-audit/audit-docker/Dockerfile | 4 ++-- inlong-audit/audit-docker/audit-docker.sh | 12 ++++++------ inlong-audit/audit-service/pom.xml | 6 ------ 3 files changed, 8 insertions(+), 14 deletions(-) diff --git a/inlong-audit/audit-docker/Dockerfile b/inlong-audit/audit-docker/Dockerfile index f1849afa830..e578b98c4b6 100644 --- a/inlong-audit/audit-docker/Dockerfile +++ b/inlong-audit/audit-docker/Dockerfile @@ -37,8 +37,8 @@ ENV AUDIT_DBNAME="apache_inlong_audit" ENV START_MODE="all" # MySQL / StarRocks ENV AUDIT_JDBC_URL=127.0.0.1:3306 -ENV AUDIT_USERNAME=root -ENV AUDIT_PASSWORD=inlong +ENV AUDIT_JDBC_USERNAME=root +ENV AUDIT_JDBC_PASSWORD=inlong # jvm ENV AUDIT_JVM_HEAP_OPTS="-XX:+UseContainerSupport -XX:InitialRAMPercentage=40.0 -XX:MaxRAMPercentage=80.0 -XX:-UseAdaptiveSizePolicy" diff --git a/inlong-audit/audit-docker/audit-docker.sh b/inlong-audit/audit-docker/audit-docker.sh index 480e760676b..cdc6c103cfa 100755 --- a/inlong-audit/audit-docker/audit-docker.sh +++ b/inlong-audit/audit-docker/audit-docker.sh @@ -54,13 +54,13 @@ sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_mysql_file}" # replace the configuration for audit-store sed -i "s/127.0.0.1:3306\/apache_inlong_audit/${AUDIT_JDBC_URL}\/${AUDIT_DBNAME}/g" "${store_conf_file}" -sed -i "s/jdbc.username=.*$/jdbc.username=${AUDIT_USERNAME}/g" "${store_conf_file}" -sed -i "s/jdbc.password=.*$/jdbc.password=${AUDIT_PASSWORD}/g" "${store_conf_file}" +sed -i "s/jdbc.username=.*$/jdbc.username=${AUDIT_JDBC_USERNAME}/g" "${store_conf_file}" +sed -i "s/jdbc.password=.*$/jdbc.password=${AUDIT_JDBC_PASSWORD}/g" "${store_conf_file}" # replace the configuration for audit-service sed -i "s/mysql.jdbc.url=.*$/mysql.jdbc.url=jdbc:mysql:\/\/${AUDIT_JDBC_URL}\/${AUDIT_DBNAME}/g" "${service_conf_file}" -sed -i "s/mysql.jdbc.username=.*$/mysql.jdbc.username=${AUDIT_USERNAME}/g" "${service_conf_file}" -sed -i "s/mysql.jdbc.password=.*$/mysql.jdbc.password=${AUDIT_PASSWORD}/g" "${service_conf_file}" +sed -i "s/mysql.jdbc.username=.*$/mysql.jdbc.username=${AUDIT_JDBC_USERNAME}/g" "${service_conf_file}" +sed -i "s/mysql.jdbc.password=.*$/mysql.jdbc.password=${AUDIT_JDBC_PASSWORD}/g" "${service_conf_file}" # Whether the database table exists. If it does not exist, initialize the database and skip if it exists. if [[ "${AUDIT_JDBC_URL}" =~ (.+):([0-9]+) ]]; then @@ -68,10 +68,10 @@ if [[ "${AUDIT_JDBC_URL}" =~ (.+):([0-9]+) ]]; then datasource_port=${BASH_REMATCH[2]} select_db_sql="SELECT COUNT(*) FROM information_schema.TABLES WHERE table_schema = 'apache_inlong_audit'" - inlong_audit_count=$(mysql -h${datasource_hostname} -P${datasource_port} -u${AUDIT_USERNAME} -p${AUDIT_PASSWORD} -e "${select_db_sql}") + inlong_audit_count=$(mysql -h${datasource_hostname} -P${datasource_port} -u${AUDIT_JDBC_USERNAME} -p${AUDIT_JDBC_PASSWORD} -e "${select_db_sql}") inlong_num=$(echo "$inlong_audit_count" | tr -cd "[0-9]") if [ "${inlong_num}" = 0 ]; then - mysql -h${datasource_hostname} -P${datasource_port} -u${AUDIT_USERNAME} -p${AUDIT_PASSWORD} < sql/apache_inlong_audit_mysql.sql + mysql -h${datasource_hostname} -P${datasource_port} -u${AUDIT_JDBC_USERNAME} -p${AUDIT_JDBC_PASSWORD} < sql/apache_inlong_audit_mysql.sql fi fi diff --git a/inlong-audit/audit-service/pom.xml b/inlong-audit/audit-service/pom.xml index e71c33586b3..2a67f41afc8 100644 --- a/inlong-audit/audit-service/pom.xml +++ b/inlong-audit/audit-service/pom.xml @@ -86,12 +86,6 @@ junit test - - org.apache.inlong - audit-common - ${project.version} - compile -