Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-2239 Zero-copy NIO/MAPPED TimedBuffer #2522

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;

import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
Expand Down Expand Up @@ -58,7 +59,7 @@ public abstract class AbstractSequentialFile implements SequentialFile {
* Instead of having AIOSequentialFile implementing the Observer, I have done it on an inner class.
* This is the class returned to the factory when the file is being activated.
*/
protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
protected final TimedBufferObserver timedBufferObserver = createTimedBufferObserver();

/**
* @param file
Expand All @@ -74,6 +75,10 @@ public AbstractSequentialFile(final File directory,
this.factory = factory;
}

protected TimedBufferObserver createTimedBufferObserver() {
return new LocalBufferObserver();
}

// Public --------------------------------------------------------

@Override
Expand Down Expand Up @@ -252,43 +257,6 @@ protected File getFile() {
return file;
}

private static final class DelegateCallback implements IOCallback {

final List<IOCallback> delegates;

private DelegateCallback(final List<IOCallback> delegates) {
this.delegates = delegates;
}

@Override
public void done() {
final int size = delegates.size();
for (int i = 0; i < size; i++) {
try {
delegates.get(i).done();
} catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
}
}
}

@Override
public void onError(final int errorCode, final String errorMessage) {
if (logger.isTraceEnabled()) {
logger.trace("onError" + " code: " + errorCode + " message: " + errorMessage);
}

final int size = delegates.size();
for (int i = 0; i < size; i++) {
try {
delegates.get(i).onError(errorCode, errorMessage);
} catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
}
}
}
}

protected ByteBuffer newBuffer(int size, int limit) {
size = factory.calculateBlockSize(size);
limit = factory.calculateBlockSize(limit);
Expand All @@ -301,21 +269,19 @@ protected ByteBuffer newBuffer(int size, int limit) {
protected class LocalBufferObserver implements TimedBufferObserver {
franz1981 marked this conversation as resolved.
Show resolved Hide resolved

@Override
public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOCallback> callbacks) {
buffer.flip();

if (buffer.limit() == 0) {
factory.releaseBuffer(buffer);
} else {
public void flushBuffer(final ByteBuf byteBuf, final boolean requestedSync, final List<IOCallback> callbacks) {
final int bytes = byteBuf.readableBytes();
if (bytes > 0) {
final ByteBuffer buffer = newBuffer(byteBuf.capacity(), bytes);
buffer.limit(bytes);
byteBuf.getBytes(byteBuf.readerIndex(), buffer);
buffer.flip();
writeDirect(buffer, requestedSync, new DelegateCallback(callbacks));
} else {
IOCallback.done(callbacks);
}
}

@Override
public ByteBuffer newBuffer(final int size, final int limit) {
return AbstractSequentialFile.this.newBuffer(size, limit);
}

@Override
public int getRemainingBytes() {
if (fileSize - position.get() > Integer.MAX_VALUE) {
Expand Down
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.io;

import java.util.Collection;
import java.util.Objects;

/**
* It is a utility class to allow several {@link IOCallback}s to be used as one.
*/
public final class DelegateCallback implements IOCallback {

private final Collection<? extends IOCallback> delegates;

/**
* It doesn't copy defensively the given {@code delegates}.
*
* @throws NullPointerException if {@code delegates} is {@code null}
*/
public DelegateCallback(final Collection<? extends IOCallback> delegates) {
Objects.requireNonNull(delegates, "delegates cannot be null!");
this.delegates = delegates;
}

@Override
public void done() {
IOCallback.done(delegates);
}

@Override
public void onError(final int errorCode, final String errorMessage) {
IOCallback.onError(delegates, errorCode, errorMessage);
}

}
Expand Up @@ -16,6 +16,10 @@
*/
package org.apache.activemq.artemis.core.io;

import java.util.Collection;

import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;

/**
* The interface used for AIO Callbacks.
*/
Expand All @@ -32,4 +36,24 @@ public interface IOCallback {
* Observation: The whole file will be probably failing if this happens. Like, if you delete the file, you will start to get errors for these operations
*/
void onError(int errorCode, String errorMessage);

static void done(Collection<? extends IOCallback> delegates) {
delegates.forEach(callback -> {
try {
callback.done();
} catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
}
});
}

static void onError(Collection<? extends IOCallback> delegates, int errorCode, final String errorMessage) {
delegates.forEach(callback -> {
try {
callback.onError(errorCode, errorMessage);
} catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
}
});
}
}
Expand Up @@ -358,13 +358,7 @@ public boolean flushBatch() {
bytesFlushed.addAndGet(pos);
}

final ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
//bufferObserver::newBuffer doesn't necessary return a buffer with limit == pos or limit == bufferSize!!
bufferToFlush.limit(pos);
//perform memcpy under the hood due to the off heap buffer
buffer.getBytes(0, bufferToFlush);

bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
bufferObserver.flushBuffer(buffer.byteBuf(), pendingSync, callbacks);
franz1981 marked this conversation as resolved.
Show resolved Hide resolved

stopSpin();

Expand Down
Expand Up @@ -16,38 +16,22 @@
*/
package org.apache.activemq.artemis.core.io.buffer;

import java.nio.ByteBuffer;
import java.util.List;

import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.core.io.IOCallback;

public interface TimedBufferObserver {

// Constants -----------------------------------------------------

// Attributes ----------------------------------------------------

// Static --------------------------------------------------------

// Constructors --------------------------------------------------

// Public --------------------------------------------------------

void flushBuffer(ByteBuffer buffer, boolean syncRequested, List<IOCallback> callbacks);
/**
* It flushes {@link ByteBuf#readableBytes()} of {@code buffer} without changing its reader/writer indexes.<br>
* It just use {@code buffer} temporary: it can be reused by the caller right after this call.
*/
void flushBuffer(ByteBuf buffer, boolean syncRequested, List<IOCallback> callbacks);

/**
* Return the number of remaining bytes that still fit on the observer (file)
*/
int getRemainingBytes();

ByteBuffer newBuffer(int size, int limit);

// Package protected ---------------------------------------------

// Protected -----------------------------------------------------

// Private -------------------------------------------------------

// Inner classes -------------------------------------------------

}
Expand Up @@ -22,8 +22,11 @@
import java.nio.ByteBuffer;
import java.util.List;

import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.core.io.DummyCallback;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
Expand All @@ -32,7 +35,6 @@
import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;

final class TimedSequentialFile implements SequentialFile {

Expand Down Expand Up @@ -239,88 +241,44 @@ public File getJavaFile() {
return this.sequentialFile.getJavaFile();
}

private static void invokeDoneOn(List<? extends IOCallback> callbacks) {
final int size = callbacks.size();
for (int i = 0; i < size; i++) {
try {
final IOCallback callback = callbacks.get(i);
callback.done();
} catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
}
}
}

private static void invokeOnErrorOn(final int errorCode,
final String errorMessage,
List<? extends IOCallback> callbacks) {
final int size = callbacks.size();
for (int i = 0; i < size; i++) {
try {
final IOCallback callback = callbacks.get(i);
callback.onError(errorCode, errorMessage);
} catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
}
}
}

private static final class DelegateCallback implements IOCallback {

List<IOCallback> delegates;

private DelegateCallback() {
this.delegates = null;
}

@Override
public void done() {
invokeDoneOn(delegates);
}

@Override
public void onError(final int errorCode, final String errorMessage) {
invokeOnErrorOn(errorCode, errorMessage, delegates);
}
}

private final class LocalBufferObserver implements TimedBufferObserver {

private final DelegateCallback delegateCallback = new DelegateCallback();

@Override
public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOCallback> callbacks) {
buffer.flip();

if (buffer.limit() == 0) {
try {
invokeDoneOn(callbacks);
} finally {
factory.releaseBuffer(buffer);
}
} else {
if (callbacks.isEmpty()) {
try {
sequentialFile.writeDirect(buffer, requestedSync);
} catch (Exception e) {
throw new IllegalStateException(e);
}
public void flushBuffer(final ByteBuf byteBuf, final boolean requestedSync, final List<IOCallback> callbacks) {
final int bytes = byteBuf.readableBytes();
if (bytes > 0) {
final boolean releaseBuffer;
final ByteBuffer buffer;
if (byteBuf.nioBufferCount() == 1) {
//any ByteBuffer is fine with the MAPPED journal
releaseBuffer = false;
buffer = byteBuf.internalNioBuffer(byteBuf.readerIndex(), bytes);
} else {
delegateCallback.delegates = callbacks;
try {
sequentialFile.writeDirect(buffer, requestedSync, delegateCallback);
} finally {
delegateCallback.delegates = null;
//perform the copy on buffer
releaseBuffer = true;
buffer = factory.newBuffer(byteBuf.capacity());
buffer.limit(bytes);
byteBuf.getBytes(byteBuf.readerIndex(), buffer);
buffer.flip();
}
try {
blockingWriteDirect(buffer, requestedSync, releaseBuffer);
IOCallback.done(callbacks);
} catch (Throwable t) {
final int code;
if (t instanceof IOException) {
code = ActiveMQExceptionType.IO_ERROR.getCode();
factory.onIOError(new ActiveMQIOErrorException(t.getMessage(), t), t.getMessage(), TimedSequentialFile.this.sequentialFile);
} else {
code = ActiveMQExceptionType.GENERIC_EXCEPTION.getCode();
}
IOCallback.onError(callbacks, code, t.getMessage());
}
} else {
IOCallback.done(callbacks);
}
}

@Override
public ByteBuffer newBuffer(final int size, final int limit) {
return factory.newBuffer(limit);
}

@Override
public int getRemainingBytes() {
try {
Expand Down