From 3579908e459b957059d40447425b91887122c2b6 Mon Sep 17 00:00:00 2001 From: Laura Schanno Date: Tue, 17 Sep 2019 11:45:25 -0400 Subject: [PATCH] Move performance tests to accumulo-testing #1200 (#90) * Move ManySplitIT as HighSplitCreationPT. * Move BalanceFasterIT as SplitBalancingPT. * Move DeleteTableDuringSplitIT as TableDeletionDuringSplitPT. * Move RollWALPerformanceIT as RollWALPT. * Use Result.result() instead of Result.parameter() when reporting resulting values so that they will be put in the appropriate section when the JSON files are written. Related: #1200 --- .../tests/HighSplitCreationPT.java | 69 +++++++ .../testing/performance/tests/RollWALPT.java | 155 ++++++++++++++++ .../performance/tests/SplitBalancingPT.java | 101 +++++++++++ .../tests/TableDeletionDuringSplitPT.java | 168 ++++++++++++++++++ 4 files changed, 493 insertions(+) create mode 100644 src/main/java/org/apache/accumulo/testing/performance/tests/HighSplitCreationPT.java create mode 100644 src/main/java/org/apache/accumulo/testing/performance/tests/RollWALPT.java create mode 100644 src/main/java/org/apache/accumulo/testing/performance/tests/SplitBalancingPT.java create mode 100644 src/main/java/org/apache/accumulo/testing/performance/tests/TableDeletionDuringSplitPT.java diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/HighSplitCreationPT.java b/src/main/java/org/apache/accumulo/testing/performance/tests/HighSplitCreationPT.java new file mode 100644 index 00000000..fe15fbac --- /dev/null +++ b/src/main/java/org/apache/accumulo/testing/performance/tests/HighSplitCreationPT.java @@ -0,0 +1,69 @@ +package org.apache.accumulo.testing.performance.tests; + +import java.nio.charset.StandardCharsets; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.testing.performance.Environment; +import org.apache.accumulo.testing.performance.PerformanceTest; +import org.apache.accumulo.testing.performance.Report; +import org.apache.accumulo.testing.performance.SystemConfiguration; +import org.apache.hadoop.io.Text; + +public class HighSplitCreationPT implements PerformanceTest { + + private static final int NUM_SPLITS = 10_000; + private static final int MIN_REQUIRED_SPLITS_PER_SECOND = 100; + private static final int ONE_SECOND = 1000; + private static final String TABLE_NAME = "highSplitCreation"; + private static final String METADATA_TABLE_SPLITS = "123456789abcde"; + + @Override + public SystemConfiguration getSystemConfig() { + return new SystemConfiguration(); + } + + @Override + public Report runTest(final Environment env) throws Exception { + Report.Builder reportBuilder = Report.builder().id("high_split_creation") + .description("Evaluate the speed of creating many splits.") + .parameter("table_name", TABLE_NAME, "The name of the test table") + .parameter("num_splits", NUM_SPLITS, "The high number of splits to add.") + .parameter("min_required_splits_per_second", MIN_REQUIRED_SPLITS_PER_SECOND, + "The minimum average number of splits that must be created per second before performance is considered too slow."); + + AccumuloClient client = env.getClient(); + client.tableOperations().create(TABLE_NAME); + client.tableOperations().addSplits(MetadataTable.NAME, getMetadataTableSplits()); + + SortedSet splits = getTestTableSplits(); + + long start = System.currentTimeMillis(); + client.tableOperations().addSplits(TABLE_NAME, splits); + long totalTime = System.currentTimeMillis() - start; + double splitsPerSecond = NUM_SPLITS / (totalTime / ONE_SECOND); + + reportBuilder.result("splits_per_second", splitsPerSecond, + "The average number of splits created per second."); + + return reportBuilder.build(); + } + + private SortedSet getMetadataTableSplits() { + SortedSet splits = new TreeSet<>(); + for (byte b : METADATA_TABLE_SPLITS.getBytes(StandardCharsets.UTF_8)) { + splits.add(new Text(new byte[] {'1', ';', b})); + } + return splits; + } + + private SortedSet getTestTableSplits() { + SortedSet splits = new TreeSet<>(); + for (int i = 0; i < NUM_SPLITS; i++) { + splits.add(new Text(Integer.toHexString(i))); + } + return splits; + } +} diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/RollWALPT.java b/src/main/java/org/apache/accumulo/testing/performance/tests/RollWALPT.java new file mode 100644 index 00000000..bae6cf54 --- /dev/null +++ b/src/main/java/org/apache/accumulo/testing/performance/tests/RollWALPT.java @@ -0,0 +1,155 @@ +package org.apache.accumulo.testing.performance.tests; + +import java.security.SecureRandom; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.testing.performance.Environment; +import org.apache.accumulo.testing.performance.PerformanceTest; +import org.apache.accumulo.testing.performance.Report; +import org.apache.accumulo.testing.performance.SystemConfiguration; +import org.apache.hadoop.io.Text; + +public class RollWALPT implements PerformanceTest { + + private static final String TABLE_SMALL_WAL = "SmallRollWAL"; + private static final String TABLE_LARGE_WAL = "LargeRollWAL"; + private static final String SIZE_SMALL_WAL = "5M"; + private static final String SIZE_LARGE_WAL = "1G"; + private static final long NUM_SPLITS = 100L; + private static final long SPLIT_DISTANCE = Long.MAX_VALUE / NUM_SPLITS; + private static final int NUM_ENTRIES = 50_000; + + private final SecureRandom random = new SecureRandom(); + + @Override + public SystemConfiguration getSystemConfig() { + Map config = new HashMap<>(); + + config.put(Property.TSERV_WAL_REPLICATION.getKey(), "1"); + config.put(Property.TSERV_WALOG_MAX_REFERENCED.getKey(), "100"); + config.put(Property.GC_CYCLE_START.getKey(), "1s"); + config.put(Property.GC_CYCLE_DELAY.getKey(), "1s"); + + return new SystemConfiguration().setAccumuloConfig(config); + } + + @Override + public Report runTest(final Environment env) throws Exception { + Report.Builder reportBuilder = Report.builder().id("rollWAL").description( + "Evaluate the performance of ingesting a large number of entries across numerous splits given both a small and large maximum WAL size.") + .parameter("small_wal_table", TABLE_SMALL_WAL, + "The name of the table used for evaluating performance with a small WAL.") + .parameter("large_wal_table", TABLE_LARGE_WAL, + "The name of the table used for evaluating performance with a large WAL.") + .parameter("num_splits", NUM_SPLITS, + "The number of splits that will be added to the tables.") + .parameter("split_distance", SPLIT_DISTANCE, "The distance between each split.") + .parameter("num_entries", NUM_ENTRIES, + "The number of entries that will be written to the tables.") + .parameter("small_wal_size", SIZE_SMALL_WAL, + "The size of the small WAL used to force many rollovers.") + .parameter("large_wal_size", SIZE_LARGE_WAL, + "The size of the large WAL used to avoid many rollovers"); + + AccumuloClient client = env.getClient(); + final long smallWALTime = evalSmallWAL(client); + reportBuilder.result("small_wal_write_time", smallWALTime, + "The time (in ns) it took to write entries to the table with a small WAL of " + + SIZE_SMALL_WAL); + + final long largeWALTime = evalLargeWAL(client); + reportBuilder.result("large_wal_write_time", largeWALTime, + "The time (in ns) it took to write entries to the table with a large WAL of " + + SIZE_LARGE_WAL); + return reportBuilder.build(); + } + + private long evalSmallWAL(final AccumuloClient client) throws AccumuloSecurityException, + AccumuloException, TableExistsException, TableNotFoundException { + setMaxWALSize(SIZE_SMALL_WAL, client); + initTable(TABLE_SMALL_WAL, client); + return getTimeToWriteEntries(TABLE_SMALL_WAL, client); + } + + private long evalLargeWAL(final AccumuloClient client) throws AccumuloSecurityException, + AccumuloException, TableExistsException, TableNotFoundException { + setMaxWALSize(SIZE_LARGE_WAL, client); + initTable(TABLE_LARGE_WAL, client); + return getTimeToWriteEntries(TABLE_LARGE_WAL, client); + } + + private void setMaxWALSize(final String size, final AccumuloClient client) + throws AccumuloSecurityException, AccumuloException { + client.instanceOperations().setProperty(Property.TSERV_WALOG_MAX_SIZE.getKey(), size); + } + + private void initTable(final String tableName, final AccumuloClient client) + throws AccumuloSecurityException, TableNotFoundException, AccumuloException, + TableExistsException { + client.tableOperations().create(tableName); + client.tableOperations().addSplits(tableName, getSplits()); + client.instanceOperations().waitForBalance(); + } + + private SortedSet getSplits() { + SortedSet splits = new TreeSet<>(); + for (long i = 0L; i < NUM_SPLITS; i++) { + splits.add(new Text(String.format("%016x", i + SPLIT_DISTANCE))); + } + return splits; + } + + private long getTimeToWriteEntries(final String tableName, final AccumuloClient client) + throws TableNotFoundException, MutationsRejectedException { + long start = System.nanoTime(); + writeEntries(tableName, client); + return System.nanoTime() - start; + } + + private void writeEntries(final String tableName, final AccumuloClient client) + throws TableNotFoundException, MutationsRejectedException { + BatchWriter bw = client.createBatchWriter(tableName); + + String instanceId = UUID.randomUUID().toString(); + final ColumnVisibility cv = new ColumnVisibility(); + for (int i = 0; i < NUM_ENTRIES; i++) { + String value = instanceId + i; + Mutation m = genMutation(cv, value); + bw.addMutation(m); + } + + bw.close(); + } + + private Mutation genMutation(final ColumnVisibility colVis, final String value) { + byte[] rowStr = toZeroPaddedString(getRandomLong(), 16); + byte[] colFamStr = toZeroPaddedString(random.nextInt(Short.MAX_VALUE), 4); + byte[] colQualStr = toZeroPaddedString(random.nextInt(Short.MAX_VALUE), 4); + Mutation mutation = new Mutation(new Text(rowStr)); + mutation.put(new Text(colFamStr), new Text(colQualStr), colVis, new Value(value)); + return mutation; + } + + private long getRandomLong() { + return ((random.nextLong() & 0x7fffffffffffffffL) % (Long.MAX_VALUE)); + } + + private byte[] toZeroPaddedString(long num, int width) { + return new byte[Math.max(Long.toString(num, 16).length(), width)]; + } +} diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/SplitBalancingPT.java b/src/main/java/org/apache/accumulo/testing/performance/tests/SplitBalancingPT.java new file mode 100644 index 00000000..22a3dbd4 --- /dev/null +++ b/src/main/java/org/apache/accumulo/testing/performance/tests/SplitBalancingPT.java @@ -0,0 +1,101 @@ +package org.apache.accumulo.testing.performance.tests; + +import java.util.HashMap; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.testing.performance.Environment; +import org.apache.accumulo.testing.performance.PerformanceTest; +import org.apache.accumulo.testing.performance.Report; +import org.apache.accumulo.testing.performance.SystemConfiguration; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SplitBalancingPT implements PerformanceTest { + + private static final Logger LOG = LoggerFactory.getLogger(SplitBalancingPT.class); + + private static final String TABLE_NAME = "splitBalancing"; + private static final String RESERVED_PREFIX = "~"; + private static final int NUM_SPLITS = 1_000; + private static final int MARGIN = 3; + private static final Text TSERVER_ASSIGNED_TABLETS_COL_FAM = new Text("loc"); + + @Override + public SystemConfiguration getSystemConfig() { + return new SystemConfiguration(); + } + + @Override + public Report runTest(final Environment env) throws Exception { + AccumuloClient client = env.getClient(); + client.tableOperations().create(TABLE_NAME); + client.tableOperations().addSplits(TABLE_NAME, getSplits()); + client.instanceOperations().waitForBalance(); + + int totalTabletServers = client.instanceOperations().getTabletServers().size(); + int expectedAllocation = NUM_SPLITS / totalTabletServers; + int min = expectedAllocation - MARGIN; + int max = expectedAllocation + MARGIN; + + Report.Builder reportBuilder = Report.builder().id("split_balancing").description( + "Evaluate and verify that when a high number of splits are created, that the tablets are balanced equally among tablet servers.") + .parameter("num_splits", NUM_SPLITS, "The number of splits") + .parameter("num_tservers", totalTabletServers, "The number of tablet servers") + .parameter("tserver_min", min, + "The minimum number of tablets that should be assigned to a tablet server.") + .parameter("tserver_max", max, + "The maximum number of tablets that should be assigned to a tablet server."); + + boolean allServersBalanced = true; + Map tablets = getTablets(client); + for (String tabletServer : tablets.keySet()) { + int count = tablets.get(tabletServer); + boolean balanced = count >= min && count <= max; + allServersBalanced = allServersBalanced & balanced; + + reportBuilder.result("size_tserver_" + tabletServer, count, + "Total tablets assigned to tablet server " + tabletServer); + } + + return reportBuilder.build(); + } + + private SortedSet getSplits() { + SortedSet splits = new TreeSet<>(); + for (int i = 0; i < NUM_SPLITS; i++) { + splits.add(new Text(String.valueOf(i))); + } + return splits; + } + + private Map getTablets(final AccumuloClient client) { + Map tablets = new HashMap<>(); + try (Scanner scanner = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { + scanner.fetchColumnFamily(TSERVER_ASSIGNED_TABLETS_COL_FAM); + Range range = new Range(null, false, RESERVED_PREFIX, false); + scanner.setRange(range); + + for (Map.Entry entry : scanner) { + String host = entry.getValue().toString(); + if (tablets.containsKey(host)) { + tablets.put(host, tablets.get(host) + 1); + } else { + tablets.put(host, 1); + } + } + } catch (Exception e) { + LOG.error("Error occurred during scan:", e); + } + return tablets; + } +} diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/TableDeletionDuringSplitPT.java b/src/main/java/org/apache/accumulo/testing/performance/tests/TableDeletionDuringSplitPT.java new file mode 100644 index 00000000..cfe943e0 --- /dev/null +++ b/src/main/java/org/apache/accumulo/testing/performance/tests/TableDeletionDuringSplitPT.java @@ -0,0 +1,168 @@ +package org.apache.accumulo.testing.performance.tests; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.testing.performance.Environment; +import org.apache.accumulo.testing.performance.PerformanceTest; +import org.apache.accumulo.testing.performance.Report; +import org.apache.accumulo.testing.performance.SystemConfiguration; +import org.apache.hadoop.io.Text; + +public class TableDeletionDuringSplitPT implements PerformanceTest { + + private static final int NUM_BATCHES = 12; + private static final int BATCH_SIZE = 8; + private static final int MAX_THREADS = BATCH_SIZE + 2; + private static final int NUM_TABLES = NUM_BATCHES * BATCH_SIZE; + private static final int NUM_SPLITS = 100; + private static final int HALF_SECOND = 500; + private static final String BASE_TABLE_NAME = "tableDeletionDuringSplit"; + private static final String THREAD_NAME = "concurrent-api-requests"; + + @Override + public SystemConfiguration getSystemConfig() { + return new SystemConfiguration(); + } + + @Override + public Report runTest(final Environment env) throws Exception { + Report.Builder reportBuilder = Report.builder().id("TableDeletionDuringSplit") + .description("Evaluates the performance of deleting tables during split operations.") + .parameter("num_tables", NUM_TABLES, "The number of tables that will be created/deleted.") + .parameter("base_table_name", BASE_TABLE_NAME, "The base table name.") + .parameter("num_splits", NUM_SPLITS, + "The number of splits that will be added to each table.") + .parameter("base_thread_name", THREAD_NAME, "The thread name used for the thread pool"); + + AccumuloClient client = env.getClient(); + String[] tableNames = getTableNames(); + createTables(tableNames, client); + splitAndDeleteTables(tableNames, client, reportBuilder); + + return reportBuilder.build(); + } + + private String[] getTableNames() { + String[] names = new String[NUM_TABLES]; + for (int i = 0; i < NUM_TABLES; i++) { + names[i] = BASE_TABLE_NAME + i; + } + return names; + } + + private void createTables(final String[] tableNames, final AccumuloClient client) + throws TableExistsException, AccumuloSecurityException, AccumuloException { + for (String tableName : tableNames) { + client.tableOperations().create(tableName); + } + } + + private void splitAndDeleteTables(final String[] tableNames, final AccumuloClient client, + final Report.Builder reportBuilder) throws ExecutionException, InterruptedException { + final LongAdder deletionTimes = new LongAdder(); + final AtomicInteger deletedTables = new AtomicInteger(0); + Iterator iter = getTasks(tableNames, client, deletionTimes, deletedTables).iterator(); + ExecutorService pool = Executors.newFixedThreadPool(MAX_THREADS); + + List> results = new ArrayList<>(); + for (int batch = 0; batch < NUM_BATCHES; batch++) { + for (int i = 0; i < BATCH_SIZE; i++) { + results.add(pool.submit(iter.next())); + results.add(pool.submit(iter.next())); + } + + for (Future future : results) { + future.get(); + } + results.clear(); + } + + List queued = pool.shutdownNow(); + + reportBuilder.result("remaining_pending_tasks", countRemaining(iter), + "The number of remaining pending tasks."); + reportBuilder.result("remaining_submitted_tasks", queued.size(), + "The number of remaining submitted tasks."); + + long totalRemainingTables = Arrays.stream(tableNames) + .filter((name) -> client.tableOperations().exists(name)).count(); + reportBuilder.result("total_remaining_tables", totalRemainingTables, + "The total number of unsuccessfully deleted tables."); + Long deletionTime = deletionTimes.sum() / deletedTables.get(); + reportBuilder.result("avg_deletion_time", deletionTime, + "The average deletion time (in ms) to delete a table."); + } + + private List getTasks(final String[] tableNames, final AccumuloClient client, + final LongAdder deletionTime, final AtomicInteger deletedTables) { + List tasks = new ArrayList<>(); + final SortedSet splits = getSplits(); + for (String tableName : tableNames) { + tasks.add(getSplitTask(tableName, client, splits)); + tasks.add(getDeletionTask(tableName, client, deletionTime, deletedTables)); + } + return tasks; + } + + private SortedSet getSplits() { + SortedSet splits = new TreeSet<>(); + for (byte i = 0; i < NUM_SPLITS; i++) { + splits.add(new Text(new byte[] {0, 0, i})); + } + return splits; + } + + private Runnable getSplitTask(final String tableName, final AccumuloClient client, + final SortedSet splits) { + return () -> { + try { + client.tableOperations().addSplits(tableName, splits); + } catch (TableNotFoundException ex) { + // Expected, ignore. + } catch (Exception e) { + throw new RuntimeException(tableName, e); + } + }; + } + + private Runnable getDeletionTask(final String tableName, final AccumuloClient client, + final LongAdder timeAdder, final AtomicInteger deletedTables) { + return () -> { + try { + Thread.sleep(HALF_SECOND); + long start = System.currentTimeMillis(); + client.tableOperations().delete(tableName); + long time = System.currentTimeMillis() - start; + timeAdder.add(time); + deletedTables.getAndIncrement(); + } catch (Exception e) { + throw new RuntimeException(tableName, e); + } + }; + } + + private int countRemaining(final Iterator i) { + int count = 0; + while (i.hasNext()) { + i.next(); + count++; + } + return count; + } +}