Skip to content

Commit

Permalink
Java 11 migrate remaining q-r (#1121)
Browse files Browse the repository at this point in the history
* Moves queue-load-leveling to Java 11

* Moves reactor to Java 11

* Moves reader-writer-lock to Java 11

* Moves repository to Java 11

* Moves resource-acquisition-is-initialization to Java 11

* Moves retry to Java 11

* Moves role-object to Java 11
  • Loading branch information
anuragagarwal561994 authored and iluwatar committed Jan 4, 2020
1 parent cd2a2e7 commit 20ea465
Show file tree
Hide file tree
Showing 52 changed files with 422 additions and 552 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,17 @@ public static void main(String[] args) {

try {
// Create a MessageQueue object.
MessageQueue msgQueue = new MessageQueue();
var msgQueue = new MessageQueue();

LOGGER.info("Submitting TaskGenerators and ServiceExecutor threads.");

// Create three TaskGenerator threads. Each of them will submit different number of jobs.
final Runnable taskRunnable1 = new TaskGenerator(msgQueue, 5);
final Runnable taskRunnable2 = new TaskGenerator(msgQueue, 1);
final Runnable taskRunnable3 = new TaskGenerator(msgQueue, 2);
final var taskRunnable1 = new TaskGenerator(msgQueue, 5);
final var taskRunnable2 = new TaskGenerator(msgQueue, 1);
final var taskRunnable3 = new TaskGenerator(msgQueue, 2);

// Create e service which should process the submitted jobs.
final Runnable srvRunnable = new ServiceExecutor(msgQueue);
final var srvRunnable = new ServiceExecutor(msgQueue);

// Create a ThreadPool of 2 threads and
// submit all Runnable task for execution to executor..
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class MessageQueue {

// Default constructor when called creates Blocking Queue object.
public MessageQueue() {
this.blkQueue = new ArrayBlockingQueue<Message>(1024);
this.blkQueue = new ArrayBlockingQueue<>(1024);
}

/**
Expand All @@ -62,13 +62,11 @@ public void submitMsg(Message msg) {
* them. Retrieves and removes the head of this queue, or returns null if this queue is empty.
*/
public Message retrieveMsg() {
Message retrievedMsg = null;
try {
retrievedMsg = blkQueue.poll();
return blkQueue.poll();
} catch (Exception e) {
LOGGER.error(e.getMessage());
}

return retrievedMsg;
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public ServiceExecutor(MessageQueue msgQueue) {
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
Message msg = msgQueue.retrieveMsg();
var msg = msgQueue.retrieveMsg();

if (null != msg) {
LOGGER.info(msg.toString() + " is served.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,11 @@ public void submit(Message msg) {
* submission TaskGenerator thread will sleep for 1 second.
*/
public void run() {

int count = this.msgCount;
var count = this.msgCount;

try {
while (count > 0) {
String statusMsg = "Message-" + count + " submitted by " + Thread.currentThread().getName();
var statusMsg = "Message-" + count + " submitted by " + Thread.currentThread().getName();
this.submit(new Message(statusMsg));

LOGGER.info(statusMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,12 @@

import org.junit.jupiter.api.Test;

import java.io.IOException;

/**
* Application Test
*/
public class AppTest {
@Test
public void test() throws IOException {
String[] args = {};
App.main(args);
public void test() {
App.main(new String[]{});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,23 @@

package com.iluwatar.queue.load.leveling;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;

/**
*
* Test case for submitting and retrieving messages from Blocking Queue.
*
*/
public class MessageQueueTest {

@Test
public void messageQueueTest() {
MessageQueue msgQueue = new MessageQueue();

var msgQueue = new MessageQueue();

// submit message
msgQueue.submitMsg(new Message("MessageQueue Test"));

// retrieve message
assertEquals("MessageQueue Test", msgQueue.retrieveMsg().getMsg());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,21 @@

package com.iluwatar.queue.load.leveling;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;

/**
*
* Test case for creating and checking the Message.
*
*/
public class MessageTest {

@Test
public void messageTest() {

// Parameterized constructor test.
String testMsg = "Message Test";
Message msg = new Message(testMsg);
var testMsg = "Message Test";
var msg = new Message(testMsg);
assertEquals(testMsg, msg.getMsg());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,23 @@
import org.junit.jupiter.api.Test;

/**
*
* Test case for submitting Message to Blocking Queue by TaskGenerator
* and retrieve the message by ServiceExecutor.
*
* Test case for submitting Message to Blocking Queue by TaskGenerator and retrieve the message by
* ServiceExecutor.
*/
public class TaskGenSrvExeTest {

@Test
public void taskGeneratorTest() {
MessageQueue msgQueue = new MessageQueue();
var msgQueue = new MessageQueue();

// Create a task generator thread with 1 job to submit.
Runnable taskRunnable = new TaskGenerator(msgQueue, 1);
Thread taskGenThr = new Thread(taskRunnable);
var taskRunnable = new TaskGenerator(msgQueue, 1);
var taskGenThr = new Thread(taskRunnable);
taskGenThr.start();

// Create a service executor thread.
Runnable srvRunnable = new ServiceExecutor(msgQueue);
Thread srvExeThr = new Thread(srvRunnable);
var srvRunnable = new ServiceExecutor(msgQueue);
var srvExeThr = new Thread(srvRunnable);
srvExeThr.start();
}

Expand Down
14 changes: 8 additions & 6 deletions reactor/src/main/java/com/iluwatar/reactor/app/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,17 @@ public void start() throws IOException {
* This represents application specific business logic that dispatcher will call on appropriate
* events. These events are read events in our example.
*/
LoggingHandler loggingHandler = new LoggingHandler();
var loggingHandler = new LoggingHandler();

/*
* Our application binds to multiple channels and uses same logging handler to handle incoming
* log requests.
*/
reactor.registerChannel(tcpChannel(6666, loggingHandler))
reactor
.registerChannel(tcpChannel(6666, loggingHandler))
.registerChannel(tcpChannel(6667, loggingHandler))
.registerChannel(udpChannel(6668, loggingHandler)).start();
.registerChannel(udpChannel(6668, loggingHandler))
.start();
}

/**
Expand All @@ -144,20 +146,20 @@ public void start() throws IOException {
public void stop() throws InterruptedException, IOException {
reactor.stop();
dispatcher.stop();
for (AbstractNioChannel channel : channels) {
for (var channel : channels) {
channel.getJavaChannel().close();
}
}

private AbstractNioChannel tcpChannel(int port, ChannelHandler handler) throws IOException {
NioServerSocketChannel channel = new NioServerSocketChannel(port, handler);
var channel = new NioServerSocketChannel(port, handler);
channel.bind();
channels.add(channel);
return channel;
}

private AbstractNioChannel udpChannel(int port, ChannelHandler handler) throws IOException {
NioDatagramChannel channel = new NioDatagramChannel(port, handler);
var channel = new NioDatagramChannel(port, handler);
channel.bind();
channels.add(channel);
return channel;
Expand Down
27 changes: 13 additions & 14 deletions reactor/src/main/java/com/iluwatar/reactor/app/AppClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
Expand Down Expand Up @@ -55,7 +54,7 @@ public class AppClient {
* @throws IOException if any I/O error occurs.
*/
public static void main(String[] args) throws IOException {
AppClient appClient = new AppClient();
var appClient = new AppClient();
appClient.start();
}

Expand Down Expand Up @@ -118,8 +117,8 @@ public TcpLoggingClient(String clientName, int serverPort) {
@Override
public void run() {
try (Socket socket = new Socket(InetAddress.getLocalHost(), serverPort)) {
OutputStream outputStream = socket.getOutputStream();
PrintWriter writer = new PrintWriter(outputStream);
var outputStream = socket.getOutputStream();
var writer = new PrintWriter(outputStream);
sendLogRequests(writer, socket.getInputStream());
} catch (IOException e) {
LOGGER.error("error sending requests", e);
Expand All @@ -128,12 +127,12 @@ public void run() {
}

private void sendLogRequests(PrintWriter writer, InputStream inputStream) throws IOException {
for (int i = 0; i < 4; i++) {
for (var i = 0; i < 4; i++) {
writer.println(clientName + " - Log request: " + i);
writer.flush();

byte[] data = new byte[1024];
int read = inputStream.read(data, 0, data.length);
var data = new byte[1024];
var read = inputStream.read(data, 0, data.length);
if (read == 0) {
LOGGER.info("Read zero bytes");
} else {
Expand Down Expand Up @@ -167,17 +166,17 @@ public UdpLoggingClient(String clientName, int port) throws UnknownHostException

@Override
public void run() {
try (DatagramSocket socket = new DatagramSocket()) {
for (int i = 0; i < 4; i++) {
try (var socket = new DatagramSocket()) {
for (var i = 0; i < 4; i++) {

String message = clientName + " - Log request: " + i;
DatagramPacket request =
new DatagramPacket(message.getBytes(), message.getBytes().length, remoteAddress);
var message = clientName + " - Log request: " + i;
var bytes = message.getBytes();
var request = new DatagramPacket(bytes, bytes.length, remoteAddress);

socket.send(request);

byte[] data = new byte[1024];
DatagramPacket reply = new DatagramPacket(data, data.length);
var data = new byte[1024];
var reply = new DatagramPacket(data, data.length);
socket.receive(reply);
if (reply.getLength() == 0) {
LOGGER.info("Read zero bytes");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void handleChannelRead(AbstractNioChannel channel, Object readObject, Sel
doLogging((ByteBuffer) readObject);
sendReply(channel, key);
} else if (readObject instanceof DatagramPacket) {
DatagramPacket datagram = (DatagramPacket) readObject;
var datagram = (DatagramPacket) readObject;
doLogging(datagram.getData());
sendReply(channel, datagram, key);
} else {
Expand All @@ -71,14 +71,14 @@ private static void sendReply(
* Create a reply acknowledgement datagram packet setting the receiver to the sender of incoming
* message.
*/
DatagramPacket replyPacket = new DatagramPacket(ByteBuffer.wrap(ACK));
var replyPacket = new DatagramPacket(ByteBuffer.wrap(ACK));
replyPacket.setReceiver(incomingPacket.getSender());

channel.write(replyPacket, key);
}

private static void sendReply(AbstractNioChannel channel, SelectionKey key) {
ByteBuffer buffer = ByteBuffer.wrap(ACK);
var buffer = ByteBuffer.wrap(ACK);
channel.write(buffer, key);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ public abstract class AbstractNioChannel {

private final SelectableChannel channel;
private final ChannelHandler handler;
private final Map<SelectableChannel, Queue<Object>> channelToPendingWrites =
new ConcurrentHashMap<>();
private final Map<SelectableChannel, Queue<Object>> channelToPendingWrites;
private NioReactor reactor;

/**
Expand All @@ -59,6 +58,7 @@ public abstract class AbstractNioChannel {
public AbstractNioChannel(ChannelHandler handler, SelectableChannel channel) {
this.handler = handler;
this.channel = channel;
this.channelToPendingWrites = new ConcurrentHashMap<>();
}

/**
Expand Down Expand Up @@ -117,18 +117,14 @@ public ChannelHandler getHandler() {
* whole pending block of data at once.
*/
void flush(SelectionKey key) throws IOException {
Queue<Object> pendingWrites = channelToPendingWrites.get(key.channel());
while (true) {
Object pendingWrite = pendingWrites.poll();
if (pendingWrite == null) {
// We don't have anything more to write so channel is interested in reading more data
reactor.changeOps(key, SelectionKey.OP_READ);
break;
}

var pendingWrites = channelToPendingWrites.get(key.channel());
Object pendingWrite;
while ((pendingWrite = pendingWrites.poll()) != null) {
// ask the concrete channel to make sense of data and write it to java channel
doWrite(pendingWrite, key);
}
// We don't have anything more to write so channel is interested in reading more data
reactor.changeOps(key, SelectionKey.OP_READ);
}

/**
Expand Down Expand Up @@ -162,7 +158,7 @@ void flush(SelectionKey key) throws IOException {
* @param key the key which is writable.
*/
public void write(Object data, SelectionKey key) {
Queue<Object> pendingWrites = this.channelToPendingWrites.get(key.channel());
var pendingWrites = this.channelToPendingWrites.get(key.channel());
if (pendingWrites == null) {
synchronized (this.channelToPendingWrites) {
pendingWrites = this.channelToPendingWrites.get(key.channel());
Expand Down

0 comments on commit 20ea465

Please sign in to comment.