Skip to content

Commit

Permalink
Replace synchronized blocks with a ReentrantLock. The synchronized bl…
Browse files Browse the repository at this point in the history
…ocks were removed and replaced by lock.lock() and lock.unlock(). This avoids that the carrier thread (OS thread) is pinned when running on virtual threads which were introduced as a preview feature in JDK 19.

The code inside the replaced synchronized blocks in ConnectionProxy performed IO, pinning the carrier thread for at least the duration of the IO operation.

https://openjdk.org/jeps/425
`There are two scenarios in which a virtual thread cannot be unmounted during blocking operations because it is pinned to its carrier:

When it executes code inside a synchronized block or method, or
When it executes a native method or a foreign function.`
  • Loading branch information
janickr committed Jan 15, 2023
1 parent 8fcccc7 commit b834c20
Showing 1 changed file with 53 additions and 35 deletions.
88 changes: 53 additions & 35 deletions src/main/user-impl/java/com/mysql/cj/jdbc/ha/ConnectionProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

/**
Expand All @@ -75,6 +76,8 @@ public class ConnectionProxy implements ICurrentConnectionProvider, InvocationHa
private HostInfo currentHostInfo;
private JdbcConnection currentConnection;

private final ReentrantLock lock = new ReentrantLock();

public ConnectionProxy(ConnectionUrl connectionUrl) throws SQLException {
this(connectionUrl, null);
}
Expand Down Expand Up @@ -179,32 +182,37 @@ public void setCurrentConnection(JdbcConnection connection, HostInfo info) {
}

@Override
public synchronized Object invoke(Object proxy, Method method, Object[] args)
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
final String methodName = method.getName();
lock.lock();
try {
final String methodName = method.getName();

if (isDirectExecute(methodName)) {
return executeMethodDirectly(methodName, args);
}
if (isDirectExecute(methodName)) {
return executeMethodDirectly(methodName, args);
}

Object[] argsCopy = args == null ? null : Arrays.copyOf(args, args.length);
Object[] argsCopy = args == null ? null : Arrays.copyOf(args, args.length);

try {
Object result = this.pluginManager.execute(
this.currentConnection.getClass(),
methodName,
() -> method.invoke(currentConnection, args),
argsCopy);
return proxyIfReturnTypeIsJdbcInterface(method.getReturnType(), result);
} catch (Exception e) {
// Check if the captured exception must be wrapped by an unchecked exception.
Class<?>[] declaredExceptions = method.getExceptionTypes();
for (Class<?> declaredException : declaredExceptions) {
if (declaredException.isAssignableFrom(e.getClass())) {
throw e;
try {
Object result = this.pluginManager.execute(
this.currentConnection.getClass(),
methodName,
() -> method.invoke(currentConnection, args),
argsCopy);
return proxyIfReturnTypeIsJdbcInterface(method.getReturnType(), result);
} catch (Exception e) {
// Check if the captured exception must be wrapped by an unchecked exception.
Class<?>[] declaredExceptions = method.getExceptionTypes();
for (Class<?> declaredException : declaredExceptions) {
if (declaredException.isAssignableFrom(e.getClass())) {
throw e;
}
}
throw new IllegalStateException(e.getMessage(), e);
}
throw new IllegalStateException(e.getMessage(), e);
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -303,6 +311,8 @@ private boolean isDirectExecute(String methodName) {
class JdbcInterfaceProxy implements InvocationHandler {
Object invokeOn;

private final ReentrantLock lock = new ReentrantLock();

JdbcInterfaceProxy(Object toInvokeOn) {
this.invokeOn = toInvokeOn;
}
Expand All @@ -329,22 +339,30 @@ private Object executeMethodDirectly(String methodName, Object[] args) {
return null;
}

public synchronized Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
final String methodName = method.getName();
if (isDirectExecute(methodName)) {
return executeMethodDirectly(methodName, args);
}

Object[] argsCopy = args == null ? null : Arrays.copyOf(args, args.length);
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
lock.lock();
try {
final String methodName = method.getName();
if (isDirectExecute(methodName)) {
return executeMethodDirectly(methodName, args);
}

synchronized(ConnectionProxy.this) {
Object result =
ConnectionProxy.this.pluginManager.execute(
this.invokeOn.getClass(),
methodName,
() -> method.invoke(this.invokeOn, args),
argsCopy);
return proxyIfReturnTypeIsJdbcInterface(method.getReturnType(), result);
Object[] argsCopy = args == null ? null : Arrays.copyOf(args, args.length);

ConnectionProxy.this.lock.lock();
try {
Object result =
ConnectionProxy.this.pluginManager.execute(
this.invokeOn.getClass(),
methodName,
() -> method.invoke(this.invokeOn, args),
argsCopy);
return proxyIfReturnTypeIsJdbcInterface(method.getReturnType(), result);
} finally {
ConnectionProxy.this.lock.unlock();
}
} finally {
lock.unlock();
}
}
}
Expand Down

0 comments on commit b834c20

Please sign in to comment.