Skip to content
Permalink
Browse files
Update ContinuousInputFormat to use col vis
  • Loading branch information
milleruntime committed Sep 19, 2019
1 parent 187df9a commit 24f489805f8232860078a55ce464033bd4783570
Showing 2 changed files with 22 additions and 11 deletions.
@@ -97,15 +97,7 @@ public static void main(String[] args) throws Exception {

try (ContinuousEnv env = new ContinuousEnv(args)) {

String vis = env.getTestProperty(TestProps.CI_INGEST_VISIBILITIES);
if (vis == null) {
visibilities = Collections.singletonList(new ColumnVisibility());
} else {
visibilities = new ArrayList<>();
for (String v : vis.split(",")) {
visibilities.add(new ColumnVisibility(v.trim()));
}
}
visibilities = parseVisibilities(env.getTestProperty(TestProps.CI_INGEST_VISIBILITIES));

long rowMin = env.getRowMin();
long rowMax = env.getRowMax();
@@ -215,6 +207,19 @@ public static void main(String[] args) throws Exception {
}
}

public static List<ColumnVisibility> parseVisibilities(String visString) {
List<ColumnVisibility> vis;
if (visString == null) {
vis = Collections.singletonList(new ColumnVisibility());
} else {
vis = new ArrayList<>();
for (String v : visString.split(",")) {
vis.add(new ColumnVisibility(v.trim()));
}
}
return vis;
}

private static long flush(BatchWriter bw, long count, final int flushInterval, long lastFlushTime)
throws MutationsRejectedException {
long t1 = System.currentTimeMillis();
@@ -34,6 +34,7 @@

import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.testing.TestProps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
@@ -57,6 +58,7 @@ public class ContinuousInputFormat extends InputFormat<Key,Value> {
private static final String PROP_FAM_MAX = "mrbulk.fam.max";
private static final String PROP_QUAL_MAX = "mrbulk.qual.max";
private static final String PROP_CHECKSUM = "mrbulk.checksum";
private static final String PROP_VIS = "mrbulk.vis";

private static class RandomSplit extends InputSplit implements Writable {
@Override
@@ -96,6 +98,7 @@ public static void configure(Configuration conf, String uuid, ContinuousEnv env)
conf.setInt(PROP_QUAL_MAX, env.getMaxColQ());
conf.setBoolean(PROP_CHECKSUM,
Boolean.parseBoolean(env.getTestProperty(TestProps.CI_INGEST_CHECKSUM)));
conf.set(PROP_VIS, env.getTestProperty(TestProps.CI_INGEST_VISIBILITIES));
}

@Override
@@ -112,6 +115,7 @@ public RecordReader<Key,Value> createRecordReader(InputSplit inputSplit,
long maxRow;
int maxFam;
int maxQual;
List<ColumnVisibility> visibilities;
boolean checksum;

Key prevKey;
@@ -128,6 +132,7 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext job) {
maxFam = job.getConfiguration().getInt(PROP_FAM_MAX, Short.MAX_VALUE);
maxQual = job.getConfiguration().getInt(PROP_QUAL_MAX, Short.MAX_VALUE);
checksum = job.getConfiguration().getBoolean(PROP_CHECKSUM, false);
visibilities = ContinuousIngest.parseVisibilities(job.getConfiguration().get(PROP_VIS));

random = new Random(new SecureRandom().nextLong());

@@ -140,15 +145,16 @@ private Key genKey(CRC32 cksum) {

byte[] fam = genCol(random.nextInt(maxFam));
byte[] qual = genCol(random.nextInt(maxQual));
byte[] cv = visibilities.get(random.nextInt(visibilities.size())).flatten();

if (cksum != null) {
cksum.update(row);
cksum.update(fam);
cksum.update(qual);
cksum.update(new byte[0]); // TODO col vis
cksum.update(cv);
}

return new Key(row, fam, qual);
return new Key(row, fam, qual, cv);
}

private byte[] createValue(byte[] ingestInstanceId, byte[] prevRow, Checksum cksum) {

0 comments on commit 24f4898

Please sign in to comment.