Skip to content

Codes #190

@siddharthrgade21-a11y

Description

@siddharthrgade21-a11y

import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
import java.util.stream.IntStream;

public class AdvancedConcurrentProcessor {

// Using BlockingQueue for thread-safe producer-consumer interaction
private static final BlockingQueue<String> dataQueue = new LinkedBlockingQueue<>(10);
// Concurrent collection for safe storage of results
private static final List<String> results = new CopyOnWriteArrayList<>();

public static void main(String[] args) throws InterruptedException {
    // Create a fixed thread pool for parallel processing
    ExecutorService executor = Executors.newFixedThreadPool(4);

    // 1. Producer Task: Generates data
    Runnable producer = () -> {
        try {
            for (int i = 0; i < 20; i++) {
                String data = "Data-" + i;
                dataQueue.put(data); // Blocks if queue is full
                System.out.println(Thread.currentThread().getName() + " produced: " + data);
                Thread.sleep(50); // Simulate production time
            }
            dataQueue.put("POISON_PILL"); // Signal completion
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    };

    // 2. Consumer Tasks: Processes data concurrently
    Runnable consumer = () -> {
        try {
            while (true) {
                String data = dataQueue.take(); // Blocks if queue is empty
                if ("POISON_PILL".equals(data)) {
                    dataQueue.put("POISON_PILL"); // Pass signal to other consumers
                    break;
                }
                // Heavy processing simulation
                String processed = Thread.currentThread().getName() + " processed " + data;
                results.add(processed);
                Thread.sleep(100);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    };

    // Execute tasks
    executor.submit(producer);
    executor.submit(consumer);
    executor.submit(consumer);

    // Shutdown executor gracefully
    executor.shutdown();
    executor.awaitTermination(1, TimeUnit.MINUTES);

    // 3. Process Results using Streams API
    System.out.println("\n--- Processing Complete. Total Results: " + results.size() + " ---");
    results.stream()
            .filter(s -> s.contains("Data-1")) // Filter data
            .map(String::toUpperCase) // Transform
            .forEach(System.out::println);
}

}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions