Skip to content

Advanced Concurrent Processor in Java #208

@siddharthrgade21-a11y

Description

@siddharthrgade21-a11y

import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
import java.util.stream.IntStream;

public class AdvancedConcurrentProcessor {

// Using BlockingQueue for thread-safe producer-consumer interaction
private static final BlockingQueue dataQueue = new LinkedBlockingQueue<>(10);
// Concurrent collection for safe storage of results
private static final List results = new CopyOnWriteArrayList<>();

public static void main(String[] args) throws InterruptedException {
// Create a fixed thread pool for parallel processing
ExecutorService executor = Executors.newFixedThreadPool(4);

// 1. Producer Task: Generates data
Runnable producer = () -> {
    try {
        for (int i = 0; i < 20; i++) {
            String data = "Data-" + i;
            dataQueue.put(data); // Blocks if queue is full
            System.out.println(Thread.currentThread().getName() + " produced: " + data);
            Thread.sleep(50); // Simulate production time
        }
        dataQueue.put("POISON_PILL"); // Signal completion
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
};

// 2. Consumer Tasks: Processes data concurrently
Runnable consumer = () -> {
    try {
        while (true) {
            String data = dataQueue.take(); // Blocks if queue is empty
            if ("POISON_PILL".equals(data)) {
                dataQueue.put("POISON_PILL"); // Pass signal to other consumers
                break;
            }
            // Heavy processing simulation
            String processed = Thread.currentThread().getName() + " processed " + data;
            results.add(processed);
            Thread.sleep(100);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
};

// Execute tasks
executor.submit(producer);
executor.submit(consumer);
executor.submit(consumer);

// Shutdown executor gracefully
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);

// 3. Process Results using Streams API
System.out.println("\n--- Processing Complete. Total Results: " + results.size() + " ---");
results.stream()
        .filter(s -> s.contains("Data-1")) // Filter data
        .map(String::toUpperCase) // Transform
        .forEach(System.out::println);

}
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions