Skip to content
Permalink
Browse files
Make MulitTable reuse tables (#191)
* This allows restarting rw tests and allow using of the same tables
across multiple threads
* Other various improvements to MultiTable
  • Loading branch information
milleruntime committed Feb 18, 2022
1 parent e8e69a0 commit d23c2fcbe47ae72df9f667235b6b7f851882a184
Showing 6 changed files with 95 additions and 32 deletions.
@@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicLong;

import org.apache.accumulo.core.client.IteratorSetting.Column;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.rfile.RFile;
import org.apache.accumulo.core.client.rfile.RFileWriter;
import org.apache.accumulo.core.data.Key;
@@ -39,7 +40,7 @@

public class BulkImport extends Test {

public static final int LOTS = 100000;
public static final int ROWS = 1_000_000;
public static final int COLS = 10;
public static final List<Column> COLNAMES = new ArrayList<>();
public static final Text CHECK_COLUMN_FAMILY = new Text("cf");
@@ -61,7 +62,7 @@ public void visit(final State state, final RandWalkEnv env, Properties props) th
List<String> tables = (List<String>) state.get("tableList");

if (tables.isEmpty()) {
log.debug("No tables to ingest into");
log.trace("No tables to ingest into");
return;
}

@@ -77,8 +78,8 @@ public void visit(final State state, final RandWalkEnv env, Properties props) th
final boolean useLegacyBulk = rand.nextBoolean();

TreeSet<String> rows = new TreeSet<>();
for (int i = 0; i < LOTS; i++)
rows.add(uuid + String.format("__%05d", i));
for (int i = 0; i < ROWS; i++)
rows.add(uuid + String.format("__%06d", i));

String markerColumnQualifier = String.format("%07d", counter.incrementAndGet());
log.debug("Preparing {} bulk import to {}", useLegacyBulk ? "legacy" : "new", tableName);
@@ -96,24 +97,30 @@ public void visit(final State state, final RandWalkEnv env, Properties props) th
}
f.close();
}
if (useLegacyBulk) {
env.getAccumuloClient().tableOperations().importDirectory(tableName, dir.toString(),
fail.toString(), true);
FileStatus[] failures = fs.listStatus(fail);
if (failures != null && failures.length > 0) {
state.set("bulkImportSuccess", "false");
throw new Exception(failures.length + " failure files found importing files from " + dir);
log.debug("Starting {} bulk import to {}", useLegacyBulk ? "legacy" : "new", tableName);
try {
if (useLegacyBulk) {
env.getAccumuloClient().tableOperations().importDirectory(tableName, dir.toString(),
fail.toString(), true);
FileStatus[] failures = fs.listStatus(fail);
if (failures != null && failures.length > 0) {
state.set("bulkImportSuccess", "false");
throw new Exception(failures.length + " failure files found importing files from " + dir);
}
} else {
env.getAccumuloClient().tableOperations().importDirectory(dir.toString()).to(tableName)
.tableTime(true).load();
}
} else {
env.getAccumuloClient().tableOperations().importDirectory(dir.toString()).to(tableName)
.tableTime(true).load();
}

fs.delete(dir, true);
fs.delete(fail, true);
log.debug("Finished {} bulk import to {} start: {} last: {} marker: {}",
useLegacyBulk ? "legacy" : "new", tableName, rows.first(), rows.last(),
markerColumnQualifier);
fs.delete(dir, true);
fs.delete(fail, true);
log.debug("Finished {} bulk import to {} start: {} last: {} marker: {}",
useLegacyBulk ? "legacy" : "new", tableName, rows.first(), rows.last(),
markerColumnQualifier);
} catch (TableNotFoundException tnfe) {
log.debug("Table {} was deleted", tableName);
tables.remove(tableName);
}
}

}
@@ -18,6 +18,9 @@

import java.util.Properties;

import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableDeletedException;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.testing.randomwalk.RandWalkEnv;
import org.apache.accumulo.testing.randomwalk.State;
import org.apache.accumulo.testing.randomwalk.Test;
@@ -26,7 +29,20 @@ public class Commit extends Test {

@Override
public void visit(State state, RandWalkEnv env, Properties props) throws Exception {
env.getMultiTableBatchWriter().flush();
try {
env.getMultiTableBatchWriter().flush();
} catch (TableOfflineException e) {
log.debug("Commit failed, table offline");
return;
} catch (MutationsRejectedException mre) {
if (mre.getCause() instanceof TableDeletedException)
log.debug("Commit failed, table deleted");
else if (mre.getCause() instanceof TableOfflineException)
log.debug("Commit failed, table offline");
else
throw mre;
return;
}

Long numWrites = state.getLong("numWrites");
Long totalWrites = state.getLong("totalWrites") + numWrites;
@@ -52,6 +52,13 @@ public void visit(State state, RandWalkEnv env, Properties props) throws Excepti
int nextId = ((Integer) state.get("nextId")).intValue();
String dstTableName = String.format("%s_%d", state.getString("tableNamePrefix"), nextId);

if (env.getAccumuloClient().tableOperations().exists(dstTableName)) {
log.debug(dstTableName + " already exists so don't copy.");
nextId++;
state.set("nextId", Integer.valueOf(nextId));
return;
}

String[] args = new String[3];
args[0] = env.getClientPropsPath();
args[1] = srcTableName;
@@ -16,8 +16,12 @@
*/
package org.apache.accumulo.testing.randomwalk.multitable;

import static java.util.stream.Collectors.toCollection;

import java.net.InetAddress;
import java.util.List;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;

import org.apache.accumulo.core.client.AccumuloClient;
@@ -34,11 +38,27 @@ public class MultiTableFixture extends Fixture {

@Override
public void setUp(State state, RandWalkEnv env) throws Exception {

String hostname = InetAddress.getLocalHost().getHostName().replaceAll("[-.]", "_");
String prefix = String.format("multi_%s", hostname);

Set<String> all = env.getAccumuloClient().tableOperations().list();
List<String> tableList = all.stream().filter(s -> s.startsWith(prefix))
.collect(toCollection(CopyOnWriteArrayList::new));

state.set("tableNamePrefix",
String.format("multi_%s_%s_%d", hostname, env.getPid(), System.currentTimeMillis()));
log.debug("Existing MultiTables: {}", tableList);
// get the max of the last ID created
OptionalInt optionalInt = tableList.stream().mapToInt(s -> {
String[] strArr = s.split("_");
return Integer.parseInt(strArr[strArr.length - 1]);
}).max();
int nextId = optionalInt.orElse(-1) + 1;
log.debug("Next ID started at {}", nextId);

state.set("tableNamePrefix", prefix);
state.set("nextId", nextId);
state.set("numWrites", 0L);
state.set("totalWrites", 0L);
state.set("tableList", tableList);
state.set("nextId", 0);
state.set("numWrites", 0L);
state.set("totalWrites", 0L);
@@ -25,6 +25,8 @@
import java.util.UUID;

import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableDeletedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.data.Mutation;
@@ -43,7 +45,7 @@ public void visit(State state, RandWalkEnv env, Properties props) throws Excepti
List<String> tables = (List<String>) state.get("tableList");

if (tables.isEmpty()) {
log.debug("No tables to ingest into");
log.trace("No tables to ingest into");
return;
}

@@ -54,10 +56,11 @@ public void visit(State state, RandWalkEnv env, Properties props) throws Excepti
try {
bw = env.getMultiTableBatchWriter().getBatchWriter(tableName);
} catch (TableOfflineException e) {
log.error("Table " + tableName + " is offline!");
log.debug("Table " + tableName + " is offline");
return;
} catch (TableNotFoundException e) {
log.error("Table " + tableName + " not found!");
log.debug("Table " + tableName + " not found");
tables.remove(tableName);
return;
}

@@ -80,10 +83,21 @@ public void visit(State state, RandWalkEnv env, Properties props) throws Excepti
alg.update(payloadBytes);
m.put(meta, new Text("sha1"), new Value(alg.digest()));

// add mutation
bw.addMutation(m);

state.set("numWrites", state.getLong("numWrites") + 1);
try {
// add mutation
bw.addMutation(m);
state.set("numWrites", state.getLong("numWrites") + 1);
} catch (TableOfflineException e) {
log.debug("BatchWrite " + tableName + " failed, offline");
} catch (MutationsRejectedException mre) {
if (mre.getCause() instanceof TableDeletedException) {
log.debug("BatchWrite " + tableName + " failed, table deleted");
tables.remove(tableName);
} else if (mre.getCause() instanceof TableOfflineException)
log.debug("BatchWrite " + tableName + " failed, offline");
else
throw mre;
}
}

}
@@ -34,7 +34,6 @@
<edge id="mt.BulkImport" weight="100"/>
<edge id="mt.OfflineTable" weight="10"/>
<edge id="mt.DropTable" weight="20"/>
<edge id="END" weight="1"/>
</node>

<node id="mt.Write">

0 comments on commit d23c2fc

Please sign in to comment.