Skip to content

Commit

Permalink
initialize useUnfairLock with true and do not change it at setMaxWait
Browse files Browse the repository at this point in the history
  • Loading branch information
zrlw authored and lizongbo committed Jan 22, 2024
1 parent 74e0520 commit 6ec6c00
Show file tree
Hide file tree
Showing 8 changed files with 355 additions and 98 deletions.
1 change: 0 additions & 1 deletion core/pom.xml
Expand Up @@ -17,7 +17,6 @@
<properties>
<spring.version>4.3.20.RELEASE</spring.version>
<junit.version>4.13.1</junit.version>
<jmh.version>1.19</jmh.version>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jdk.version>1.8</jdk.version>
Expand Down
Expand Up @@ -254,7 +254,7 @@ public abstract class DruidAbstractDataSource extends WrapperAdapter implements
static final AtomicLongFieldUpdater<DruidAbstractDataSource> destroyCountUpdater = AtomicLongFieldUpdater.newUpdater(DruidAbstractDataSource.class, "destroyCount");
static final AtomicLongFieldUpdater<DruidAbstractDataSource> createStartNanosUpdater = AtomicLongFieldUpdater.newUpdater(DruidAbstractDataSource.class, "createStartNanos");

private Boolean useUnfairLock;
private Boolean useUnfairLock = true;
private boolean useLocalSessionState = true;
private boolean keepConnectionUnderlyingTransactionIsolation;

Expand Down Expand Up @@ -368,7 +368,7 @@ public boolean isUseUnfairLock() {
}

public void setUseUnfairLock(boolean useUnfairLock) {
if (lock.isFair() == !useUnfairLock && this.useUnfairLock != null) {
if (lock.isFair() == !useUnfairLock) {
return;
}

Expand Down Expand Up @@ -1087,20 +1087,6 @@ public void setMaxWait(long maxWaitMillis) {
return;
}

if (maxWaitMillis > 0 && useUnfairLock == null && !this.inited) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if ((!this.inited) && (!lock.isFair())) {
this.lock = new ReentrantLock(true);
this.notEmpty = this.lock.newCondition();
this.empty = this.lock.newCondition();
}
} finally {
lock.unlock();
}
}

if (inited) {
LOG.error("maxWait changed : " + this.maxWait + " -> " + maxWaitMillis);
}
Expand Down
111 changes: 37 additions & 74 deletions core/src/main/java/com/alibaba/druid/pool/DruidDataSource.java
Expand Up @@ -1688,12 +1688,12 @@ private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLExce
throw new DataSourceDisableException();
}

final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
final int maxWaitThreadCount = this.maxWaitThreadCount;

DruidConnectionHolder holder;

long startTime = System.currentTimeMillis(); //进入循环等待之前,先记录开始尝试获取连接的时间
final long expiredTime = startTime + maxWait;
for (boolean createDirect = false; ; ) {
if (createDirect) {
try {
Expand Down Expand Up @@ -1803,21 +1803,20 @@ private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLExce
}

if (maxWait > 0) {
long maxWaitNanos = nanos - TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - startTime);
if (maxWaitNanos > 0) {
holder = pollLast(maxWaitNanos);
if (System.currentTimeMillis() < expiredTime) {
holder = pollLast(startTime, expiredTime);
} else {
holder = null;
break;
}
} else {
holder = takeLast();
holder = takeLast(startTime);
}

if (holder != null) {
if (holder.discard) {
holder = null;
if (maxWait > 0 && System.currentTimeMillis() - startTime >= maxWait) {
if (maxWait > 0 && System.currentTimeMillis() >= expiredTime) {
break;
}
continue;
Expand Down Expand Up @@ -2342,25 +2341,46 @@ boolean putLast(DruidConnectionHolder e, long lastActiveTimeMillis) {
return true;
}

DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
private DruidConnectionHolder takeLast(long startTime) throws InterruptedException, SQLException {
return pollLast(startTime, 0);
}

private DruidConnectionHolder pollLast(long startTime, long expiredTime) throws InterruptedException, SQLException {
try {
long awaitStartTime;
long estimate = 0;
while (poolingCount == 0) {
emptySignal(); // send signal to CreateThread create connection
// send signal to CreateThread create connection
emptySignal();

if (failFast && isFailContinuous()) {
throw new DataSourceNotAvailableException(createError);
}

awaitStartTime = System.currentTimeMillis();
if (expiredTime != 0) {
estimate = expiredTime - awaitStartTime;
if (estimate <= 0) {
return null;
}
}

notEmptyWaitThreadCount++;
if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
}
try {
notEmpty.await(); // signal by recycle or creator
// signal by recycle or creator
if (estimate == 0) {
notEmpty.await();
} else {
notEmpty.await(estimate, TimeUnit.MILLISECONDS);
}
} finally {
notEmptyWaitThreadCount--;
notEmptyWaitCount++;
notEmptyWaitNanos += TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - awaitStartTime);
}
notEmptyWaitCount++;

if (!enable) {
connectErrorCountUpdater.incrementAndGet(this);
Expand All @@ -2370,6 +2390,10 @@ DruidConnectionHolder takeLast() throws InterruptedException, SQLException {

throw new DataSourceDisableException();
}

if (poolingCount == 0) {
continue;
}
}
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
Expand All @@ -2381,71 +2405,10 @@ DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;

return last;
}
long waitNanos = System.currentTimeMillis() - startTime;
last.setLastNotEmptyWaitNanos(waitNanos);

private DruidConnectionHolder pollLast(long maxWaitNanos) throws InterruptedException, SQLException {
long estimate = maxWaitNanos;

for (; ; ) {
if (poolingCount == 0) {
emptySignal(); // send signal to CreateThread create connection

if (failFast && isFailContinuous()) {
throw new DataSourceNotAvailableException(createError);
}

if (estimate <= 0) {
return null;
}

notEmptyWaitThreadCount++;
if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
}

try {
long startEstimate = estimate;
estimate = notEmpty.awaitNanos(estimate); // signal by
// recycle or
// creator
notEmptyWaitCount++;
notEmptyWaitNanos += (startEstimate - estimate);

if (!enable) {
connectErrorCountUpdater.incrementAndGet(this);

if (disableException != null) {
throw disableException;
}

throw new DataSourceDisableException();
}
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
notEmptySignalCount++;
throw ie;
} finally {
notEmptyWaitThreadCount--;
}

if (poolingCount == 0) {
if (estimate > 0) {
continue;
}
return null;
}
}

decrementPoolingCount();
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;

long waitNanos = maxWaitNanos - estimate;
last.setLastNotEmptyWaitNanos(waitNanos);

return last;
}
return last;
}

private final void decrementPoolingCount() {
Expand Down
Expand Up @@ -25,22 +25,22 @@ public void test_fair() throws Exception {
Assert.assertEquals(false, ((ReentrantLock) dataSource.getLock()).isFair());
dataSource.setMaxWait(100);

Assert.assertEquals(true, ((ReentrantLock) dataSource.getLock()).isFair());
Assert.assertEquals(false, ((ReentrantLock) dataSource.getLock()).isFair());
{
Connection conn = dataSource.getConnection();
conn.close();
}
dataSource.setMaxWait(110);

Assert.assertEquals(true, ((ReentrantLock) dataSource.getLock()).isFair());
Assert.assertEquals(false, ((ReentrantLock) dataSource.getLock()).isFair());
{
Connection conn = dataSource.getConnection();
conn.close();
}

dataSource.setMaxWait(0);

Assert.assertEquals(true, ((ReentrantLock) dataSource.getLock()).isFair());
Assert.assertEquals(false, ((ReentrantLock) dataSource.getLock()).isFair());
{
Connection conn = dataSource.getConnection();
conn.close();
Expand Down
@@ -0,0 +1,101 @@
package com.alibaba.druid.bvt.pool;

import com.alibaba.druid.mock.MockConnection;
import com.alibaba.druid.mock.MockDriver;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.stat.DruidDataSourceStatManager;
import org.junit.Assert;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import java.sql.Connection;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

@BenchmarkMode(Mode.AverageTime)
@Warmup(iterations = 1, time = 3)
@Measurement(iterations = 3, time = 3)
// Threads.MAX means using Runtime.getRuntime().availableProcessors().
@Threads(Threads.MAX)
@Fork(1)
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Benchmark)
public class UsingDefaultLockModeBenchmarkTest {
private DruidDataSource dataSource;

@Setup(Level.Trial)
public void setUp() throws Exception {
DruidDataSourceStatManager.clear();

dataSource = new DruidDataSource();
dataSource.setRemoveAbandoned(true);
dataSource.setRemoveAbandonedTimeoutMillis(100);
dataSource.setLogAbandoned(true);
dataSource.setTimeBetweenEvictionRunsMillis(10);
dataSource.setMinEvictableIdleTimeMillis(300 * 1000);
dataSource.setUrl("jdbc:mock:xxx");
dataSource.setDriver(new SlowDriver());
int poolSize = Runtime.getRuntime().availableProcessors() / 2;
dataSource.setMaxActive(poolSize);
dataSource.setInitialSize(poolSize);
dataSource.setMaxWait(2000);
dataSource.init();
}

public static class SlowDriver extends MockDriver {
public MockConnection createMockConnection(MockDriver driver, String url, Properties connectProperties) {
try {
Thread.sleep(1000 * 1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return super.createMockConnection(driver, url, connectProperties);
}
}

@TearDown(Level.Trial)
public void tearDown() throws Exception {
dataSource.close();
Assert.assertEquals(0, DruidDataSourceStatManager.getInstance().getDataSourceList().size());
}

@Benchmark
public void test_activeTrace() throws Exception {
int count = 1000_00;
int i = 0;
try {
for (; i < count; ++i) {
Connection conn = dataSource.getConnection();
Assert.assertNotNull(conn);
conn.close();
Assert.assertTrue(conn.isClosed());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
Assert.assertEquals(count, i);
}
}

public static void main(String[] args) throws RunnerException {
Options options = new OptionsBuilder()
.include(UsingDefaultLockModeBenchmarkTest.class.getSimpleName())
.build();
new Runner(options).run();
}
}

0 comments on commit 6ec6c00

Please sign in to comment.