-
Notifications
You must be signed in to change notification settings - Fork 694
Open
Description
import java.util.concurrent.*;
import java.util.Random;
public class ProducerConsumerDemo {
// Shared buffer using a BlockingQueue with a capacity of 5
private static final BlockingQueue<Integer> buffer = new ArrayBlockingQueue<>(5);
private static final int NUM_PRODUCERS = 2;
private static final int NUM_CONSUMERS = 2;
private static final int TASKS_PER_PRODUCER = 5;
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
System.out.println("Starting Producer-Consumer Demo with BlockingQueue");
// Start producers
for (int i = 0; i < NUM_PRODUCERS; i++) {
executorService.submit(new Producer(i + 1, TASKS_PER_PRODUCER));
}
// Start consumers
for (int i = 0; i < NUM_CONSUMERS; i++) {
executorService.submit(new Consumer(i + 1));
}
// Shutdown the executor service after all tasks are submitted
executorService.shutdown();
try {
// Wait for all tasks to complete, up to 1 minute
if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
System.out.println("Timeout occurred, forcing exit.");
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("All tasks finished.");
}
// Producer class
static class Producer implements Runnable {
private final int id;
private final int tasksToProduce;
private final Random random = new Random();
Producer(int id, int tasksToProduce) {
this.id = id;
this.tasksToProduce = tasksToProduce;
}
@Override
public void run() {
for (int i = 0; i < tasksToProduce; i++) {
int data = random.nextInt(100);
try {
// put() blocks if the queue is full
buffer.put(data);
System.out.println("Producer " + id + " produced: " + data + " (Queue size: " + buffer.size() + ")");
Thread.sleep(random.nextInt(500)); // Simulate some work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
// Consumer class
static class Consumer implements Runnable {
private final int id;
Consumer(int id) {
this.id = id;
}
@Override
public void run() {
while (true) {
try {
// take() blocks if the queue is empty
Integer data = buffer.take();
System.out.println("Consumer " + id + " consumed: " + data + " (Queue size: " + buffer.size() + ")");
Thread.sleep(random.nextInt(1000)); // Simulate processing time
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
Metadata
Metadata
Assignees
Labels
No labels