diff --git a/pom.xml b/pom.xml index 671370c..2e80983 100644 --- a/pom.xml +++ b/pom.xml @@ -18,6 +18,7 @@ 3.4.5 33.4.0-jre 3.12.0 + 2.0.12 @@ -37,6 +38,7 @@ guava ${guava.version} + org.junit.jupiter @@ -56,6 +58,12 @@ ${commons-lang3.version} test + + + com.alibaba + fastjson + ${fastjson.version} + diff --git a/src/main/java/multi_threading/fixed_operating_sequence/FixedOperatingSequenceByLock.java b/src/main/java/multi_threading/fixed_operating_sequence/FixedOperatingSequenceByLock.java new file mode 100644 index 0000000..d4a5214 --- /dev/null +++ b/src/main/java/multi_threading/fixed_operating_sequence/FixedOperatingSequenceByLock.java @@ -0,0 +1,42 @@ +package multi_threading.fixed_operating_sequence; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Require always to run the t2 thread first before running other threads. + */ +public class FixedOperatingSequenceByLock { + private static final ReentrantLock lock = new ReentrantLock(); + private static final Condition cond = lock.newCondition(); + private static boolean ready = false; + + public static void main(String[] args) throws InterruptedException { + new Thread(() -> { + try { + lock.lock(); + while (!ready) { + try { + cond.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } finally { + lock.unlock(); + } + System.out.println("t1 run..."); + }, "t1").start(); + + new Thread(() -> { + try { + lock.lock(); + System.out.println("t2 run..."); + ready = true; + cond.signal(); + } finally { + lock.unlock(); + } + }, "t2").start(); + } +} diff --git a/src/main/java/multi_threading/fixed_operating_sequence/FixedOperatingSequenceByPark.java b/src/main/java/multi_threading/fixed_operating_sequence/FixedOperatingSequenceByPark.java new file mode 100644 index 0000000..13fcf90 --- /dev/null +++ b/src/main/java/multi_threading/fixed_operating_sequence/FixedOperatingSequenceByPark.java @@ -0,0 +1,21 @@ +package multi_threading.fixed_operating_sequence; + +import java.util.concurrent.locks.LockSupport; + +/** + * Require always to run the t2 thread first before running other threads. + */ +public class FixedOperatingSequenceByPark { + public static void main(String[] args) { + Thread t1 = new Thread(() -> { + LockSupport.park(); + System.out.println("t1 run..."); + }, "t1"); + t1.start(); + + new Thread(() -> { + System.out.println("t2 run..."); + LockSupport.unpark(t1); + }, "t2").start(); + } +} diff --git a/src/main/java/multi_threading/fixed_operating_sequence/FixedOperatingSequenceByWaitNotify.java b/src/main/java/multi_threading/fixed_operating_sequence/FixedOperatingSequenceByWaitNotify.java new file mode 100644 index 0000000..3ea5e5d --- /dev/null +++ b/src/main/java/multi_threading/fixed_operating_sequence/FixedOperatingSequenceByWaitNotify.java @@ -0,0 +1,32 @@ +package multi_threading.fixed_operating_sequence; + +/** + * Require always to run the t2 thread first before running other threads. + */ +public class FixedOperatingSequenceByWaitNotify { + private static final Object obj = new Object(); + private static boolean ready = false; + + public static void main(String[] args) throws InterruptedException { + new Thread(() -> { + synchronized (obj) { + while (!ready) { + try { + obj.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + System.out.println("t1 run..."); + } + }, "t1").start(); + + new Thread(() -> { + synchronized (obj) { + System.out.println("t2 run..."); + ready = true; + obj.notify(); + } + }, "t2").start(); + } +} diff --git a/src/main/java/multi_threading/guarded_suspension/GuardedSuspensionMod.java b/src/main/java/multi_threading/guarded_suspension/GuardedSuspensionMod.java new file mode 100644 index 0000000..a04ebf3 --- /dev/null +++ b/src/main/java/multi_threading/guarded_suspension/GuardedSuspensionMod.java @@ -0,0 +1,99 @@ +package multi_threading.guarded_suspension; + +import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.Lists; + +import java.util.List; + +public class GuardedSuspensionMod { + + @SuppressWarnings("unchecked") + public static void main(String[] args) { + GuardObject guardObject = new GuardObject(); + new Thread(() -> { + System.out.println("waiting for result..."); +// List o = (List) guardObject.get(); +// List o = (List) guardObject.get(100); + List o = (List) guardObject.get(3000); + System.out.println(STR."result:\{JSONObject.toJSONString(o)}"); + }, "t1").start(); + + new Thread(() -> { + System.out.println("process downloading..."); + guardObject.complete(new Download().downloadMock()); + }, "t2").start(); + } + +} + +class GuardObject { + private Object response; + + public Object get() { + synchronized (this) { + while (response == null) { + try { + this.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return response; + } + } + + /** + * Waiting with timeout period, + * The principle of the join method in Thread + * + * @param timeout + * @return + */ + public Object get(long timeout) { + synchronized (this) { + long begin = System.currentTimeMillis(); + // elapsed time + long passTime = 0; + while (response == null) { + try { + // the time to wait for this cycle + long waitTime = timeout - passTime; + if (waitTime <= 0) { + break; + } + this.wait(waitTime); + passTime = System.currentTimeMillis() - begin; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return response; + } + } + + public void complete(Object response) { + synchronized (this) { + this.response = response; + this.notifyAll(); + } + } +} + +class Download { + public List downloadMock() { + try { + //mock download progress + Thread.sleep(2500); + return buildMockResult(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private List buildMockResult() { + String s = "success"; + List res = Lists.newArrayList(); + res.add(s); + return res; + } +} diff --git a/src/main/java/multi_threading/producer_consumer/Message.java b/src/main/java/multi_threading/producer_consumer/Message.java new file mode 100644 index 0000000..41a072f --- /dev/null +++ b/src/main/java/multi_threading/producer_consumer/Message.java @@ -0,0 +1,16 @@ +package multi_threading.producer_consumer; + +import lombok.Getter; +import lombok.Setter; + +@Setter +@Getter +public class Message { + private int id; + private String value; + + @Override + public String toString() { + return STR."Message{id=\{id}, value='\{value}'}"; + } +} diff --git a/src/main/java/multi_threading/producer_consumer/MessageQueue.java b/src/main/java/multi_threading/producer_consumer/MessageQueue.java new file mode 100644 index 0000000..b6ef744 --- /dev/null +++ b/src/main/java/multi_threading/producer_consumer/MessageQueue.java @@ -0,0 +1,44 @@ +package multi_threading.producer_consumer; + +import com.google.common.collect.Lists; + +import java.util.List; + +public class MessageQueue { + public int capacity = 10; + + private final List queue = Lists.newArrayList(); + + public Message take() { + synchronized (queue) { + while (queue.isEmpty()) { + try { + queue.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + Message message = queue.removeFirst(); + queue.notifyAll(); + return message; + } + } + + public void put(Message message) { + synchronized (queue) { + while (queue.size() >= capacity) { + try { + queue.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + queue.add(message); + queue.notifyAll(); + } + } + + public int size() { + return queue.size(); + } +} diff --git a/src/main/java/multi_threading/thread_alternate_running/ThreadAlternateRunningByLock.java b/src/main/java/multi_threading/thread_alternate_running/ThreadAlternateRunningByLock.java new file mode 100644 index 0000000..7d0b270 --- /dev/null +++ b/src/main/java/multi_threading/thread_alternate_running/ThreadAlternateRunningByLock.java @@ -0,0 +1,55 @@ +package multi_threading.thread_alternate_running; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +public class ThreadAlternateRunningByLock { + public static void main(String[] args) throws InterruptedException { + AwaitSignal awaitSignalObj = new AwaitSignal(5); + Condition a = awaitSignalObj.newCondition(); + Condition b = awaitSignalObj.newCondition(); + Condition c = awaitSignalObj.newCondition(); + + new Thread(() -> { + awaitSignalObj.process("a", a, b); + }, "t1").start(); + new Thread(() -> { + awaitSignalObj.process("b", b, c); + }, "t2").start(); + new Thread(() -> { + awaitSignalObj.process("c", c, a); + }, "t3").start(); + + Thread.sleep(1000); + try { + awaitSignalObj.lock(); + a.signal(); + } finally { + awaitSignalObj.unlock(); + } + } +} + +class AwaitSignal extends ReentrantLock { + private final int loopNumber; + + public AwaitSignal(int loopNumber) { + this.loopNumber = loopNumber; + } + + public void process(String str, Condition current, Condition next) { + for (int i = 0; i < loopNumber; i++) { + lock(); + try { + current.await(); + System.out.print(str); + next.signal(); + } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); // restore interrupted state + throw new RuntimeException(e); + } finally { + unlock(); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/multi_threading/thread_alternate_running/ThreadAlternateRunningByPark.java b/src/main/java/multi_threading/thread_alternate_running/ThreadAlternateRunningByPark.java new file mode 100644 index 0000000..2e783f1 --- /dev/null +++ b/src/main/java/multi_threading/thread_alternate_running/ThreadAlternateRunningByPark.java @@ -0,0 +1,41 @@ +package multi_threading.thread_alternate_running; + +import java.util.concurrent.locks.LockSupport; + +public class ThreadAlternateRunningByPark { + static Thread t1; + static Thread t2; + static Thread t3; + public static void main(String[] args) { + ParkUnpark parkUnparkObj = new ParkUnpark(5); + t1 = new Thread(() -> { + parkUnparkObj.process("a", t2); + }, "t1"); + t2 = new Thread(() -> { + parkUnparkObj.process("b", t3); + }, "t2"); + t3 = new Thread(() -> { + parkUnparkObj.process("c", t1); + }, "t3"); + t1.start(); + t2.start(); + t3.start(); + + LockSupport.unpark(t1); + } +} + +class ParkUnpark { + private final int loopNumber; + ParkUnpark(int loopNumber) { + this.loopNumber = loopNumber; + } + + public void process(String str, Thread t) { + for (int i = 0; i < loopNumber; i++) { + LockSupport.park(); + System.out.print(str); + LockSupport.unpark(t); + } + } +} diff --git a/src/main/java/multi_threading/thread_alternate_running/ThreadAlternateRunningByWaitNotify.java b/src/main/java/multi_threading/thread_alternate_running/ThreadAlternateRunningByWaitNotify.java new file mode 100644 index 0000000..66848d1 --- /dev/null +++ b/src/main/java/multi_threading/thread_alternate_running/ThreadAlternateRunningByWaitNotify.java @@ -0,0 +1,43 @@ +package multi_threading.thread_alternate_running; + +public class ThreadAlternateRunningByWaitNotify { + public static void main(String[] args) { + WaitNotify waitNotifyObj = new WaitNotify(1, 5); + new Thread(() -> { + waitNotifyObj.process("a", 1, 2); + }, "t1").start(); + new Thread(() -> { + waitNotifyObj.process("b", 2, 3); + }, "t2").start(); + new Thread(() -> { + waitNotifyObj.process("c", 3, 1); + }, "t3").start(); + } +} + +class WaitNotify { + private int flag; + private final int loopNumber; + + public WaitNotify(int flag, int loopNumber) { + this.flag = flag; + this.loopNumber = loopNumber; + } + + public void process(String str, int waitFlag, int nextFlag) { + for (int i = 0; i < loopNumber; i++) { + synchronized (this) { + while (waitFlag != flag) { + try { + this.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + System.out.print(str); + flag = nextFlag; + this.notifyAll(); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/multi_threading/two_phase_stop/TwoPhaseStopModByInterrupt.java b/src/main/java/multi_threading/two_phase_stop/TwoPhaseStopModByInterrupt.java new file mode 100644 index 0000000..83fa70a --- /dev/null +++ b/src/main/java/multi_threading/two_phase_stop/TwoPhaseStopModByInterrupt.java @@ -0,0 +1,39 @@ +package multi_threading.two_phase_stop; + +public class TwoPhaseStopModByInterrupt { + public static void main(String[] args) throws InterruptedException { + MonitorVersion1 demo = new MonitorVersion1(); + demo.start(); + Thread.sleep(3500); + demo.stop(); + } +} + +class MonitorVersion1 { + private Thread monitor; + + public void start() { + monitor = new Thread(() -> { + while (true) { + Thread current = Thread.currentThread(); + if (current.isInterrupted()) { + System.out.println("Thread interrupted......"); + break; + } + try { + Thread.sleep(1000); + System.out.println("Simulate the execution of business logic......"); + } catch (InterruptedException e) { + //Because the sleep thread is marked as false by interrupt, i.e. isInterrupted returns false + //At this point, it is necessary to interrupt again and mark the interrupt as true so that it will exit in the next round of while judgment + current.interrupt(); + } + } + }); + monitor.start(); + } + + public void stop() { + monitor.interrupt(); + } +} diff --git a/src/main/java/multi_threading/two_phase_stop/TwoPhaseStopModByVolatile.java b/src/main/java/multi_threading/two_phase_stop/TwoPhaseStopModByVolatile.java new file mode 100644 index 0000000..123c558 --- /dev/null +++ b/src/main/java/multi_threading/two_phase_stop/TwoPhaseStopModByVolatile.java @@ -0,0 +1,37 @@ +package multi_threading.two_phase_stop; + +public class TwoPhaseStopModByVolatile { + public static void main(String[] args) throws InterruptedException { + MonitorVersion2 demo = new MonitorVersion2(); + demo.start(); + Thread.sleep(3500); + demo.stop(); + } +} + +class MonitorVersion2 { + private Thread monitor; + private volatile boolean stop = false; + + public void start() { + monitor = new Thread(() -> { + while (true) { + if (stop) { + System.out.println("Thread interrupted......"); + break; + } + try { + Thread.sleep(1000); + System.out.println("Simulate the execution of business logic......"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + monitor.start(); + } + + public void stop() { + stop = true; + } +} \ No newline at end of file diff --git a/src/test/java/producer_consumer/ProducerConsumerModTest.java b/src/test/java/producer_consumer/ProducerConsumerModTest.java new file mode 100644 index 0000000..bf958d8 --- /dev/null +++ b/src/test/java/producer_consumer/ProducerConsumerModTest.java @@ -0,0 +1,256 @@ +package producer_consumer; + +import multi_threading.producer_consumer.Message; +import multi_threading.producer_consumer.MessageQueue; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ProducerConsumerModTest { + + /** + * Test basic functions: Produce a message and then consume it + */ + @Test + public void testBasicFunctionality() { + System.out.println("Test basic functions..."); + MessageQueue queue = new MessageQueue(); + queue.capacity = 10; + + // producer thread + Thread producer = new Thread(() -> { + Message msg = new Message(); + msg.setId(1); + msg.setValue("Hello World"); + queue.put(msg); + System.out.println(STR."Producer: send message - \{msg}"); + }); + + // consumer thread + Thread consumer = new Thread(() -> { + Message msg = queue.take(); + System.out.println(STR."Consumer: receive message - \{msg}"); + if (msg.getId() != 1 || !"Hello World".equals(msg.getValue())) { + throw new AssertionError("The message content is incorrect!"); + } + }); + + // Activate the consumer first and ensure that it is waiting for a message + consumer.start(); + try { + Thread.sleep(100); // Ensure that consumers initiate first + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Start the producer + producer.start(); + + // Waiting for thread completion + try { + producer.join(); + consumer.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + /** + * Test multiple producers and a single consumer + * + * @throws InterruptedException + */ + @Test + public void testMultipleProducersSingleConsumer() throws InterruptedException { + System.out.println("Test multiple producers and a single consumer..."); + MessageQueue queue = new MessageQueue(); + queue.capacity = 10; + + int producerCount = 5; + int messageCount = 20; + final CountDownLatch latch = new CountDownLatch(producerCount + 1); + List consumedMessages = new ArrayList<>(); + + // create producers + for (int i = 0; i < producerCount; i++) { + final int producerId = i + 1; + new Thread(() -> { + Random random = new Random(); + for (int j = 0; j < messageCount / producerCount; j++) { + try { + Thread.sleep(random.nextInt(50)); // random delay + + Message msg = new Message(); + msg.setId(producerId * 100 + j); + msg.setValue(STR."Producer \{producerId} - Msg \{j}"); + queue.put(msg); + + System.out.println(STR."Producer \{producerId}: send message - \{msg.getId()}"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + latch.countDown(); + }).start(); + } + + // create consumer + new Thread(() -> { + for (int i = 0; i < messageCount; i++) { + try { + Message msg = queue.take(); + consumedMessages.add(msg.getId()); + System.out.println(STR."Consumer: receive message - \{msg.getId()}"); + Thread.sleep(50); // consumption delay + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + latch.countDown(); + }).start(); + + // Waiting for all tasks to be completed + latch.await(5, TimeUnit.SECONDS); + + System.out.println(STR."Number of messages consumed: \{consumedMessages.size()} (should be \{messageCount})"); + assertSame(consumedMessages.size(), messageCount); + } + + + /** + * Test multiple consumers for a single producer + * + * @throws InterruptedException + */ + @Test + public void testSingleProducerMultipleConsumers() throws InterruptedException { + System.out.println("Test multiple consumers for a single producer..."); + MessageQueue queue = new MessageQueue(); + queue.capacity = 10; + + int consumerCount = 3; + int messageCount = 15; + final CountDownLatch latch = new CountDownLatch(1 + consumerCount); + List consumedMessages = new ArrayList<>(); + + // create producer + new Thread(() -> { + for (int i = 0; i < messageCount; i++) { + try { + Message msg = new Message(); + msg.setId(i + 1); + msg.setValue(STR."Message \{i + 1}"); + queue.put(msg); + System.out.println(STR."producer: send message - \{msg.getId()}"); + Thread.sleep(30); // production delay + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + latch.countDown(); + }).start(); + + // create consumers + for (int i = 0; i < consumerCount; i++) { + final int consumerId = i + 1; + new Thread(() -> { + while (true) { + try { + Message msg = queue.take(); + synchronized (consumedMessages) { + consumedMessages.add(msg.getId()); + } + System.out.println(STR."Consumer \{consumerId}: receive message - \{msg.getId()}"); + } catch (Exception e) { + break; + } + } + }).start(); + } + + // Waiting for the producer to complete + latch.await(3, TimeUnit.SECONDS); + + // Give consumers time to process remaining messages + Thread.sleep(500); + + System.out.println(STR."Number of messages consumed: \{consumedMessages.size()} (should be \{messageCount})"); + assertSame(consumedMessages.size(), messageCount); + } + + /** + * Test multiple producers and consumers + * + * @throws InterruptedException + */ + @Test + public void testMultipleProducersMultipleConsumers() throws InterruptedException { + System.out.println("Test multiple producers and consumers..."); + MessageQueue queue = new MessageQueue(); + queue.capacity = 20; + + int producerCount = 4; + int consumerCount = 4; + int messagesPerProducer = 10; + int totalMessages = producerCount * messagesPerProducer; + final CountDownLatch latch = new CountDownLatch(producerCount + consumerCount); + List consumedMessages = new ArrayList<>(); + + // create producers + for (int i = 0; i < producerCount; i++) { + final int producerId = i + 1; + new Thread(() -> { + Random random = new Random(); + for (int j = 0; j < messagesPerProducer; j++) { + try { + Thread.sleep(random.nextInt(50)); // random delay + + Message msg = new Message(); + msg.setId(producerId * 1000 + j); + msg.setValue(STR."P\{producerId}-Msg\{j}"); + queue.put(msg); + + System.out.println(STR."Producer \{producerId}: send message - \{msg.getId()}"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + latch.countDown(); + }).start(); + } + + // create consumers + for (int i = 0; i < consumerCount; i++) { + final int consumerId = i + 1; + new Thread(() -> { + for (int j = 0; j < totalMessages / consumerCount; j++) { + try { + Message msg = queue.take(); + synchronized (consumedMessages) { + consumedMessages.add(msg.getId()); + } + System.out.println(STR."Consumer \{consumerId}: receive message - \{msg.getId()}"); + Thread.sleep(30); // consumption delay + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + latch.countDown(); + }).start(); + } + + // Waiting for all tasks to be completed + boolean completed = latch.await(10, TimeUnit.SECONDS); + + System.out.println(STR."Number of messages consumed: \{consumedMessages.size()} (should be \{totalMessages})"); + assertSame(consumedMessages.size(), totalMessages); + assertTrue(completed); + } +}