Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 12 additions & 17 deletions reactor/src/main/java/com/iluwatar/reactor/app/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,27 @@

package com.iluwatar.reactor.app;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import com.iluwatar.reactor.framework.AbstractNioChannel;
import com.iluwatar.reactor.framework.ChannelHandler;
import com.iluwatar.reactor.framework.Dispatcher;
import com.iluwatar.reactor.framework.NioDatagramChannel;
import com.iluwatar.reactor.framework.NioReactor;
import com.iluwatar.reactor.framework.NioServerSocketChannel;
import com.iluwatar.reactor.framework.ThreadPoolDispatcher;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* This application demonstrates Reactor pattern. The example demonstrated is a Distributed Logging
* Service where it listens on multiple TCP or UDP sockets for incoming log requests.
*
* <p>
* <i>INTENT</i> <br>
*
* <p><i>INTENT</i> <br>
* The Reactor design pattern handles service requests that are delivered concurrently to an
* application by one or more clients. The application can register specific handlers for processing
* which are called by reactor on specific events.
*
* <p>
* <i>PROBLEM</i> <br>
*
* <p><i>PROBLEM</i> <br>
* Server applications in a distributed system must handle multiple clients that send them service
* requests. Following forces need to be resolved:
* <ul>
Expand All @@ -55,9 +52,8 @@
* <li>Programming Simplicity</li>
* <li>Adaptability</li>
* </ul>
*
* <p>
* <i>PARTICIPANTS</i> <br>
*
* <p><i>PARTICIPANTS</i> <br>
* <ul>
* <li>Synchronous Event De-multiplexer
* <p>
Expand Down Expand Up @@ -89,7 +85,6 @@
* separate thread for each client, which provides better scalability under load (number of clients
* increase).
* The example uses Java NIO framework to implement the Reactor.
*
*/
public class App {

Expand All @@ -100,7 +95,7 @@ public class App {
/**
* Creates an instance of App which will use provided dispatcher for dispatching events on
* reactor.
*
*
* @param dispatcher the dispatcher that will be used to dispatch events.
*/
public App(Dispatcher dispatcher) {
Expand Down Expand Up @@ -142,9 +137,9 @@ public void start() throws IOException {

/**
* Stops the NIO reactor. This is a blocking call.
*
*
* @throws InterruptedException if interrupted while stopping the reactor.
* @throws IOException if any I/O error occurs
* @throws IOException if any I/O error occurs
*/
public void stop() throws InterruptedException, IOException {
reactor.stop();
Expand Down
7 changes: 3 additions & 4 deletions reactor/src/main/java/com/iluwatar/reactor/app/AppClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@

package com.iluwatar.reactor.app;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -39,6 +36,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Represents the clients of Reactor pattern. Multiple clients are run concurrently and send logging
Expand Down Expand Up @@ -158,7 +157,7 @@ static class UdpLoggingClient implements Runnable {
* Creates a new UDP logging client.
*
* @param clientName the name of the client to be sent in logging requests.
* @param port the port on which client will send logging requests.
* @param port the port on which client will send logging requests.
* @throws UnknownHostException if localhost is unknown
*/
public UdpLoggingClient(String clientName, int port) throws UnknownHostException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@

package com.iluwatar.reactor.app;

import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;

import com.iluwatar.reactor.framework.AbstractNioChannel;
import com.iluwatar.reactor.framework.ChannelHandler;
import com.iluwatar.reactor.framework.NioDatagramChannel.DatagramPacket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -63,7 +62,11 @@ public void handleChannelRead(AbstractNioChannel channel, Object readObject, Sel
}
}

private static void sendReply(AbstractNioChannel channel, DatagramPacket incomingPacket, SelectionKey key) {
private static void sendReply(
AbstractNioChannel channel,
DatagramPacket incomingPacket,
SelectionKey key
) {
/*
* Create a reply acknowledgement datagram packet setting the receiver to the sender of incoming
* message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@
/**
* This represents the <i>Handle</i> of Reactor pattern. These are resources managed by OS which can
* be submitted to {@link NioReactor}.
*
* <p>
* This class serves has the responsibility of reading the data when a read event occurs and writing
* the data back when the channel is writable. It leaves the reading and writing of data on the
* concrete implementation. It provides a block writing mechanism wherein when any
* {@link ChannelHandler} wants to write data back, it queues the data in pending write queue and
* clears it in block manner. This provides better throughput.
*
* <p>This class serves has the responsibility of reading the data when a read event occurs and
* writing the data back when the channel is writable. It leaves the reading and writing of data on
* the concrete implementation. It provides a block writing mechanism wherein when any {@link
* ChannelHandler} wants to write data back, it queues the data in pending write queue and clears it
* in block manner. This provides better throughput.
*/
public abstract class AbstractNioChannel {

Expand All @@ -53,7 +52,7 @@ public abstract class AbstractNioChannel {

/**
* Creates a new channel.
*
*
* @param handler which will handle events occurring on this channel.
* @param channel a NIO channel to be wrapped.
*/
Expand All @@ -70,39 +69,43 @@ void setReactor(NioReactor reactor) {
}

/**
* Get channel.
*
* @return the wrapped NIO channel.
*/
public SelectableChannel getJavaChannel() {
return channel;
}

/**
* The operation in which the channel is interested, this operation is provided to
* {@link Selector}.
*
* The operation in which the channel is interested, this operation is provided to {@link
* Selector}.
*
* @return interested operation.
* @see SelectionKey
*/
public abstract int getInterestedOps();

/**
* Binds the channel on provided port.
*
*
* @throws IOException if any I/O error occurs.
*/
public abstract void bind() throws IOException;

/**
* Reads the data using the key and returns the read data. The underlying channel should be
* fetched using {@link SelectionKey#channel()}.
*
*
* @param key the key on which read event occurred.
* @return data read.
* @throws IOException if any I/O error occurs.
*/
public abstract Object read(SelectionKey key) throws IOException;

/**
* Get handler.
*
* @return the handler associated with this channel.
*/
public ChannelHandler getHandler() {
Expand Down Expand Up @@ -130,34 +133,33 @@ void flush(SelectionKey key) throws IOException {

/**
* Writes the data to the channel.
*
*
* @param pendingWrite the data to be written on channel.
* @param key the key which is writable.
* @param key the key which is writable.
* @throws IOException if any I/O error occurs.
*/
protected abstract void doWrite(Object pendingWrite, SelectionKey key) throws IOException;

/**
* Queues the data for writing. The data is not guaranteed to be written on underlying channel
* when this method returns. It will be written when the channel is flushed.
*
* <p>
* This method is used by the {@link ChannelHandler} to send reply back to the client. <br>
*
* <p>This method is used by the {@link ChannelHandler} to send reply back to the client. <br>
* Example:
*
*
* <pre>
* <code>
* {@literal @}Override
* public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) {
* byte[] data = ((ByteBuffer)readObject).array();
* public void handleChannelRead(AbstractNioChannel channel, Object readObj, SelectionKey key) {
* byte[] data = ((ByteBuffer)readObj).array();
* ByteBuffer buffer = ByteBuffer.wrap("Server reply".getBytes());
* channel.write(buffer, key);
* }
* </code>
* </pre>
*
*
* @param data the data to be written on underlying channel.
* @param key the key which is writable.
* @param key the key which is writable.
*/
public void write(Object data, SelectionKey key) {
Queue<Object> pendingWrites = this.channelToPendingWrites.get(key.channel());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@
/**
* Represents the <i>EventHandler</i> of Reactor pattern. It handles the incoming events dispatched
* to it by the {@link Dispatcher}. This is where the application logic resides.
*
* <p>
* A {@link ChannelHandler} can be associated with one or many {@link AbstractNioChannel}s, and
* whenever an event occurs on any of the associated channels, the handler is notified of the event.
*
* <p>A {@link ChannelHandler} can be associated with one or many {@link AbstractNioChannel}s, and
* whenever an event occurs on any of the associated channels, the handler is notified of the
* event.
*/
public interface ChannelHandler {

/**
* Called when the {@code channel} receives some data from remote peer.
*
* @param channel the channel from which the data was received.
*
* @param channel the channel from which the data was received.
* @param readObject the data read.
* @param key the key on which read event occurred.
* @param key the key on which read event occurred.
*/
void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,12 @@
* Represents the event dispatching strategy. When {@link NioReactor} senses any event on the
* registered {@link AbstractNioChannel}s then it de-multiplexes the event type, read or write or
* connect, and then calls the {@link Dispatcher} to dispatch the read events. This decouples the
* I/O processing from application specific processing. <br>
* Dispatcher should call the {@link ChannelHandler} associated with the channel on which event
* occurred.
*
* <p>
* The application can customize the way in which event is dispatched such as using the reactor
* I/O processing from application specific processing. <br> Dispatcher should call the {@link
* ChannelHandler} associated with the channel on which event occurred.
*
* <p>The application can customize the way in which event is dispatched such as using the reactor
* thread to dispatch event to channels or use a worker pool to do the non I/O processing.
*
*
* @see SameThreadDispatcher
* @see ThreadPoolDispatcher
*/
Expand All @@ -45,19 +43,18 @@ public interface Dispatcher {
* This hook method is called when read event occurs on particular channel. The data read is
* provided in <code>readObject</code>. The implementation should dispatch this read event to the
* associated {@link ChannelHandler} of <code>channel</code>.
*
* <p>
* The type of <code>readObject</code> depends on the channel on which data was received.
*
* @param channel on which read event occurred
*
* <p>The type of <code>readObject</code> depends on the channel on which data was received.
*
* @param channel on which read event occurred
* @param readObject object read by channel
* @param key on which event occurred
* @param key on which event occurred
*/
void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key);

/**
* Stops dispatching events and cleans up any acquired resources such as threads.
*
*
* @throws InterruptedException if interrupted while stopping dispatcher.
*/
void stop() throws InterruptedException;
Expand Down
Loading