Skip to content

Commit

Permalink
ARTEMIS-4401 improving JDBC Performance with Paging by a significant …
Browse files Browse the repository at this point in the history
…factor
  • Loading branch information
clebertsuconic committed Sep 12, 2023
1 parent d830f04 commit 6a8cd17
Show file tree
Hide file tree
Showing 93 changed files with 4,234 additions and 2,548 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,13 @@ public void printDataJDBC(Configuration configuration, PrintStream out) throws E

printBanner(out, BINDINGS_BANNER);

DescribeJournal.printSurvivingRecords(storageManager.getBindingsJournal(), out, safe);
DescribeJournal bindings = DescribeJournal.printSurvivingRecords(storageManager.getBindingsJournal(), out, safe);

printBanner(out, MESSAGES_BANNER);

DescribeJournal describeJournal = DescribeJournal.printSurvivingRecords(storageManager.getMessageJournal(), out, safe);

printPages(describeJournal, storageManager, pagingmanager, out, safe, maxPages, null);
printPages(describeJournal, storageManager, pagingmanager, out, safe, maxPages, bindings);

cleanup();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@
<jdbc-lock-expiration>${jdbcLockExpiration}</jdbc-lock-expiration>
<jdbc-lock-renew-period>${jdbcLockRenewPeriod}</jdbc-lock-renew-period>
<jdbc-network-timeout>${jdbcNetworkTimeout}</jdbc-network-timeout>
<jdbc-max-page-size-bytes>100K</jdbc-max-page-size-bytes>
</database-store>
</store>
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,8 @@ public static String getDefaultHapolicyBackupStrategy() {

private static final long DEFAULT_JDBC_ALLOWED_TIME_DIFF_MILLIS = 250;

private static final int DEFAULT_JDBC_MAX_PAGE_SIZE_BYTES = 100 * 1024;

// Default period to wait between connection TTL checks
public static final long DEFAULT_CONNECTION_TTL_CHECK_INTERVAL = 2000;

Expand Down Expand Up @@ -1512,6 +1514,10 @@ public static long getDefaultJdbcAllowedTimeDiffMillis() {
return DEFAULT_JDBC_ALLOWED_TIME_DIFF_MILLIS;
}

public static int getDefaultJdbcMaxPageSizeBytes() {
return DEFAULT_JDBC_MAX_PAGE_SIZE_BYTES;
}

public static long getDefaultConnectionTtlCheckInterval() {
return DEFAULT_CONNECTION_TTL_CHECK_INTERVAL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ private void createTableIfNotExists(String tableName, String... sqls) throws SQL
}
}

public SQLProvider getSqlProvider() {
return sqlProvider;
}

public void setSqlProvider(SQLProvider sqlProvider) {
this.sqlProvider = sqlProvider;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ public void addDerbyShutdownHook() {
}
}

public int getNetworkTimeoutMillis() {
return networkTimeoutMillis;
}

public void setNetworkTimeout(Executor executor, int milliseconds) {
this.networkTimeoutExecutor = executor;
this.networkTimeoutMillis = milliseconds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static DataSource getDataSource(String dataSourceClassName, Map<String, O
.map(key -> key + "=" + (key.equalsIgnoreCase("password") ? "****" : dataSourceProperties.get(key)))
.collect(Collectors.joining(", ", "{", "}")));
try {
DataSource dataSource = (DataSource) Class.forName(dataSourceClassName).newInstance();
DataSource dataSource = (DataSource) Class.forName(dataSourceClassName).getDeclaredConstructor().newInstance();
for (Map.Entry<String, Object> entry : dataSourceProperties.entrySet()) {
PropertyUtils.setProperty(dataSource, entry.getKey(), entry.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,32 @@

import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.jctools.queues.MpscUnboundedArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;

public class JDBCSequentialFile implements SequentialFile {

Expand Down Expand Up @@ -64,12 +71,24 @@ public class JDBCSequentialFile implements SequentialFile {

private final JDBCSequentialFileFactoryDriver dbDriver;

MpscUnboundedArrayQueue<ScheduledWrite> writeQueue = new MpscUnboundedArrayQueue<>(8192);

// Allows DB Drivers to cache meta data.
private final Map<Object, Object> metaData = new ConcurrentHashMap<>();

final JDBCPageWriteScheduler pageWriteScheduler;

final ScheduledExecutorService scheduledExecutorService;

private final ReusableLatch pendingWrites = new ReusableLatch();

final long syncDelay;

JDBCSequentialFile(final JDBCSequentialFileFactory fileFactory,
final String filename,
final Executor executor,
final ScheduledExecutorService scheduledExecutorService,
final long syncDelay,
final JDBCSequentialFileFactoryDriver driver,
final Object writeLock) throws SQLException {
this.fileFactory = fileFactory;
Expand All @@ -78,6 +97,9 @@ public class JDBCSequentialFile implements SequentialFile {
this.executor = executor;
this.writeLock = writeLock;
this.dbDriver = driver;
this.scheduledExecutorService = scheduledExecutorService;
this.syncDelay = syncDelay;
this.pageWriteScheduler = new JDBCPageWriteScheduler(scheduledExecutorService, executor, syncDelay);
}

void setWritePosition(long writePosition) {
Expand Down Expand Up @@ -168,9 +190,9 @@ public void delete() throws IOException, InterruptedException, ActiveMQException
}
}

private synchronized int internalWrite(byte[] data, IOCallback callback, boolean append) {
private synchronized int jdbcWrite(byte[] data, IOCallback callback, boolean append) {
try {
open();
logger.debug("Writing {} bytes into {}", data.length, filename);
synchronized (writeLock) {
int noBytes = dbDriver.writeToFile(this, data, append);
seek(append ? writePosition + noBytes : noBytes);
Expand All @@ -189,52 +211,104 @@ private synchronized int internalWrite(byte[] data, IOCallback callback, boolean
return 0;
}

public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) {
return internalWrite(buffer, callback, true);
public synchronized int jdbcWrite(ActiveMQBuffer buffer, IOCallback callback) {
return jdbcWrite(buffer, callback, true);
}

public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback, boolean append) {
public synchronized int jdbcWrite(ActiveMQBuffer buffer, IOCallback callback, boolean append) {
byte[] data = new byte[buffer.readableBytes()];
buffer.readBytes(data);
return internalWrite(data, callback, append);
return jdbcWrite(data, callback, append);
}

private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) {
final byte[] data;
if (buffer.hasArray() && buffer.arrayOffset() == 0 && buffer.position() == 0 && buffer.limit() == buffer.array().length) {
data = buffer.array();
} else {
byte[] copy = new byte[buffer.remaining()];
buffer.get(copy);
data = copy;


private void pollWrites() {
if (writeQueue.isEmpty()) {
return;
}

logger.debug("polling {} elements on {}", writeQueue.size(), this.filename);

ArrayList<ScheduledWrite> writeList = new ArrayList<>(writeQueue.size()); // the size here is just an estimate

byte[] bytes = extractBytes(writeList);

jdbcWrite(bytes, null, true);
writeList.forEach(this::doCallback);
}

/* Even though I would love to have a reusable byte[] for the following buffer
PreparedStatement.setData takes a byte[] without any sizing on the interface.
Blob interface would support setBytes with an offset and size, but some of the databases we are using
(DB2 specifically) is not allowing us to use Blob (at least during our dev time).
for that reason I'm using this byte[] with the very specific size that needs to be written
Also Notice that our PagingManager will make sure that this size wouldn't go beyond our page-size limit
which we also limit at the JDBC storage, which should be 100K. */
private byte[] extractBytes(ArrayList<ScheduledWrite> writeList) {
int totalSize = 0;
ScheduledWrite write;
while ((write = writeQueue.poll()) != null) {
writeList.add(write);
totalSize += write.readable();
}
byte[] bytes = new byte[totalSize];

int writePosition = 0;

for (ScheduledWrite el : writeList) {
writePosition += el.readAt(bytes, writePosition);
el.releaseBuffer();
}

return bytes;
}

private void doCallback(ScheduledWrite write) {
if (write != null && write.callback != null) {
write.callback.done();
}
return internalWrite(data, callback, true);
pendingWrites.countDown();
}

private void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback callback, boolean append) {
executor.execute(() -> {
internalWrite(bytes, callback, append);
});
scheduleWrite(new ScheduledWrite(bytes, callback, append));
}

private void scheduleWrite(ScheduledWrite scheduledWrite) {
logger.debug("offering {} bytes into {}", scheduledWrite.readable(), filename);
pendingWrites.countUp();
writeQueue.offer(scheduledWrite);
this.pageWriteScheduler.delay();
}

private void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) {
executor.execute(() -> {
internalWrite(bytes, callback);
});
scheduleWrite(new ScheduledWrite(bytes, callback, true));
}

synchronized void seek(long noBytes) {
writePosition = noBytes;
}

public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback, boolean append) throws Exception {
// We ignore sync since we schedule writes straight away.
public void sendToDB(ActiveMQBuffer bytes, IOCallback callback, boolean append) throws Exception {
SimpleWaitIOCallback waitIOCallback = null;

if (callback == null) {
waitIOCallback = new SimpleWaitIOCallback();
callback = waitIOCallback;
}

scheduleWrite(bytes, callback, append);

if (callback != null) {
waitIOCallback.waitCompletion();
}
}

@Override
public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception {
write(bytes, sync, callback, true);
sendToDB(bytes, callback, true);
}

@Override
Expand All @@ -246,7 +320,7 @@ public void write(ActiveMQBuffer bytes, boolean sync) throws Exception {
public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws Exception {
ActiveMQBuffer data = ActiveMQBuffers.fixedBuffer(bytes.getEncodeSize());
bytes.encode(data);
write(data, sync, callback, true);
sendToDB(data, callback, true);
}

@Override
Expand Down Expand Up @@ -330,15 +404,27 @@ public void close(boolean waitOnSync, boolean block) throws Exception {
fileFactory.sequentialFileClosed(this);
}

public int getNetworkTimeoutMillis() {
return dbDriver.getJdbcConnectionProvider().getNetworkTimeoutMillis();
}

@Override
public void sync() throws IOException {
final SimpleWaitIOCallback callback = new SimpleWaitIOCallback();
executor.execute(callback::done);

try {
callback.waitCompletion();
int syncTimeout = getNetworkTimeoutMillis();

if (syncTimeout >= 0) {
if (!pendingWrites.await(syncTimeout, TimeUnit.MILLISECONDS)) {
fileFactory.onIOError(new ActiveMQIOErrorException("Database not responding to syncs before timeout"), "Error during JDBC file sync.", this);
}
} else {
// waiting forever however logger.debug while doing so
while (!pendingWrites.await(1, TimeUnit.SECONDS)) {
logger.debug("Awaiting syncs from database for page file {}", this.filename);
}
}

} catch (Exception e) {
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "Error during JDBC file sync.");
fileFactory.onIOError(e, "Error during JDBC file sync.", this);
}
}
Expand All @@ -363,7 +449,7 @@ public void renameTo(String newFileName) throws Exception {
@Override
public SequentialFile cloneFile() {
try {
JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, executor, dbDriver, writeLock);
JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, executor, scheduledExecutorService, syncDelay, dbDriver, writeLock);
clone.setWritePosition(this.writePosition);
return clone;
} catch (Exception e) {
Expand Down Expand Up @@ -421,4 +507,20 @@ public void addMetaData(Object key, Object value) {
public Object getMetaData(Object key) {
return metaData.get(key);
}



private class JDBCPageWriteScheduler extends ActiveMQScheduledComponent {

JDBCPageWriteScheduler(ScheduledExecutorService scheduledExecutorService,
Executor executor,
long checkPeriod) {
super(scheduledExecutorService, executor, checkPeriod, TimeUnit.MILLISECONDS, true);
}

@Override
public void run() {
pollWrites();
}
}
}

0 comments on commit 6a8cd17

Please sign in to comment.