Skip to content
Permalink
Browse files
Move performance tests to accumulo-testing #1200 (#90)
* Move ManySplitIT as HighSplitCreationPT.
* Move BalanceFasterIT as SplitBalancingPT.
* Move DeleteTableDuringSplitIT as TableDeletionDuringSplitPT.
* Move RollWALPerformanceIT as RollWALPT.
* Use Result.result() instead of Result.parameter() when reporting
resulting values so that they will be put in the appropriate section
when the JSON files are written.

Related: #1200
  • Loading branch information
lbschanno authored and milleruntime committed Sep 17, 2019
1 parent 32aba0e commit 3579908e459b957059d40447425b91887122c2b6
Showing 4 changed files with 493 additions and 0 deletions.
@@ -0,0 +1,69 @@
package org.apache.accumulo.testing.performance.tests;

import java.nio.charset.StandardCharsets;
import java.util.SortedSet;
import java.util.TreeSet;

import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.testing.performance.Environment;
import org.apache.accumulo.testing.performance.PerformanceTest;
import org.apache.accumulo.testing.performance.Report;
import org.apache.accumulo.testing.performance.SystemConfiguration;
import org.apache.hadoop.io.Text;

public class HighSplitCreationPT implements PerformanceTest {

private static final int NUM_SPLITS = 10_000;
private static final int MIN_REQUIRED_SPLITS_PER_SECOND = 100;
private static final int ONE_SECOND = 1000;
private static final String TABLE_NAME = "highSplitCreation";
private static final String METADATA_TABLE_SPLITS = "123456789abcde";

@Override
public SystemConfiguration getSystemConfig() {
return new SystemConfiguration();
}

@Override
public Report runTest(final Environment env) throws Exception {
Report.Builder reportBuilder = Report.builder().id("high_split_creation")
.description("Evaluate the speed of creating many splits.")
.parameter("table_name", TABLE_NAME, "The name of the test table")
.parameter("num_splits", NUM_SPLITS, "The high number of splits to add.")
.parameter("min_required_splits_per_second", MIN_REQUIRED_SPLITS_PER_SECOND,
"The minimum average number of splits that must be created per second before performance is considered too slow.");

AccumuloClient client = env.getClient();
client.tableOperations().create(TABLE_NAME);
client.tableOperations().addSplits(MetadataTable.NAME, getMetadataTableSplits());

SortedSet<Text> splits = getTestTableSplits();

long start = System.currentTimeMillis();
client.tableOperations().addSplits(TABLE_NAME, splits);
long totalTime = System.currentTimeMillis() - start;
double splitsPerSecond = NUM_SPLITS / (totalTime / ONE_SECOND);

reportBuilder.result("splits_per_second", splitsPerSecond,
"The average number of splits created per second.");

return reportBuilder.build();
}

private SortedSet<Text> getMetadataTableSplits() {
SortedSet<Text> splits = new TreeSet<>();
for (byte b : METADATA_TABLE_SPLITS.getBytes(StandardCharsets.UTF_8)) {
splits.add(new Text(new byte[] {'1', ';', b}));
}
return splits;
}

private SortedSet<Text> getTestTableSplits() {
SortedSet<Text> splits = new TreeSet<>();
for (int i = 0; i < NUM_SPLITS; i++) {
splits.add(new Text(Integer.toHexString(i)));
}
return splits;
}
}
@@ -0,0 +1,155 @@
package org.apache.accumulo.testing.performance.tests;

import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;

import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.testing.performance.Environment;
import org.apache.accumulo.testing.performance.PerformanceTest;
import org.apache.accumulo.testing.performance.Report;
import org.apache.accumulo.testing.performance.SystemConfiguration;
import org.apache.hadoop.io.Text;

public class RollWALPT implements PerformanceTest {

private static final String TABLE_SMALL_WAL = "SmallRollWAL";
private static final String TABLE_LARGE_WAL = "LargeRollWAL";
private static final String SIZE_SMALL_WAL = "5M";
private static final String SIZE_LARGE_WAL = "1G";
private static final long NUM_SPLITS = 100L;
private static final long SPLIT_DISTANCE = Long.MAX_VALUE / NUM_SPLITS;
private static final int NUM_ENTRIES = 50_000;

private final SecureRandom random = new SecureRandom();

@Override
public SystemConfiguration getSystemConfig() {
Map<String,String> config = new HashMap<>();

config.put(Property.TSERV_WAL_REPLICATION.getKey(), "1");
config.put(Property.TSERV_WALOG_MAX_REFERENCED.getKey(), "100");
config.put(Property.GC_CYCLE_START.getKey(), "1s");
config.put(Property.GC_CYCLE_DELAY.getKey(), "1s");

return new SystemConfiguration().setAccumuloConfig(config);
}

@Override
public Report runTest(final Environment env) throws Exception {
Report.Builder reportBuilder = Report.builder().id("rollWAL").description(
"Evaluate the performance of ingesting a large number of entries across numerous splits given both a small and large maximum WAL size.")
.parameter("small_wal_table", TABLE_SMALL_WAL,
"The name of the table used for evaluating performance with a small WAL.")
.parameter("large_wal_table", TABLE_LARGE_WAL,
"The name of the table used for evaluating performance with a large WAL.")
.parameter("num_splits", NUM_SPLITS,
"The number of splits that will be added to the tables.")
.parameter("split_distance", SPLIT_DISTANCE, "The distance between each split.")
.parameter("num_entries", NUM_ENTRIES,
"The number of entries that will be written to the tables.")
.parameter("small_wal_size", SIZE_SMALL_WAL,
"The size of the small WAL used to force many rollovers.")
.parameter("large_wal_size", SIZE_LARGE_WAL,
"The size of the large WAL used to avoid many rollovers");

AccumuloClient client = env.getClient();
final long smallWALTime = evalSmallWAL(client);
reportBuilder.result("small_wal_write_time", smallWALTime,
"The time (in ns) it took to write entries to the table with a small WAL of "
+ SIZE_SMALL_WAL);

final long largeWALTime = evalLargeWAL(client);
reportBuilder.result("large_wal_write_time", largeWALTime,
"The time (in ns) it took to write entries to the table with a large WAL of "
+ SIZE_LARGE_WAL);
return reportBuilder.build();
}

private long evalSmallWAL(final AccumuloClient client) throws AccumuloSecurityException,
AccumuloException, TableExistsException, TableNotFoundException {
setMaxWALSize(SIZE_SMALL_WAL, client);
initTable(TABLE_SMALL_WAL, client);
return getTimeToWriteEntries(TABLE_SMALL_WAL, client);
}

private long evalLargeWAL(final AccumuloClient client) throws AccumuloSecurityException,
AccumuloException, TableExistsException, TableNotFoundException {
setMaxWALSize(SIZE_LARGE_WAL, client);
initTable(TABLE_LARGE_WAL, client);
return getTimeToWriteEntries(TABLE_LARGE_WAL, client);
}

private void setMaxWALSize(final String size, final AccumuloClient client)
throws AccumuloSecurityException, AccumuloException {
client.instanceOperations().setProperty(Property.TSERV_WALOG_MAX_SIZE.getKey(), size);
}

private void initTable(final String tableName, final AccumuloClient client)
throws AccumuloSecurityException, TableNotFoundException, AccumuloException,
TableExistsException {
client.tableOperations().create(tableName);
client.tableOperations().addSplits(tableName, getSplits());
client.instanceOperations().waitForBalance();
}

private SortedSet<Text> getSplits() {
SortedSet<Text> splits = new TreeSet<>();
for (long i = 0L; i < NUM_SPLITS; i++) {
splits.add(new Text(String.format("%016x", i + SPLIT_DISTANCE)));
}
return splits;
}

private long getTimeToWriteEntries(final String tableName, final AccumuloClient client)
throws TableNotFoundException, MutationsRejectedException {
long start = System.nanoTime();
writeEntries(tableName, client);
return System.nanoTime() - start;
}

private void writeEntries(final String tableName, final AccumuloClient client)
throws TableNotFoundException, MutationsRejectedException {
BatchWriter bw = client.createBatchWriter(tableName);

String instanceId = UUID.randomUUID().toString();
final ColumnVisibility cv = new ColumnVisibility();
for (int i = 0; i < NUM_ENTRIES; i++) {
String value = instanceId + i;
Mutation m = genMutation(cv, value);
bw.addMutation(m);
}

bw.close();
}

private Mutation genMutation(final ColumnVisibility colVis, final String value) {
byte[] rowStr = toZeroPaddedString(getRandomLong(), 16);
byte[] colFamStr = toZeroPaddedString(random.nextInt(Short.MAX_VALUE), 4);
byte[] colQualStr = toZeroPaddedString(random.nextInt(Short.MAX_VALUE), 4);
Mutation mutation = new Mutation(new Text(rowStr));
mutation.put(new Text(colFamStr), new Text(colQualStr), colVis, new Value(value));
return mutation;
}

private long getRandomLong() {
return ((random.nextLong() & 0x7fffffffffffffffL) % (Long.MAX_VALUE));
}

private byte[] toZeroPaddedString(long num, int width) {
return new byte[Math.max(Long.toString(num, 16).length(), width)];
}
}
@@ -0,0 +1,101 @@
package org.apache.accumulo.testing.performance.tests;

import java.util.HashMap;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;

import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.testing.performance.Environment;
import org.apache.accumulo.testing.performance.PerformanceTest;
import org.apache.accumulo.testing.performance.Report;
import org.apache.accumulo.testing.performance.SystemConfiguration;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SplitBalancingPT implements PerformanceTest {

private static final Logger LOG = LoggerFactory.getLogger(SplitBalancingPT.class);

private static final String TABLE_NAME = "splitBalancing";
private static final String RESERVED_PREFIX = "~";
private static final int NUM_SPLITS = 1_000;
private static final int MARGIN = 3;
private static final Text TSERVER_ASSIGNED_TABLETS_COL_FAM = new Text("loc");

@Override
public SystemConfiguration getSystemConfig() {
return new SystemConfiguration();
}

@Override
public Report runTest(final Environment env) throws Exception {
AccumuloClient client = env.getClient();
client.tableOperations().create(TABLE_NAME);
client.tableOperations().addSplits(TABLE_NAME, getSplits());
client.instanceOperations().waitForBalance();

int totalTabletServers = client.instanceOperations().getTabletServers().size();
int expectedAllocation = NUM_SPLITS / totalTabletServers;
int min = expectedAllocation - MARGIN;
int max = expectedAllocation + MARGIN;

Report.Builder reportBuilder = Report.builder().id("split_balancing").description(
"Evaluate and verify that when a high number of splits are created, that the tablets are balanced equally among tablet servers.")
.parameter("num_splits", NUM_SPLITS, "The number of splits")
.parameter("num_tservers", totalTabletServers, "The number of tablet servers")
.parameter("tserver_min", min,
"The minimum number of tablets that should be assigned to a tablet server.")
.parameter("tserver_max", max,
"The maximum number of tablets that should be assigned to a tablet server.");

boolean allServersBalanced = true;
Map<String,Integer> tablets = getTablets(client);
for (String tabletServer : tablets.keySet()) {
int count = tablets.get(tabletServer);
boolean balanced = count >= min && count <= max;
allServersBalanced = allServersBalanced & balanced;

reportBuilder.result("size_tserver_" + tabletServer, count,
"Total tablets assigned to tablet server " + tabletServer);
}

return reportBuilder.build();
}

private SortedSet<Text> getSplits() {
SortedSet<Text> splits = new TreeSet<>();
for (int i = 0; i < NUM_SPLITS; i++) {
splits.add(new Text(String.valueOf(i)));
}
return splits;
}

private Map<String,Integer> getTablets(final AccumuloClient client) {
Map<String,Integer> tablets = new HashMap<>();
try (Scanner scanner = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
scanner.fetchColumnFamily(TSERVER_ASSIGNED_TABLETS_COL_FAM);
Range range = new Range(null, false, RESERVED_PREFIX, false);
scanner.setRange(range);

for (Map.Entry<Key,Value> entry : scanner) {
String host = entry.getValue().toString();
if (tablets.containsKey(host)) {
tablets.put(host, tablets.get(host) + 1);
} else {
tablets.put(host, 1);
}
}
} catch (Exception e) {
LOG.error("Error occurred during scan:", e);
}
return tablets;
}
}

0 comments on commit 3579908

Please sign in to comment.