Skip to content

Commit

Permalink
work on #4 blocking connection pools
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoten committed Sep 27, 2017
1 parent 686d2e6 commit 5247244
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 9 deletions.
13 changes: 12 additions & 1 deletion src/main/java/org/davidmoten/rx/jdbc/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.sql.DataSource;

import org.davidmoten.rx.jdbc.exceptions.SQLRuntimeException;
import org.davidmoten.rx.jdbc.pool.ConnectionProviderBlockingPool;
import org.davidmoten.rx.jdbc.pool.Pools;
import org.davidmoten.rx.pool.Pool;

Expand Down Expand Up @@ -54,6 +56,14 @@ public static Database from(@Nonnull Pool<Connection> pool) {
return new Database(pool.member().cast(Connection.class), () -> pool.close());
}

public static Database fromBlocking(@Nonnull ConnectionProvider cp) {
return Database.from(new ConnectionProviderBlockingPool(cp));
}

public static Database fromBlocking(@Nonnull DataSource dataSource) {
return fromBlocking(Util.connectionProvider(dataSource));
}

public static Database test(int maxPoolSize) {
Preconditions.checkArgument(maxPoolSize > 0, "maxPoolSize must be greater than 0");
return Database.from( //
Expand Down Expand Up @@ -110,7 +120,8 @@ public Connection get() {
latch.countDown();
} else {
if (!latch.await(1, TimeUnit.MINUTES)) {
throw new SQLRuntimeException("waited 1 minute but test database was not created");
throw new SQLRuntimeException(
"waited 1 minute but test database was not created");
}
}
return c;
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/org/davidmoten/rx/jdbc/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.stream.Collectors;

import javax.annotation.RegEx;
import javax.sql.DataSource;

import org.apache.commons.io.IOUtils;
import org.davidmoten.rx.jdbc.annotations.Column;
Expand Down Expand Up @@ -1013,6 +1014,7 @@ public static ConnectionProvider connectionProvider(String url) {
@Override
public Connection get() {
try {

return DriverManager.getConnection(url);
} catch (SQLException e) {
throw new SQLRuntimeException(e);
Expand All @@ -1039,4 +1041,23 @@ static Connection toTransactedConnection(AtomicReference<Connection> connection,
return c2;
}
}

public static ConnectionProvider connectionProvider(DataSource dataSource) {
return new ConnectionProvider() {

@Override
public Connection get() {
try {
return dataSource.getConnection();
} catch (SQLException e) {
throw new SQLRuntimeException(e);
}
}

@Override
public void close() {
// do nothing
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Properties;
import java.util.concurrent.Executor;

import org.davidmoten.rx.pool.Member;
import org.davidmoten.rx.pool.MemberWithValue;
import org.davidmoten.rx.pool.NonBlockingMember;
import org.davidmoten.rx.pool.NonBlockingPool;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package org.davidmoten.rx.jdbc.pool;

import java.sql.Connection;
import java.sql.SQLException;

import org.davidmoten.rx.jdbc.ConnectionProvider;
import org.davidmoten.rx.jdbc.exceptions.SQLRuntimeException;
import org.davidmoten.rx.pool.Member;
import org.davidmoten.rx.pool.MemberWithValue;
import org.davidmoten.rx.pool.Pool;

import io.reactivex.Single;
import io.reactivex.plugins.RxJavaPlugins;

public final class ConnectionProviderBlockingPool implements Pool<Connection> {

private final ConnectionProvider connectionProvider;

public ConnectionProviderBlockingPool(ConnectionProvider connectionProvider) {
this.connectionProvider = connectionProvider;
}

@Override
public Single<Member<Connection>> member() {
return Single.fromCallable(() -> {
return new MemberWithValue<Connection>() {

Connection connection;
volatile boolean shutdown = false;

@Override
public void close() throws Exception {
connectionProvider.close();
}

@Override
public MemberWithValue<Connection> checkout() {
// blocking
connection = connectionProvider.get();
return this;
}

@Override
public void checkin() {
try {
connection.close();
} catch (SQLException e) {
throw new SQLRuntimeException(e);
}
}

@Override
public void shutdown() {
shutdown = true;
try {
connection.close();
} catch (SQLException e) {
RxJavaPlugins.onError(e);
}
}

@Override
public boolean isShutdown() {
return shutdown;
}

@Override
public Connection value() {
return connection;
}

};
});
}

@Override
public void close() throws Exception {
connectionProvider.close();
}

}
22 changes: 20 additions & 2 deletions src/test/java/org/davidmoten/rx/jdbc/DatabaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.hsqldb.jdbc.JDBCClobFile;
import org.junit.Assert;
import org.junit.FixMethodOrder;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
Expand Down Expand Up @@ -96,6 +97,10 @@ private static Database db() {
return DatabaseCreator.create(1);
}

private static Database blocking() {
return DatabaseCreator.createBlocking();
}

private static Database db(int poolSize) {
return DatabaseCreator.create(poolSize);
}
Expand Down Expand Up @@ -660,7 +665,7 @@ public void testSelectTransactedTupleN() {
assertTrue(list.get(1).isComplete());
assertEquals(2, list.size());
}

@Test
public void testSelectTransactedCount() {
db() //
Expand All @@ -685,7 +690,7 @@ public void testSelectTransactedGetAs() {
.assertValueCount(4) //
.assertComplete();
}

@Test
public void testSelectTransactedGetAsOptional() {
List<Tx<Optional<String>>> list = db() //
Expand Down Expand Up @@ -2018,6 +2023,19 @@ public void testAutomappedObjectsWhenDefaultMethodInvoked() {
p1.nameLower();
}

@Test
@Ignore
public void testBlockingDatabase() {
blocking().select("select score from person where name=?") //
.parameters("FRED", "JOSEPH") //
.getAs(Integer.class) //
.test() //
.awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
.assertNoErrors() //
.assertValues(21, 34) //
.assertComplete();
}

interface PersonWithDefaultMethod {
@Column
String name();
Expand Down
16 changes: 11 additions & 5 deletions src/test/java/org/davidmoten/rx/jdbc/pool/DatabaseCreator.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ public class DatabaseCreator {

private static final AtomicInteger dbNumber = new AtomicInteger();

public static Database createBlocking() {
return Database.fromBlocking(connectionProvider());
}

public static Database create(int maxSize) {
return create(maxSize, false, Schedulers.from(Executors.newFixedThreadPool(maxSize)));
}
Expand Down Expand Up @@ -73,15 +77,16 @@ private static void createDatabaseDerby(Connection c) throws SQLException {

public static Database create(int maxSize, boolean big, Scheduler scheduler) {
return Database.from(Pools.nonBlocking() //
.connectionProvider(connectionProvider(nextUrl(), big)).maxPoolSize(maxSize) //
.connectionProvider(connectionProvider(nextUrl(), big)) //
.maxPoolSize(maxSize) //
.scheduler(scheduler) //
.build());
}

public static ConnectionProvider connectionProvider() {
return connectionProvider(nextUrl(), false);
}

private static ConnectionProvider connectionProvider(String url, boolean big) {
return new ConnectionProvider() {

Expand Down Expand Up @@ -124,12 +129,13 @@ private static void createDatabase(Connection c, boolean big) {
"create table person (name varchar(50) primary key, score int not null, date_of_birth date, registered timestamp)")
.execute();
if (big) {
List<String> lines = IOUtils.readLines(DatabaseCreator.class.getResourceAsStream("/big.txt"),
List<String> lines = IOUtils.readLines(
DatabaseCreator.class.getResourceAsStream("/big.txt"),
StandardCharsets.UTF_8);
lines.stream().map(line -> line.split("\t")).forEach(items -> {
try {
c.prepareStatement("insert into person(name,score) values('" + items[0] + "',"
+ Integer.parseInt(items[1]) + ")").execute();
c.prepareStatement("insert into person(name,score) values('" + items[0]
+ "'," + Integer.parseInt(items[1]) + ")").execute();
} catch (SQLException e) {
throw new SQLRuntimeException(e);
}
Expand Down

0 comments on commit 5247244

Please sign in to comment.