diff --git a/build.gradle.kts b/build.gradle.kts index 3341beb..5370e10 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,5 +1,6 @@ plugins { kotlin("jvm") version "1.9.20" + java application } @@ -12,6 +13,8 @@ repositories { dependencies { testImplementation(kotlin("test")) + testImplementation("org.openjdk.jcstress:jcstress-core:0.16") + testAnnotationProcessor("org.openjdk.jcstress:jcstress-core:0.16") } tasks.test { @@ -24,4 +27,20 @@ kotlin { application { mainClass.set("MainKt") -} \ No newline at end of file +} + +// JCStress runner task: runs JCStress tests located on the test runtime classpath +// Use: ./gradlew jcstress [-PjcstressArgs="-v -m quick"] +tasks.register("jcstress") { + group = "verification" + description = "Run JCStress stress tests" + mainClass.set("org.openjdk.jcstress.Main") + classpath = sourceSets.test.get().runtimeClasspath + dependsOn("testClasses") + + val argsProp = project.findProperty("jcstressArgs") as String? + if (!argsProp.isNullOrBlank()) { + args = argsProp.split("\\s+".toRegex()) + } +} + diff --git a/src/main/java/org/itmo/Graph.java b/src/main/java/org/itmo/Graph.java index 141a0b6..485d039 100644 --- a/src/main/java/org/itmo/Graph.java +++ b/src/main/java/org/itmo/Graph.java @@ -1,13 +1,18 @@ package org.itmo; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collector; +import java.util.stream.Collectors; class Graph { private final int V; private final ArrayList[] adjList; + private final ExecutorService executor; Graph(int vertices) { this.V = vertices; @@ -15,6 +20,8 @@ class Graph { for (int i = 0; i < vertices; ++i) { adjList[i] = new ArrayList<>(); } + + executor = Executors.newFixedThreadPool(12); } void addEdge(int src, int dest) { @@ -24,9 +31,45 @@ void addEdge(int src, int dest) { } void parallelBFS(int startVertex) { + AtomicBoolean[] visited = new AtomicBoolean[V]; + for (Integer i = 0; i < V; i++) { + visited[i] = new AtomicBoolean(false); + } + + QueueBalancer balancer = new QueueBalancer<>(12); + Queue nextVertexes = new ConcurrentLinkedQueue<>(); + + visited[startVertex].set(true); + nextVertexes.add(startVertex); + + while (!nextVertexes.isEmpty()) { + while (!nextVertexes.isEmpty()) { + balancer.add(nextVertexes.poll()); + } + + balancer.getQueues().stream() + .map((queue) -> executor.submit(() -> { + while (!queue.isEmpty()) { + Integer vertex = queue.poll(); + + for (int n : adjList[vertex]) { + Boolean isVisited = visited[n].getAndSet(true); + if (!isVisited) { + nextVertexes.add(n); + } + } + } + })).forEach((future) -> { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + }); + } } - //Generated by ChatGPT + // Generated by ChatGPT void bfs(int startVertex) { boolean[] visited = new boolean[V]; diff --git a/src/main/java/org/itmo/QueueBalancer.java b/src/main/java/org/itmo/QueueBalancer.java new file mode 100644 index 0000000..c961549 --- /dev/null +++ b/src/main/java/org/itmo/QueueBalancer.java @@ -0,0 +1,26 @@ +package org.itmo; + +import java.util.ArrayDeque; +import java.util.List; +import java.util.Queue; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +class QueueBalancer { + private Integer nextQueue; + private final List> queues; + + QueueBalancer(Integer size) { + nextQueue = 0; + queues = Stream.>generate(ArrayDeque::new).limit(size).collect(Collectors.toList()); + } + + void add(T value) { + nextQueue = (nextQueue + 1) % queues.size(); + queues.get(nextQueue).add(value); + } + + List> getQueues() { + return queues; + } +} diff --git a/src/main/java/org/itmo/UnsafeCounter.java b/src/main/java/org/itmo/UnsafeCounter.java new file mode 100644 index 0000000..1041a21 --- /dev/null +++ b/src/main/java/org/itmo/UnsafeCounter.java @@ -0,0 +1,13 @@ +package org.itmo; + +public class UnsafeCounter { + private int counter = 0; + + public void increment() { + counter++; // <-- гонка данных + } + + public int get() { + return counter; + } +} diff --git a/src/test/java/org/itmo/BFSTest.java b/src/test/java/org/itmo/BFSTest.java index 7bf9098..77af020 100644 --- a/src/test/java/org/itmo/BFSTest.java +++ b/src/test/java/org/itmo/BFSTest.java @@ -4,18 +4,15 @@ import java.io.FileWriter; import java.io.IOException; -import java.nio.Buffer; -import java.util.HashSet; +import java.util.List; import java.util.Random; -import java.util.function.BiFunction; -import java.util.stream.IntStream; public class BFSTest { @Test public void bfsTest() throws IOException { - int[] sizes = new int[]{10, 100, 1000, 10_000, 10_000, 50_000, 100_000, 1_000_000, 2_000_000}; - int[] connections = new int[]{50, 500, 5000, 50_000, 100_000, 1_000_000, 1_000_000, 10_000_000, 10_000_000}; + int[] sizes = new int[] { 10, 100, 1000, 10_000, 10_000, 50_000, 100_000, 1_000_000, 2_000_000 }; + int[] connections = new int[] { 50, 500, 5000, 50_000, 100_000, 1_000_000, 1_000_000, 10_000_000, 10_000_000 }; Random r = new Random(42); try (FileWriter fw = new FileWriter("tmp/results.txt")) { for (int i = 0; i < sizes.length; i++) { @@ -34,7 +31,6 @@ public void bfsTest() throws IOException { } } - private long executeSerialBfsAndGetTime(Graph g) { long startTime = System.currentTimeMillis(); g.bfs(0); diff --git a/src/test/java/org/itmo/RandomGraphGenerator.java b/src/test/java/org/itmo/RandomGraphGenerator.java index fdb888c..3f02e49 100644 --- a/src/test/java/org/itmo/RandomGraphGenerator.java +++ b/src/test/java/org/itmo/RandomGraphGenerator.java @@ -1,7 +1,9 @@ package org.itmo; import java.util.Arrays; +import java.util.HashSet; import java.util.Random; +import java.util.Set; import java.util.SplittableRandom; import java.util.concurrent.ForkJoinPool; import java.util.stream.IntStream; @@ -11,22 +13,31 @@ public class RandomGraphGenerator { private long pack(int u, int v) { return (((long) u) << 32) | (v & 0xffffffffL); } + private int unpackU(long key) { return (int) (key >>> 32); } + private int unpackV(long key) { return (int) (key & 0xffffffffL); } Graph generateGraph(Random r, int size, int numEdges) { - if (numEdges < size - 1) throw new IllegalArgumentException("We need min size-1 edges"); + if (size < 1) + throw new IllegalArgumentException("size must be >= 1"); + if (numEdges < size - 1) + throw new IllegalArgumentException("We need min size-1 edges"); long maxDirected = (long) size * (size - 1); - if (numEdges > maxDirected) throw new IllegalArgumentException("Too many edges for directed graph without self-loops"); + if (numEdges > maxDirected) + throw new IllegalArgumentException("Too many edges for directed graph without self-loops"); + + int[] perm = IntStream.range(0, size).toArray(); + for (int i = size - 1; i > 0; i--) { + int j = r.nextInt(i + 1); + int tmp = perm[i]; + perm[i] = perm[j]; + perm[j] = tmp; - int[] perm = java.util.stream.IntStream.range(0, size).toArray(); - for (int i = size - 1; i > 1; i--) { - int j = 1 + r.nextInt(i); - int tmp = perm[i]; perm[i] = perm[j]; perm[j] = tmp; } final int chainCount = size - 1; @@ -48,7 +59,8 @@ Graph generateGraph(Random r, int size, int numEdges) { final SplittableRandom base = new SplittableRandom(r.nextLong()); final SplittableRandom[] seeds = new SplittableRandom[threads]; - for (int t = 0; t < threads; t++) seeds[t] = base.split(); + for (int t = 0; t < threads; t++) + seeds[t] = base.split(); long[] finalKeys = keys; IntStream.range(0, threads).parallel().forEach(t -> { @@ -58,7 +70,8 @@ Graph generateGraph(Random r, int size, int numEdges) { for (int i = start; i < end; i++) { int u = rnd.nextInt(size); int v = rnd.nextInt(size - 1); - if (v >= u) v++; + if (v >= u) + v++; finalKeys[i] = pack(u, v); } }); @@ -74,7 +87,7 @@ Graph generateGraph(Random r, int size, int numEdges) { while (unique < numEdges) { int missing = numEdges - unique; - int extra = Math.max(missing / 2, 10_000); // небольшой запас + int extra = Math.max(missing / 2, 10_000); int add = missing + extra; long[] more = new long[unique + add]; @@ -82,7 +95,8 @@ Graph generateGraph(Random r, int size, int numEdges) { final SplittableRandom base2 = base.split(); final SplittableRandom[] seeds2 = new SplittableRandom[threads]; - for (int t = 0; t < threads; t++) seeds2[t] = base2.split(); + for (int t = 0; t < threads; t++) + seeds2[t] = base2.split(); final int offset2 = unique; final int chunk2 = (add + threads - 1) / threads; @@ -93,7 +107,8 @@ Graph generateGraph(Random r, int size, int numEdges) { for (int i = start; i < end; i++) { int u = rnd.nextInt(size); int v = rnd.nextInt(size - 1); - if (v >= u) v++; + if (v >= u) + v++; more[i] = pack(u, v); } }); @@ -109,6 +124,31 @@ Graph generateGraph(Random r, int size, int numEdges) { keys = more; } + Set chainSet = new HashSet<>(chainCount * 2); + for (int i = 1; i < size; i++) { + chainSet.add(pack(perm[i - 1], perm[i])); + } + + int p = 0; + for (int i = 0; i < unique && p < chainCount; i++) { + long e = keys[i]; + if (chainSet.remove(e)) { + // swap keys[p] и keys[i] + long tmp = keys[p]; + keys[p] = keys[i]; + keys[i] = tmp; + p++; + } + } + + SplittableRandom shuf = base.split(); + for (int i = p; i < numEdges; i++) { + int j = i + shuf.nextInt(unique - i); + long tmp = keys[i]; + keys[i] = keys[j]; + keys[j] = tmp; + } + Graph g = new Graph(size); for (int i = 0; i < numEdges; i++) { long key = keys[i]; diff --git a/src/test/java/org/itmo/UnsafeCounterTest.java b/src/test/java/org/itmo/UnsafeCounterTest.java new file mode 100644 index 0000000..a831605 --- /dev/null +++ b/src/test/java/org/itmo/UnsafeCounterTest.java @@ -0,0 +1,27 @@ +package org.itmo; + +import org.openjdk.jcstress.annotations.*; +import org.openjdk.jcstress.infra.results.I_Result; + +@JCStressTest +@Outcome(id = "5", expect = Expect.ACCEPTABLE, desc = "Все 5 инкрементов выполнены корректно") +@Outcome(id = "1", expect = Expect.ACCEPTABLE_INTERESTING, desc = "Гонка данных: часть инкрементов потерялась") +@Outcome(id = "2", expect = Expect.ACCEPTABLE_INTERESTING, desc = "Гонка данных: часть инкрементов потерялась") +@Outcome(id = "3", expect = Expect.ACCEPTABLE_INTERESTING, desc = "Гонка данных: часть инкрементов потерялась") +@Outcome(id = "4", expect = Expect.ACCEPTABLE_INTERESTING, desc = "Гонка данных: часть инкрементов потерялась") +@State +public class UnsafeCounterTest { + + private UnsafeCounter counter = new UnsafeCounter(); + + @Actor public void actor1() { counter.increment(); } + @Actor public void actor2() { counter.increment(); } + @Actor public void actor3() { counter.increment(); } + @Actor public void actor4() { counter.increment(); } + @Actor public void actor5() { counter.increment(); } + + @Arbiter + public void arbiter(I_Result r) { + r.r1 = counter.get(); + } +} diff --git a/tmp/results.txt b/tmp/results.txt index 027e7f9..0109e02 100644 --- a/tmp/results.txt +++ b/tmp/results.txt @@ -1,32 +1,36 @@ Times for 10 vertices and 50 connections: Serial: 0 -Parallel: 0 +Parallel: 6 -------- Times for 100 vertices and 500 connections: -Serial: 0 -Parallel: 0 +Serial: 1 +Parallel: 4 -------- Times for 1000 vertices and 5000 connections: Serial: 1 -Parallel: 0 +Parallel: 6 -------- Times for 10000 vertices and 50000 connections: -Serial: 3 -Parallel: 0 +Serial: 4 +Parallel: 9 -------- Times for 10000 vertices and 100000 connections: Serial: 2 -Parallel: 0 +Parallel: 9 -------- Times for 50000 vertices and 1000000 connections: -Serial: 30 -Parallel: 0 +Serial: 20 +Parallel: 31 -------- Times for 100000 vertices and 1000000 connections: -Serial: 18 -Parallel: 0 +Serial: 19 +Parallel: 37 -------- Times for 1000000 vertices and 10000000 connections: -Serial: 307 -Parallel: 0 +Serial: 629 +Parallel: 536 +-------- +Times for 2000000 vertices and 10000000 connections: +Serial: 1112 +Parallel: 2922 --------