Skip to content
Permalink
Browse files
Remove HdfsZooInstance & formatting
  • Loading branch information
milleruntime committed Aug 30, 2018
1 parent 8188eed commit d741f36cbc78a7e8349ed8f5df57ce0c49efc888
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 39 deletions.
@@ -16,18 +16,12 @@
*/
package org.apache.accumulo.testing.core.ingest;

import static java.nio.charset.StandardCharsets.UTF_8;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.server.cli.ClientOnRequiredTable;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -40,29 +34,15 @@ static class Opts extends ClientOnRequiredTable {
String source = null;
@Parameter(names = {"-f", "--failures"}, description = "directory to copy failures into: will be deleted before the bulk import")
String failures = null;
@Parameter(description = "<username> <password> <tablename> <sourcedir> <failuredir>")
List<String> args = new ArrayList<>();
}

public static void main(String[] args) throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
final FileSystem fs = FileSystem.get(new Configuration());
Opts opts = new Opts();
if (args.length == 5) {
System.err.println("Deprecated syntax for BulkImportDirectory, please use the new style (see --help)");
final String user = args[0];
final byte[] pass = args[1].getBytes(UTF_8);
final String tableName = args[2];
final String dir = args[3];
final String failureDir = args[4];
final Path failureDirPath = new Path(failureDir);
fs.delete(failureDirPath, true);
fs.mkdirs(failureDirPath);
HdfsZooInstance.getInstance().getConnector(user, new PasswordToken(pass)).tableOperations().importDirectory(tableName, dir, failureDir, false);
} else {
opts.parseArgs(BulkImportDirectory.class.getName(), args);
fs.delete(new Path(opts.failures), true);
fs.mkdirs(new Path(opts.failures));
opts.getConnector().tableOperations().importDirectory(opts.getTableName(), opts.source, opts.failures, false);
}
System.err.println("Deprecated syntax for BulkImportDirectory, please use the new style (see --help)");
opts.parseArgs(BulkImportDirectory.class.getName(), args);
fs.delete(new Path(opts.failures), true);
fs.mkdirs(new Path(opts.failures));
opts.getConnector().tableOperations().importDirectory(opts.getTableName(), opts.source, opts.failures, false);
}
}
@@ -61,23 +61,18 @@ public class ScanExecutorPT implements PerformanceTest {
private static final String TEST_DESC = "Scan Executor Test. Test running lots of short scans "
+ "while long scans are running in the background. Each short scan reads a random row and "
+ "family. Using execution hints, short scans are randomly either given a high priority or "
+ "a dedicated executor. If the scan prioritizer or dispatcher is not working properly, "
+ "then the short scans will be orders of magnitude slower.";
+ "a dedicated executor. If the scan prioritizer or dispatcher is not working properly, " + "then the short scans will be orders of magnitude slower.";

@Override
public SystemConfiguration getConfiguration() {
Map<String,String> siteCfg = new HashMap<>();

siteCfg.put(Property.TSERV_SCAN_MAX_OPENFILES.getKey(), "200");
siteCfg.put(Property.TSERV_MINTHREADS.getKey(), "200");
siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se1.threads",
SCAN_EXECUTOR_THREADS);
siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se1.prioritizer",
SCAN_PRIORITIZER);
siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se2.threads",
SCAN_EXECUTOR_THREADS);
siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se2.prioritizer",
SCAN_PRIORITIZER);
siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se1.threads", SCAN_EXECUTOR_THREADS);
siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se1.prioritizer", SCAN_PRIORITIZER);
siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se2.threads", SCAN_EXECUTOR_THREADS);
siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se2.prioritizer", SCAN_PRIORITIZER);

return new SystemConfiguration().setAccumuloConfig(siteCfg);
}
@@ -139,8 +134,7 @@ public Report runTest(Environment env) throws Exception {
return builder.build();
}

private static long scan(String tableName, Connector c, byte[] row, byte[] fam,
Map<String,String> hints) throws TableNotFoundException {
private static long scan(String tableName, Connector c, byte[] row, byte[] fam, Map<String,String> hints) throws TableNotFoundException {
long t1 = System.currentTimeMillis();
int count = 0;
try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) {
@@ -154,8 +148,7 @@ private static long scan(String tableName, Connector c, byte[] row, byte[] fam,
return System.currentTimeMillis() - t1;
}

private long scan(String tableName, Connector c, AtomicBoolean stop, Map<String,String> hints)
throws TableNotFoundException {
private long scan(String tableName, Connector c, AtomicBoolean stop, Map<String,String> hints) throws TableNotFoundException {
long count = 0;
while (!stop.get()) {
try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) {

0 comments on commit d741f36

Please sign in to comment.