Skip to content

Commit

Permalink
[CONJ-259] fixing race condition on prepared statement cache permitti…
Browse files Browse the repository at this point in the history
…ng deallocate statement during use
  • Loading branch information
rusher committed Feb 23, 2016
1 parent 2657f16 commit 048b927
Show file tree
Hide file tree
Showing 10 changed files with 318 additions and 92 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -6,7 +6,7 @@
<artifactId>mariadb-java-client</artifactId>
<packaging>jar</packaging>
<name>mariadb-java-client</name>
<version>1.3.6</version>
<version>1.3.7-SNAPSHOT</version>
<description>JDBC driver for MariaDB and MySQL</description>
<url>https://mariadb.com/kb/en/mariadb/about-mariadb-connector-j/</url>

Expand Down
Expand Up @@ -88,7 +88,6 @@ public MariaDbServerPreparedStatement(MariaDbConnection connection, String sql,
}

private void prepare(String sql) throws SQLException {
checkClose();
try {
lock.lock();
try {
Expand All @@ -100,12 +99,12 @@ private void prepare(String sql) throws SQLException {
} finally {
lock.unlock();
}
parameterCount = prepareResult.parameters.length;
currentParameterHolder = new ParameterHolder[prepareResult.parameters.length];
parameterCount = prepareResult.getParameters().length;
currentParameterHolder = new ParameterHolder[prepareResult.getParameters().length];
returnTableAlias = protocol.getOptions().useOldAliasMetadataBehavior;
metadata = new MariaDbResultSetMetaData(prepareResult.columns,
metadata = new MariaDbResultSetMetaData(prepareResult.getColumns(),
protocol.getDataTypeMappingFlags(), returnTableAlias);
parameterMetaData = new MariaDbParameterMetaData(prepareResult.parameters);
parameterMetaData = new MariaDbParameterMetaData(prepareResult.getParameters());
} catch (QueryException e) {
try {
this.close();
Expand Down Expand Up @@ -350,7 +349,7 @@ public int executeUpdate() throws SQLException {

@Override
public void clearParameters() throws SQLException {
currentParameterHolder = new ParameterHolder[prepareResult.parameters.length];
currentParameterHolder = new ParameterHolder[prepareResult.getParameters().length];
}

@Override
Expand Down Expand Up @@ -406,7 +405,7 @@ public void close() throws SQLException {
cachedResultSets.clear();
if (protocol != null && protocol.isConnected()) {
try {
protocol.releasePrepareStatement(sql, prepareResult.statementId);
protocol.releasePrepareStatement(sql, prepareResult);
} catch (QueryException e) {
//if (log.isDebugEnabled()) log.debug("Error releasing preparedStatement", e);
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/mariadb/jdbc/MariaDbStatement.java
Expand Up @@ -102,7 +102,7 @@ public class MariaDbStatement implements Statement {
private int fetchSize;
private boolean isStreaming = false;
private int maxRows;
protected ReentrantLock lock;
protected final ReentrantLock lock;

public static final Pattern deleteEndSemicolonPattern = Pattern.compile("[;][ ]*$", Pattern.CASE_INSENSITIVE);

Expand Down
Expand Up @@ -7,6 +7,7 @@
import org.mariadb.jdbc.internal.stream.MaxAllowedPacketException;
import org.mariadb.jdbc.internal.util.ExceptionMapper;
import org.mariadb.jdbc.internal.util.PrepareStatementCache;
import org.mariadb.jdbc.internal.util.dao.PrepareStatementCacheKey;
import org.mariadb.jdbc.internal.util.dao.QueryException;
import org.mariadb.jdbc.internal.util.constant.ServerStatus;
import org.mariadb.jdbc.internal.util.buffer.Reader;
Expand Down Expand Up @@ -136,10 +137,12 @@ public static String hexdump(ByteBuffer bb, int offset) {
public PrepareResult prepare(String sql) throws QueryException {
checkClose();
try {
if (urlParser.getOptions().cachePrepStmts && prepareStatementCache.containsKey(sql)) {
PrepareResult pr = prepareStatementCache.get(sql);
pr.addUse();
return pr;
PrepareStatementCacheKey prepareStatementCacheKey = new PrepareStatementCacheKey(database, sql);
if (urlParser.getOptions().cachePrepStmts) {
PrepareResult pr = prepareStatementCache.get(prepareStatementCacheKey);
if (pr != null && pr.incrementShareCounter()) {
return pr;
}
}

SendPrepareStatementPacket sendPrepareStatementPacket = new SendPrepareStatementPacket(sql);
Expand Down Expand Up @@ -180,9 +183,9 @@ public PrepareResult prepare(String sql) throws QueryException {
}
PrepareResult prepareResult = new PrepareResult(statementId, columns, params);
if (urlParser.getOptions().cachePrepStmts && sql != null && sql.length() < urlParser.getOptions().prepStmtCacheSqlLimit) {
prepareStatementCache.putIfNone(sql, prepareResult);
PrepareResult cachedPrepareResult = prepareStatementCache.put(prepareStatementCacheKey, prepareResult);
return cachedPrepareResult != null ? cachedPrepareResult : prepareResult;
}
// if (log.isDebugEnabled()) log.debug("prepare statementId : " + prepareResult.statementId);
return prepareResult;
} else {
throw new QueryException("Unexpected packet returned by server, first byte " + bit);
Expand All @@ -194,23 +197,6 @@ public PrepareResult prepare(String sql) throws QueryException {
}
}

@Override
public void closePreparedStatement(int statementId) throws QueryException {
lock.lock();
try {
writer.startPacket(0);
writer.write(0x19); /*COM_STMT_CLOSE*/
writer.write(statementId);
writer.finishPacket();
} catch (IOException e) {
throw new QueryException(e.getMessage(), -1,
ExceptionMapper.SqlStates.CONNECTION_EXCEPTION.getSqlState(),
e);
} finally {
lock.unlock();
}
}

@Override
public boolean getAutocommit() {
lock.lock();
Expand Down Expand Up @@ -569,11 +555,11 @@ public AbstractQueryResult executePreparedQuery(String sql, ParameterHolder[] pa
//send binary data in a separate stream
for (int i = 0; i < parameterCount; i++) {
if (parameters[i].isLongData()) {
SendPrepareParameterPacket.send(i, (LongDataParameterHolder) parameters[i], prepareResult.statementId, writer);
SendPrepareParameterPacket.send(i, (LongDataParameterHolder) parameters[i], prepareResult.getStatementId(), writer);
}
}
//send execute query
SendExecutePrepareStatementPacket packet = new SendExecutePrepareStatementPacket(prepareResult.statementId, parameters,
SendExecutePrepareStatementPacket packet = new SendExecutePrepareStatementPacket(prepareResult.getStatementId(), parameters,
parameterCount, parameterTypeHeader);
packet.send(writer);

Expand All @@ -599,19 +585,41 @@ public AbstractQueryResult executePreparedQuery(String sql, ParameterHolder[] pa
}
}

/**
* Deallocate prepare statement if not used anymore.
* @param sql sql query
* @param prepareResult allocation result
* @throws QueryException if deallocation failed.
*/
@Override
public void releasePrepareStatement(String sql, int statementId) throws QueryException {
checkClose();
public void releasePrepareStatement(String sql, PrepareResult prepareResult) throws QueryException {
//If prepared cache is enable, the PrepareResult can be shared in many PrepStatement, so synchronised use count indicator will be decrement.
prepareResult.decrementShareCounter();

//deallocate from server only if last use of this prepareResult
if (prepareResult.canBeDeallocate()) {
forceReleasePrepareStatement(prepareResult.getStatementId());

//if prepareResult is in cache, remove it since not used anymore.
if (urlParser.getOptions().cachePrepStmts) {
prepareStatementCache.remove(new PrepareStatementCacheKey(database, sql));
}

}
}

/**
* Force release of prepare statement that are not used.
* This method will be call when adding a new preparestatement in cache, so the packet can be send to server without
* problem.
*
* @param statementId prepared statement Id to remove.
* @throws QueryException if connection exception.
*/
private void forceReleasePrepareStatement(int statementId) throws QueryException {
lock.lock();
try {
if (urlParser.getOptions().cachePrepStmts && prepareStatementCache.containsKey(sql)) {
PrepareResult pr = prepareStatementCache.get(sql);
pr.removeUse();
if (!pr.hasToBeClose()) {
return;
}
prepareStatementCache.remove(sql);
}
checkClose();
final SendClosePrepareStatementPacket packet = new SendClosePrepareStatementPacket(statementId);
try {
packet.send(writer);
Expand Down
Expand Up @@ -70,8 +70,6 @@ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWIS
public interface Protocol {
PrepareResult prepare(String sql) throws QueryException;

void closePreparedStatement(int statementId) throws QueryException;

boolean getAutocommit();

boolean noBackslashEscapes();
Expand Down Expand Up @@ -187,7 +185,7 @@ public interface Protocol {
AbstractQueryResult executePreparedQuery(String sql, ParameterHolder[] parameters, PrepareResult prepareResult, MariaDbType[] parameterTypeHeader,
boolean isStreaming) throws QueryException;

void releasePrepareStatement(String sql, int statementId) throws QueryException;
void releasePrepareStatement(String sql, PrepareResult prepareResult) throws QueryException;

AbstractQueryResult executePreparedQueryAfterFailover(String sql, ParameterHolder[] parameters, PrepareResult oldPrepareResult,
MariaDbType[] parameterTypeHeader, boolean isStreaming) throws QueryException; //used
Expand Down
Expand Up @@ -50,12 +50,13 @@ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWIS
package org.mariadb.jdbc.internal.util;

import org.mariadb.jdbc.internal.util.dao.PrepareResult;

import org.mariadb.jdbc.internal.util.dao.PrepareStatementCacheKey;
import java.util.LinkedHashMap;
import java.util.Map;

public class PrepareStatementCache extends LinkedHashMap<String, PrepareResult> {
private int maxSize;

public final class PrepareStatementCache extends LinkedHashMap<PrepareStatementCacheKey, PrepareResult> {
private final int maxSize;

private PrepareStatementCache(int size) {
super(size, .75f, true);
Expand All @@ -67,21 +68,44 @@ public static PrepareStatementCache newInstance(int size) {
}

/**
* Add prepared statement to cache.
* @param key sql
* @param value prepareResult
* @return PrepareResult to avoid to prepare statement.
* Remove eldestEntry.
* @param eldest eldest entry
* @return true if eldest entry must be removed
*/
public PrepareResult putIfNone(String key, PrepareResult value) {
if (!containsKey(key)) {
put(key, value);
}
return value;
@Override
public boolean removeEldestEntry(Map.Entry eldest) {
return this.size() > maxSize;
}


/**
* Associates the specified value with the specified key in this map.
* If the map previously contained a mapping for the key,
* the existing cached prepared result shared counter will be incremented.
* @param key key
* @param result new prepare result.
* @return the previous value associated with key if not been deallocate, or null if there was no mapping for key.
*/
@Override
protected boolean removeEldestEntry(Map.Entry<String, PrepareResult> eldest) {
return this.size() > maxSize;
public synchronized PrepareResult put(PrepareStatementCacheKey key, PrepareResult result) {
PrepareResult cachedPrepareResult = super.get(key);
//if there is already some cached data (and not been deallocate), return existing cached data
if (cachedPrepareResult != null && cachedPrepareResult.incrementShareCounter()) {
return cachedPrepareResult;
}
//if no cache data, or been deallocate, put new result in cache
super.put(key, result);
return null;
}

@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder("PrepareStatementCache.map[");
for (Map.Entry<PrepareStatementCacheKey, PrepareResult> entry : this.entrySet()) {
stringBuilder.append("\n").append(entry.getKey().getDatabase()).append("-").append(entry.getKey().getQuery())
.append("-").append(entry.getValue().getShareCounter());
}
stringBuilder.append("]");
return stringBuilder.toString();
}
}
@@ -1,10 +1,10 @@
package org.mariadb.jdbc.internal.util.constant;

public final class Version {
public static final String version = "1.3.6";
public static final String version = "1.3.7-SNAPSHOT";
public static final int majorVersion = 1;
public static final int minorVersion = 3;
public static final int patchVersion = 6;
public static final String qualifier = "";
public static final int patchVersion = 7;
public static final String qualifier = "SNAPSHOT";

}
58 changes: 45 additions & 13 deletions src/main/java/org/mariadb/jdbc/internal/util/dao/PrepareResult.java
Expand Up @@ -52,10 +52,13 @@ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWIS
import org.mariadb.jdbc.internal.packet.dao.ColumnInformation;

public class PrepareResult {
public int statementId;
public ColumnInformation[] columns;
public ColumnInformation[] parameters;
private int useTime = 1;
private final int statementId;
private ColumnInformation[] columns;
private ColumnInformation[] parameters;

//share indicator
private volatile int shareCounter = 1;
private volatile boolean isBeingDeallocate;

/**
* PrepareStatement Result object.
Expand All @@ -69,20 +72,49 @@ public PrepareResult(int statementId, ColumnInformation[] columns, ColumnInforma
this.parameters = parameters;
}

public synchronized void addUse() {
useTime++;
/**
* Increment share counter.
* @return true if can be used (is not been deallocate).
*/
public synchronized boolean incrementShareCounter() {
if (isBeingDeallocate) {
return false;
}
shareCounter++;
return true;
}

public synchronized void decrementShareCounter() {
shareCounter--;
}

/**
* Asked if can be deallocate (is not shared in other statement) and set deallocate flag to true if so.
*
* @return true if can be deallocate
*/
public synchronized boolean canBeDeallocate() {
if (shareCounter > 0 || isBeingDeallocate) {
return false;
}
isBeingDeallocate = true;
return true;
}

//for unit test
public synchronized int getShareCounter() {
return shareCounter;
}

public synchronized void removeUse() {
useTime--;
public int getStatementId() {
return statementId;
}

public synchronized boolean hasToBeClose() {
return useTime <= 0;
public ColumnInformation[] getColumns() {
return columns;
}

//for test unit
public synchronized int getUseTime() {
return useTime;
public ColumnInformation[] getParameters() {
return parameters;
}
}

0 comments on commit 048b927

Please sign in to comment.