diff --git a/src/main/java/lessons/lesson11/pubsub/PubSubApplication.java b/src/main/java/lessons/lesson11/pubsub/PubSubApplication.java new file mode 100644 index 0000000..94b6cdd --- /dev/null +++ b/src/main/java/lessons/lesson11/pubsub/PubSubApplication.java @@ -0,0 +1,47 @@ +package lessons.lesson11.pubsub; + +import java.util.Scanner; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; + +public class PubSubApplication { + public static void main(String[] args) { + Utils.clearTerminal(); + + BlockingDeque messages = new LinkedBlockingDeque<>(); + + ScreenService screenService = new ScreenService(); + Subscriber subscriber = new Subscriber(messages, screenService); + Publisher publisher = new Publisher(messages); + + Thread subscriberThread = new Thread(subscriber, "Subscriber"); + subscriberThread.start(); + + screenService.printScreen(); + + try (Scanner scanner = new Scanner(System.in)) { + while (true) { + String message = scanner.nextLine(); + + if(message.isEmpty()) { + screenService.printScreen(); + continue; + } + + publisher.add(message); + + if ("exit".equals(message)) break; + } + } catch (Exception e) { + System.out.printf("Error - %s\n", e.getMessage()); + } finally { + System.out.println("\nExit"); + + try { + subscriberThread.join(2000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } +} diff --git a/src/main/java/lessons/lesson11/pubsub/Publisher.java b/src/main/java/lessons/lesson11/pubsub/Publisher.java new file mode 100644 index 0000000..478658f --- /dev/null +++ b/src/main/java/lessons/lesson11/pubsub/Publisher.java @@ -0,0 +1,15 @@ +package lessons.lesson11.pubsub; + +import java.util.concurrent.BlockingDeque; + +public class Publisher { + private final BlockingDeque messages; + + public Publisher(BlockingDeque messages) { + this.messages = messages; + } + + public void add(final String message) { + messages.add(message); + } +} diff --git a/src/main/java/lessons/lesson11/pubsub/ScreenService.java b/src/main/java/lessons/lesson11/pubsub/ScreenService.java new file mode 100644 index 0000000..360f6be --- /dev/null +++ b/src/main/java/lessons/lesson11/pubsub/ScreenService.java @@ -0,0 +1,87 @@ +package lessons.lesson11.pubsub; + +import java.util.ArrayList; +import java.util.List; + +public class ScreenService { + private final static int DEFAULT_FORDER_LENGTH = 50; + private final static int PADDING_LENGTH = 3; + + private final List prevMessages; + + public ScreenService() { + this.prevMessages = new ArrayList<>(); + } + + + public void addMessage(final String message) { + prevMessages.add(message); + } + + private int getBorderLength() { + if (prevMessages.isEmpty()) return DEFAULT_FORDER_LENGTH; + + int maxWordLength = 0; + + for (String message : prevMessages) { + if (message.length() > maxWordLength) { + maxWordLength = message.length(); + } + } + + int borderLen = maxWordLength + (PADDING_LENGTH * 2); + + return Math.max(borderLen, DEFAULT_FORDER_LENGTH); + + } + + public void printScreen() { + Utils.clearTerminal(); + + int borderLength = getBorderLength(); + + String padding = " ".repeat(PADDING_LENGTH); + + System.out.println("-".repeat(borderLength)); + + System.out.println(getHeader(borderLength)); + + System.out.println("-".repeat(borderLength)); + System.out.printf("|%s|\n", " ".repeat(borderLength - 2)); + + if (prevMessages.isEmpty()) { + System.out.printf("|%s|\n", " ".repeat(borderLength - 2)); + System.out.printf("|%s|\n", " ".repeat(borderLength - 2)); + } else { + for (String message : prevMessages) { + int paddingRightLen = borderLength - message.length() - PADDING_LENGTH - 2; + + String paddingRight = " ".repeat(paddingRightLen); + + System.out.printf("|%s%s%s|\n", padding, message, paddingRight); + } + } + + System.out.printf("|%s|\n", " ".repeat(borderLength - 2)); + + System.out.println("-".repeat(borderLength)); + System.out.print("\n\n"); + + System.out.print("Enter message: "); + } + + + private String getHeader(final int borderLength) { + String headerTitle = "MESSAGES"; + int headerPaddingLength = (borderLength - headerTitle.length()) / 2; + + String headerPaddingLeft = " ".repeat(headerPaddingLength - 1); + String headerPaddingRight = headerPaddingLeft; + + if (borderLength % 2 != 0) { + headerPaddingRight = " ".repeat(headerPaddingLength); + } + + return String.format("|%s%s%s|", headerPaddingLeft, headerTitle, headerPaddingRight); + } +} diff --git a/src/main/java/lessons/lesson11/pubsub/Subscriber.java b/src/main/java/lessons/lesson11/pubsub/Subscriber.java new file mode 100644 index 0000000..5e528a1 --- /dev/null +++ b/src/main/java/lessons/lesson11/pubsub/Subscriber.java @@ -0,0 +1,29 @@ +package lessons.lesson11.pubsub; + +import java.util.concurrent.BlockingDeque; + +public class Subscriber implements Runnable { + private final BlockingDeque messages; + private final ScreenService screenService; + + + public Subscriber(BlockingDeque messages, ScreenService screenService) { + this.messages = messages; + this.screenService = screenService; + } + + @Override + public void run() { + while (true) { + if (messages.isEmpty()) continue; + + String message = messages.poll(); + + if ("exit".equals(message)) break; + + screenService.addMessage(message); + + screenService.printScreen(); + } + } +} diff --git a/src/main/java/lessons/lesson11/pubsub/Utils.java b/src/main/java/lessons/lesson11/pubsub/Utils.java new file mode 100644 index 0000000..a15ee7b --- /dev/null +++ b/src/main/java/lessons/lesson11/pubsub/Utils.java @@ -0,0 +1,8 @@ +package lessons.lesson11.pubsub; + +public class Utils { + public static void clearTerminal() { + System.out.print("\033[H\033[2J"); + System.out.flush(); + } +} diff --git a/src/main/java/lessons/lesson11/simplepool/SimpleThreadPool.java b/src/main/java/lessons/lesson11/simplepool/SimpleThreadPool.java new file mode 100644 index 0000000..c3975ed --- /dev/null +++ b/src/main/java/lessons/lesson11/simplepool/SimpleThreadPool.java @@ -0,0 +1,55 @@ +package lessons.lesson11.simplepool; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SimpleThreadPool { + private final BlockingQueue tasks; + private final List workers; + private final AtomicBoolean isRunning; + static final Runnable STOPPER = () -> {}; + + public SimpleThreadPool(int workerCount) { + this.isRunning = new AtomicBoolean(true); + this.tasks = new LinkedBlockingDeque<>(); + this.workers = new ArrayList<>(); + + for (int i = 0; i < workerCount; i++) { + String workerName = "Worker " + (i + 1); + + Thread worker = new Thread(() -> { + while (isRunning.get() || !tasks.isEmpty()) { + try { + Runnable task = tasks.poll(2, TimeUnit.SECONDS); + + if (task != null && !task.equals(STOPPER)) { + task.run(); + } + } catch (InterruptedException e) { + System.out.printf("Error - %s", e.getMessage()); + } + } + }, workerName); + + workers.add(worker); + worker.start(); + } + + } + + public void submit(final Runnable task) { + tasks.add(task); + } + + public void shutdown() { + isRunning.set(false); + + for (int i = 0; i < workers.size(); i++) { + tasks.add(STOPPER); + } + } +} diff --git a/src/main/java/lessons/lesson11/simplepool/ThreadPoolApplication.java b/src/main/java/lessons/lesson11/simplepool/ThreadPoolApplication.java new file mode 100644 index 0000000..3da2731 --- /dev/null +++ b/src/main/java/lessons/lesson11/simplepool/ThreadPoolApplication.java @@ -0,0 +1,17 @@ +package lessons.lesson11.simplepool; + +public class ThreadPoolApplication { + public static void main(String[] args) { + SimpleThreadPool threadPool = new SimpleThreadPool(10); + + for (int i = 0; i < 30; i++) { + int n = i; + + threadPool.submit(() -> { + System.out.printf("Task number %s\n", n); + }); + } + + threadPool.shutdown(); + } +}