From 8f6c24cbc1153515b76703a0395eadff80bc620f Mon Sep 17 00:00:00 2001 From: Mike Miller Date: Thu, 7 Mar 2019 17:54:51 -0500 Subject: [PATCH] Creates continuous BulkIngest (#63) * A new Bulk import MapReduce job run using "cingest bulk" * Will create a million random keys in Rfiles in the same format as CI * The keys are in a linked list with the value being the prev row * Works with CI createTable and verify --- bin/cingest | 4 + .../apache/accumulo/testing/TestProps.java | 5 + .../testing/continuous/BulkIngest.java | 259 ++++++++++++++++++ .../testing/continuous/ContinuousEnv.java | 14 +- .../testing/continuous/ContinuousIngest.java | 12 +- 5 files changed, 283 insertions(+), 11 deletions(-) create mode 100644 src/main/java/org/apache/accumulo/testing/continuous/BulkIngest.java diff --git a/bin/cingest b/bin/cingest index 39e6d337..a004b3b6 100755 --- a/bin/cingest +++ b/bin/cingest @@ -33,6 +33,7 @@ Available applications: verify Verifies continous ingest test. Stop ingest before running. moru Streses Accumulo by reading and writing to the ingest table. Stop ingest before running. + bulk Create RFiles in a Map Reduce job and calls importDirectory if successful EOF } @@ -71,6 +72,9 @@ case "$1" in moru) ci_main="${ci_package}.ContinuousMoru" ;; + bulk) + ci_main="${ci_package}.BulkIngest" + ;; *) echo "Unknown application: $1" print_usage diff --git a/src/main/java/org/apache/accumulo/testing/TestProps.java b/src/main/java/org/apache/accumulo/testing/TestProps.java index c82154f0..7d7808d1 100644 --- a/src/main/java/org/apache/accumulo/testing/TestProps.java +++ b/src/main/java/org/apache/accumulo/testing/TestProps.java @@ -32,6 +32,7 @@ public class TestProps { private static final String CI_BW = CI + "batch.walker."; private static final String CI_SCANNER = CI + "scanner."; private static final String CI_VERIFY = CI + "verify."; + private static final String CI_BULK = CI + "bulk."; /** Common properties **/ // HDFS root path. Should match 'fs.defaultFS' property in Hadoop's core-site.xml @@ -116,6 +117,10 @@ public class TestProps { // Location in HDFS to store output public static final String CI_VERIFY_OUTPUT_DIR = CI_VERIFY + "output.dir"; + /** Bulk **/ + // Bulk ingest Job instance uuid + public static final String CI_BULK_UUID = CI_BULK + "uuid"; + public static Properties loadFromFile(String propsFilePath) { try { return loadFromStream(new FileInputStream(propsFilePath)); diff --git a/src/main/java/org/apache/accumulo/testing/continuous/BulkIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/BulkIngest.java new file mode 100644 index 00000000..35ff6881 --- /dev/null +++ b/src/main/java/org/apache/accumulo/testing/continuous/BulkIngest.java @@ -0,0 +1,259 @@ +package org.apache.accumulo.testing.continuous; + +import static org.apache.accumulo.testing.TestProps.CI_BULK_UUID; +import static org.apache.accumulo.testing.continuous.ContinuousIngest.genCol; +import static org.apache.accumulo.testing.continuous.ContinuousIngest.genLong; + +import java.io.BufferedOutputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Base64; +import java.util.Collection; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.hadoop.mapreduce.AccumuloFileOutputFormat; +import org.apache.accumulo.hadoop.mapreduce.partition.KeyRangePartitioner; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Bulk import a million random key value pairs. Same format as ContinuousIngest and can be + * verified by running ContinuousVerify. + */ +public class BulkIngest extends Configured implements Tool { + public static final int NUM_KEYS = 1_000_000; + public static final String BULK_CI_DIR = "ci-bulk"; + + public static final Logger log = LoggerFactory.getLogger(BulkIngest.class); + + @Override + public int run(String[] args) throws Exception { + Job job = Job.getInstance(getConf()); + job.setJarByClass(BulkIngest.class); + // very important to prevent guava conflicts + job.getConfiguration().set("mapreduce.job.classloader", "true"); + FileSystem fs = FileSystem.get(job.getConfiguration()); + + final String JOB_DIR = BULK_CI_DIR + "/" + getCurrentJobNumber(fs); + final String RFILE_DIR = JOB_DIR + "/rfiles"; + log.info("Creating new job at {}", JOB_DIR); + + String ingestInstanceId = UUID.randomUUID().toString(); + job.getConfiguration().set(CI_BULK_UUID, ingestInstanceId); + + log.info(String.format("UUID %d %s", System.currentTimeMillis(), ingestInstanceId)); + + Path outputDir = new Path(RFILE_DIR); + + job.setInputFormatClass(RandomInputFormat.class); + + // map the generated random longs to key values + job.setMapperClass(RandomMapper.class); + job.setMapOutputKeyClass(Key.class); + job.setMapOutputValueClass(Value.class); + + // output RFiles for the import + job.setOutputFormatClass(AccumuloFileOutputFormat.class); + AccumuloFileOutputFormat.configure().outputPath(outputDir).store(job); + + try (ContinuousEnv env = new ContinuousEnv(args)) { + String tableName = env.getAccumuloTableName(); + + // create splits file for KeyRangePartitioner + String splitsFile = JOB_DIR + "/splits.txt"; + try (AccumuloClient client = env.getAccumuloClient()) { + + // make sure splits file is closed before continuing + try (PrintStream out = new PrintStream(new BufferedOutputStream(fs.create(new Path( + splitsFile))))) { + Collection splits = client.tableOperations().listSplits(tableName, 100); + for (Text split : splits) { + out.println(Base64.getEncoder().encodeToString(split.copyBytes())); + } + job.setNumReduceTasks(splits.size() + 1); + } + + job.setPartitionerClass(KeyRangePartitioner.class); + KeyRangePartitioner.setSplitFile(job, splitsFile); + + job.waitForCompletion(true); + boolean success = job.isSuccessful(); + + // bulk import completed files + if (success) { + log.info("Sort and create job successful. Bulk importing {} to {}", RFILE_DIR, tableName); + client.tableOperations().importDirectory(RFILE_DIR).to(tableName).load(); + } else { + log.error("Job failed, not calling bulk import"); + } + return success ? 0 : 1; + } + } + } + + private int getCurrentJobNumber(FileSystem fs) throws Exception { + Path jobPath = new Path(BULK_CI_DIR); + FileStatus jobDir = fs.getFileStatus(jobPath); + if (jobDir.isDirectory()) { + FileStatus[] jobs = fs.listStatus(jobPath); + return jobs.length; + } else { + log.info("{} directory doesn't exist yet, first job running will create it.", BULK_CI_DIR); + return 0; + } + } + + public static void main(String[] args) throws Exception { + int ret = ToolRunner.run(new BulkIngest(), args); + System.exit(ret); + } + + /** + * Mapper that takes the longs from RandomInputFormat and output Key Value pairs + */ + public static class RandomMapper extends Mapper { + + private String uuid; + private Text currentRow; + private Text currentValue; + private Text emptyCfCq; + + @Override + protected void setup(Context context) { + uuid = context.getConfiguration().get(CI_BULK_UUID); + currentRow = new Text(); + currentValue = new Text(); + emptyCfCq = new Text(genCol(0)); + } + + @Override + protected void map(LongWritable key, LongWritable value, Context context) throws IOException, + InterruptedException { + currentRow.set(ContinuousIngest.genRow(key.get())); + + // hack since we can't pass null - don't set first val (prevRow), we want it to be null + long longVal = value.get(); + if (longVal != 1L) { + currentValue.set(ContinuousIngest.genRow(longVal)); + } + + Key outputKey = new Key(currentRow, emptyCfCq, emptyCfCq); + Value outputValue = ContinuousIngest.createValue(uuid.getBytes(), 0, + currentValue.copyBytes(), null); + + context.write(outputKey, outputValue); + } + } + + /** + * Generates a million LongWritable keys. The LongWritable value points to the previous key. + * The first key value pair has a value of 1L. This is translated to null in RandomMapper + */ + public static class RandomInputFormat extends InputFormat { + + public static class RandomSplit extends InputSplit implements Writable { + @Override + public void write(DataOutput dataOutput) {} + + @Override + public void readFields(DataInput dataInput) {} + + @Override + public long getLength() { + return 0; + } + + @Override + public String[] getLocations() { + return new String[0]; + } + } + + @Override + public List getSplits(JobContext jobContext) { + List splits = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + splits.add(new RandomSplit()); + } + return splits; + } + + @Override + public RecordReader createRecordReader(InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) { + return new RecordReader() { + int number; + int currentNumber; + LongWritable currentKey; + LongWritable prevRow; + private Random random; + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext job) { + // number = Integer.parseInt(job.getConfiguration().get(NUM_KEYS)); + number = NUM_KEYS; + currentKey = new LongWritable(1); + prevRow = new LongWritable(1); + random = new Random(); + currentNumber = 0; + } + + @Override + public boolean nextKeyValue() { + if (currentNumber < number) { + prevRow.set(currentKey.get()); + currentKey.set(genLong(0, Long.MAX_VALUE, random)); + currentNumber++; + return true; + } else { + return false; + } + } + + @Override + public LongWritable getCurrentKey() { + return currentKey; + } + + @Override + public LongWritable getCurrentValue() { + return prevRow; + } + + @Override + public float getProgress() { + return currentNumber * 1.0f / number; + } + + @Override + public void close() throws IOException { + + } + }; + } + } +} diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java index 7d9f89c6..51ae3d5e 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java @@ -9,11 +9,11 @@ import org.apache.accumulo.testing.TestEnv; import org.apache.accumulo.testing.TestProps; -class ContinuousEnv extends TestEnv { +public class ContinuousEnv extends TestEnv { private List authList; - ContinuousEnv(String[] args) { + public ContinuousEnv(String[] args) { super(args); } @@ -43,23 +43,23 @@ Authorizations getRandomAuthorizations() { return getAuthList().get(r.nextInt(getAuthList().size())); } - long getRowMin() { + public long getRowMin() { return Long.parseLong(testProps.getProperty(TestProps.CI_INGEST_ROW_MIN)); } - long getRowMax() { + public long getRowMax() { return Long.parseLong(testProps.getProperty(TestProps.CI_INGEST_ROW_MAX)); } - int getMaxColF() { + public int getMaxColF() { return Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_MAX_CF)); } - int getMaxColQ() { + public int getMaxColQ() { return Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_MAX_CQ)); } - String getAccumuloTableName() { + public String getAccumuloTableName() { return testProps.getProperty(TestProps.CI_COMMON_ACCUMULO_TABLE); } } diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java index e4a5935f..957ce3e4 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java @@ -249,8 +249,8 @@ public static Mutation genMutation(long rowLong, int cfInt, int cqInt, ColumnVis byte[] rowString = genRow(rowLong); - byte[] cfString = FastFormat.toZeroPaddedString(cfInt, 4, 16, EMPTY_BYTES); - byte[] cqString = FastFormat.toZeroPaddedString(cqInt, 4, 16, EMPTY_BYTES); + byte[] cfString = genCol(cfInt); + byte[] cqString = genCol(cqInt); if (checksum) { cksum = new CRC32(); @@ -267,6 +267,10 @@ public static Mutation genMutation(long rowLong, int cfInt, int cqInt, ColumnVis return m; } + public static byte[] genCol(int cfInt) { + return FastFormat.toZeroPaddedString(cfInt, 4, 16, EMPTY_BYTES); + } + public static long genLong(long min, long max, Random r) { return ((r.nextLong() & 0x7fffffffffffffffL) % (max - min)) + min; } @@ -275,11 +279,11 @@ static byte[] genRow(long min, long max, Random r) { return genRow(genLong(min, max, r)); } - static byte[] genRow(long rowLong) { + public static byte[] genRow(long rowLong) { return FastFormat.toZeroPaddedString(rowLong, 16, 16, EMPTY_BYTES); } - private static Value createValue(byte[] ingestInstanceId, long count, byte[] prevRow, + public static Value createValue(byte[] ingestInstanceId, long count, byte[] prevRow, Checksum cksum) { int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + 3; if (cksum != null)