From 9c8fae2981c5e3ca6ac41bddbbcfb8b6c20b0f9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Sat, 18 May 2024 22:47:50 +0200 Subject: [PATCH] GH-2998 add benchmarks --- .../OverflowBenchmarkConcurrent.java | 240 ++++++++++++++++++ .../OverflowBenchmarkConcurrent.java | 240 ++++++++++++++++++ 2 files changed, 480 insertions(+) create mode 100644 core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkConcurrent.java create mode 100644 core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/benchmark/OverflowBenchmarkConcurrent.java diff --git a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkConcurrent.java b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkConcurrent.java new file mode 100644 index 00000000000..9eba59ca615 --- /dev/null +++ b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/OverflowBenchmarkConcurrent.java @@ -0,0 +1,240 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ + +package org.eclipse.rdf4j.sail.lmdb.benchmark; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.commons.io.FileUtils; +import org.assertj.core.util.Files; +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Model; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.model.util.Values; +import org.eclipse.rdf4j.repository.RepositoryResult; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.rio.RDFFormat; +import org.eclipse.rdf4j.rio.Rio; +import org.eclipse.rdf4j.sail.NotifyingSail; +import org.eclipse.rdf4j.sail.NotifyingSailConnection; +import org.eclipse.rdf4j.sail.SailConnectionListener; +import org.eclipse.rdf4j.sail.SailException; +import org.eclipse.rdf4j.sail.helpers.NotifyingSailConnectionWrapper; +import org.eclipse.rdf4j.sail.helpers.NotifyingSailWrapper; +import org.eclipse.rdf4j.sail.lmdb.LmdbStore; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.Logger; + +/** + * @author Håvard Ottestad + */ +@State(Scope.Benchmark) +@Warmup(iterations = 0) +@BenchmarkMode({ Mode.AverageTime }) +@Fork(value = 1, jvmArgs = { "-Xms1G", "-Xmx1G", "-XX:+UseParallelGC" }) +@Measurement(iterations = 10, batchSize = 1, time = 1, timeUnit = TimeUnit.MILLISECONDS) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class OverflowBenchmarkConcurrent { + + @Setup(Level.Trial) + public void setup() { + ((Logger) (LoggerFactory + .getLogger("org.eclipse.rdf4j.sail.lmdb.MemoryOverflowModel"))) + .setLevel(ch.qos.logback.classic.Level.DEBUG); + } + + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder() + .include("OverflowBenchmarkConcurrent") // adapt to run other benchmark tests + .build(); + + new Runner(opt).run(); + } + + @Benchmark + public void manyConcurrentTransactions() throws IOException { + File temporaryFolder = Files.newTemporaryFolder(); + SailRepository sailRepository = new SailRepository(new NotifySailWrapper(new NotifySailWrapper( + new NotifySailWrapper( + new NotifySailWrapper(new NotifySailWrapper(new LmdbStore(temporaryFolder))))))); + ExecutorService executorService = Executors.newFixedThreadPool(10); + + try { + + Model parse; + try (InputStream resourceAsStream = getClass().getClassLoader() + .getResourceAsStream("benchmarkFiles/datagovbe-valid.ttl")) { + parse = Rio.parse(resourceAsStream, RDFFormat.TURTLE); + } + + List> futureList = new ArrayList<>(); + + CountDownLatch countDownLatch = new CountDownLatch(1); + + for (int i = 0; i < 38; i++) { + var seed = i + 485924; + { + Future submit = executorService.submit(() -> { + try { + countDownLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + try (SailRepositoryConnection connection = sailRepository.getConnection()) { + + int addSize = new Random(seed).nextInt(parse.size()); + IRI context = Values.iri("http://example.org/" + new Random(seed + 1).nextInt(10)); + List collect = parse.stream() + .skip(addSize) + .limit(10_000) + .map(s -> SimpleValueFactory.getInstance() + .createStatement(s.getSubject(), s.getPredicate(), s.getObject(), context)) + .collect(Collectors.toList()); + StringWriter stringWriter = new StringWriter(); + Rio.write(collect, stringWriter, RDFFormat.TRIG); + String string = stringWriter.toString(); + + connection.prepareUpdate("INSERT DATA { GRAPH " + string + " }").execute(); + + System.out.println("Added"); + } + }); + futureList.add(submit); + } + { + Future submit = executorService.submit(() -> { + try { + countDownLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + try (SailRepositoryConnection connection = sailRepository.getConnection()) { + System.out.println("Waiting"); + long l = System.currentTimeMillis(); + while (!connection.isEmpty()) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + return; + } + if (System.currentTimeMillis() - l > 1000) { + break; + } + } + System.out.println("Removing"); + connection.begin(); + try (RepositoryResult statements = connection.getStatements(null, null, null)) { + statements.stream().limit(10_000).forEach(connection::remove); + } + connection.commit(); + + System.out.println("Removed"); + } + }); + futureList.add(submit); + } + } + + countDownLatch.countDown(); + + for (int i = 0; i < futureList.size(); i++) { + Future future = futureList.get(i); + try { + future.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + System.out.println("Done: " + i); + } + + } finally { + try { + executorService.shutdownNow(); + } finally { + try { + sailRepository.shutDown(); + } finally { + FileUtils.deleteDirectory(temporaryFolder); + } + } + } + + } + + static class NotifySailWrapper extends NotifyingSailWrapper { + + public NotifySailWrapper(NotifyingSail baseSail) { + super(baseSail); + } + + @Override + public NotifyingSailConnection getConnection() throws SailException { + return new Connection(super.getConnection()); + } + } + + static class Connection extends NotifyingSailConnectionWrapper implements SailConnectionListener { + + Set addedStatements = new HashSet<>(); + Set removedStatements = new HashSet<>(); + + public Connection(NotifyingSailConnection wrappedCon) { + super(wrappedCon); + addConnectionListener(this); + } + + @Override + public void statementAdded(Statement st) { + removedStatements.remove(st); + addedStatements.add(st); + } + + @Override + public void statementRemoved(Statement st) { + addedStatements.remove(st); + removedStatements.add(st); + } + + } + +} diff --git a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/benchmark/OverflowBenchmarkConcurrent.java b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/benchmark/OverflowBenchmarkConcurrent.java new file mode 100644 index 00000000000..826f1386749 --- /dev/null +++ b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/benchmark/OverflowBenchmarkConcurrent.java @@ -0,0 +1,240 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ + +package org.eclipse.rdf4j.sail.nativerdf.benchmark; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.commons.io.FileUtils; +import org.assertj.core.util.Files; +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Model; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.model.util.Values; +import org.eclipse.rdf4j.repository.RepositoryResult; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.rio.RDFFormat; +import org.eclipse.rdf4j.rio.Rio; +import org.eclipse.rdf4j.sail.NotifyingSail; +import org.eclipse.rdf4j.sail.NotifyingSailConnection; +import org.eclipse.rdf4j.sail.SailConnectionListener; +import org.eclipse.rdf4j.sail.SailException; +import org.eclipse.rdf4j.sail.helpers.NotifyingSailConnectionWrapper; +import org.eclipse.rdf4j.sail.helpers.NotifyingSailWrapper; +import org.eclipse.rdf4j.sail.nativerdf.NativeStore; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.Logger; + +/** + * @author Håvard Ottestad + */ +@State(Scope.Benchmark) +@Warmup(iterations = 0) +@BenchmarkMode({ Mode.AverageTime }) +@Fork(value = 1, jvmArgs = { "-Xms1G", "-Xmx1G", "-XX:+UseParallelGC" }) +@Measurement(iterations = 10, batchSize = 1, time = 1, timeUnit = TimeUnit.MILLISECONDS) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class OverflowBenchmarkConcurrent { + + @Setup(Level.Trial) + public void setup() { + ((Logger) (LoggerFactory + .getLogger("org.eclipse.rdf4j.sail.nativerdf.MemoryOverflowModel"))) + .setLevel(ch.qos.logback.classic.Level.DEBUG); + } + + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder() + .include("OverflowBenchmarkConcurrent") // adapt to run other benchmark tests + .build(); + + new Runner(opt).run(); + } + + @Benchmark + public void manyConcurrentTransactions() throws IOException { + File temporaryFolder = Files.newTemporaryFolder(); + SailRepository sailRepository = new SailRepository(new NotifySailWrapper(new NotifySailWrapper( + new NotifySailWrapper( + new NotifySailWrapper(new NotifySailWrapper(new NativeStore(temporaryFolder))))))); + ExecutorService executorService = Executors.newFixedThreadPool(10); + + try { + + Model parse; + try (InputStream resourceAsStream = getClass().getClassLoader() + .getResourceAsStream("benchmarkFiles/datagovbe-valid.ttl")) { + parse = Rio.parse(resourceAsStream, RDFFormat.TURTLE); + } + + List> futureList = new ArrayList<>(); + + CountDownLatch countDownLatch = new CountDownLatch(1); + + for (int i = 0; i < 38; i++) { + var seed = i + 485924; + { + Future submit = executorService.submit(() -> { + try { + countDownLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + try (SailRepositoryConnection connection = sailRepository.getConnection()) { + + int addSize = new Random(seed).nextInt(parse.size()); + IRI context = Values.iri("http://example.org/" + new Random(seed + 1).nextInt(10)); + List collect = parse.stream() + .skip(addSize) + .limit(10_000) + .map(s -> SimpleValueFactory.getInstance() + .createStatement(s.getSubject(), s.getPredicate(), s.getObject(), context)) + .collect(Collectors.toList()); + StringWriter stringWriter = new StringWriter(); + Rio.write(collect, stringWriter, RDFFormat.TRIG); + String string = stringWriter.toString(); + + connection.prepareUpdate("INSERT DATA { GRAPH " + string + " }").execute(); + + System.out.println("Added"); + } + }); + futureList.add(submit); + } + { + Future submit = executorService.submit(() -> { + try { + countDownLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + try (SailRepositoryConnection connection = sailRepository.getConnection()) { + System.out.println("Waiting"); + long l = System.currentTimeMillis(); + while (!connection.isEmpty()) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + return; + } + if (System.currentTimeMillis() - l > 1000) { + break; + } + } + System.out.println("Removing"); + connection.begin(); + try (RepositoryResult statements = connection.getStatements(null, null, null)) { + statements.stream().limit(10_000).forEach(connection::remove); + } + connection.commit(); + + System.out.println("Removed"); + } + }); + futureList.add(submit); + } + } + + countDownLatch.countDown(); + + for (int i = 0; i < futureList.size(); i++) { + Future future = futureList.get(i); + try { + future.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + System.out.println("Done: " + i); + } + + } finally { + try { + executorService.shutdownNow(); + } finally { + try { + sailRepository.shutDown(); + } finally { + FileUtils.deleteDirectory(temporaryFolder); + } + } + } + + } + + static class NotifySailWrapper extends NotifyingSailWrapper { + + public NotifySailWrapper(NotifyingSail baseSail) { + super(baseSail); + } + + @Override + public NotifyingSailConnection getConnection() throws SailException { + return new Connection(super.getConnection()); + } + } + + static class Connection extends NotifyingSailConnectionWrapper implements SailConnectionListener { + + Set addedStatements = new HashSet<>(); + Set removedStatements = new HashSet<>(); + + public Connection(NotifyingSailConnection wrappedCon) { + super(wrappedCon); + addConnectionListener(this); + } + + @Override + public void statementAdded(Statement st) { + removedStatements.remove(st); + addedStatements.add(st); + } + + @Override + public void statementRemoved(Statement st) { + addedStatements.remove(st); + removedStatements.add(st); + } + + } + +}