Skip to content

Commit

Permalink
Add zipfian distribution option to continuous ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
DomGarguilo committed Apr 12, 2024
1 parent 898cded commit f0f6abb
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 1 deletion.
8 changes: 8 additions & 0 deletions conf/accumulo-testing.properties
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ test.ci.ingest.pause.duration.max=120
# The probability (between 0.0 and 1.0) that a set of entries will be deleted during continuous ingest
# To disable deletes, set probability to 0.0
test.ci.ingest.delete.probability=0.1
# Enables Zipfian distribution for value size. If set to true, the value will have random bytes inserted into it with a size generated based on a Zipfian distribution.
test.ci.ingest.zipfian.enabled=true
# Minimum size to insert into the value when Zipfian distribution is enabled
test.ci.ingest.zipfian.min.size=0
# Maximum size to insert into the value when Zipfian distribution is enabled
test.ci.ingest.zipfian.max.size=10000
# Exponent of the Zipfian distribution
test.ci.ingest.zipfian.exponent=1.5

# Batch walker
# ------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.testing.TestProps;
import org.apache.accumulo.testing.util.FastFormat;
import org.apache.commons.math3.random.RandomDataGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -56,6 +57,13 @@ public class ContinuousIngest {
private static int pauseMin;
private static int pauseMax;

private static boolean zipfianEnabled;
private static int minSize;
private static int maxSize;
private static double exponent;

private static RandomDataGenerator rnd;

private static ColumnVisibility getVisibility(Random rand) {
return visibilities.get(rand.nextInt(visibilities.size()));
}
Expand Down Expand Up @@ -173,6 +181,18 @@ protected static void doIngest(AccumuloClient client, long rowMin, long rowMax,
log.info("DELETES will occur with a probability of {}",
String.format("%.02f", deleteProbability));

zipfianEnabled = Boolean.parseBoolean(testProps.getProperty("test.ci.ingest.zipfian.enabled"));

if (zipfianEnabled) {
minSize = Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.min.size"));
maxSize = Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.max.size"));
exponent = Double.parseDouble(testProps.getProperty("test.ci.ingest.zipfian.exponent"));
rnd = new RandomDataGenerator();

log.info("Zipfian distribution enabled with min size: {}, max size: {}, exponent: {}",
minSize, maxSize, exponent);
}

try (BatchWriter bw = client.createBatchWriter(tableName)) {
out: while (true) {
ColumnVisibility cv = getVisibility(random);
Expand Down Expand Up @@ -317,25 +337,54 @@ public static byte[] genRow(long rowLong) {

public static byte[] createValue(byte[] ingestInstanceId, long entriesWritten, byte[] prevRow,
Checksum cksum) {
int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + 3;
final int numOfSeparators = 4;
int dataLen =
ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + numOfSeparators;
if (cksum != null)
dataLen += 8;

int zipfLength = 0;
if (zipfianEnabled) {
// add the length of the zipfian data to the value
int range = maxSize - minSize;
zipfLength = rnd.nextZipf(range, exponent) + minSize;
dataLen += zipfLength;
}

byte[] val = new byte[dataLen];

// add the ingest instance id to the value
System.arraycopy(ingestInstanceId, 0, val, 0, ingestInstanceId.length);
int index = ingestInstanceId.length;

val[index++] = ':';

// add the count of entries written to the value
int added = FastFormat.toZeroPaddedString(val, index, entriesWritten, 16, 16, EMPTY_BYTES);
if (added != 16)
throw new RuntimeException(" " + added);
index += 16;

val[index++] = ':';

// add the previous row to the value
if (prevRow != null) {
System.arraycopy(prevRow, 0, val, index, prevRow.length);
index += prevRow.length;
}

val[index++] = ':';

if (zipfianEnabled) {
// add random data to the value of length zipfLength
for (int i = 0; i < zipfLength; i++) {
val[index++] = (byte) rnd.nextInt(0, 256);
}

val[index++] = ':';
}

// add the checksum to the value
if (cksum != null) {
cksum.update(val, 0, index);
cksum.getValue();
Expand Down

0 comments on commit f0f6abb

Please sign in to comment.