diff --git a/conf/accumulo-testing.properties b/conf/accumulo-testing.properties index 93c32276..e871277f 100644 --- a/conf/accumulo-testing.properties +++ b/conf/accumulo-testing.properties @@ -89,6 +89,14 @@ test.ci.ingest.pause.duration.max=120 # The probability (between 0.0 and 1.0) that a set of entries will be deleted during continuous ingest # To disable deletes, set probability to 0.0 test.ci.ingest.delete.probability=0.1 +# Enables Zipfian distribution for value size. If set to true, the value will have random bytes inserted into it with a size generated based on a Zipfian distribution. +test.ci.ingest.zipfian.enabled=true +# Minimum size to insert into the value when Zipfian distribution is enabled +test.ci.ingest.zipfian.min.size=0 +# Maximum size to insert into the value when Zipfian distribution is enabled +test.ci.ingest.zipfian.max.size=10000 +# Exponent of the Zipfian distribution +test.ci.ingest.zipfian.exponent=1.5 # Batch walker # ------------ diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java index 1bb32a5c..bb93ca75 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java @@ -38,6 +38,7 @@ import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.testing.TestProps; import org.apache.accumulo.testing.util.FastFormat; +import org.apache.commons.math3.random.RandomDataGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +57,13 @@ public class ContinuousIngest { private static int pauseMin; private static int pauseMax; + private static boolean zipfianEnabled; + private static int minSize; + private static int maxSize; + private static double exponent; + + private static RandomDataGenerator rnd; + private static ColumnVisibility getVisibility(Random rand) { return visibilities.get(rand.nextInt(visibilities.size())); } @@ -173,6 +181,18 @@ protected static void doIngest(AccumuloClient client, long rowMin, long rowMax, log.info("DELETES will occur with a probability of {}", String.format("%.02f", deleteProbability)); + zipfianEnabled = Boolean.parseBoolean(testProps.getProperty("test.ci.ingest.zipfian.enabled")); + + if (zipfianEnabled) { + minSize = Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.min.size")); + maxSize = Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.max.size")); + exponent = Double.parseDouble(testProps.getProperty("test.ci.ingest.zipfian.exponent")); + rnd = new RandomDataGenerator(); + + log.info("Zipfian distribution enabled with min size: {}, max size: {}, exponent: {}", + minSize, maxSize, exponent); + } + try (BatchWriter bw = client.createBatchWriter(tableName)) { out: while (true) { ColumnVisibility cv = getVisibility(random); @@ -317,18 +337,37 @@ public static byte[] genRow(long rowLong) { public static byte[] createValue(byte[] ingestInstanceId, long entriesWritten, byte[] prevRow, Checksum cksum) { - int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + 3; + final int numOfSeparators = 4; + int dataLen = + ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + numOfSeparators; if (cksum != null) dataLen += 8; + + int zipfLength = 0; + if (zipfianEnabled) { + // add the length of the zipfian data to the value + int range = maxSize - minSize; + zipfLength = rnd.nextZipf(range, exponent) + minSize; + dataLen += zipfLength; + } + byte[] val = new byte[dataLen]; + + // add the ingest instance id to the value System.arraycopy(ingestInstanceId, 0, val, 0, ingestInstanceId.length); int index = ingestInstanceId.length; + val[index++] = ':'; + + // add the count of entries written to the value int added = FastFormat.toZeroPaddedString(val, index, entriesWritten, 16, 16, EMPTY_BYTES); if (added != 16) throw new RuntimeException(" " + added); index += 16; + val[index++] = ':'; + + // add the previous row to the value if (prevRow != null) { System.arraycopy(prevRow, 0, val, index, prevRow.length); index += prevRow.length; @@ -336,6 +375,16 @@ public static byte[] createValue(byte[] ingestInstanceId, long entriesWritten, b val[index++] = ':'; + if (zipfianEnabled) { + // add random data to the value of length zipfLength + for (int i = 0; i < zipfLength; i++) { + val[index++] = (byte) rnd.nextInt(0, 256); + } + + val[index++] = ':'; + } + + // add the checksum to the value if (cksum != null) { cksum.update(val, 0, index); cksum.getValue();