diff --git a/README.md b/README.md index 6495c30..0eb4312 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ +[![Review Assignment Due Date](https://classroom.github.com/assets/deadline-readme-button-22041afd0340ce965d47ae6ef1cefeee28c7c493a6346c4f15d667ab976d596c.svg)](https://classroom.github.com/a/AwTYhPar) # Лабораторная работа № 1: определение достижимости параллелизма и реализация параллельных алгоритмов. Шаги выполнения: diff --git a/artifacts/graphSize.png b/artifacts/graphSize.png new file mode 100644 index 0000000..55d6e95 Binary files /dev/null and b/artifacts/graphSize.png differ diff --git a/artifacts/jmh/graph-size.txt b/artifacts/jmh/graph-size.txt new file mode 100644 index 0000000..dce6cae --- /dev/null +++ b/artifacts/jmh/graph-size.txt @@ -0,0 +1,29 @@ +Benchmark (graphSize) Mode Cnt Score Error Units +JmhBfsTest.forkJoinBfs 100 avgt 4 0.043 ± 0.026 ms/op +JmhBfsTest.forkJoinBfs 1000 avgt 4 0.226 ± 0.066 ms/op +JmhBfsTest.forkJoinBfs 10000 avgt 4 2.217 ± 0.288 ms/op +JmhBfsTest.forkJoinBfs 50000 avgt 4 11.955 ± 2.038 ms/op +JmhBfsTest.forkJoinBfs 100000 avgt 4 23.961 ± 2.808 ms/op +JmhBfsTest.forkJoinBfs 1000000 avgt 4 234.495 ± 69.199 ms/op +JmhBfsTest.forkJoinBfs 2000000 avgt 4 437.866 ± 152.712 ms/op +JmhBfsTest.parallelBfs 100 avgt 4 2.571 ± 0.988 ms/op +JmhBfsTest.parallelBfs 1000 avgt 4 2.468 ± 0.873 ms/op +JmhBfsTest.parallelBfs 10000 avgt 4 4.318 ± 1.568 ms/op +JmhBfsTest.parallelBfs 50000 avgt 4 12.274 ± 1.017 ms/op +JmhBfsTest.parallelBfs 100000 avgt 4 26.078 ± 1.949 ms/op +JmhBfsTest.parallelBfs 1000000 avgt 4 459.465 ± 286.051 ms/op +JmhBfsTest.parallelBfs 2000000 avgt 4 772.640 ± 393.633 ms/op +JmhBfsTest.parallelFrontierBfs 100 avgt 4 2.187 ± 1.741 ms/op +JmhBfsTest.parallelFrontierBfs 1000 avgt 4 2.117 ± 0.881 ms/op +JmhBfsTest.parallelFrontierBfs 10000 avgt 4 5.444 ± 3.305 ms/op +JmhBfsTest.parallelFrontierBfs 50000 avgt 4 16.782 ± 11.075 ms/op +JmhBfsTest.parallelFrontierBfs 100000 avgt 4 32.159 ± 9.104 ms/op +JmhBfsTest.parallelFrontierBfs 1000000 avgt 4 354.178 ± 72.852 ms/op +JmhBfsTest.parallelFrontierBfs 2000000 avgt 4 626.251 ± 216.028 ms/op +JmhBfsTest.serialBfs 100 avgt 4 0.002 ± 0.001 ms/op +JmhBfsTest.serialBfs 1000 avgt 4 0.046 ± 0.005 ms/op +JmhBfsTest.serialBfs 10000 avgt 4 0.721 ± 0.172 ms/op +JmhBfsTest.serialBfs 50000 avgt 4 5.449 ± 3.297 ms/op +JmhBfsTest.serialBfs 100000 avgt 4 20.219 ± 4.305 ms/op +JmhBfsTest.serialBfs 1000000 avgt 4 383.653 ± 34.153 ms/op +JmhBfsTest.serialBfs 2000000 avgt 4 622.543 ± 70.652 ms/op diff --git a/artifacts/jmh/threads.txt b/artifacts/jmh/threads.txt new file mode 100644 index 0000000..7ab66aa --- /dev/null +++ b/artifacts/jmh/threads.txt @@ -0,0 +1,15 @@ +Benchmark (runtime) Mode Cnt Score Error Units +JmhBfsResourceTest.parallelBfs 1 avgt 4 811.736 ± 532.065 ms/op +JmhBfsResourceTest.parallelBfs 2 avgt 4 800.443 ± 213.065 ms/op +JmhBfsResourceTest.parallelBfs 4 avgt 4 786.551 ± 188.788 ms/op +JmhBfsResourceTest.parallelBfs 6 avgt 4 746.592 ± 310.785 ms/op +JmhBfsResourceTest.parallelBfs 12 avgt 4 726.113 ± 108.879 ms/op +JmhBfsResourceTest.parallelBfs 16 avgt 4 746.983 ± 143.357 ms/op +JmhBfsResourceTest.parallelBfs 24 avgt 4 787.820 ± 211.231 ms/op +JmhBfsResourceTest.parallelFrontierBfs 1 avgt 4 967.328 ± 479.491 ms/op +JmhBfsResourceTest.parallelFrontierBfs 2 avgt 4 623.180 ± 302.594 ms/op +JmhBfsResourceTest.parallelFrontierBfs 4 avgt 4 558.315 ± 282.455 ms/op +JmhBfsResourceTest.parallelFrontierBfs 6 avgt 4 732.789 ± 298.786 ms/op +JmhBfsResourceTest.parallelFrontierBfs 12 avgt 4 798.175 ± 286.191 ms/op +JmhBfsResourceTest.parallelFrontierBfs 16 avgt 4 746.080 ± 201.253 ms/op +JmhBfsResourceTest.parallelFrontierBfs 24 avgt 4 775.214 ± 253.786 ms/op diff --git a/artifacts/threads.png b/artifacts/threads.png new file mode 100644 index 0000000..1831877 Binary files /dev/null and b/artifacts/threads.png differ diff --git a/build.gradle.kts b/build.gradle.kts index 3341beb..d2c376d 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,6 +1,9 @@ plugins { kotlin("jvm") version "1.9.20" + java application + id("me.champeau.jmh") version "0.7.3" + // id("org.openjdk.jcstress") version "0.15" } group = "org.itmo" @@ -12,6 +15,8 @@ repositories { dependencies { testImplementation(kotlin("test")) + testImplementation("org.openjdk.jcstress:jcstress-core:0.16") + testAnnotationProcessor("org.openjdk.jcstress:jcstress-core:0.16") } tasks.test { @@ -19,9 +24,35 @@ tasks.test { } kotlin { - jvmToolchain(8) + jvmToolchain(21) } application { mainClass.set("MainKt") -} \ No newline at end of file +} + +jmh { + jvmArgs = listOf("--enable-native-access=ALL-UNNAMED", "--add-opens=java.base/java.lang=ALL-UNNAMED") +} + +sourceSets { + test { + java.srcDirs("src/test/kotlin", "src/test/java") + } +} + +// JCStress runner task: runs JCStress tests located on the test runtime classpath +// Use: ./gradlew jcstress [-PjcstressArgs="-v -m quick"] +tasks.register("jcstress") { + group = "verification" + description = "Run JCStress stress tests" + mainClass.set("org.openjdk.jcstress.Main") + classpath = sourceSets.test.get().runtimeClasspath + dependsOn("testClasses") + + val argsProp = project.findProperty("jcstressArgs") as String? + if (!argsProp.isNullOrBlank()) { + args = argsProp.split("\\s+".toRegex()) + } +} + diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 62f495d..b82aa23 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/src/jmh/kotlin/org/itmo/bfs/JmhBfsResourceTest.kt b/src/jmh/kotlin/org/itmo/bfs/JmhBfsResourceTest.kt new file mode 100644 index 0000000..7178381 --- /dev/null +++ b/src/jmh/kotlin/org/itmo/bfs/JmhBfsResourceTest.kt @@ -0,0 +1,63 @@ +package org.itmo.bfs + +import org.itmo.Graph +import org.itmo.RandomGraphGenerator +import org.itmo.bfs.impl.ParallelBfs +import org.itmo.bfs.impl.ParallelFrontierBfs +import org.openjdk.jmh.annotations.Benchmark +import org.openjdk.jmh.annotations.BenchmarkMode +import org.openjdk.jmh.annotations.Fork +import org.openjdk.jmh.annotations.Measurement +import org.openjdk.jmh.annotations.Mode +import org.openjdk.jmh.annotations.OutputTimeUnit +import org.openjdk.jmh.annotations.Param +import org.openjdk.jmh.annotations.Scope +import org.openjdk.jmh.annotations.Setup +import org.openjdk.jmh.annotations.State +import org.openjdk.jmh.annotations.Warmup +import org.openjdk.jmh.infra.Blackhole +import java.util.concurrent.TimeUnit +import kotlin.math.min + +@Fork(1) +@State(Scope.Benchmark) +@Warmup(iterations = 1, time = 5) +@Measurement(iterations = 4, time = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +open class JmhBfsResourceTest { + + @Param("1", "2", "4", "6", "12", "16", "24") + private var runtime = 0 + + private var connections = 0 + private lateinit var graph: Graph + + private lateinit var parallelFrontierBfs: Bfs + private lateinit var parallelBfs: Bfs + + @Setup + fun setup() { + val graphSize = 2_000_000 + connections = min(graphSize * 10, 10_000_000) + + println("Генерация графа размером $graphSize с $connections рёбрами...") + + graph = RandomGraphGenerator().generateGraph(java.util.Random(42), graphSize, connections) + + println("Генерация завершена.") + + parallelBfs = ParallelBfs(runtime) + parallelFrontierBfs = ParallelFrontierBfs(runtime) + } + + @Benchmark + fun parallelBfs(bh: Blackhole) { + bh.consume(parallelBfs.bfs(graph, 0)) + } + + @Benchmark + fun parallelFrontierBfs(bh: Blackhole) { + bh.consume(parallelFrontierBfs.bfs(graph, 0)) + } +} diff --git a/src/jmh/kotlin/org/itmo/bfs/JmhBfsTest.kt b/src/jmh/kotlin/org/itmo/bfs/JmhBfsTest.kt new file mode 100644 index 0000000..8a1e160 --- /dev/null +++ b/src/jmh/kotlin/org/itmo/bfs/JmhBfsTest.kt @@ -0,0 +1,75 @@ +package org.itmo.bfs + +import org.itmo.Graph +import org.itmo.RandomGraphGenerator +import org.itmo.bfs.impl.ForkJoinBfs +import org.itmo.bfs.impl.ParallelBfs +import org.itmo.bfs.impl.ParallelFrontierBfs +import org.itmo.bfs.impl.SimpleBfs +import org.openjdk.jmh.annotations.Benchmark +import org.openjdk.jmh.annotations.BenchmarkMode +import org.openjdk.jmh.annotations.Fork +import org.openjdk.jmh.annotations.Measurement +import org.openjdk.jmh.annotations.Mode +import org.openjdk.jmh.annotations.OutputTimeUnit +import org.openjdk.jmh.annotations.Param +import org.openjdk.jmh.annotations.Scope +import org.openjdk.jmh.annotations.Setup +import org.openjdk.jmh.annotations.State +import org.openjdk.jmh.annotations.Warmup +import org.openjdk.jmh.infra.Blackhole +import java.util.concurrent.TimeUnit +import kotlin.math.min + +@Fork(1) +@State(Scope.Benchmark) +@Warmup(iterations = 1, time = 5) +@Measurement(iterations = 4, time = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +open class JmhBfsTest { + + @Param("10", "100", "1000", "10000", "50000", "100000", "1000000", "2000000") + private var graphSize = 0 + + private var connections = 0 + private lateinit var graph: Graph + + private val runtime = Runtime.getRuntime().availableProcessors() * 2 + + private val simpleBfs = SimpleBfs() + private val forkJoinBfs = ForkJoinBfs() + private val parallelFrontierBfs = ParallelFrontierBfs(runtime) + private val parallelBfs = ParallelBfs(runtime) + + @Setup + fun setup() { + connections = min(graphSize * 10, 10_000_000) + + println("Генерация графа размером $graphSize с $connections рёбрами...") + + graph = RandomGraphGenerator().generateGraph(java.util.Random(42), graphSize, connections) + + println("Генерация завершена.") + } + + @Benchmark + fun serialBfs(bh: Blackhole) { + bh.consume(simpleBfs.bfs(graph, 0)) + } + + @Benchmark + fun parallelBfs(bh: Blackhole) { + bh.consume(parallelBfs.bfs(graph, 0)) + } + + @Benchmark + fun parallelFrontierBfs(bh: Blackhole) { + bh.consume(parallelFrontierBfs.bfs(graph, 0)) + } + + @Benchmark + fun forkJoinBfs(bh: Blackhole) { + bh.consume(forkJoinBfs.bfs(graph, 0)) + } +} diff --git a/src/main/java/org/itmo/Graph.java b/src/main/java/org/itmo/Graph.java index 141a0b6..8c3de6c 100644 --- a/src/main/java/org/itmo/Graph.java +++ b/src/main/java/org/itmo/Graph.java @@ -1,16 +1,13 @@ package org.itmo; import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -class Graph { - private final int V; +public class Graph { + private final int size; private final ArrayList[] adjList; Graph(int vertices) { - this.V = vertices; + this.size = vertices; adjList = new ArrayList[vertices]; for (int i = 0; i < vertices; ++i) { adjList[i] = new ArrayList<>(); @@ -23,28 +20,12 @@ void addEdge(int src, int dest) { } } - void parallelBFS(int startVertex) { + public int getSize() { + return size; } - //Generated by ChatGPT - void bfs(int startVertex) { - boolean[] visited = new boolean[V]; - - LinkedList queue = new LinkedList<>(); - - visited[startVertex] = true; - queue.add(startVertex); - - while (!queue.isEmpty()) { - startVertex = queue.poll(); - - for (int n : adjList[startVertex]) { - if (!visited[n]) { - visited[n] = true; - queue.add(n); - } - } - } + // not safe, but today I don't care + public ArrayList[] getAdjList() { + return adjList; } - } diff --git a/src/main/java/org/itmo/bfs/Bfs.java b/src/main/java/org/itmo/bfs/Bfs.java new file mode 100644 index 0000000..1970d86 --- /dev/null +++ b/src/main/java/org/itmo/bfs/Bfs.java @@ -0,0 +1,7 @@ +package org.itmo.bfs; + +import org.itmo.Graph; + +public interface Bfs { + void bfs(Graph graph, int startVertex); +} diff --git a/src/main/java/org/itmo/bfs/impl/ForkJoinBfs.java b/src/main/java/org/itmo/bfs/impl/ForkJoinBfs.java new file mode 100644 index 0000000..df265ba --- /dev/null +++ b/src/main/java/org/itmo/bfs/impl/ForkJoinBfs.java @@ -0,0 +1,33 @@ +package org.itmo.bfs.impl; + +import java.util.Collection; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicIntegerArray; +import org.itmo.Graph; +import org.itmo.bfs.Bfs; + +public class ForkJoinBfs implements Bfs { + + @Override + public void bfs(Graph graph, int startVertex) { + var visited = new AtomicIntegerArray(graph.getSize()); + visited.set(startVertex, 1); + + // we can use faster collection, bz we don't remove elements and queue is sequential (linearizable) on CAS + Collection frontier = new ConcurrentLinkedQueue<>(); + frontier.add(startVertex); + + while (!frontier.isEmpty()) { + Queue nextFrontier = new ConcurrentLinkedQueue<>(); + + // actually this need bulk optimization for unbalanced graphs + frontier.parallelStream() + .flatMap(v -> graph.getAdjList()[v].stream()) + .filter(neighbor -> visited.compareAndSet(neighbor, 0, 1)) + .forEach(nextFrontier::add); + + frontier = nextFrontier; + } + } +} diff --git a/src/main/java/org/itmo/bfs/impl/ParallelBfs.java b/src/main/java/org/itmo/bfs/impl/ParallelBfs.java new file mode 100644 index 0000000..c44c912 --- /dev/null +++ b/src/main/java/org/itmo/bfs/impl/ParallelBfs.java @@ -0,0 +1,82 @@ +package org.itmo.bfs.impl; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerArray; +import java.util.stream.IntStream; +import org.itmo.Graph; +import org.itmo.bfs.Bfs; + +// unfortunately, this implementation is not working +public class ParallelBfs implements Bfs { + + private final int threadNumber; + + public ParallelBfs(int threadNumber) { + this.threadNumber = threadNumber; + } + + @Override + public void bfs(Graph graph, int startVertex) { + AtomicInteger activeNodes = new AtomicInteger(1); + AtomicIntegerArray distances = new AtomicIntegerArray(graph.getSize()); + ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + + queue.add(startVertex); + + IntStream.range(0, threadNumber) + .mapToObj(i -> + new Thread(new BfsWorker(graph, distances, queue, activeNodes)) + ) + .peek(Thread::start) + .forEach(thread -> { + try { + thread.join(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + public record BfsWorker( + Graph graph, + AtomicIntegerArray distances, + Queue queue, + AtomicInteger activeNodes + ) implements Runnable { + + @Override + public void run() { + var adjList = graph.getAdjList(); + while (activeNodes.get() > 0) { + var currentVertex = queue.poll(); + if (currentVertex == null) { + continue; + } + int d = distances.get(currentVertex); + for (var neighbor : adjList[currentVertex]) { + if (updateDistanceIfLower(neighbor, d - 1)) { + queue.add(neighbor); + activeNodes.incrementAndGet(); + } + } + activeNodes.decrementAndGet(); + } + } + + private boolean updateDistanceIfLower(int node, int newDistance) { + while (true) { + int currentDistance = distances.get(node); + + if (newDistance >= currentDistance || currentDistance != 0) { + return false; + } + + if (distances.compareAndSet(node, currentDistance, newDistance)) { + return true; + } + } + } + } +} diff --git a/src/main/java/org/itmo/bfs/impl/ParallelFrontierBfs.java b/src/main/java/org/itmo/bfs/impl/ParallelFrontierBfs.java new file mode 100644 index 0000000..d00ff87 --- /dev/null +++ b/src/main/java/org/itmo/bfs/impl/ParallelFrontierBfs.java @@ -0,0 +1,69 @@ +package org.itmo.bfs.impl; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicIntegerArray; +import org.itmo.Graph; +import org.itmo.bfs.Bfs; + +public class ParallelFrontierBfs implements Bfs { + + private final int threadNumber; + + public ParallelFrontierBfs(int threadNumber) { + this.threadNumber = threadNumber; + } + + @Override + public void bfs(Graph graph, int startVertex) { + var adjList = graph.getAdjList(); + var visited = new AtomicIntegerArray(graph.getSize()); + visited.set(startVertex, 1); + + List frontier = new ArrayList<>(); + frontier.add(startVertex); + + try (var es = Executors.newFixedThreadPool(threadNumber)) { + while (!frontier.isEmpty()) { + final List currentFrontier = frontier; + + // we only need write synchronization, java doesn't have such list, writing it vpadlu + List nextFrontier = Collections.synchronizedList(new ArrayList<>()); + + var butchSize = Math.max(1, frontier.size() / threadNumber); + + List> futures = new ArrayList<>(); + for (int i = 0; i < frontier.size(); i += butchSize) { + var startIndex = i; + var lastIndex = Math.min(i + butchSize, frontier.size()); + + var future = es.submit(() -> + { + for (int vertex = startIndex; vertex < lastIndex; vertex++) { + for (int neighbor : adjList[currentFrontier.get(vertex)]) { + if (visited.compareAndSet(neighbor, 0, 1)) { + nextFrontier.add(neighbor); + } + } + } + } + ); + + futures.add(future); + } + + for (Future f : futures) { + f.get(); + } + + frontier = nextFrontier; + } + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/org/itmo/bfs/impl/SimpleBfs.java b/src/main/java/org/itmo/bfs/impl/SimpleBfs.java new file mode 100644 index 0000000..21f3d31 --- /dev/null +++ b/src/main/java/org/itmo/bfs/impl/SimpleBfs.java @@ -0,0 +1,31 @@ +package org.itmo.bfs.impl; + +import java.util.LinkedList; +import org.itmo.Graph; +import org.itmo.bfs.Bfs; + +public class SimpleBfs implements Bfs { + + //Generated by ChatGPT + @Override + public void bfs(Graph graph, int startVertex) { + boolean[] visited = new boolean[graph.getSize()]; + + LinkedList queue = new LinkedList<>(); + + visited[startVertex] = true; + queue.add(startVertex); + + var adjList = graph.getAdjList(); + while (!queue.isEmpty()) { + startVertex = queue.poll(); + + for (int n : adjList[startVertex]) { + if (!visited[n]) { + visited[n] = true; + queue.add(n); + } + } + } + } +} diff --git a/src/test/java/org/itmo/BFSTest.java b/src/test/java/org/itmo/BFSTest.java index 7bf9098..da138ec 100644 --- a/src/test/java/org/itmo/BFSTest.java +++ b/src/test/java/org/itmo/BFSTest.java @@ -1,14 +1,15 @@ package org.itmo; -import org.junit.jupiter.api.Test; - import java.io.FileWriter; import java.io.IOException; -import java.nio.Buffer; -import java.util.HashSet; import java.util.Random; -import java.util.function.BiFunction; -import java.util.stream.IntStream; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import org.itmo.bfs.impl.ForkJoinBfs; +import org.itmo.bfs.impl.ParallelBfs; +import org.itmo.bfs.impl.ParallelFrontierBfs; +import org.itmo.bfs.impl.SimpleBfs; +import org.junit.jupiter.api.Test; public class BFSTest { @@ -24,9 +25,13 @@ public void bfsTest() throws IOException { Graph g = new RandomGraphGenerator().generateGraph(r, sizes[i], connections[i]); System.out.println("Generation completed!\nStarting bfs"); long serialTime = executeSerialBfsAndGetTime(g); + long forkJoinTime = executeForkJoinBfs(g); + long frontierTime = executeParallelFrontierBfsAndGetTime(g); long parallelTime = executeParallelBfsAndGetTime(g); fw.append("Times for " + sizes[i] + " vertices and " + connections[i] + " connections: "); fw.append("\nSerial: " + serialTime); + fw.append("\nForkJoin: " + forkJoinTime); + fw.append("\nFrontier: " + frontierTime); fw.append("\nParallel: " + parallelTime); fw.append("\n--------\n"); } @@ -34,19 +39,70 @@ public void bfsTest() throws IOException { } } + @Test + public void bfsThreadsTest() throws IOException { + int graphSize = 2_000_000; + int connections = 10_000_000; + int[] threadCounts = new int[]{1, 2, 4, 8, 12, 16, 24}; + Random r = new Random(42); + + System.out.println("Generating graph of size " + graphSize + " with " + connections + " connections... wait"); + Graph g = new RandomGraphGenerator().generateGraph(r, graphSize, connections); + System.out.println("Graph generation completed!"); + + try (FileWriter fw = new FileWriter("tmp/threads_results.txt")) { + for (int threads : threadCounts) { + System.out.println("--------------------------"); + System.out.println("Running BFS with " + threads + " threads"); + var bfs = new ParallelFrontierBfs(threads); + long startTime = System.nanoTime(); + bfs.bfs(g, 0); + long endTime = System.nanoTime(); + long result = (endTime - startTime) / 1_000_000; + + fw.append("Times for " + graphSize + " vertices, " + connections + " connections and " + threads + " threads:\n"); + fw.append("Frontier: " + result + "\n"); + fw.append("--------\n"); + fw.flush(); + } + } + } private long executeSerialBfsAndGetTime(Graph g) { - long startTime = System.currentTimeMillis(); - g.bfs(0); - long endTime = System.currentTimeMillis(); - return endTime - startTime; + var bfs = new SimpleBfs(); + long startTime = System.nanoTime(); + bfs.bfs(g, 0); + long endTime = System.nanoTime(); + return (endTime - startTime) / 1_000_000; } private long executeParallelBfsAndGetTime(Graph g) { - long startTime = System.currentTimeMillis(); - g.parallelBFS(0); - long endTime = System.currentTimeMillis(); - return endTime - startTime; + var runtime = Runtime.getRuntime().availableProcessors() * 2; + var bfs = new ParallelBfs(runtime); + long startTime = System.nanoTime(); + bfs.bfs(g, 0); + long endTime = System.nanoTime(); + return (endTime - startTime) / 1_000_000; } + private long executeParallelFrontierBfsAndGetTime(Graph g) { + var runtime = Runtime.getRuntime().availableProcessors() * 2; + var bfs = new ParallelFrontierBfs(runtime); + long startTime = System.nanoTime(); + bfs.bfs(g, 0); + long endTime = System.nanoTime(); + return (endTime - startTime) / 1_000_000; + } + + private long executeForkJoinBfs(Graph g) { + var bfs = new ForkJoinBfs(); + long startTime = System.nanoTime(); + try (var pool = ForkJoinPool.commonPool()) { + pool.submit(() -> bfs.bfs(g, 0)).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + long endTime = System.nanoTime(); + return (endTime - startTime) / 1_000_000; + } } diff --git a/src/test/java/org/itmo/ParallelBfsTest.java b/src/test/java/org/itmo/ParallelBfsTest.java new file mode 100644 index 0000000..d467ff7 --- /dev/null +++ b/src/test/java/org/itmo/ParallelBfsTest.java @@ -0,0 +1,114 @@ +package org.itmo; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.Random; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerArray; +import org.itmo.bfs.impl.ParallelBfs.BfsWorker; +import org.openjdk.jcstress.annotations.Actor; +import org.openjdk.jcstress.annotations.Arbiter; +import org.openjdk.jcstress.annotations.Expect; +import org.openjdk.jcstress.annotations.JCStressTest; +import org.openjdk.jcstress.annotations.Outcome; +import org.openjdk.jcstress.annotations.State; +import org.openjdk.jcstress.infra.results.I_Result; + +@JCStressTest +@Outcome(id = "0", expect = Expect.ACCEPTABLE, desc = "Нет нарушений") +@State +public class ParallelBfsTest { + + private final Graph graph = new RandomGraphGenerator().generateGraph(new Random(42), 1000, 100000); + + private final AtomicInteger activeNodes = new AtomicInteger(1); + private final AtomicIntegerArray distances = new AtomicIntegerArray(graph.getSize()); + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + + { + queue.add(0); + } + + @Actor + public void actor1() { + Thread t = new Thread(new BfsWorker(graph, distances, queue, activeNodes)); + t.start(); + + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Actor + public void actor2() { + Thread t = new Thread(new BfsWorker(graph, distances, queue, activeNodes)); + t.start(); + + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Actor + public void actor3() { + Thread t = new Thread(new BfsWorker(graph, distances, queue, activeNodes)); + t.start(); + + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Actor + public void actor4() { + Thread t = new Thread(new BfsWorker(graph, distances, queue, activeNodes)); + t.start(); + + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Arbiter + public void arbiter(I_Result r) { + List distancesSimple = new ArrayList<>(); + for (int i = 0; i < distances.length(); i++) { + distancesSimple.add(0); + } + + Queue queue = new LinkedList<>(); + + queue.add(0); + + while (!queue.isEmpty()) { + int vertex = queue.poll(); + + for (int neighbor : graph.getAdjList()[vertex]) { + if (distancesSimple.get(neighbor).equals(0)) { + distancesSimple.set(neighbor, distancesSimple.get(vertex) - 1); + queue.add(neighbor); + } + } + } + + int result = 0; + for (int i = 0; i < graph.getSize(); i++) { + if (!distancesSimple.get(i).equals(distances.get(i))) { + result++; + } + } + + r.r1 = result; + } +} diff --git a/src/test/java/org/itmo/RandomGraphGenerator.java b/src/test/java/org/itmo/RandomGraphGenerator.java index fdb888c..efd7885 100644 --- a/src/test/java/org/itmo/RandomGraphGenerator.java +++ b/src/test/java/org/itmo/RandomGraphGenerator.java @@ -1,7 +1,9 @@ package org.itmo; import java.util.Arrays; +import java.util.HashSet; import java.util.Random; +import java.util.Set; import java.util.SplittableRandom; import java.util.concurrent.ForkJoinPool; import java.util.stream.IntStream; @@ -11,22 +13,27 @@ public class RandomGraphGenerator { private long pack(int u, int v) { return (((long) u) << 32) | (v & 0xffffffffL); } + private int unpackU(long key) { return (int) (key >>> 32); } + private int unpackV(long key) { return (int) (key & 0xffffffffL); } - Graph generateGraph(Random r, int size, int numEdges) { + public Graph generateGraph(Random r, int size, int numEdges) { + if (size < 1) throw new IllegalArgumentException("size must be >= 1"); if (numEdges < size - 1) throw new IllegalArgumentException("We need min size-1 edges"); long maxDirected = (long) size * (size - 1); if (numEdges > maxDirected) throw new IllegalArgumentException("Too many edges for directed graph without self-loops"); - int[] perm = java.util.stream.IntStream.range(0, size).toArray(); - for (int i = size - 1; i > 1; i--) { - int j = 1 + r.nextInt(i); - int tmp = perm[i]; perm[i] = perm[j]; perm[j] = tmp; + int[] perm = IntStream.range(0, size).toArray(); + for (int i = size - 1; i > 0; i--) { + int j = r.nextInt(i + 1); + int tmp = perm[i]; + perm[i] = perm[j]; + perm[j] = tmp; } final int chainCount = size - 1; @@ -74,7 +81,7 @@ Graph generateGraph(Random r, int size, int numEdges) { while (unique < numEdges) { int missing = numEdges - unique; - int extra = Math.max(missing / 2, 10_000); // небольшой запас + int extra = Math.max(missing / 2, 10_000); int add = missing + extra; long[] more = new long[unique + add]; @@ -109,6 +116,31 @@ Graph generateGraph(Random r, int size, int numEdges) { keys = more; } + Set chainSet = new HashSet<>(chainCount * 2); + for (int i = 1; i < size; i++) { + chainSet.add(pack(perm[i - 1], perm[i])); + } + + int p = 0; + for (int i = 0; i < unique && p < chainCount; i++) { + long e = keys[i]; + if (chainSet.remove(e)) { + // swap keys[p] и keys[i] + long tmp = keys[p]; + keys[p] = keys[i]; + keys[i] = tmp; + p++; + } + } + + SplittableRandom shuf = base.split(); + for (int i = p; i < numEdges; i++) { + int j = i + shuf.nextInt(unique - i); + long tmp = keys[i]; + keys[i] = keys[j]; + keys[j] = tmp; + } + Graph g = new Graph(size); for (int i = 0; i < numEdges; i++) { long key = keys[i]; @@ -118,5 +150,4 @@ Graph generateGraph(Random r, int size, int numEdges) { } return g; } - } diff --git a/tmp/results.txt b/tmp/results.txt index 027e7f9..fae4a4f 100644 --- a/tmp/results.txt +++ b/tmp/results.txt @@ -1,32 +1,54 @@ Times for 10 vertices and 50 connections: Serial: 0 -Parallel: 0 +ForkJoin: 1 +Frontier: 1 +Parallel: 3 -------- Times for 100 vertices and 500 connections: Serial: 0 -Parallel: 0 +ForkJoin: 3 +Frontier: 5 +Parallel: 3 -------- Times for 1000 vertices and 5000 connections: Serial: 1 -Parallel: 0 +ForkJoin: 7 +Frontier: 25 +Parallel: 20 -------- Times for 10000 vertices and 50000 connections: -Serial: 3 -Parallel: 0 +Serial: 4 +ForkJoin: 15 +Frontier: 33 +Parallel: 33 -------- Times for 10000 vertices and 100000 connections: -Serial: 2 -Parallel: 0 +Serial: 7 +ForkJoin: 4 +Frontier: 19 +Parallel: 5 -------- Times for 50000 vertices and 1000000 connections: -Serial: 30 -Parallel: 0 +Serial: 28 +ForkJoin: 21 +Frontier: 38 +Parallel: 43 -------- Times for 100000 vertices and 1000000 connections: -Serial: 18 -Parallel: 0 +Serial: 47 +ForkJoin: 36 +Frontier: 66 +Parallel: 60 -------- Times for 1000000 vertices and 10000000 connections: -Serial: 307 -Parallel: 0 +Serial: 525 +ForkJoin: 304 +Frontier: 538 +Parallel: 576 +-------- +Times for 2000000 vertices and 10000000 connections: +Serial: 876 +ForkJoin: 531 +Frontier: 702 +Parallel: 1061 -------- diff --git a/tmp/threads_results.txt b/tmp/threads_results.txt new file mode 100644 index 0000000..8f52319 --- /dev/null +++ b/tmp/threads_results.txt @@ -0,0 +1,21 @@ +Times for 2000000 vertices, 10000000 connections and 1 threads: +Frontier: 1027 +-------- +Times for 2000000 vertices, 10000000 connections and 2 threads: +Frontier: 767 +-------- +Times for 2000000 vertices, 10000000 connections and 4 threads: +Frontier: 694 +-------- +Times for 2000000 vertices, 10000000 connections and 8 threads: +Frontier: 733 +-------- +Times for 2000000 vertices, 10000000 connections and 12 threads: +Frontier: 816 +-------- +Times for 2000000 vertices, 10000000 connections and 16 threads: +Frontier: 887 +-------- +Times for 2000000 vertices, 10000000 connections and 24 threads: +Frontier: 814 +--------