Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
plugins {
kotlin("jvm") version "1.9.20"
java
application
}

Expand All @@ -12,6 +13,8 @@ repositories {

dependencies {
testImplementation(kotlin("test"))
testImplementation("org.openjdk.jcstress:jcstress-core:0.16")
testAnnotationProcessor("org.openjdk.jcstress:jcstress-core:0.16")
}

tasks.test {
Expand All @@ -24,4 +27,20 @@ kotlin {

application {
mainClass.set("MainKt")
}
}

// JCStress runner task: runs JCStress tests located on the test runtime classpath
// Use: ./gradlew jcstress [-PjcstressArgs="-v -m quick"]
tasks.register<JavaExec>("jcstress") {
group = "verification"
description = "Run JCStress stress tests"
mainClass.set("org.openjdk.jcstress.Main")
classpath = sourceSets.test.get().runtimeClasspath
dependsOn("testClasses")

val argsProp = project.findProperty("jcstressArgs") as String?
if (!argsProp.isNullOrBlank()) {
args = argsProp.split("\\s+".toRegex())
}
}

49 changes: 46 additions & 3 deletions src/main/java/org/itmo/Graph.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
package org.itmo;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collector;
import java.util.stream.Collectors;

class Graph {
private final int V;
private final ArrayList<Integer>[] adjList;
private final ExecutorService executor;

Graph(int vertices) {
this.V = vertices;
adjList = new ArrayList[vertices];
for (int i = 0; i < vertices; ++i) {
adjList[i] = new ArrayList<>();
}

executor = Executors.newFixedThreadPool(12);
}

void addEdge(int src, int dest) {
Expand All @@ -24,9 +31,45 @@ void addEdge(int src, int dest) {
}

void parallelBFS(int startVertex) {
AtomicBoolean[] visited = new AtomicBoolean[V];
for (Integer i = 0; i < V; i++) {
visited[i] = new AtomicBoolean(false);
}

QueueBalancer<Integer> balancer = new QueueBalancer<>(12);
Queue<Integer> nextVertexes = new ConcurrentLinkedQueue<>();

visited[startVertex].set(true);
nextVertexes.add(startVertex);

while (!nextVertexes.isEmpty()) {
while (!nextVertexes.isEmpty()) {
balancer.add(nextVertexes.poll());
}

balancer.getQueues().stream()
.map((queue) -> executor.submit(() -> {
while (!queue.isEmpty()) {
Integer vertex = queue.poll();

for (int n : adjList[vertex]) {
Boolean isVisited = visited[n].getAndSet(true);
if (!isVisited) {
nextVertexes.add(n);
}
}
}
})).forEach((future) -> {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}

//Generated by ChatGPT
// Generated by ChatGPT
void bfs(int startVertex) {
boolean[] visited = new boolean[V];

Expand Down
26 changes: 26 additions & 0 deletions src/main/java/org/itmo/QueueBalancer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.itmo;

import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import java.util.stream.Collectors;
import java.util.stream.Stream;

class QueueBalancer<T> {
private Integer nextQueue;
private final List<Queue<T>> queues;

QueueBalancer(Integer size) {
nextQueue = 0;
queues = Stream.<Queue<T>>generate(ArrayDeque<T>::new).limit(size).collect(Collectors.toList());
}

void add(T value) {
nextQueue = (nextQueue + 1) % queues.size();
queues.get(nextQueue).add(value);
}

List<Queue<T>> getQueues() {
return queues;
}
}
13 changes: 13 additions & 0 deletions src/main/java/org/itmo/UnsafeCounter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.itmo;

public class UnsafeCounter {
private int counter = 0;

public void increment() {
counter++; // <-- гонка данных
}

public int get() {
return counter;
}
}
10 changes: 3 additions & 7 deletions src/test/java/org/itmo/BFSTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,15 @@

import java.io.FileWriter;
import java.io.IOException;
import java.nio.Buffer;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.function.BiFunction;
import java.util.stream.IntStream;

public class BFSTest {

@Test
public void bfsTest() throws IOException {
int[] sizes = new int[]{10, 100, 1000, 10_000, 10_000, 50_000, 100_000, 1_000_000, 2_000_000};
int[] connections = new int[]{50, 500, 5000, 50_000, 100_000, 1_000_000, 1_000_000, 10_000_000, 10_000_000};
int[] sizes = new int[] { 10, 100, 1000, 10_000, 10_000, 50_000, 100_000, 1_000_000, 2_000_000 };
int[] connections = new int[] { 50, 500, 5000, 50_000, 100_000, 1_000_000, 1_000_000, 10_000_000, 10_000_000 };
Random r = new Random(42);
try (FileWriter fw = new FileWriter("tmp/results.txt")) {
for (int i = 0; i < sizes.length; i++) {
Expand All @@ -34,7 +31,6 @@ public void bfsTest() throws IOException {
}
}


private long executeSerialBfsAndGetTime(Graph g) {
long startTime = System.currentTimeMillis();
g.bfs(0);
Expand Down
62 changes: 51 additions & 11 deletions src/test/java/org/itmo/RandomGraphGenerator.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package org.itmo;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.SplittableRandom;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
Expand All @@ -11,22 +13,31 @@ public class RandomGraphGenerator {
private long pack(int u, int v) {
return (((long) u) << 32) | (v & 0xffffffffL);
}

private int unpackU(long key) {
return (int) (key >>> 32);
}

private int unpackV(long key) {
return (int) (key & 0xffffffffL);
}

Graph generateGraph(Random r, int size, int numEdges) {
if (numEdges < size - 1) throw new IllegalArgumentException("We need min size-1 edges");
if (size < 1)
throw new IllegalArgumentException("size must be >= 1");
if (numEdges < size - 1)
throw new IllegalArgumentException("We need min size-1 edges");
long maxDirected = (long) size * (size - 1);
if (numEdges > maxDirected) throw new IllegalArgumentException("Too many edges for directed graph without self-loops");
if (numEdges > maxDirected)
throw new IllegalArgumentException("Too many edges for directed graph without self-loops");

int[] perm = IntStream.range(0, size).toArray();
for (int i = size - 1; i > 0; i--) {
int j = r.nextInt(i + 1);
int tmp = perm[i];
perm[i] = perm[j];
perm[j] = tmp;

int[] perm = java.util.stream.IntStream.range(0, size).toArray();
for (int i = size - 1; i > 1; i--) {
int j = 1 + r.nextInt(i);
int tmp = perm[i]; perm[i] = perm[j]; perm[j] = tmp;
}

final int chainCount = size - 1;
Expand All @@ -48,7 +59,8 @@ Graph generateGraph(Random r, int size, int numEdges) {

final SplittableRandom base = new SplittableRandom(r.nextLong());
final SplittableRandom[] seeds = new SplittableRandom[threads];
for (int t = 0; t < threads; t++) seeds[t] = base.split();
for (int t = 0; t < threads; t++)
seeds[t] = base.split();

long[] finalKeys = keys;
IntStream.range(0, threads).parallel().forEach(t -> {
Expand All @@ -58,7 +70,8 @@ Graph generateGraph(Random r, int size, int numEdges) {
for (int i = start; i < end; i++) {
int u = rnd.nextInt(size);
int v = rnd.nextInt(size - 1);
if (v >= u) v++;
if (v >= u)
v++;
finalKeys[i] = pack(u, v);
}
});
Expand All @@ -74,15 +87,16 @@ Graph generateGraph(Random r, int size, int numEdges) {

while (unique < numEdges) {
int missing = numEdges - unique;
int extra = Math.max(missing / 2, 10_000); // небольшой запас
int extra = Math.max(missing / 2, 10_000);
int add = missing + extra;

long[] more = new long[unique + add];
System.arraycopy(keys, 0, more, 0, unique);

final SplittableRandom base2 = base.split();
final SplittableRandom[] seeds2 = new SplittableRandom[threads];
for (int t = 0; t < threads; t++) seeds2[t] = base2.split();
for (int t = 0; t < threads; t++)
seeds2[t] = base2.split();

final int offset2 = unique;
final int chunk2 = (add + threads - 1) / threads;
Expand All @@ -93,7 +107,8 @@ Graph generateGraph(Random r, int size, int numEdges) {
for (int i = start; i < end; i++) {
int u = rnd.nextInt(size);
int v = rnd.nextInt(size - 1);
if (v >= u) v++;
if (v >= u)
v++;
more[i] = pack(u, v);
}
});
Expand All @@ -109,6 +124,31 @@ Graph generateGraph(Random r, int size, int numEdges) {
keys = more;
}

Set<Long> chainSet = new HashSet<>(chainCount * 2);
for (int i = 1; i < size; i++) {
chainSet.add(pack(perm[i - 1], perm[i]));
}

int p = 0;
for (int i = 0; i < unique && p < chainCount; i++) {
long e = keys[i];
if (chainSet.remove(e)) {
// swap keys[p] и keys[i]
long tmp = keys[p];
keys[p] = keys[i];
keys[i] = tmp;
p++;
}
}

SplittableRandom shuf = base.split();
for (int i = p; i < numEdges; i++) {
int j = i + shuf.nextInt(unique - i);
long tmp = keys[i];
keys[i] = keys[j];
keys[j] = tmp;
}

Graph g = new Graph(size);
for (int i = 0; i < numEdges; i++) {
long key = keys[i];
Expand Down
27 changes: 27 additions & 0 deletions src/test/java/org/itmo/UnsafeCounterTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.itmo;

import org.openjdk.jcstress.annotations.*;
import org.openjdk.jcstress.infra.results.I_Result;

@JCStressTest
@Outcome(id = "5", expect = Expect.ACCEPTABLE, desc = "Все 5 инкрементов выполнены корректно")
@Outcome(id = "1", expect = Expect.ACCEPTABLE_INTERESTING, desc = "Гонка данных: часть инкрементов потерялась")
@Outcome(id = "2", expect = Expect.ACCEPTABLE_INTERESTING, desc = "Гонка данных: часть инкрементов потерялась")
@Outcome(id = "3", expect = Expect.ACCEPTABLE_INTERESTING, desc = "Гонка данных: часть инкрементов потерялась")
@Outcome(id = "4", expect = Expect.ACCEPTABLE_INTERESTING, desc = "Гонка данных: часть инкрементов потерялась")
@State
public class UnsafeCounterTest {

private UnsafeCounter counter = new UnsafeCounter();

@Actor public void actor1() { counter.increment(); }
@Actor public void actor2() { counter.increment(); }
@Actor public void actor3() { counter.increment(); }
@Actor public void actor4() { counter.increment(); }
@Actor public void actor5() { counter.increment(); }

@Arbiter
public void arbiter(I_Result r) {
r.r1 = counter.get();
}
}
30 changes: 17 additions & 13 deletions tmp/results.txt
Original file line number Diff line number Diff line change
@@ -1,32 +1,36 @@
Times for 10 vertices and 50 connections:
Serial: 0
Parallel: 0
Parallel: 6
--------
Times for 100 vertices and 500 connections:
Serial: 0
Parallel: 0
Serial: 1
Parallel: 4
--------
Times for 1000 vertices and 5000 connections:
Serial: 1
Parallel: 0
Parallel: 6
--------
Times for 10000 vertices and 50000 connections:
Serial: 3
Parallel: 0
Serial: 4
Parallel: 9
--------
Times for 10000 vertices and 100000 connections:
Serial: 2
Parallel: 0
Parallel: 9
--------
Times for 50000 vertices and 1000000 connections:
Serial: 30
Parallel: 0
Serial: 20
Parallel: 31
--------
Times for 100000 vertices and 1000000 connections:
Serial: 18
Parallel: 0
Serial: 19
Parallel: 37
--------
Times for 1000000 vertices and 10000000 connections:
Serial: 307
Parallel: 0
Serial: 629
Parallel: 536
--------
Times for 2000000 vertices and 10000000 connections:
Serial: 1112
Parallel: 2922
--------