Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 69 additions & 0 deletions .idea/workspace.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions src/main/java/lessons/lesson11/task1/Main.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package lessons.lesson11.task1;

public class Main {
public static void main(String[] args) {
QueueMessage queueMessage = new QueueMessage();
Publisher publisher = new Publisher(queueMessage);
Subscriber subscriber = new Subscriber(queueMessage);

Thread publisherThread = new Thread(publisher);
Thread subscriberThread = new Thread(subscriber);

publisherThread.start();
subscriberThread.start();
}
}
25 changes: 25 additions & 0 deletions src/main/java/lessons/lesson11/task1/Publisher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package lessons.lesson11.task1;

import java.util.Scanner;

public class Publisher implements Runnable {
private final QueueMessage queue;

public Publisher(QueueMessage queue) {
this.queue = queue;
}

@Override
public void run() {
Scanner scanner = new Scanner(System.in);
System.out.println("Input a word: (exit - to exit)");
while (true) {
String input = scanner.next();
queue.add(input);
if ("exit".equalsIgnoreCase(input)) {
break;
}
}
scanner.close();
}
}
24 changes: 24 additions & 0 deletions src/main/java/lessons/lesson11/task1/QueueMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package lessons.lesson11.task1;

import java.util.LinkedList;
import java.util.Queue;

public class QueueMessage {
private final Queue<String> messages = new LinkedList<>();

public synchronized void add(String message) {
messages.add(message);
notify();
}

public synchronized String get() {
while (messages.isEmpty()) {
try {
wait();
} catch (Exception e) {
Thread.currentThread().interrupt();
}
}
return messages.poll();
}
}
21 changes: 21 additions & 0 deletions src/main/java/lessons/lesson11/task1/Subscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package lessons.lesson11.task1;

public class Subscriber implements Runnable {
private final QueueMessage queue;

public Subscriber(QueueMessage queue) {
this.queue = queue;
}

@Override
public void run() {
while (true) {
String message = queue.get();
if ("exit".equalsIgnoreCase(message)) {
System.out.println("Subscriber exited");
break;
}
System.out.println("Subscriber got:" + message + "\n");
}
}
}
29 changes: 29 additions & 0 deletions src/main/java/lessons/lesson11/task2/Main.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package lessons.lesson11.task2;

public class Main {
public static void main(String[] args) {
SimpleThreadPool pool = new SimpleThreadPool(4);

for (int i = 1; i <= 10; i++) {
int taskNum = i;
pool.submit(() -> {
System.out.println(Thread.currentThread().getName() + " task: " + taskNum);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}

try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

pool.shutdown();
System.out.println("Completing task...");
}
}

26 changes: 26 additions & 0 deletions src/main/java/lessons/lesson11/task2/MyThread.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package lessons.lesson11.task2;

public class MyThread extends Thread {
private SimpleThreadPool pool;

public MyThread(SimpleThreadPool pool, String name) {
super(name);
this.pool = pool;
}

@Override
public void run() {
while (true) {
Runnable task = pool.getTask();
if (task == null) {
break;
}
try {
task.run();
} catch (RuntimeException e) {
System.out.println(getName() + " error:" + e.getMessage());
}
}
System.out.println(getName() + " is finished.");
}
}
49 changes: 49 additions & 0 deletions src/main/java/lessons/lesson11/task2/SimpleThreadPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package lessons.lesson11.task2;

import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

public class SimpleThreadPool {
private final Queue<Runnable> tasks = new LinkedList<>();
private List<MyThread> threads = new LinkedList<>();
private volatile boolean isRunning = true;
private final int counter;

public SimpleThreadPool(int counter) {
this.counter = counter;
for (int i = 0; i < counter; i++) {
MyThread myThread = new MyThread(this, "Pull: " + (i + 1));
threads.add(myThread);
myThread.start();
}
}

public synchronized void submit(Runnable task) {
tasks.add(task);
notify();
}

public synchronized Runnable getTask() {
while (tasks.isEmpty()) {
if (!isRunning) {
return null;
}
try {
wait();
} catch (Exception e) {
Thread.currentThread().interrupt();
if (tasks.isEmpty() && !isRunning) {
return null;
}
}
}
return tasks.poll();
}

public synchronized void shutdown() {
isRunning = false;
notifyAll();
System.out.println("All tasks submitted, shutting down pool...");
}
}