Skip to content
Permalink
Browse files
Continuous Ingest improvements (#168)
* consolidated for-loops
* improved logs
* avoided parsing properties multiple times
* move AccumuloClient and BatchWriter into try-with-resources
* created member variables to reuse values & simplify methods
* renamed variables and made some final
  • Loading branch information
DomGarguilo committed Dec 8, 2021
1 parent 0a50e1f commit 7a91acc7ce12afe907d6a01adc784dc3aa553e16
Showing 2 changed files with 107 additions and 117 deletions.
@@ -83,7 +83,7 @@ test.ci.ingest.pause.duration.min=60
# Maximum pause duration (in seconds)
test.ci.ingest.pause.duration.max=120
# The probability (between 0.0 and 1.0) that a set of entries will be deleted during continuous ingest
# To disable deletes, set probability to 0
# To disable deletes, set probability to 0.0
test.ci.ingest.delete.probability=0.1

# Batch walker
@@ -17,10 +17,6 @@
package org.apache.accumulo.testing.continuous;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.testing.TestProps.CI_INGEST_PAUSE_DURATION_MAX;
import static org.apache.accumulo.testing.TestProps.CI_INGEST_PAUSE_DURATION_MIN;
import static org.apache.accumulo.testing.TestProps.CI_INGEST_PAUSE_WAIT_MAX;
import static org.apache.accumulo.testing.TestProps.CI_INGEST_PAUSE_WAIT_MIN;

import java.util.ArrayList;
import java.util.Collections;
@@ -54,6 +50,9 @@ public class ContinuousIngest {
private static List<ColumnVisibility> visibilities;
private static long lastPauseNs;
private static long pauseWaitSec;
private static boolean pauseEnabled;
private static int pauseMin;
private static int pauseMax;

private static ColumnVisibility getVisibility(Random rand) {
return visibilities.get(rand.nextInt(visibilities.size()));
@@ -64,14 +63,11 @@ private static boolean pauseEnabled(Properties props) {
return Boolean.parseBoolean(value);
}

private static int getPause(Properties props, Random rand, String minProp, String maxProp) {
int min = Integer.parseInt(props.getProperty(minProp));
int max = Integer.parseInt(props.getProperty(maxProp));
Preconditions.checkState(max >= min && min > 0);
if (max == min) {
return min;
private static int getPause(Random rand) {
if (pauseMax == pauseMin) {
return pauseMin;
}
return (rand.nextInt(max - min) + min);
return (rand.nextInt(pauseMax - pauseMin) + pauseMin);
}

private static float getDeleteProbability(Properties props) {
@@ -86,51 +82,48 @@ private static int getFlushEntries(Properties props) {
return Integer.parseInt(props.getProperty(TestProps.CI_INGEST_FLUSH_ENTRIES, "1000000"));
}

private static void pauseCheck(Properties props, Random rand) throws InterruptedException {
if (pauseEnabled(props)) {
private static void pauseCheck(Random rand) throws InterruptedException {
if (pauseEnabled) {
long elapsedNano = System.nanoTime() - lastPauseNs;
if (elapsedNano > (TimeUnit.SECONDS.toNanos(pauseWaitSec))) {
long pauseDurationSec = getPause(props, rand, CI_INGEST_PAUSE_DURATION_MIN,
CI_INGEST_PAUSE_DURATION_MAX);
log.info("PAUSING for " + pauseDurationSec + "s");
long pauseDurationSec = getPause(rand);
log.info("PAUSING for {}s", pauseDurationSec);
Thread.sleep(TimeUnit.SECONDS.toMillis(pauseDurationSec));
lastPauseNs = System.nanoTime();
pauseWaitSec = getPause(props, rand, CI_INGEST_PAUSE_WAIT_MIN, CI_INGEST_PAUSE_WAIT_MAX);
log.info("INGESTING for " + pauseWaitSec + "s");
pauseWaitSec = getPause(rand);
log.info("INGESTING for {}s", pauseWaitSec);
}
}
}

public static void main(String[] args) throws Exception {

try (ContinuousEnv env = new ContinuousEnv(args)) {
try (ContinuousEnv env = new ContinuousEnv(args);
AccumuloClient client = env.getAccumuloClient()) {

visibilities = parseVisibilities(env.getTestProperty(TestProps.CI_INGEST_VISIBILITIES));
final long rowMin = env.getRowMin();
final long rowMax = env.getRowMax();
Preconditions.checkState(0 <= rowMin && rowMin <= rowMax,
"Bad rowMin/rowMax, must conform to: 0 <= rowMin <= rowMax");

long rowMin = env.getRowMin();
long rowMax = env.getRowMax();
if (rowMin < 0 || rowMax < 0 || rowMax <= rowMin) {
throw new IllegalArgumentException("bad min and max");
}

AccumuloClient client = env.getAccumuloClient();
String tableName = env.getAccumuloTableName();
if (!client.tableOperations().exists(tableName)) {
throw new TableNotFoundException(null, tableName,
"Consult the README and create the table before starting ingest.");
}

BatchWriter bw = client.createBatchWriter(tableName);

Random r = new Random();
Random rand = new Random();

byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8);
log.info("Ingest instance ID: {} current time: {}ms", new String(ingestInstanceId, UTF_8),
System.currentTimeMillis());

log.info(String.format("UUID %d %s", System.currentTimeMillis(),
new String(ingestInstanceId, UTF_8)));
Properties testProps = env.getTestProperties();

long count = 0;
final int flushInterval = getFlushEntries(env.getTestProperties());
long entriesWritten = 0L;
long entriesDeleted = 0L;
final int flushInterval = getFlushEntries(testProps);
log.info("A flush will occur after every {} entries written", flushInterval);
final int maxDepth = 25;

// always want to point back to flushed data. This way the previous item should
@@ -141,99 +134,97 @@ public static void main(String[] args) throws Exception {

long lastFlushTime = System.currentTimeMillis();

int maxColF = env.getMaxColF();
int maxColQ = env.getMaxColQ();
boolean checksum = Boolean.parseBoolean(env.getTestProperty(TestProps.CI_INGEST_CHECKSUM));
long numEntries = Long.parseLong(env.getTestProperty(TestProps.CI_INGEST_CLIENT_ENTRIES));
final int maxColF = env.getMaxColF();
final int maxColQ = env.getMaxColQ();
final boolean checksum = Boolean
.parseBoolean(testProps.getProperty(TestProps.CI_INGEST_CHECKSUM));
final long numEntries = Long
.parseLong(testProps.getProperty(TestProps.CI_INGEST_CLIENT_ENTRIES));
log.info("Total entries to be written: {}", numEntries);

Properties testProps = env.getTestProperties();
if (pauseEnabled(testProps)) {
visibilities = parseVisibilities(testProps.getProperty(TestProps.CI_INGEST_VISIBILITIES));

pauseEnabled = pauseEnabled(testProps);

pauseMin = Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MIN));
pauseMax = Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MAX));
Preconditions.checkState(0 < pauseMin && pauseMin <= pauseMax,
"Bad pause wait min/max, must conform to: 0 < min <= max");

if (pauseEnabled) {
lastPauseNs = System.nanoTime();
pauseWaitSec = getPause(testProps, r, CI_INGEST_PAUSE_WAIT_MIN, CI_INGEST_PAUSE_WAIT_MAX);
pauseWaitSec = getPause(rand);
log.info("PAUSING enabled");
log.info("INGESTING for " + pauseWaitSec + "s");
log.info("INGESTING for {}s", pauseWaitSec);
}

final float deleteProbability = getDeleteProbability(testProps);
log.info("DELETES will occur with a probability of {}",
String.format("%.02f", deleteProbability));

out: while (true) {
ColumnVisibility cv = getVisibility(r);
try (BatchWriter bw = client.createBatchWriter(tableName)) {
out: while (true) {
ColumnVisibility cv = getVisibility(rand);

// generate first set of nodes
for (int index = 0; index < flushInterval; index++) {
long rowLong = genLong(rowMin, rowMax, r);
// generate sets nodes that link to previous set of nodes
for (int depth = 0; depth < maxDepth; depth++) {
for (int index = 0; index < flushInterval; index++) {
long rowLong = genLong(rowMin, rowMax, rand);

int cf = r.nextInt(maxColF);
int cq = r.nextInt(maxColQ);
byte[] prevRow = depth == 0 ? null : genRow(nodeMap[depth - 1][index].row);

nodeMap[0][index] = new MutationInfo(rowLong, cf, cq);
int cfInt = rand.nextInt(maxColF);
int cqInt = rand.nextInt(maxColQ);

Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, count, null, checksum);
count++;
bw.addMutation(m);
}
nodeMap[depth][index] = new MutationInfo(rowLong, cfInt, cqInt);
Mutation m = genMutation(rowLong, cfInt, cqInt, cv, ingestInstanceId, entriesWritten,
prevRow, checksum);
entriesWritten++;
bw.addMutation(m);
}

lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
if (count >= numEntries)
break out;

// generate subsequent sets of nodes that link to previous set of nodes
for (int depth = 1; depth < maxDepth; depth++) {
for (int index = 0; index < flushInterval; index++) {
long rowLong = genLong(rowMin, rowMax, r);
byte[] prevRow = genRow(nodeMap[depth - 1][index].row);
int cfInt = r.nextInt(maxColF);
int cqInt = r.nextInt(maxColQ);
nodeMap[depth][index] = new MutationInfo(rowLong, cfInt, cqInt);
Mutation m = genMutation(rowLong, cfInt, cqInt, cv, ingestInstanceId, count, prevRow,
checksum);
count++;
bw.addMutation(m);
lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
if (entriesWritten >= numEntries)
break out;
pauseCheck(rand);
}

lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
if (count >= numEntries)
break out;
pauseCheck(testProps, r);
}

// random chance that the entries will be deleted
boolean delete = r.nextFloat() < deleteProbability;

// if the previously written entries are scheduled to be deleted
if (delete) {
log.info("Deleting last portion of written entries");
// add delete mutations in the reverse order in which they were written
for (int depth = nodeMap.length - 1; depth >= 0; depth--) {
for (int index = nodeMap[depth].length - 1; index >= 0; index--) {
MutationInfo currentNode = nodeMap[depth][index];
Mutation m = new Mutation(genRow(currentNode.row));
m.putDelete(genCol(currentNode.cf), genCol(currentNode.cq));
// random chance that the entries will be deleted
final boolean delete = rand.nextFloat() < deleteProbability;

// if the previously written entries are scheduled to be deleted
if (delete) {
log.info("Deleting last portion of written entries");
// add delete mutations in the reverse order in which they were written
for (int depth = nodeMap.length - 1; depth >= 0; depth--) {
for (int index = nodeMap[depth].length - 1; index >= 0; index--) {
MutationInfo currentNode = nodeMap[depth][index];
Mutation m = new Mutation(genRow(currentNode.row));
m.putDelete(genCol(currentNode.cf), genCol(currentNode.cq));
entriesDeleted++;
bw.addMutation(m);
}
lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
pauseCheck(rand);
}
} else {
// create one big linked list, this makes all the first inserts point to something
for (int index = 0; index < flushInterval - 1; index++) {
MutationInfo firstEntry = nodeMap[0][index];
MutationInfo lastEntry = nodeMap[maxDepth - 1][index + 1];
Mutation m = genMutation(firstEntry.row, firstEntry.cf, firstEntry.cq, cv,
ingestInstanceId, entriesWritten, genRow(lastEntry.row), checksum);
entriesWritten++;
bw.addMutation(m);
}
lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
pauseCheck(testProps, r);
lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
}
} else {
// create one big linked list, this makes all the first inserts point to something
for (int index = 0; index < flushInterval - 1; index++) {
MutationInfo firstEntry = nodeMap[0][index];
MutationInfo lastEntry = nodeMap[maxDepth - 1][index + 1];
Mutation m = genMutation(firstEntry.row, firstEntry.cf, firstEntry.cq, cv,
ingestInstanceId, count, genRow(lastEntry.row), checksum);
count++;
bw.addMutation(m);
}
lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
}

if (count >= numEntries)
break out;
pauseCheck(testProps, r);
if (entriesWritten >= numEntries)
break out;
pauseCheck(rand);
}
}
bw.close();
}
}

@@ -263,19 +254,18 @@ public static List<ColumnVisibility> parseVisibilities(String visString) {
return vis;
}

private static long flush(BatchWriter bw, long count, final int flushInterval, long lastFlushTime)
throws MutationsRejectedException {
private static long flush(BatchWriter bw, long entriesWritten, long entriesDeleted,
long lastFlushTime) throws MutationsRejectedException {
long t1 = System.currentTimeMillis();
bw.flush();
long t2 = System.currentTimeMillis();
log.info(String.format("FLUSH %d %d %d %d %d", t2, (t2 - lastFlushTime), (t2 - t1), count,
flushInterval));
lastFlushTime = t2;
return lastFlushTime;
log.info("FLUSH - duration: {}ms, since last flush: {}ms, total written: {}, total deleted: {}",
(t2 - t1), (t2 - lastFlushTime), entriesWritten, entriesDeleted);
return t2;
}

public static Mutation genMutation(long rowLong, int cfInt, int cqInt, ColumnVisibility cv,
byte[] ingestInstanceId, long count, byte[] prevRow, boolean checksum) {
byte[] ingestInstanceId, long entriesWritten, byte[] prevRow, boolean checksum) {
// Adler32 is supposed to be faster, but according to wikipedia is not
// good for small data.... so used CRC32 instead
CRC32 cksum = null;
@@ -295,7 +285,7 @@ public static Mutation genMutation(long rowLong, int cfInt, int cqInt, ColumnVis

Mutation m = new Mutation(rowString);

m.put(cfString, cqString, cv, createValue(ingestInstanceId, count, prevRow, cksum));
m.put(cfString, cqString, cv, createValue(ingestInstanceId, entriesWritten, prevRow, cksum));
return m;
}

@@ -315,7 +305,7 @@ public static byte[] genRow(long rowLong) {
return FastFormat.toZeroPaddedString(rowLong, 16, 16, EMPTY_BYTES);
}

public static byte[] createValue(byte[] ingestInstanceId, long count, byte[] prevRow,
public static byte[] createValue(byte[] ingestInstanceId, long entriesWritten, byte[] prevRow,
Checksum cksum) {
int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + 3;
if (cksum != null)
@@ -324,7 +314,7 @@ public static byte[] createValue(byte[] ingestInstanceId, long count, byte[] pre
System.arraycopy(ingestInstanceId, 0, val, 0, ingestInstanceId.length);
int index = ingestInstanceId.length;
val[index++] = ':';
int added = FastFormat.toZeroPaddedString(val, index, count, 16, 16, EMPTY_BYTES);
int added = FastFormat.toZeroPaddedString(val, index, entriesWritten, 16, 16, EMPTY_BYTES);
if (added != 16)
throw new RuntimeException(" " + added);
index += 16;

0 comments on commit 7a91acc

Please sign in to comment.