Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
/*
* Copyright (c) 2023 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.oceanbase.odc.core.datasource;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.oceanbase.odc.core.shared.exception.OverLimitException;

import lombok.extern.slf4j.Slf4j;

/**
* Connection count manager for tracking database connections by url+username
*/
@Slf4j
public class ConnectionCountManager {

private static volatile ConnectionCountManager instance;
private final Map<String, AtomicInteger> connectionCountMap = new ConcurrentHashMap<>();
private volatile long maxConnectionCount = -1;

private ConnectionCountManager() {}

public static ConnectionCountManager getInstance() {
if (instance == null) {
synchronized (ConnectionCountManager.class) {
if (instance == null) {
instance = new ConnectionCountManager();
}
}
}
return instance;
}

/**
* Initialize the max connection count limit
*
* @param maxConnectionCount max connection count, -1 means no limit
*/
public void setMaxConnectionCount(long maxConnectionCount) {
this.maxConnectionCount = maxConnectionCount;
log.info("Set max connection count to {}", maxConnectionCount);
}

/**
* Get the max connection count limit
*
* @return max connection count, -1 means no limit
*/
public long getMaxConnectionCount() {
return maxConnectionCount;
}

/**
* Generate key from url and username
* Extracts host:port from JDBC URL and combines with username
*
* @param url database url (JDBC URL format)
* @param username database username
* @return key string in format "host:port:username"
*/
public static String generateKey(String url, String username) {
if (url == null) {
url = "";
}
if (username == null) {
username = "";
}
String hostPort = extractHostAndPort(url);
return hostPort + ":" + username;
}

/**
* Extract host:port from JDBC URL
* Supports formats:
* - jdbc:mysql://host:port/database?params
* - jdbc:oceanbase://host:port/database?params
* - jdbc:postgresql://host:port/database?params
* - jdbc:oracle:thin:@host:port:database
* - jdbc:oracle:thin:@//host:port/database
*
* @param jdbcUrl JDBC URL
* @return host:port string, or original url if parsing fails
*/
private static String extractHostAndPort(String jdbcUrl) {
if (jdbcUrl == null || jdbcUrl.isEmpty()) {
return "";
}

// Pattern for MySQL/OceanBase/PostgreSQL: jdbc:type://host:port/...
Pattern mysqlPattern = Pattern.compile("jdbc:(mysql|oceanbase|postgresql)://([^/:]+)(?::([0-9]+))?");
Matcher mysqlMatcher = mysqlPattern.matcher(jdbcUrl);
if (mysqlMatcher.find()) {
String host = mysqlMatcher.group(2);
String port = mysqlMatcher.group(3);
if (port != null && !port.isEmpty()) {
return host + ":" + port;
} else {
// Use default port based on database type
String dbType = mysqlMatcher.group(1);
int defaultPort = getDefaultPort(dbType);
return host + ":" + defaultPort;
}
}

// Pattern for Oracle SID format: jdbc:oracle:thin:@host:port:database
Pattern oracleSidPattern = Pattern.compile("jdbc:oracle:thin:@([^:/]+):([0-9]+)");
Matcher oracleSidMatcher = oracleSidPattern.matcher(jdbcUrl);
if (oracleSidMatcher.find()) {
return oracleSidMatcher.group(1) + ":" + oracleSidMatcher.group(2);
}

// Pattern for Oracle Service Name format: jdbc:oracle:thin:@//host:port/database
Pattern oracleServicePattern = Pattern.compile("jdbc:oracle:thin:@//([^:/]+):([0-9]+)");
Matcher oracleServiceMatcher = oracleServicePattern.matcher(jdbcUrl);
if (oracleServiceMatcher.find()) {
return oracleServiceMatcher.group(1) + ":" + oracleServiceMatcher.group(2);
}

// If no pattern matches, log warning and return original URL
log.warn("Failed to extract host:port from JDBC URL: {}, using original URL as key", jdbcUrl);
return jdbcUrl;
}

/**
* Get default port for database type
*
* @param dbType database type (mysql, oceanbase, postgresql, etc.)
* @return default port number
*/
private static int getDefaultPort(String dbType) {
if (dbType == null) {
return 3306; // Default to MySQL port
}
switch (dbType.toLowerCase()) {
case "mysql":
return 3306;
case "oceanbase":
return 2883;
case "postgresql":
return 5432;
default:
return 3306; // Default to MySQL port
}
}

/**
* Increment connection count for the given key
*
* @param key connection key (url+username)
* @return current count after increment
* @throws OverLimitException if connection count exceeds limit
*/
public int incrementConnectionCount(String key) {
if (maxConnectionCount > 0) {
AtomicInteger count = connectionCountMap.computeIfAbsent(key, k -> new AtomicInteger(0));
int currentCount = count.incrementAndGet();
log.debug("Increment connection count, key={}, currentCount={}, maxCount={}", key, currentCount,
maxConnectionCount);
if (currentCount > maxConnectionCount) {
count.decrementAndGet();
String message = String.format("数据库连接数超限, 当前值=%d, 最大值=%d",
currentCount, maxConnectionCount);
throw new IllegalStateException(String.format(message, maxConnectionCount));
}
return currentCount;
} else {
// No limit, just increment
AtomicInteger count = connectionCountMap.computeIfAbsent(key, k -> new AtomicInteger(0));
int currentCount = count.incrementAndGet();
log.debug("Increment connection count (no limit), key={}, currentCount={}", key, currentCount);
return currentCount;
}
}

/**
* Decrement connection count for the given key
*
* @param key connection key (url+username)
* @return current count after decrement
*/
public int decrementConnectionCount(String key) {
AtomicInteger count = connectionCountMap.get(key);
if (count == null) {
log.warn("Attempt to decrement connection count for non-existent key: {}", key);
return 0;
}
int currentCount = count.decrementAndGet();
log.debug("Decrement connection count, key={}, currentCount={}", key, currentCount);
if (currentCount <= 0) {
connectionCountMap.remove(key);
log.debug("Removed connection count entry for key: {}", key);
}
return currentCount;
}

/**
* Get current connection count for the given key
*
* @param key connection key (url+username)
* @return current count
*/
public int getConnectionCount(String key) {
AtomicInteger count = connectionCountMap.get(key);
return count == null ? 0 : count.get();
}

/**
* Clear all connection counts (mainly for testing)
*/
public void clear() {
connectionCountMap.clear();
log.info("Cleared all connection counts");
}

/**
* Get all connection counts (mainly for monitoring)
*
* @return snapshot of connection count map
*/
public Map<String, Integer> getAllConnectionCounts() {
Map<String, Integer> snapshot = new ConcurrentHashMap<>();
connectionCountMap.forEach((key, count) -> snapshot.put(key, count.get()));
return snapshot;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public class SingleConnectionDataSource extends BaseClassBasedDataSource impleme
private ScheduledExecutorService keepAliveScheduler;
@Setter
private long timeOutMillis = 10 * 1000;
private String connectionKey;
private static final ConnectionCountManager connectionCountManager = ConnectionCountManager.getInstance();

public SingleConnectionDataSource() {
this(false, false);
Expand Down Expand Up @@ -189,6 +191,16 @@ private void closeConnection() {
log.error("Failed to close the connection", throwable);
}
}
// Decrement connection count when connection is closed
if (this.connectionKey != null) {
try {
connectionCountManager.decrementConnectionCount(this.connectionKey);
log.info("Decremented connection count, key={}", this.connectionKey);
} catch (Exception e) {
log.warn("Failed to decrement connection count", e);
}
this.connectionKey = null;
}
}

private boolean tryLock(Lock lock) {
Expand Down Expand Up @@ -262,13 +274,26 @@ private synchronized Connection innerCreateConnection() throws SQLException {
throw new IllegalStateException("Connection is not null");
}
try {
log.info("Incremented connection count, key={}", this.connectionKey);
Connection connection = newConnectionFromDriver(getUsername(), getPassword());
// Generate connection key and check/increment connection count
this.connectionKey = ConnectionCountManager.generateKey(getUrl(), getUsername());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的url指的是啥,username是数据库连接用户名称吗

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

url是数据库连接串,username是数据库连接用户名

connectionCountManager.incrementConnectionCount(this.connectionKey);
prepareConnection(connection);
this.connection = connection;
this.lock = new ReentrantLock();
log.info("Established shared JDBC Connection, lock={}", this.lock.hashCode());
return getConnectionProxy(this.connection, this.lock);
} catch (Throwable e) {
// If connection creation fails, decrement the count
if (this.connectionKey != null) {
try {
connectionCountManager.decrementConnectionCount(this.connectionKey);
log.info("Decremented connection count due to creation failure, key={}", this.connectionKey);
} catch (Exception ex) {
log.warn("Failed to decrement connection count after creation failure", ex);
}
}
publishEvent(new GetConnectionFailedEvent(Optional.ofNullable(connection)));
throw new SQLException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public enum LimitMetric implements Translatable {
TRANSACTION_QUERY_LIMIT,
SESSION_COUNT,
USER_COUNT,
USER_DATASOURCE_SESSION_COUNT,
DATASOURCE_CONNECTION_COUNT,
EXPORT_OBJECT_COUNT,
TABLE_NAME_LENGTH,
WORKSHEET_CHANGE_COUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ com.oceanbase.odc.LimitMetric.FILE_COUNT=file count
com.oceanbase.odc.LimitMetric.TRANSACTION_QUERY_LIMIT=query limit
com.oceanbase.odc.LimitMetric.SESSION_COUNT=session count
com.oceanbase.odc.LimitMetric.USER_COUNT=user count
com.oceanbase.odc.LimitMetric.USER_DATASOURCE_SESSION_COUNT=user datasource session count
com.oceanbase.odc.LimitMetric.DATASOURCE_CONNECTION_COUNT=datasource connection count
com.oceanbase.odc.LimitMetric.EXPORT_OBJECT_COUNT=export object count
com.oceanbase.odc.LimitMetric.TABLE_NAME_LENGTH=table name length
com.oceanbase.odc.LimitMetric.WORKSHEET_CHANGE_COUNT=The number of changed worksheets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ com.oceanbase.odc.LimitMetric.FILE_COUNT=文件数量
com.oceanbase.odc.LimitMetric.TRANSACTION_QUERY_LIMIT=查询结果集大小
com.oceanbase.odc.LimitMetric.SESSION_COUNT=数据库 SESSION 数量
com.oceanbase.odc.LimitMetric.USER_COUNT=数据库连接用户数
com.oceanbase.odc.LimitMetric.USER_DATASOURCE_SESSION_COUNT=用户对数据源的连接数
com.oceanbase.odc.LimitMetric.DATASOURCE_CONNECTION_COUNT=数据源连接数
com.oceanbase.odc.LimitMetric.EXPORT_OBJECT_COUNT=导出对象数量
com.oceanbase.odc.LimitMetric.TABLE_NAME_LENGTH=表名长度
com.oceanbase.odc.LimitMetric.WORKSHEET_CHANGE_COUNT=变更的工作簿数量
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ com.oceanbase.odc.LimitMetric.FILE_COUNT=文件數量
com.oceanbase.odc.LimitMetric.TRANSACTION_QUERY_LIMIT=查詢結果集大小
com.oceanbase.odc.LimitMetric.SESSION_COUNT=數據庫 SESSION 數量
com.oceanbase.odc.LimitMetric.USER_COUNT=數據庫連接用戶數
com.oceanbase.odc.LimitMetric.USER_DATASOURCE_SESSION_COUNT=用戶對數據源的連接數
com.oceanbase.odc.LimitMetric.DATASOURCE_CONNECTION_COUNT=數據源連接數
com.oceanbase.odc.LimitMetric.EXPORT_OBJECT_COUNT=導出對象數量
com.oceanbase.odc.LimitMetric.TABLE_NAME_LENGTH=表名長度
com.oceanbase.odc.LimitMetric.WORKSHEET_CHANGE_COUNT=变更的工作簿数量
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,21 @@ public ListResponse<PartitionPlanPreViewResp> preview(@PathVariable String sessi
SidUtils.getSessionId(sessionId), req.getTableConfigs(), req.isOnlyForPartitionName()));
}

/**
* 删除并释放某个用户某个数据源下的所有数据库连接,当 dataSourceId 为空时关闭该用户的所有连接
*
* @param userId 用户ID
* @param dataSourceId 数据源ID,可为空
* @return 关闭的会话数量
*/
@ApiOperation(value = "closeUserDatasourceSessions", notes = "删除并释放指定用户指定数据源下的数据库连接;若未指定数据源则关闭该用户的全部连接")
@RequestMapping(value = {"/users/{userId:[\\d]+}/datasources/sessions",
"/users/{userId:[\\d]+}/datasources/{dataSourceId:[\\d]+}/sessions"}, method = RequestMethod.DELETE)
public SuccessResponse<Integer> closeUserDatasourceSessions(
@PathVariable Long userId,
@PathVariable(value = "dataSourceId", required = false) Long dataSourceId) {
int closedCount = sessionService.closeUserDatasourceSessions(userId, dataSourceId);
return Responses.success(closedCount);
}

}
5 changes: 5 additions & 0 deletions server/odc-server/src/main/resources/data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,8 @@ INSERT INTO config_system_configuration(`key`, `value`, `description`) VALUES('o
-- 连接管理
INSERT INTO config_system_configuration(`key`, `value`, `description`) VALUES('odc.connect.temp.expire-after-inactive-interval-seconds',
'86400', '临时连接不活跃之后的保留周期,单位:秒,默认值 86400') ON DUPLICATE KEY UPDATE `id`=`id`;
INSERT INTO config_system_configuration(`key`, `value`, `description`) VALUES('odc.connect.datasource.max-connection-count',
'-1', '单个数据源(url+username)的最大连接数,默认 -1,表示不限制') ON DUPLICATE KEY UPDATE `id`=`id`;
INSERT INTO config_system_configuration(`key`, `value`, `description`) VALUES('odc.connect.temp.expire-check-interval-millis',
'600000', '临时连接配置清理检查周期,单位:毫秒,默认值 600000 表示 10 分钟') ON DUPLICATE KEY UPDATE `id`=`id`;
update config_system_configuration set `value`='120000',`description`=
Expand Down Expand Up @@ -523,6 +525,9 @@ INSERT INTO config_system_configuration(`key`, `value`, `description`) VALUES('o
INSERT INTO config_system_configuration(`key`, `value`, `description`) VALUES('odc.security.file.upload.safe-suffix-list',
'*', '允许上传的文件名扩展名,默认 *,表示允许所有文件扩展名') ON DUPLICATE KEY UPDATE `id`=`id`;

INSERT INTO config_system_configuration(`key`, `value`, `description`) VALUES('odc.session.sql-execute.user-datasource-max-count',
'-1', '用户对单个数据源的最大连接数,默认 -1,表示不限制') ON DUPLICATE KEY UPDATE `id`=`id`;

--
-- cloud object-storage
--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public class ConnectProperties {
@Value("${odc.connect.persistent-connection-operations:create,delete,update,read}")
private Set<String> persistentConnectionOperations = new HashSet<>();

/**
* 单个数据源(url+username)的最大连接数,默认 -1,表示不限制
*/
@Value("${odc.connect.datasource.max-connection-count:-1}")
private long datasourceMaxConnectionCount = -1;

public Set<String> getConnectionSupportedOperations(boolean temp, Set<String> permittedActions) {
// temp connection can only be private connection, skip permittedActions heere
if (temp) {
Expand Down
Loading