Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/main/java/org/itmo/CuncurrentQueueBalancer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.itmo;

import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

class CuncurrentQueueBalancer<T> {
private AtomicInteger nextQueue;
private final List<Queue<T>> queues;

CuncurrentQueueBalancer(Integer size) {
nextQueue = new AtomicInteger(0);
queues = Stream.<Queue<T>>generate(ConcurrentLinkedQueue<T>::new).limit(size).collect(Collectors.toList());
}

void add(T value) {
Integer nextQueueIdx = nextQueue.getAndUpdate((val) -> (val + 1) % queues.size());
queues.get(nextQueueIdx).add(value);
}

List<Queue<T>> getQueues() {
return queues;
}
}
47 changes: 26 additions & 21 deletions src/main/java/org/itmo/Graph.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package org.itmo;

import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collector;
import java.util.stream.Collectors;

class Graph {
private final Integer CORES = 12;
private final int V;
private final ArrayList<Integer>[] adjList;
private final ExecutorService executor;
Expand All @@ -21,7 +21,7 @@ class Graph {
adjList[i] = new ArrayList<>();
}

executor = Executors.newFixedThreadPool(12);
executor = Executors.newFixedThreadPool(CORES);
}

void addEdge(int src, int dest) {
Expand All @@ -35,37 +35,42 @@ void parallelBFS(int startVertex) {
for (Integer i = 0; i < V; i++) {
visited[i] = new AtomicBoolean(false);
}

QueueBalancer<Integer> balancer = new QueueBalancer<>(12);
Queue<Integer> nextVertexes = new ConcurrentLinkedQueue<>();

visited[startVertex].set(true);
nextVertexes.add(startVertex);

while (!nextVertexes.isEmpty()) {
while (!nextVertexes.isEmpty()) {
balancer.add(nextVertexes.poll());
}
CuncurrentQueueBalancer<Integer> balancer = new CuncurrentQueueBalancer<>(CORES);
balancer.add(startVertex);

balancer.getQueues().stream()
while (true) {
CuncurrentQueueBalancer<Integer> nextBalancer = new CuncurrentQueueBalancer<>(CORES);

List<Future<?>> futures = balancer.getQueues().stream()
.filter((queue) -> !queue.isEmpty())
.map((queue) -> executor.submit(() -> {
while (!queue.isEmpty()) {
Integer vertex = queue.poll();

for (int n : adjList[vertex]) {
Boolean isVisited = visited[n].getAndSet(true);
if (!isVisited) {
nextVertexes.add(n);
nextBalancer.add(n);
}
}
}
})).forEach((future) -> {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
})).collect(Collectors.toList());

if (futures.isEmpty()) {
break;
}

for (Future<?> f : futures) {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

balancer = nextBalancer;
}
}

Expand Down
26 changes: 0 additions & 26 deletions src/main/java/org/itmo/QueueBalancer.java

This file was deleted.

34 changes: 17 additions & 17 deletions tmp/results.txt
Original file line number Diff line number Diff line change
@@ -1,36 +1,36 @@
Times for 10 vertices and 50 connections:
Serial: 0
Parallel: 6
Parallel: 3
--------
Times for 100 vertices and 500 connections:
Serial: 1
Parallel: 4
Serial: 0
Parallel: 2
--------
Times for 1000 vertices and 5000 connections:
Serial: 1
Parallel: 6
Serial: 0
Parallel: 3
--------
Times for 10000 vertices and 50000 connections:
Serial: 4
Parallel: 9
Serial: 3
Parallel: 13
--------
Times for 10000 vertices and 100000 connections:
Serial: 2
Parallel: 9
Serial: 1
Parallel: 17
--------
Times for 50000 vertices and 1000000 connections:
Serial: 20
Parallel: 31
Serial: 17
Parallel: 22
--------
Times for 100000 vertices and 1000000 connections:
Serial: 19
Parallel: 37
Serial: 15
Parallel: 39
--------
Times for 1000000 vertices and 10000000 connections:
Serial: 629
Parallel: 536
Serial: 651
Parallel: 378
--------
Times for 2000000 vertices and 10000000 connections:
Serial: 1112
Parallel: 2922
Serial: 1025
Parallel: 3005
--------