Skip to content

Commit

Permalink
ARTEMIS-2496 Revert catch up with zero-copy, as it's causing issues i…
Browse files Browse the repository at this point in the history
…nto some integration usage

Revert "ARTEMIS-2336 Use zero copy to replicate journal/page/large message file"

This reverts commit 85b93f0.
  • Loading branch information
clebertsuconic committed Sep 18, 2019
1 parent 320381a commit 70c2200
Show file tree
Hide file tree
Showing 13 changed files with 149 additions and 513 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.activemq.artemis.core.protocol.core;

import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.concurrent.locks.Lock;

import org.apache.activemq.artemis.api.core.ActiveMQException;
Expand Down Expand Up @@ -68,20 +66,6 @@ public interface Channel {
*/
boolean send(Packet packet);

/**
* Sends a packet and file on this channel.
*
* @param packet the packet to send
* @param raf the file to send
* @param fileChannel the file channel retrieved from raf
* @param offset the position of the raf
* @param dataSize the data size to send
* @param callback callback after send
* @return false if the packet was rejected by an outgoing interceptor; true if the send was
* successful
*/
boolean send(Packet packet, RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, Callback callback);

/**
* Sends a packet on this channel.
*
Expand Down Expand Up @@ -263,8 +247,4 @@ public interface Channel {
* @param transferring whether the channel is transferring
*/
void setTransferring(boolean transferring);

interface Callback {
void done(boolean success);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;

import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand All @@ -27,7 +25,6 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import io.netty.channel.ChannelFutureListener;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
Expand Down Expand Up @@ -277,104 +274,67 @@ private void waitForFailOver(String timeoutMsg) {
}
}

private ActiveMQBuffer beforeSend(final Packet packet, final int reconnectID) {
packet.setChannelID(id);

if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
}

if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
// This must never called by more than one thread concurrently
private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) {
if (invokeInterceptors(packet, interceptors, connection) != null) {
return false;
}

ActiveMQBuffer buffer = packet.encode(connection);

lock.lock();
synchronized (sendLock) {
packet.setChannelID(id);

try {
if (failingOver) {
waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send");
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
}

// Sanity check
if (transferring) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover();
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
}

if (resendCache != null && packet.isRequiresConfirmations()) {
addResendPacket(packet);
}
ActiveMQBuffer buffer = packet.encode(connection);

} finally {
lock.unlock();
}
lock.lock();

if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id);
}
try {
if (failingOver) {
waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send");
}

checkReconnectID(reconnectID);
// Sanity check
if (transferring) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover();
}

//We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
//As the send could block if the response cache cannot add, preventing responses to be handled.
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
while (!responseAsyncCache.add(packet)) {
try {
Thread.sleep(1);
} catch (Exception e) {
// Ignore
if (resendCache != null && packet.isRequiresConfirmations()) {
addResendPacket(packet);
}
}
}

return buffer;
}
} finally {
lock.unlock();
}

// This must never called by more than one thread concurrently
private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) {
if (invokeInterceptors(packet, interceptors, connection) != null) {
return false;
}
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id);
}

synchronized (sendLock) {
ActiveMQBuffer buffer = beforeSend(packet, reconnectID);
checkReconnectID(reconnectID);

// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
// buffer is full, preventing any incoming buffers being handled and blocking failover
try {
connection.getTransportConnection().write(buffer, flush, batch);
} catch (Throwable t) {
//If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full.
//The client would get still know about this as the exception bubbles up the call stack instead.
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
responseAsyncCache.remove(packet.getCorrelationID());
//We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
//As the send could block if the response cache cannot add, preventing responses to be handled.
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
while (!responseAsyncCache.add(packet)) {
try {
Thread.sleep(1);
} catch (Exception e) {
// Ignore
}
}
throw t;
}
return true;
}
}

@Override
public boolean send(Packet packet,
RandomAccessFile raf,
FileChannel fileChannel,
long offset,
int dataSize,
Callback callback) {
if (invokeInterceptors(packet, interceptors, connection) != null) {
return false;
}

synchronized (sendLock) {
ActiveMQBuffer buffer = beforeSend(packet, -1);

// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
// buffer is full, preventing any incoming buffers being handled and blocking failover
try {
connection.getTransportConnection().write(buffer);
connection.getTransportConnection().write(raf, fileChannel, offset, dataSize, callback == null ? null : (ChannelFutureListener) future -> callback.done(future == null || future.isSuccess()));
connection.getTransportConnection().write(buffer, flush, batch);
} catch (Throwable t) {
//If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full.
//The client would get still know about this as the exception bubbles up the call stack instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,7 @@ protected void encodeHeader(ActiveMQBuffer buffer) {
}

protected void encodeSize(ActiveMQBuffer buffer) {
encodeSize(buffer, buffer.writerIndex());
}

protected void encodeSize(ActiveMQBuffer buffer, int size) {
this.size = size;
size = buffer.writerIndex();

// The length doesn't include the actual length byte
int len = size - DataConstants.SIZE_INT;
Expand All @@ -349,10 +345,9 @@ protected void encodeSize(ActiveMQBuffer buffer, int size) {
}

protected ActiveMQBuffer createPacket(CoreRemotingConnection connection) {
return createPacket(connection, expectedEncodeSize());
}

protected ActiveMQBuffer createPacket(CoreRemotingConnection connection, int size) {
int size = expectedEncodeSize();

if (connection == null) {
return new ChannelBufferWrapper(Unpooled.buffer(size));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
*/
package org.apache.activemq.artemis.core.remoting.impl.netty;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.SocketAddress;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -32,8 +29,6 @@
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedFile;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
Expand Down Expand Up @@ -355,18 +350,6 @@ private boolean canWrite(final int requiredCapacity) {
return canWrite;
}

private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize) {
if (channel.pipeline().get(SslHandler.class) == null) {
return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize);
} else {
try {
return new ChunkedFile(raf, offset, dataSize, 8192);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

@Override
public final void write(ActiveMQBuffer buffer,
final boolean flush,
Expand Down Expand Up @@ -407,30 +390,6 @@ public final void write(ActiveMQBuffer buffer,
}
}

@Override
public void write(RandomAccessFile raf,
FileChannel fileChannel,
long offset,
int dataSize,
final ChannelFutureListener futureListener) {
final int readableBytes = dataSize;
if (logger.isDebugEnabled()) {
final int remainingBytes = this.writeBufferHighWaterMark - readableBytes;
if (remainingBytes < 0) {
logger.debug("a write request is exceeding by " + (-remainingBytes) + " bytes the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark + " ] : consider to set it at least of " + readableBytes + " bytes");
}
}

//no need to lock because the Netty's channel is thread-safe
//and the order of write is ensured by the order of the write calls
final Channel channel = this.channel;
assert readableBytes >= 0;
ChannelFuture channelFuture = channel.writeAndFlush(getFileObject(raf, fileChannel, offset, dataSize));
if (futureListener != null) {
channelFuture.addListener(futureListener);
}
}

private static void flushAndWait(final Channel channel, final ChannelPromise promise) {
if (!channel.eventLoop().inEventLoop()) {
waitFor(promise, DEFAULT_WAIT_MILLIS);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.activemq.artemis.spi.core.remoting;

import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.concurrent.TimeUnit;

import io.netty.channel.ChannelFutureListener;
Expand Down Expand Up @@ -103,8 +101,6 @@ default boolean blockUntilWritable(final int requiredCapacity, final long timeou
*/
void write(ActiveMQBuffer buffer);

void write(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, ChannelFutureListener futureListener);

/**
* This should close the internal channel without calling any listeners.
* This is to avoid a situation where the broker is busy writing on an internal thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package org.apache.activemq.artemis.core.protocol.core.impl;

import javax.security.auth.Subject;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -394,15 +392,6 @@ public void write(ActiveMQBuffer buffer) {

}

@Override
public void write(RandomAccessFile raf,
FileChannel fileChannel,
long offset,
int dataSize,
ChannelFutureListener channelFutureListener) {

}

@Override
public void forceClose() {

Expand Down
Loading

0 comments on commit 70c2200

Please sign in to comment.