Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 5 additions & 22 deletions inlong-audit/audit-docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,11 @@ ENV TUBE_AUDIT_TOPIC="inlong-audit"
ENV AUDIT_DBNAME="apache_inlong_audit"
# proxy/store/all, start audit module individually, or all
ENV START_MODE="all"
# mysql / clickhouse / elasticsearch / starrocks
ENV STORE_MODE=mysql
# mysql
ENV JDBC_URL=127.0.0.1:3306
ENV USERNAME=root
ENV PASSWORD=inlong
# clickhouse
ENV STORE_CK_URL=127.0.0.1:8123
ENV STORE_CK_USERNAME=default
ENV STORE_CK_PASSWD=default
ENV STORE_CK_DBNAME="apache_inlong_audit"
# elasticsearch
ENV STORE_ES_HOST=127.0.0.1
ENV STORE_ES_PORT=9200
ENV STORE_ES_AUTHENABLE=false
ENV STORE_ES_USERNAME=elastic
ENV STORE_ES_PASSWD=inlong
# starrocks
ENV STORE_SR_URL=127.0.0.1:9030
ENV STORE_SR_USERNAME=default
ENV STORE_SR_PASSWD=default
ENV STORE_SR_DBNAME="apache_inlong_audit"
# MySQL / StarRocks
ENV AUDIT_JDBC_URL=127.0.0.1:3306
ENV AUDIT_JDBC_USERNAME=root
ENV AUDIT_JDBC_PASSWORD=inlong

# jvm
ENV AUDIT_JVM_HEAP_OPTS="-XX:+UseContainerSupport -XX:InitialRAMPercentage=40.0 -XX:MaxRAMPercentage=80.0 -XX:-UseAdaptiveSizePolicy"
WORKDIR /opt/inlong-audit
Expand Down
61 changes: 21 additions & 40 deletions inlong-audit/audit-docker/audit-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@
#

file_path=$(cd "$(dirname "$0")"/../;pwd)
# store config
store_conf_file=${file_path}/conf/application.properties

#SQL file
sql_mysql_file="${file_path}"/sql/apache_inlong_audit_mysql.sql

# proxy config
proxy_conf_file=${file_path}/conf/audit-proxy-${MQ_TYPE}.conf
sql_mysql_file="${file_path}"/sql/apache_inlong_audit_mysql.sql
sql_ck_file="${file_path}"/sql/apache_inlong_audit_clickhouse.sql
sql_sr_file="${file_path}"/sql/apache_inlong_audit_starrocks.sql

# store config
store_conf_file=${file_path}/conf/application.properties

# audit-service config
service_conf_file=${file_path}/conf/audit-service.properties

# replace the configuration for audit proxy
# replace the configuration for audit-proxy
sed -i "s/manager.hosts=.*$/manager.hosts=${MANAGER_OPENAPI_IP}:${MANAGER_OPENAPI_PORT}/g" "${store_conf_file}"
sed -i "s/proxy.cluster.tag=.*$/proxy.cluster.tag=${CLUSTER_TAG}/g" "${store_conf_file}"
if [ "${MQ_TYPE}" = "pulsar" ]; then
Expand All @@ -47,51 +49,29 @@ if [ "${MQ_TYPE}" = "tubemq" ]; then
sed -i "s/agent1.sinks.tube-sink-msg2.topic = .*$/agent1.sinks.tube-sink-msg2.topic = ${TUBE_AUDIT_TOPIC}/g" "${proxy_conf_file}"
fi

# replace the configuration for audit store
if [ -n "${STORE_MODE}" ]; then
sed -i "s/audit.config.store.mode=.*$/audit.config.store.mode=${STORE_MODE}/g" "${store_conf_file}"
fi
# DB
sed -i "s/127.0.0.1:3306\/apache_inlong_audit/${JDBC_URL}\/${AUDIT_DBNAME}/g" "${store_conf_file}"
sed -i "s/spring.datasource.druid.username=.*$/spring.datasource.druid.username=${USERNAME}/g" "${store_conf_file}"
sed -i "s/spring.datasource.druid.password=.*$/spring.datasource.druid.password=${PASSWORD}/g" "${store_conf_file}"
# mysql file for audit
# replace the audit db name for audit sql file
sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_mysql_file}"
# clickhouse
sed -i "s/clickhouse.url=.*$/clickhouse.url=jdbc:clickhouse:\/\/${STORE_CK_URL}\/${STORE_CK_DBNAME}/g" "${store_conf_file}"
sed -i "s/clickhouse.username=.*$/clickhouse.username=${STORE_CK_USERNAME}/g" "${store_conf_file}"
sed -i "s/clickhouse.password=.*$/clickhouse.password=${STORE_CK_PASSWD}/g" "${store_conf_file}"
# mysql file for clickhouse
sed -i "s/apache_inlong_audit/${STORE_CK_DBNAME}/g" "${sql_ck_file}"
# elasticsearch
sed -i "s/elasticsearch.host=.*$/elasticsearch.host=${STORE_ES_HOST}/g" "${store_conf_file}"
sed -i "s/elasticsearch.port=.*$/elasticsearch.port=${STORE_ES_PORT}/g" "${store_conf_file}"
sed -i "s/elasticsearch.authEnable=.*$/elasticsearch.authEnable=${STORE_ES_AUTHENABLE}/g" "${store_conf_file}"
sed -i "s/elasticsearch.username=.*$/elasticsearch.username=${STORE_ES_USERNAME}/g" "${store_conf_file}"
sed -i "s/elasticsearch.password=.*$/elasticsearch.password=${STORE_ES_PASSWD}/g" "${store_conf_file}"

# StarRocks SQL file for audit
sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_sr_file}"
# StarRocks
sed -i "s/jdbc.url=.*$/jdbc.url=jdbc:mysql:\/\/${STORE_SR_URL}\/${STORE_SR_DBNAME}/g" "${store_conf_file}"
sed -i "s/jdbc.username=.*$/jdbc.username=${STORE_SR_USERNAME}/g" "${store_conf_file}"
sed -i "s/jdbc.password=.*$/jdbc.password=${STORE_SR_PASSWD}/g" "${store_conf_file}"
# replace the configuration for audit-store
sed -i "s/127.0.0.1:3306\/apache_inlong_audit/${AUDIT_JDBC_URL}\/${AUDIT_DBNAME}/g" "${store_conf_file}"
sed -i "s/jdbc.username=.*$/jdbc.username=${AUDIT_JDBC_USERNAME}/g" "${store_conf_file}"
sed -i "s/jdbc.password=.*$/jdbc.password=${AUDIT_JDBC_PASSWORD}/g" "${store_conf_file}"

# audit-service config
sed -i "s/mysql.jdbc.url=.*$/mysql.jdbc.url=jdbc:mysql:\/\/${JDBC_URL}\/${AUDIT_DBNAME}/g" "${service_conf_file}"
sed -i "s/mysql.jdbc.username=.*$/mysql.jdbc.username=${USERNAME}/g" "${service_conf_file}"
sed -i "s/mysql.jdbc.password=.*$/mysql.jdbc.password=${PASSWORD}/g" "${service_conf_file}"
# replace the configuration for audit-service
sed -i "s/mysql.jdbc.url=.*$/mysql.jdbc.url=jdbc:mysql:\/\/${AUDIT_JDBC_URL}\/${AUDIT_DBNAME}/g" "${service_conf_file}"
sed -i "s/mysql.jdbc.username=.*$/mysql.jdbc.username=${AUDIT_JDBC_USERNAME}/g" "${service_conf_file}"
sed -i "s/mysql.jdbc.password=.*$/mysql.jdbc.password=${AUDIT_JDBC_PASSWORD}/g" "${service_conf_file}"

# Whether the database table exists. If it does not exist, initialize the database and skip if it exists.
if [[ "${JDBC_URL}" =~ (.+):([0-9]+) ]]; then
if [[ "${AUDIT_JDBC_URL}" =~ (.+):([0-9]+) ]]; then
datasource_hostname=${BASH_REMATCH[1]}
datasource_port=${BASH_REMATCH[2]}

select_db_sql="SELECT COUNT(*) FROM information_schema.TABLES WHERE table_schema = 'apache_inlong_audit'"
inlong_audit_count=$(mysql -h${datasource_hostname} -P${datasource_port} -u${USERNAME} -p${PASSWORD} -e "${select_db_sql}")
inlong_audit_count=$(mysql -h${datasource_hostname} -P${datasource_port} -u${AUDIT_JDBC_USERNAME} -p${AUDIT_JDBC_PASSWORD} -e "${select_db_sql}")
inlong_num=$(echo "$inlong_audit_count" | tr -cd "[0-9]")
if [ "${inlong_num}" = 0 ]; then
mysql -h${datasource_hostname} -P${datasource_port} -u${USERNAME} -p${PASSWORD} < sql/apache_inlong_audit_mysql.sql
mysql -h${datasource_hostname} -P${datasource_port} -u${AUDIT_JDBC_USERNAME} -p${AUDIT_JDBC_PASSWORD} < sql/apache_inlong_audit_mysql.sql
fi
fi

Expand All @@ -108,6 +88,7 @@ if [ "${START_MODE}" = "all" ] || [ "${START_MODE}" = "proxy" ]; then
bash +x ./bin/proxy-start.sh tubemq
fi
fi

# start store
if [ "${START_MODE}" = "all" ] || [ "${START_MODE}" = "store" ]; then
bash +x ./bin/store-start.sh
Expand Down
6 changes: 0 additions & 6 deletions inlong-audit/audit-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,6 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>audit-common</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<resources>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_DETECT_INTERVAL_MS;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS;
Expand All @@ -48,6 +51,8 @@
import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_IDLE_CONNECTIONS;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MIN_IDLE_CONNECTIONS;
import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_IDS_SQL;
import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_IPS_SQL;
import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_MINUTE_SQL;
Expand All @@ -68,8 +73,12 @@ public class RealTimeQuery {
private final String queryLogTsSql;
private final String queryIdsByIpSql;
private final String queryReportIpsSql;
private final ExecutorService executor =
Executors.newFixedThreadPool(
Configuration.getInstance().get(KEY_API_THREAD_POOL_SIZE, DEFAULT_API_THREAD_POOL_SIZE));

private RealTimeQuery() {

List<JdbcConfig> jdbcConfigList = ConfigService.getInstance().getAllAuditSource();
for (JdbcConfig jdbcConfig : jdbcConfigList) {
BasicDataSource dataSource = new BasicDataSource();
Expand Down Expand Up @@ -126,7 +135,7 @@ public static RealTimeQuery getInstance() {
public List<StatData> queryLogTs(String startTime, String endTime, String inlongGroupId,
String inlongStreamId, String auditId) {
long currentTime = System.currentTimeMillis();
List<StatData> statDataList = new LinkedList<>();
List<StatData> statDataList = new CopyOnWriteArrayList<>();
if (dataSourceList.isEmpty()) {
return statDataList;
}
Expand All @@ -136,11 +145,11 @@ public List<StatData> queryLogTs(String startTime, String endTime, String inlong
List<StatData> statDataListTemp =
doQueryLogTs(dataSource, startTime, endTime, inlongGroupId, inlongStreamId, auditId);
statDataList.addAll(statDataListTemp);
});
}, executor);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
LOGGER.info("Query log ts by params: {} {} {} {} {}, cost {} ms", startTime, endTime, inlongGroupId,
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
LOGGER.info("Query log ts by params: {} {} {} {} {}, total cost {} ms", startTime, endTime, inlongGroupId,
inlongStreamId, auditId, System.currentTimeMillis() - currentTime);
return filterMaxAuditVersion(statDataList);
}
Expand All @@ -165,8 +174,7 @@ public List<StatData> filterMaxAuditVersion(List<StatData> allStatData) {
for (Map.Entry<String, List<StatData>> entry : allData.entrySet()) {
long maxAuditVersion = Long.MIN_VALUE;
for (StatData maxData : entry.getValue()) {
maxAuditVersion =
maxData.getAuditVersion() > maxAuditVersion ? maxData.getAuditVersion() : maxAuditVersion;
maxAuditVersion = Math.max(maxData.getAuditVersion(), maxAuditVersion);
}
for (StatData statData : entry.getValue()) {
if (statData.getAuditVersion() == maxAuditVersion) {
Expand All @@ -191,6 +199,7 @@ public List<StatData> filterMaxAuditVersion(List<StatData> allStatData) {
*/
private List<StatData> doQueryLogTs(DataSource dataSource, String startTime, String endTime, String inlongGroupId,
String inlongStreamId, String auditId) {
long currentTime = System.currentTimeMillis();
List<StatData> result = new LinkedList<>();
try (Connection connection = dataSource.getConnection();
PreparedStatement pstat = connection.prepareStatement(queryLogTsSql)) {
Expand Down Expand Up @@ -219,6 +228,8 @@ private List<StatData> doQueryLogTs(DataSource dataSource, String startTime, Str
} catch (Exception exception) {
LOGGER.error("Query log time has exception!, datasource={} ", dataSource, exception);
}
LOGGER.info("Query log ts by params: {} {} {} {} {}, cost {} ms", startTime, endTime, inlongGroupId,
inlongStreamId, auditId, System.currentTimeMillis() - currentTime);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class OpenApiConstants {
public static final int DEFAULT_API_CACHE_MAX_SIZE = 50000000;

public static final String KEY_API_CACHE_EXPIRED_HOURS = "api.cache.expired.hours";
public static final int DEFAULT_API_CACHE_EXPIRED_HOURS = 12;
public static final int DEFAULT_API_CACHE_EXPIRED_HOURS = 18;

// Http config
public static final String PARAMS_START_TIME = "startTime";
Expand Down

This file was deleted.

This file was deleted.

Loading