Skip to content
Permalink
Browse files
Some minor code cleanup throughout accumulo-testing (#176)
  • Loading branch information
Manno15 committed Dec 7, 2021
1 parent 638db89 commit 0a50e1f4d93c135c9cbae90b20628bd48c52e583
Showing 37 changed files with 106 additions and 141 deletions.
@@ -16,7 +16,6 @@
# limitations under the License.

bin_dir=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
at_home=$( cd "$( dirname "$bin_dir" )" && pwd )
source "${bin_dir}/build"

function print_usage() {
@@ -55,7 +54,7 @@ export CLASSPATH="$TEST_JAR_PATH:$HADOOP_API_JAR:$HADOOP_RUNTIME_JAR:$CLASSPATH"

if [ -n "$HADOOP_HOME" ]; then
export HADOOP_USE_CLIENT_CLASSLOADER=true
"$HADOOP_HOME"/bin/yarn jar "$TEST_JAR_PATH" "$mr_main" "$TEST_PROPS" "$ACCUMULO_CLIENT_PROPS" "${@:2}"
"$HADOOP_HOME"/bin/yarn jar "$TEST_JAR_PATH" "$mr_main" "$TEST_PROPS" "$ACCUMULO_CLIENT_PROPS" "${@:2}"
else
echo "Hadoop must be installed and HADOOP_HOME must be set!"
exit 1
@@ -33,7 +33,7 @@
public class TestEnv implements AutoCloseable {

protected final Properties testProps;
private String clientPropsPath;
private final String clientPropsPath;
private final Properties clientProps;
private AccumuloClient client = null;
private Configuration hadoopConfig = null;
@@ -319,8 +319,8 @@ public static void main(String[] args) throws Exception {
if (ta == null)
logger.debug("{} {} {} {} {}", undefinedNode.undef, undefinedNode.ref, uuid, t1, t2);
else
logger.debug("{} {} {} {} {}", undefinedNode.undef, undefinedNode.ref, ta.tablet,
ta.server, uuid, t1, t2);
logger.debug("{} {} {} {} {} {} {}", undefinedNode.undef, undefinedNode.ref,
ta.tablet, ta.server, uuid, t1, t2);

}
} else {
@@ -23,7 +23,7 @@
public class Collector {

Persistence persistence;
private int batchSize;
private final int batchSize;

public Collector(GcsEnv gcsEnv) {
this.persistence = new Persistence(gcsEnv);
@@ -40,7 +40,7 @@ public class Generator {

Random rand = new Random();

private Persistence persistence;
private final Persistence persistence;

public Generator(GcsEnv gcsEnv) {
this.persistence = new Persistence(gcsEnv);
@@ -18,5 +18,5 @@
package org.apache.accumulo.testing.gcs;

public interface Mutator {
public void run(Persistence p);
void run(Persistence p);
}
@@ -48,9 +48,9 @@

public class Persistence {

private BatchWriter writer;
private String table;
private AccumuloClient client;
private final BatchWriter writer;
private final String table;
private final AccumuloClient client;

Persistence(GcsEnv env) {
this.client = env.getAccumuloClient();
@@ -28,7 +28,7 @@
public class Verifier {

Persistence persistence;
private int batchSize;
private final int batchSize;

public Verifier(GcsEnv gcsEnv) {
this.persistence = new Persistence(gcsEnv);
@@ -20,7 +20,6 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Random;
@@ -111,7 +110,6 @@ public static int consume(Iterable<Entry<Key,Value>> scanner, long numberOfRows)
int count = 0;

while (rowIter.hasNext()) {
Iterator<Entry<Key,Value>> itr = rowIter.next();
count++;
if (count >= numberOfRows) {
break;
@@ -125,20 +123,19 @@ public static TabletId pickTablet(TableOperations tops, String table, Random r)

if (cacheTablets) {
Locations locations = tops.locate(table, Collections.singleton(new Range()));
tablets = new ArrayList<TabletId>(locations.groupByTablet().keySet());
tablets = new ArrayList<>(locations.groupByTablet().keySet());
cacheTablets = false;
}
int index = r.nextInt(tablets.size());
TabletId tabletId = tablets.get(index);
return tabletId;
return tablets.get(index);
}

/*
* These interfaces + implementations are used to determine how many times the scanner should look
* up a random tablet and scan it.
*/
static interface LoopControl {
public boolean keepScanning();
interface LoopControl {
boolean keepScanning();
}

// Does a finite number of iterations
@@ -153,8 +153,8 @@ public static byte[][] generateValues(int dataSize) {
return bytevals;
}

private static byte[] ROW_PREFIX = "row_".getBytes(UTF_8);
private static byte[] COL_PREFIX = "col_".getBytes(UTF_8);
private static final byte[] ROW_PREFIX = "row_".getBytes(UTF_8);
private static final byte[] COL_PREFIX = "col_".getBytes(UTF_8);

public static Text generateRow(int rowid, int startRow) {
return new Text(FastFormat.toZeroPaddedString(rowid + startRow, 10, 10, ROW_PREFIX));
@@ -196,7 +196,7 @@ public static void main(String[] args) {

public static void ingest(AccumuloClient client, FileSystem fs, Opts opts, Configuration conf)
throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException,
MutationsRejectedException, TableExistsException {
TableExistsException {
long stopTime;

byte[][] bytevals = generateValues(opts.dataSize);
@@ -245,11 +245,7 @@ public static void ingest(AccumuloClient client, FileSystem fs, Opts opts, Confi
key.setTimestamp(startTime);
}

if (opts.delete) {
key.setDeleted(true);
} else {
key.setDeleted(false);
}
key.setDeleted(opts.delete);

bytesWritten += key.getSize();

@@ -334,9 +330,8 @@ public static void ingest(AccumuloClient client, FileSystem fs, Opts opts, Confi
elapsed);
}

public static void ingest(AccumuloClient c, Opts opts, Configuration conf)
throws MutationsRejectedException, IOException, AccumuloException, AccumuloSecurityException,
TableNotFoundException, TableExistsException {
public static void ingest(AccumuloClient c, Opts opts, Configuration conf) throws IOException,
AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException {
ingest(c, FileSystem.get(conf), opts, conf);
}
}
@@ -101,7 +101,7 @@ private static void verifyIngest(AccumuloClient client, Opts opts)
Range range = new Range(startKey, startKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL));
scanner.setRange(range);

byte[] val = null; // t.get(rowKey, column);
byte[] val = null;

Iterator<Entry<Key,Value>> iter = scanner.iterator();

@@ -48,14 +48,14 @@ public class GroupCommitPT implements PerformanceTest {
private static final int NUM_FLUSHES = 1024;

static Mutation createRandomMutation(Random rand) {
byte row[] = new byte[16];
byte[] row = new byte[16];

rand.nextBytes(row);

Mutation m = new Mutation(row);

byte cq[] = new byte[8];
byte val[] = new byte[16];
byte[] cq = new byte[8];
byte[] val = new byte[16];

for (int i = 0; i < 3; i++) {
rand.nextBytes(cq);
@@ -68,8 +68,8 @@ static Mutation createRandomMutation(Random rand) {

static class WriteTask implements Runnable {

private int batchSize;
private BatchWriter bw;
private final int batchSize;
private final BatchWriter bw;
private volatile int written = 0;

WriteTask(BatchWriter bw, int numMutations) throws Exception {
@@ -153,7 +153,7 @@ public Report runTest(Environment env) throws Exception {
"The number of times each thread will flush its batch writer. The flushes are spread evenly between mutations.");

// number of threads to run for each test
int tests[] = new int[] {1, 2, 4, 8, 16, 32, 64};
int[] tests = new int[] {1, 2, 4, 8, 16, 32, 64};

// run warm up test
for (int numThreads : tests) {
@@ -174,7 +174,7 @@ private void runTest(Report.Builder report, Environment env, int numThreads, boo
Preconditions.checkArgument(NUM_MUTATIONS % numThreads == 0);

// presplit tablet to allow more concurrency to tablet in memory map updates, so this does not
// impeded write ahead log appends.
// impede write ahead log appends.
NewTableConfiguration ntc = new NewTableConfiguration();
SortedSet<Text> splits = new TreeSet<>();
for (int s = 16; s < 256; s += 16) {
@@ -23,7 +23,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

@@ -187,7 +186,6 @@ private long scan(String tableName, AccumuloClient c, AtomicBoolean stop,

scanner.addScanIterator(is);

// scanner.setExecutionHints(hints);
for (Iterator<Entry<Key,Value>> iter = scanner.iterator(); iter.hasNext(); iter.next()) {
count++;
if (stop.get()) {
@@ -200,8 +198,7 @@ private long scan(String tableName, AccumuloClient c, AtomicBoolean stop,
return count;
}

private LongSummaryStatistics runShortScans(Environment env, String tableName, int numScans)
throws InterruptedException, ExecutionException {
private LongSummaryStatistics runShortScans(Environment env, String tableName, int numScans) {

try (TestExecutor<Long> executor = new TestExecutor<>(NUM_SHORT_SCANS_THREADS)) {
Random rand = new Random();
@@ -28,8 +28,8 @@
import java.util.stream.Stream;

public class TestExecutor<T> implements Iterable<T>, AutoCloseable {
private ExecutorService es;
private List<Future<T>> futures = new ArrayList<>();
private final ExecutorService es;
private final List<Future<T>> futures = new ArrayList<>();

public TestExecutor(int numThreads) {
es = Executors.newFixedThreadPool(numThreads);
@@ -25,7 +25,7 @@
public class Framework {

private static final Logger log = LoggerFactory.getLogger(Framework.class);
private HashMap<String,Node> nodes = new HashMap<>();
private final HashMap<String,Node> nodes = new HashMap<>();
private static final Framework INSTANCE = new Framework();

/**

0 comments on commit 0a50e1f

Please sign in to comment.