Skip to content

Commit

Permalink
for #1205, sync cached connections when get connection for concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Sep 23, 2018
1 parent ae60469 commit 945ed17
Showing 1 changed file with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

/**
* Adapter for {@code Connection}.
Expand Down Expand Up @@ -102,19 +103,26 @@ public final List<Connection> getConnections(final ConnectionMode connectionMode
ShardingEventBusInstance.getInstance().post(new GetConnectionStartEvent(dataSourceName));
DataSource dataSource = getDataSourceMap().get(dataSourceName);
Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName);
Collection<Connection> connections = cachedConnections.get(dataSourceName);
Collection<Connection> connections;
synchronized (cachedConnections) {
connections = cachedConnections.get(dataSourceName);
}
List<Connection> result;
if (connections.size() >= connectionSize) {
result = new ArrayList<>(cachedConnections.get(dataSourceName)).subList(0, connectionSize);
result = new ArrayList<>(connections).subList(0, connectionSize);
} else if (!connections.isEmpty()) {
result = new ArrayList<>(connectionSize);
result.addAll(connections);
List<Connection> newConnections = createConnections(connectionMode, dataSource, connectionSize - connections.size());
result.addAll(newConnections);
cachedConnections.putAll(dataSourceName, newConnections);
synchronized (cachedConnections) {
cachedConnections.putAll(dataSourceName, newConnections);
}
} else {
result = new ArrayList<>(createConnections(connectionMode, dataSource, connectionSize));
cachedConnections.putAll(dataSourceName, result);
synchronized (cachedConnections) {
cachedConnections.putAll(dataSourceName, result);
}
}
postGetConnectionEvent(result);
return result;
Expand Down Expand Up @@ -223,7 +231,7 @@ public final void close() throws SQLException {
forceExecuteTemplateForClose.execute(cachedConnections.entries(), new ForceExecuteCallback<Map.Entry<String, Connection>>() {

@Override
public void execute(final Map.Entry<String, Connection> cachedConnectionsEntrySet) throws SQLException {
public void execute(final Entry<String, Connection> cachedConnectionsEntrySet) throws SQLException {
Connection connection = cachedConnectionsEntrySet.getValue();
ShardingEventBusInstance.getInstance().post(
new CloseConnectionStartEvent(cachedConnectionsEntrySet.getKey(), DataSourceMetaDataFactory.newInstance(databaseType, connection.getMetaData().getURL())));
Expand Down

0 comments on commit 945ed17

Please sign in to comment.