Skip to content

Commit

Permalink
White space and minimal formatting
Browse files Browse the repository at this point in the history
Signed-off-by: Arjan Tijms <arjan.tijms@gmail.com>
  • Loading branch information
arjantijms committed Mar 16, 2022
1 parent 5285f3f commit 98635a3
Show file tree
Hide file tree
Showing 15 changed files with 289 additions and 229 deletions.
Expand Up @@ -16,17 +16,18 @@

package org.glassfish.grizzly.thrift;

import java.io.IOException;

import org.apache.thrift.TConfiguration;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.utils.BufferOutputStream;

import java.io.IOException;

/**
* Abstract class for implementing thrift's TTransport.
* By using BufferOutputStream, the output buffer will be increased automatically by given MemoryManager if it doesn't have enough spaces.
* Abstract class for implementing thrift's TTransport. By using
* BufferOutputStream, the output buffer will be increased automatically by
* given MemoryManager if it doesn't have enough spaces.
*
* @author Bongjae Chang
*/
Expand Down
Expand Up @@ -16,16 +16,6 @@

package org.glassfish.grizzly.thrift;

import org.apache.thrift.transport.TTransportException;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.Grizzly;
import org.glassfish.grizzly.GrizzlyFuture;
import org.glassfish.grizzly.attributes.Attribute;
import org.glassfish.grizzly.memory.MemoryManager;
import org.glassfish.grizzly.thrift.client.GrizzlyThriftClient;
import org.glassfish.grizzly.utils.BufferOutputStream;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
Expand All @@ -34,14 +24,25 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.thrift.transport.TTransportException;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.Grizzly;
import org.glassfish.grizzly.GrizzlyFuture;
import org.glassfish.grizzly.Processor;
import org.glassfish.grizzly.attributes.Attribute;
import org.glassfish.grizzly.filterchain.FilterChain;
import org.glassfish.grizzly.memory.MemoryManager;
import org.glassfish.grizzly.thrift.client.GrizzlyThriftClient;
import org.glassfish.grizzly.utils.BufferOutputStream;

/**
* TGrizzlyClientTransport is the client-side TTransport.
* <p>
* BlockingQueue which belongs to ThriftClientFilter has input messages when server's response are arrived.
* Only TTransport#flush() will be called, output messages will be written. Before flush(), output messages will be stored in buffer.
* BlockingQueue which belongs to ThriftClientFilter has input messages when
* server's response are arrived. Only TTransport#flush() will be called, output
* messages will be written. Before flush(), output messages will be stored in
* buffer.
*
* @author Bongjae Chang
*/
Expand All @@ -50,8 +51,15 @@ public class TGrizzlyClientTransport extends AbstractTGrizzlyTransport {
private static final long DEFAULT_READ_TIMEOUT_MILLIS = -1L; // never timed out
private static final long DEFAULT_WRITE_TIMEOUT_MILLIS = -1L; // never timed out

private final Attribute<BlockingQueue<Buffer>> inputBuffersQueueAttribute =
Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(GrizzlyThriftClient.INPUT_BUFFERS_QUEUE_ATTRIBUTE_NAME);
private Buffer input = null;
private final Connection connection;
private final BlockingQueue<Buffer> inputBuffersQueue;
private final BufferOutputStream outputStream;
private final long readTimeoutMillis;
private final long writeTimeoutMillis;

private final Attribute<BlockingQueue<Buffer>> inputBuffersQueueAttribute = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
.createAttribute(GrizzlyThriftClient.INPUT_BUFFERS_QUEUE_ATTRIBUTE_NAME);

private final AtomicBoolean running = new AtomicBoolean();

Expand All @@ -67,6 +75,7 @@ public static TGrizzlyClientTransport create(final Connection connection, final
if (connection == null) {
throw new IllegalStateException("connection should not be null.");
}

final Processor processor = connection.getProcessor();
if (!(processor instanceof FilterChain)) {
throw new IllegalStateException("connection's processor has to be a FilterChain.");
Expand All @@ -76,33 +85,21 @@ public static TGrizzlyClientTransport create(final Connection connection, final
if (idx == -1) {
throw new IllegalStateException("connection has to have ThriftClientFilter in the FilterChain.");
}
final ThriftClientFilter thriftClientFilter =
(ThriftClientFilter) connectionFilterChain.get(idx);
final ThriftClientFilter thriftClientFilter = (ThriftClientFilter) connectionFilterChain.get(idx);
if (thriftClientFilter == null) {
throw new IllegalStateException("thriftClientFilter should not be null.");
}
return new TGrizzlyClientTransport(connection, readTimeoutMillis, writeTimeoutMillis);
}

private Buffer input = null;
private final Connection connection;
private final BlockingQueue<Buffer> inputBuffersQueue;
private final BufferOutputStream outputStream;
private final long readTimeoutMillis;
private final long writeTimeoutMillis;

private TGrizzlyClientTransport(final Connection connection,
final long readTimeoutMillis,
final long writeTimeoutMillis) {
private TGrizzlyClientTransport(final Connection connection, final long readTimeoutMillis, final long writeTimeoutMillis) {
this.connection = connection;
this.inputBuffersQueue = new LinkedTransferQueue<>();
inputBuffersQueueAttribute.set(connection, this.inputBuffersQueue);
this.outputStream = new BufferOutputStream(
connection.getTransport().getMemoryManager()) {
this.outputStream = new BufferOutputStream(connection.getTransport().getMemoryManager()) {

@Override
protected Buffer allocateNewBuffer(
final MemoryManager memoryManager, final int size) {
protected Buffer allocateNewBuffer(final MemoryManager memoryManager, final int size) {
final Buffer b = memoryManager.allocate(size);
b.allowBufferDispose(true);
return b;
Expand Down
Expand Up @@ -23,7 +23,8 @@
/**
* TGrizzlyServerTransport is the server-side TTransport.
* <p>
* Input and output buffers are already allocated by Grizzly or ThriftServerFilter.
* Input and output buffers are already allocated by Grizzly or
* ThriftServerFilter.
*
* @author Bongjae Chang
*/
Expand Down
34 changes: 18 additions & 16 deletions src/main/java/org/glassfish/grizzly/thrift/ThriftClientFilter.java
Expand Up @@ -16,6 +16,13 @@

package org.glassfish.grizzly.thrift;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TransferQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.thrift.TServiceClient;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.Connection;
Expand All @@ -28,19 +35,14 @@
import org.glassfish.grizzly.thrift.client.GrizzlyThriftClient;
import org.glassfish.grizzly.thrift.client.pool.ObjectPool;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TransferQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* ThriftClientFilter is a client-side filter for Thrift RPC processors.
* <p>
* Read-messages will be queued in LinkedBlockingQueue from which TGrizzlyClientTransport will read it.
* Read-messages will be queued in LinkedBlockingQueue from which
* TGrizzlyClientTransport will read it.
* <p>
* Usages:
*
* <pre>
* {@code
* final FilterChainBuilder clientFilterChainBuilder = FilterChainBuilder.stateless();
Expand Down Expand Up @@ -71,12 +73,12 @@ public class ThriftClientFilter<T extends TServiceClient> extends BaseFilter {

private static final Logger logger = Grizzly.logger(ThriftClientFilter.class);

private final Attribute<ObjectPool<SocketAddress, T>> connectionPoolAttribute =
Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(GrizzlyThriftClient.CONNECTION_POOL_ATTRIBUTE_NAME);
private final Attribute<T> connectionClientAttribute =
Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(GrizzlyThriftClient.CLIENT_ATTRIBUTE_NAME);
private final Attribute<BlockingQueue<Buffer>> inputBuffersQueueAttribute =
Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(GrizzlyThriftClient.INPUT_BUFFERS_QUEUE_ATTRIBUTE_NAME);
private final Attribute<ObjectPool<SocketAddress, T>> connectionPoolAttribute = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
.createAttribute(GrizzlyThriftClient.CONNECTION_POOL_ATTRIBUTE_NAME);
private final Attribute<T> connectionClientAttribute = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
.createAttribute(GrizzlyThriftClient.CLIENT_ATTRIBUTE_NAME);
private final Attribute<BlockingQueue<Buffer>> inputBuffersQueueAttribute = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
.createAttribute(GrizzlyThriftClient.INPUT_BUFFERS_QUEUE_ATTRIBUTE_NAME);

static final Buffer POISON = Buffers.EMPTY_BUFFER;

Expand Down Expand Up @@ -107,8 +109,7 @@ public NextAction handleClose(FilterChainContext ctx) throws IOException {
final Connection<SocketAddress> connection = ctx.getConnection();
if (connection != null) {
boolean hasWaitingConsumer = false;
final TransferQueue<Buffer> inputBuffersQueue =
(TransferQueue<Buffer>) inputBuffersQueueAttribute.remove(connection);
final TransferQueue<Buffer> inputBuffersQueue = (TransferQueue<Buffer>) inputBuffersQueueAttribute.remove(connection);
if (inputBuffersQueue != null) {
hasWaitingConsumer = inputBuffersQueue.tryTransfer(POISON);
}
Expand All @@ -125,6 +126,7 @@ public NextAction handleClose(FilterChainContext ctx) throws IOException {
}
}
}

return ctx.getInvokeAction();
}
}
Expand Up @@ -16,6 +16,8 @@

package org.glassfish.grizzly.thrift;

import java.io.IOException;

import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.Grizzly;
Expand All @@ -26,12 +28,12 @@
import org.glassfish.grizzly.memory.Buffers;
import org.glassfish.grizzly.memory.MemoryManager;

import java.io.IOException;

/**
* ThriftFrameFilter supports TFramedTranport that ensures a fully read message by preceding messages with a 4-byte frame size.
* ThriftFrameFilter supports TFramedTranport that ensures a fully read message
* by preceding messages with a 4-byte frame size.
* <p>
* If the frame size exceeds the max size which you can set by constructor's parameter, exception will be thrown.
* If the frame size exceeds the max size which you can set by constructor's
* parameter, exception will be thrown.
*
* @author Bongjae Chang
*/
Expand Down
Expand Up @@ -16,6 +16,8 @@

package org.glassfish.grizzly.thrift;

import java.io.IOException;

import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
Expand All @@ -29,14 +31,14 @@
import org.glassfish.grizzly.memory.MemoryManager;
import org.glassfish.grizzly.utils.BufferOutputStream;

import java.io.IOException;

/**
* ThriftServerFilter is a server-side filter for Thrift RPC processors.
* <p>
* You can set the specific response size by constructor for optimal performance.
* You can set the specific response size by constructor for optimal
* performance.
* <p>
* Usages:
*
* <pre>
* {@code
* final FilterChainBuilder serverFilterChainBuilder = FilterChainBuilder.stateless();
Expand Down

0 comments on commit 98635a3

Please sign in to comment.