From 10ceeae5f8e4852663405927224f58d98aca7605 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=94=D0=B0=D0=B2?= =?UTF-8?q?=D1=8B=D0=B4=D0=BE=D0=B2?= Date: Tue, 7 Oct 2025 15:55:21 +0300 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D0=BB=20Cu?= =?UTF-8?q?ncurrentQueueBalancer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/itmo/CuncurrentQueueBalancer.java | 27 +++++++++++ src/main/java/org/itmo/Graph.java | 47 ++++++++++--------- src/main/java/org/itmo/QueueBalancer.java | 26 ---------- tmp/results.txt | 34 +++++++------- 4 files changed, 70 insertions(+), 64 deletions(-) create mode 100644 src/main/java/org/itmo/CuncurrentQueueBalancer.java delete mode 100644 src/main/java/org/itmo/QueueBalancer.java diff --git a/src/main/java/org/itmo/CuncurrentQueueBalancer.java b/src/main/java/org/itmo/CuncurrentQueueBalancer.java new file mode 100644 index 0000000..e0a7710 --- /dev/null +++ b/src/main/java/org/itmo/CuncurrentQueueBalancer.java @@ -0,0 +1,27 @@ +package org.itmo; + +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +class CuncurrentQueueBalancer { + private AtomicInteger nextQueue; + private final List> queues; + + CuncurrentQueueBalancer(Integer size) { + nextQueue = new AtomicInteger(0); + queues = Stream.>generate(ConcurrentLinkedQueue::new).limit(size).collect(Collectors.toList()); + } + + void add(T value) { + Integer nextQueueIdx = nextQueue.getAndUpdate((val) -> (val + 1) % queues.size()); + queues.get(nextQueueIdx).add(value); + } + + List> getQueues() { + return queues; + } +} diff --git a/src/main/java/org/itmo/Graph.java b/src/main/java/org/itmo/Graph.java index 485d039..f0a8c43 100644 --- a/src/main/java/org/itmo/Graph.java +++ b/src/main/java/org/itmo/Graph.java @@ -1,15 +1,15 @@ package org.itmo; import java.util.*; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collector; import java.util.stream.Collectors; class Graph { + private final Integer CORES = 12; private final int V; private final ArrayList[] adjList; private final ExecutorService executor; @@ -21,7 +21,7 @@ class Graph { adjList[i] = new ArrayList<>(); } - executor = Executors.newFixedThreadPool(12); + executor = Executors.newFixedThreadPool(CORES); } void addEdge(int src, int dest) { @@ -35,19 +35,16 @@ void parallelBFS(int startVertex) { for (Integer i = 0; i < V; i++) { visited[i] = new AtomicBoolean(false); } - - QueueBalancer balancer = new QueueBalancer<>(12); - Queue nextVertexes = new ConcurrentLinkedQueue<>(); - visited[startVertex].set(true); - nextVertexes.add(startVertex); - while (!nextVertexes.isEmpty()) { - while (!nextVertexes.isEmpty()) { - balancer.add(nextVertexes.poll()); - } + CuncurrentQueueBalancer balancer = new CuncurrentQueueBalancer<>(CORES); + balancer.add(startVertex); - balancer.getQueues().stream() + while (true) { + CuncurrentQueueBalancer nextBalancer = new CuncurrentQueueBalancer<>(CORES); + + List> futures = balancer.getQueues().stream() + .filter((queue) -> !queue.isEmpty()) .map((queue) -> executor.submit(() -> { while (!queue.isEmpty()) { Integer vertex = queue.poll(); @@ -55,17 +52,25 @@ void parallelBFS(int startVertex) { for (int n : adjList[vertex]) { Boolean isVisited = visited[n].getAndSet(true); if (!isVisited) { - nextVertexes.add(n); + nextBalancer.add(n); } } } - })).forEach((future) -> { - try { - future.get(); - } catch (InterruptedException | ExecutionException e) { - e.printStackTrace(); - } - }); + })).collect(Collectors.toList()); + + if (futures.isEmpty()) { + break; + } + + for (Future f : futures) { + try { + f.get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + } + + balancer = nextBalancer; } } diff --git a/src/main/java/org/itmo/QueueBalancer.java b/src/main/java/org/itmo/QueueBalancer.java deleted file mode 100644 index c961549..0000000 --- a/src/main/java/org/itmo/QueueBalancer.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.itmo; - -import java.util.ArrayDeque; -import java.util.List; -import java.util.Queue; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -class QueueBalancer { - private Integer nextQueue; - private final List> queues; - - QueueBalancer(Integer size) { - nextQueue = 0; - queues = Stream.>generate(ArrayDeque::new).limit(size).collect(Collectors.toList()); - } - - void add(T value) { - nextQueue = (nextQueue + 1) % queues.size(); - queues.get(nextQueue).add(value); - } - - List> getQueues() { - return queues; - } -} diff --git a/tmp/results.txt b/tmp/results.txt index 0109e02..c864868 100644 --- a/tmp/results.txt +++ b/tmp/results.txt @@ -1,36 +1,36 @@ Times for 10 vertices and 50 connections: Serial: 0 -Parallel: 6 +Parallel: 3 -------- Times for 100 vertices and 500 connections: -Serial: 1 -Parallel: 4 +Serial: 0 +Parallel: 2 -------- Times for 1000 vertices and 5000 connections: -Serial: 1 -Parallel: 6 +Serial: 0 +Parallel: 3 -------- Times for 10000 vertices and 50000 connections: -Serial: 4 -Parallel: 9 +Serial: 3 +Parallel: 13 -------- Times for 10000 vertices and 100000 connections: -Serial: 2 -Parallel: 9 +Serial: 1 +Parallel: 17 -------- Times for 50000 vertices and 1000000 connections: -Serial: 20 -Parallel: 31 +Serial: 17 +Parallel: 22 -------- Times for 100000 vertices and 1000000 connections: -Serial: 19 -Parallel: 37 +Serial: 15 +Parallel: 39 -------- Times for 1000000 vertices and 10000000 connections: -Serial: 629 -Parallel: 536 +Serial: 651 +Parallel: 378 -------- Times for 2000000 vertices and 10000000 connections: -Serial: 1112 -Parallel: 2922 +Serial: 1025 +Parallel: 3005 --------