Skip to content

Commit

Permalink
Tidy up JDBCService for documentation.
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed Apr 12, 2016
1 parent 291582b commit 39cbf0e
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 129 deletions.
76 changes: 76 additions & 0 deletions src/main/java/net/openhft/chronicle/queue/JDBCComponent.java
@@ -0,0 +1,76 @@
package net.openhft.chronicle.queue;

import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.util.ThrowingSupplier;
import net.openhft.chronicle.wire.Marshallable;
import net.openhft.chronicle.wire.WireOut;
import org.jetbrains.annotations.NotNull;

import java.sql.*;
import java.util.Iterator;

/**
* Created by peter on 12/04/16.
*/
public class JDBCComponent implements JDBCStatement {
private final Connection connection;
private final JDBCResult result;

public JDBCComponent(ThrowingSupplier<Connection, SQLException> connectionSupplier, JDBCResult result) throws SQLException {
connection = connectionSupplier.get();
this.result = result;
}

@Override
public void executeUpdate(String query, Object... args) {
try (PreparedStatement ps = connection.prepareStatement(query)) {
for (int i = 0; i < args.length; i++)
ps.setObject(i + 1, args[i]);
int count = ps.executeUpdate();
// record the count.
result.updateResult(count, query, args);
} catch (Throwable t) {
result.updateThrown(t, query, args);
}
}

@Override
public void executeQuery(String query, Class<? extends Marshallable> resultType, Object... args) {
try (PreparedStatement ps = connection.prepareStatement(query)) {
for (int i = 0; i < args.length; i++)
ps.setObject(i + 1, args[i]);
ResultSet resultSet = ps.executeQuery();
ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount();
result.queryResult(new Iterator<Marshallable>() {
@Override
public boolean hasNext() {
try {
return resultSet.next();
} catch (SQLException e) {
throw Jvm.rethrow(e);
}
}

@Override
public Marshallable next() {
return new Marshallable() {
@Override
public void writeMarshallable(@NotNull WireOut wire) {
try {
for (int i = 1; i <= columnCount; i++) {
wire.writeEventName(metaData.getCatalogName(i))
.object(resultSet.getObject(i));
}
} catch (SQLException e) {
throw Jvm.rethrow(e);
}
}
};
}
}, query, args);
} catch (Throwable t) {
result.queryThrown(t, query, args);
}
}
}
108 changes: 23 additions & 85 deletions src/main/java/net/openhft/chronicle/queue/JDBCService.java
@@ -1,60 +1,48 @@
package net.openhft.chronicle.queue;

import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.util.ThrowingSupplier;
import net.openhft.chronicle.threads.LongPauser;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.chronicle.threads.Pauser;
import net.openhft.chronicle.wire.Marshallable;
import net.openhft.chronicle.wire.WireOut;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.Iterator;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* Created by peter on 06/04/16.
*/
public class JDBCService implements Runnable, Closeable, JDBCStatement {
public class JDBCService implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(JDBCService.class);
private final ChronicleQueue in;
private final ChronicleQueue out;
private final Connection connection;
private final ExecutorService service;
private final JDBCStatement statement;
private final ThrowingSupplier<Connection, SQLException> connectionSupplier;
private volatile boolean closed = false;

public JDBCService(ChronicleQueue in, ChronicleQueue out, ThrowingSupplier<Connection, SQLException> connectionSupplier) throws SQLException {
this.in = in;
this.out = out;
this.connection = connectionSupplier.get();
String name = in.file().getName();
service = Executors.newSingleThreadExecutor(new NamedThreadFactory(name + "-JDBCService", true));
service.execute(this);
statement = in.createAppender().methodWriterBuilder(JDBCStatement.class).recordHistory(true).get();
}

@Override
public void executeQuery(String query, Class<? extends Marshallable> resultType, Object... args) {
statement.executeQuery(query, resultType, args);
}
this.connectionSupplier = connectionSupplier;

@Override
public void executeUpdate(String query, Object... args) {
statement.executeUpdate(query, args);
service = Executors.newSingleThreadExecutor(
new NamedThreadFactory(in.file().getName() + "-JDBCService", true));
service.execute(this::runLoop);
service.shutdown(); // stop when the task exits.
}

@Override
public void run() {
void runLoop() {
try {
JDBCResult result = out.createAppender().methodWriterBuilder(JDBCResult.class).recordHistory(true).get();
JSImpl js = new JSImpl(result);
JDBCResult result = out.createAppender()
.methodWriterBuilder(JDBCResult.class)
.recordHistory(true)
.get();
JDBCComponent js = new JDBCComponent(connectionSupplier, result);
MethodReader reader = in.createTailer().afterLastWritten(out).methodReader(js);
Pauser pauser = new LongPauser(50, 200, 1, 10, TimeUnit.MILLISECONDS);
while (!closed) {
Expand All @@ -73,64 +61,14 @@ public void close() {
closed = true;
}

class JSImpl implements JDBCStatement {
private final JDBCResult result;

public JSImpl(JDBCResult result) {
this.result = result;
}

@Override
public void executeQuery(String query, Class<? extends Marshallable> resultType, Object... args) {
try (PreparedStatement ps = connection.prepareStatement(query)) {
for (int i = 0; i < args.length; i++)
ps.setObject(i + 1, args[i]);
ResultSet resultSet = ps.executeQuery();
ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount();
result.queryResult(new Iterator<Marshallable>() {
@Override
public boolean hasNext() {
try {
return resultSet.next();
} catch (SQLException e) {
throw Jvm.rethrow(e);
}
}

@Override
public Marshallable next() {
return new Marshallable() {
@Override
public void writeMarshallable(@NotNull WireOut wire) {
try {
for (int i = 1; i <= columnCount; i++) {
wire.writeEventName(metaData.getCatalogName(i))
.object(resultSet.getObject(i));
}
} catch (SQLException e) {
throw Jvm.rethrow(e);
}
}
};
}
}, query, args);
} catch (Throwable t) {
result.queryThrown(t, query, args);
}
}
public JDBCStatement createWriter() {
return in.createAppender()
.methodWriterBuilder(JDBCStatement.class)
.recordHistory(true)
.get();
}

@Override
public void executeUpdate(String query, Object... args) {
try (PreparedStatement ps = connection.prepareStatement(query)) {
for (int i = 0; i < args.length; i++)
ps.setObject(i + 1, args[i]);
int count = ps.executeUpdate();
// record the count.
result.updateResult(count, query, args);
} catch (Throwable t) {
result.updateThrown(t, query, args);
}
}
public MethodReader createReader(JDBCResult result) {
return out.createTailer().methodReader(result);
}
}
52 changes: 16 additions & 36 deletions src/main/java/net/openhft/chronicle/queue/MethodReader.java
Expand Up @@ -83,6 +83,22 @@ public MethodReader(ExcerptTailer tailer, Object... objects) {
}
}

static void logMessage(CharSequence s, ValueIn v) {
String name = s.toString();
String rest;

if (v.wireIn() instanceof BinaryWire) {
Bytes bytes = Bytes.elasticByteBuffer((int) (v.wireIn().bytes().readRemaining() * 3 / 2 + 64));
long pos = v.wireIn().bytes().readPosition();
v.wireIn().copyTo(new TextWire(bytes));
v.wireIn().bytes().readPosition(pos);
rest = bytes.toString();
} else {
rest = v.toString();
}
LOGGER.debug("read " + name + " - " + rest);
}

public void addParseletForMethod(Object o, Method m, Class<?> parameterType) {
Class msgClass = parameterType;
m.setAccessible(true); // turn of security check to make a little faster
Expand Down Expand Up @@ -144,42 +160,6 @@ public void addParseletForMethod(Object o, Method m, Class[] parameterTypes) {
});
}

public void addParseletForMethod(Object o, Method m, Class[] parameterTypes) {
m.setAccessible(true); // turn of security check to make a little faster
Object[] args = new Object[parameterTypes.length];
BiConsumer<Object[], ValueIn> sequenceReader = (a, v) -> {
int i = 0;
for(Class clazz: parameterTypes) {
a[i++] = v.object(clazz);
}
};
wireParser.register(m::getName, (s, v, $) -> {
try {
if (Jvm.isDebug())
logMessage(s, v);

v.sequence(args, sequenceReader);
m.invoke(o, args);
} catch (Exception i) {
LOGGER.error("Failure to dispatch message: " + m.getName() + " " + Arrays.toString(args), i);
}
});
} static void logMessage(CharSequence s, ValueIn v) {
String name = s.toString();
String rest;

if (v.wireIn() instanceof BinaryWire) {
Bytes bytes = Bytes.elasticByteBuffer((int) (v.wireIn().bytes().readRemaining() * 3 / 2 + 64));
long pos = v.wireIn().bytes().readPosition();
v.wireIn().copyTo(new TextWire(bytes));
v.wireIn().bytes().readPosition(pos);
rest = bytes.toString();
} else {
rest = v.toString();
}
LOGGER.debug("read " + name + " - " + rest);
}

/**
* reads one message
*
Expand Down
16 changes: 8 additions & 8 deletions src/test/java/net/openhft/chronicle/queue/JDBCServiceTest.java
Expand Up @@ -2,6 +2,7 @@

import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.io.IOTools;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.wire.Marshallable;
Expand Down Expand Up @@ -42,32 +43,31 @@ public void doCreateTable(int repeats, int noUpdates) throws SQLException, IOExc
try (ChronicleQueue in = SingleChronicleQueueBuilder.binary(path1).build();
ChronicleQueue out = SingleChronicleQueueBuilder.binary(path2).build()) {

JDBCService jdbcService = new JDBCService(in, out, () -> DriverManager.getConnection("jdbc:hsqldb:file:" + file.getAbsolutePath(), "SA", ""));
JDBCService service = new JDBCService(in, out, () -> DriverManager.getConnection("jdbc:hsqldb:file:" + file.getAbsolutePath(), "SA", ""));

JDBCStatement service = in.createAppender().methodWriter(JDBCStatement.class);

service.executeUpdate("CREATE TABLE tableName (\n" +
JDBCStatement writer = service.createWriter();
writer.executeUpdate("CREATE TABLE tableName (\n" +
"name VARCHAR(64) NOT NULL,\n" +
"num INT\n" +
")\n");

for (int i = 1; i < (long) noUpdates; i++)
service.executeUpdate("INSERT INTO tableName (name, num)\n" +
writer.executeUpdate("INSERT INTO tableName (name, num)\n" +
"VALUES (?, ?)", "name", i);


written = System.nanoTime() - start;
AtomicLong queries = new AtomicLong();
AtomicLong updates = new AtomicLong();
MethodReader methodReader = out.createTailer().methodReader(new CountingJDBCResult(queries, updates));
CountingJDBCResult countingJDBCResult = new CountingJDBCResult(queries, updates);
MethodReader methodReader = service.createReader(countingJDBCResult);
int counter = 0;
while (updates.get() < noUpdates) {
if (methodReader.readOne())
counter++;
else
Thread.yield();
}
jdbcService.close();
Closeable.closeQuietly(service);

// System.out.println(in.dump());
// System.out.println(out.dump());
Expand Down

0 comments on commit 39cbf0e

Please sign in to comment.