Skip to content
Permalink
Browse files
Update batch example to use new Connector builder (#14)
* Also update README.md instructions telling users to set up
  accumulo-client.properties
  • Loading branch information
mikewalch committed Apr 9, 2018
1 parent ded48f4 commit 4094d0086a4f6841ecee14844d30fa47600b4cc0
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 205 deletions.
@@ -34,10 +34,11 @@ Before running any of the examples, the following steps must be performed.
git clone https://github.com/apache/accumulo-examples.git
mvn clean package

4. Specify Accumulo connection information. All examples read connection information from a
properties file. Copy the template and edit it.
4. Specify Accumulo connection information in `conf/accumulo-client.properties`. Some old examples
still read connection information from an examples.conf file so that should also be configured.

cd accumulo-examples
nano conf/accumulo-client.properties
cp examples.conf.template examples.conf
nano examples.conf

@@ -16,42 +16,40 @@ limitations under the License.
-->
# Apache Accumulo Batch Writing and Scanning Example

This tutorial uses the following Java classes:

* [SequentialBatchWriter.java] - writes mutations with sequential rows and random values
* [RandomBatchWriter.java] - used by SequentialBatchWriter to generate random values
* [RandomBatchScanner.java] - reads random rows and verifies their values

This is an example of how to use the BatchWriter and BatchScanner.

First, you must ensure that the user you are running with (i.e `myuser` below) has the
`exampleVis` authorization.

$ accumulo shell -u root -e "setauths -u myuser -s exampleVis"

Second, you must create the table, batchtest1, ahead of time.

$ accumulo shell -u root -e "createtable batchtest1"

The command below adds 10000 entries with random 50 bytes values to Accumulo.
This tutorial uses the following Java classes.

$ ./bin/runex client.SequentialBatchWriter -c ./examples.conf -t batchtest1 --start 0 --num 10000 --size 50 --batchMemory 20M --batchLatency 500 --batchThreads 20 --vis exampleVis

The command below will do 100 random queries.

$ ./bin/runex client.RandomBatchScanner -c ./examples.conf -t batchtest1 --num 100 --min 0 --max 10000 --size 50 --scanThreads 20 --auths exampleVis

07 11:33:11,103 [client.CountingVerifyingReceiver] INFO : Generating 100 random queries...
07 11:33:11,112 [client.CountingVerifyingReceiver] INFO : finished
07 11:33:11,260 [client.CountingVerifyingReceiver] INFO : 694.44 lookups/sec 0.14 secs

07 11:33:11,260 [client.CountingVerifyingReceiver] INFO : num results : 100

07 11:33:11,364 [client.CountingVerifyingReceiver] INFO : Generating 100 random queries...
07 11:33:11,370 [client.CountingVerifyingReceiver] INFO : finished
07 11:33:11,416 [client.CountingVerifyingReceiver] INFO : 2173.91 lookups/sec 0.05 secs
* [SequentialBatchWriter.java] - writes mutations with sequential rows and random values
* [RandomBatchScanner.java] - reads random rows and verifies their values

07 11:33:11,416 [client.CountingVerifyingReceiver] INFO : num results : 100
Run `SequentialBatchWriter` to add 10000 entries with random 50 bytes values to Accumulo.

$ ./bin/runex client.SequentialBatchWriter

Verify data was ingested by scanning the table using the Accumulo shell:

$ accumulo shell
root@instance> table batch
root@instance batch> scan

Run `RandomBatchScanner` to perform 1000 random queries and verify the results.

$ ./bin/runex client.RandomBatchScanner
16:04:05,950 [examples.client.RandomBatchScanner] INFO : Generating 1000 random ranges for BatchScanner to read
16:04:06,020 [examples.client.RandomBatchScanner] INFO : Reading ranges using BatchScanner
16:04:06,283 [examples.client.RandomBatchScanner] TRACE: 100 lookups
16:04:06,290 [examples.client.RandomBatchScanner] TRACE: 200 lookups
16:04:06,294 [examples.client.RandomBatchScanner] TRACE: 300 lookups
16:04:06,297 [examples.client.RandomBatchScanner] TRACE: 400 lookups
16:04:06,301 [examples.client.RandomBatchScanner] TRACE: 500 lookups
16:04:06,304 [examples.client.RandomBatchScanner] TRACE: 600 lookups
16:04:06,307 [examples.client.RandomBatchScanner] TRACE: 700 lookups
16:04:06,309 [examples.client.RandomBatchScanner] TRACE: 800 lookups
16:04:06,316 [examples.client.RandomBatchScanner] TRACE: 900 lookups
16:04:06,320 [examples.client.RandomBatchScanner] TRACE: 1000 lookups
16:04:06,330 [examples.client.RandomBatchScanner] INFO : Scan finished! 3246.75 lookups/sec, 0.31 secs, 1000 results
16:04:06,331 [examples.client.RandomBatchScanner] INFO : All expected rows were scanned

[SequentialBatchWriter.java]: ../src/main/java/org/apache/accumulo/examples/client/SequentialBatchWriter.java
[RandomBatchWriter.java]: ../src/main/java/org/apache/accumulo/examples/client/RandomBatchWriter.java
@@ -35,9 +35,9 @@ class CountingVerifyingReceiver {

long count = 0;
int expectedValueSize = 0;
HashMap<Text,Boolean> expectedRows;
HashMap<String,Boolean> expectedRows;

CountingVerifyingReceiver(HashMap<Text,Boolean> expectedRows, int expectedValueSize) {
CountingVerifyingReceiver(HashMap<String,Boolean> expectedRows, int expectedValueSize) {
this.expectedRows = expectedRows;
this.expectedValueSize = expectedValueSize;
}
@@ -56,7 +56,7 @@ public void receive(Key key, Value value) {
if (!expectedRows.containsKey(key.getRow())) {
log.error("Got unexpected key " + key);
} else {
expectedRows.put(key.getRow(), true);
expectedRows.put(key.getRow().toString(), true);
}

count++;
@@ -16,179 +16,102 @@
*/
package org.apache.accumulo.examples.client;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.examples.client.RandomBatchWriter.abs;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.examples.cli.BatchScannerOpts;
import org.apache.accumulo.examples.cli.ClientOnRequiredTable;
import org.apache.hadoop.io.Text;
import org.apache.accumulo.core.security.Authorizations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.beust.jcommander.Parameter;

/**
* Simple example for reading random batches of data from Accumulo.
*/
public class RandomBatchScanner {
private static final Logger log = LoggerFactory.getLogger(RandomBatchScanner.class);

/**
* Generate a number of ranges, each covering a single random row.
*
* @param num
* the number of ranges to generate
* @param min
* the minimum row that will be generated
* @param max
* the maximum row that will be generated
* @param r
* a random number generator
* @param ranges
* a set in which to store the generated ranges
* @param expectedRows
* a map in which to store the rows covered by the ranges (initially mapped to false)
*/
static void generateRandomQueries(int num, long min, long max, Random r, HashSet<Range> ranges, HashMap<Text,Boolean> expectedRows) {
log.info(String.format("Generating %,d random queries...", num));
while (ranges.size() < num) {
long rowid = (abs(r.nextLong()) % (max - min)) + min;

Text row1 = new Text(String.format("row_%010d", rowid));
private static final Logger log = LoggerFactory.getLogger(RandomBatchScanner.class);

Range range = new Range(new Text(row1));
ranges.add(range);
expectedRows.put(row1, false);
public static void main(String[] args) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
Connector connector = Connector.builder().usingProperties("conf/accumulo-client.properties").build();
try {
connector.tableOperations().create("batch");
} catch (TableExistsException e) {
// ignore
}

log.info("finished");
}

/**
* Prints a count of the number of rows mapped to false.
*
* @return boolean indicating "were all the rows found?"
*/
private static boolean checkAllRowsFound(HashMap<Text,Boolean> expectedRows) {
int count = 0;
boolean allFound = true;
for (Entry<Text,Boolean> entry : expectedRows.entrySet())
if (!entry.getValue())
count++;

if (count > 0) {
log.warn("Did not find " + count + " rows");
allFound = false;
int totalLookups = 1000;
int totalEntries = 10000;
Random r = new Random();
HashSet<Range> ranges = new HashSet<>();
HashMap<String,Boolean> expectedRows = new HashMap<>();
log.info("Generating {} random ranges for BatchScanner to read", totalLookups);
while (ranges.size() < totalLookups) {
long rowId = abs(r.nextLong()) % totalEntries;
String row = String.format("row_%010d", rowId);
ranges.add(new Range(row));
expectedRows.put(row, false);
}
return allFound;
}

/**
* Generates a number of random queries, verifies that the key/value pairs returned were in the queried ranges and that the values were generated by
* {@link RandomBatchWriter#createValue(long, int)}. Prints information about the results.
*
* @param num
* the number of queries to generate
* @param min
* the min row to query
* @param max
* the max row to query
* @param evs
* the expected size of the values
* @param r
* a random number generator
* @param tsbr
* a batch scanner
* @return boolean indicating "did the queries go fine?"
*/
static boolean doRandomQueries(int num, long min, long max, int evs, Random r, BatchScanner tsbr) {

HashSet<Range> ranges = new HashSet<>(num);
HashMap<Text,Boolean> expectedRows = new java.util.HashMap<>();

generateRandomQueries(num, min, max, r, ranges, expectedRows);

tsbr.setRanges(ranges);

CountingVerifyingReceiver receiver = new CountingVerifyingReceiver(expectedRows, evs);

long t1 = System.currentTimeMillis();

for (Entry<Key,Value> entry : tsbr) {
receiver.receive(entry.getKey(), entry.getValue());
long lookups = 0;

log.info("Reading ranges using BatchScanner");
try (BatchScanner scan = connector.createBatchScanner("batch", Authorizations.EMPTY, 20)) {
scan.setRanges(ranges);
for (Entry<Key, Value> entry : scan) {
Key key = entry.getKey();
Value value = entry.getValue();
String row = key.getRow().toString();
long rowId = Integer.parseInt(row.split("_")[1]);

Value expectedValue = SequentialBatchWriter.createValue(rowId);

if (!Arrays.equals(expectedValue.get(), value.get())) {
log.error("Unexpected value for key: {} expected: {} actual: {}", key,
new String(expectedValue.get(), UTF_8), new String(value.get(), UTF_8));
}

if (!expectedRows.containsKey(key.getRow().toString())) {
log.error("Encountered unexpected key: {} ", key);
} else {
expectedRows.put(key.getRow().toString(), true);
}

lookups++;
if (lookups % 100 == 0) {
log.trace("{} lookups", lookups);
}
}
}

long t2 = System.currentTimeMillis();
log.info(String.format("Scan finished! %6.2f lookups/sec, %.2f secs, %d results",
lookups / ((t2 - t1) / 1000.0), ((t2 - t1) / 1000.0), lookups));

log.info(String.format("%6.2f lookups/sec %6.2f secs%n", num / ((t2 - t1) / 1000.0), ((t2 - t1) / 1000.0)));
log.info(String.format("num results : %,d%n", receiver.count));

return checkAllRowsFound(expectedRows);
}

public static class Opts extends ClientOnRequiredTable {
@Parameter(names = "--min", description = "miniumum row that will be generated")
long min = 0;
@Parameter(names = "--max", description = "maximum ow that will be generated")
long max = 0;
@Parameter(names = "--num", required = true, description = "number of ranges to generate")
int num = 0;
@Parameter(names = "--size", required = true, description = "size of the value to write")
int size = 0;
@Parameter(names = "--seed", description = "seed for pseudo-random number generator")
Long seed = null;
}

/**
* Scans over a specified number of entries to Accumulo using a {@link BatchScanner}. Completes scans twice to compare times for a fresh query with those for
* a repeated query which has cached metadata and connections already established.
*/
public static void main(String[] args) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
Opts opts = new Opts();
BatchScannerOpts bsOpts = new BatchScannerOpts();
opts.parseArgs(RandomBatchScanner.class.getName(), args, bsOpts);

Connector connector = opts.getConnector();
BatchScanner batchReader = connector.createBatchScanner(opts.getTableName(), opts.auths, bsOpts.scanThreads);
batchReader.setTimeout(bsOpts.scanTimeout, TimeUnit.MILLISECONDS);

Random r;
if (opts.seed == null)
r = new Random();
else
r = new Random(opts.seed);

// do one cold
boolean status = doRandomQueries(opts.num, opts.min, opts.max, opts.size, r, batchReader);

System.gc();
System.gc();
System.gc();

if (opts.seed == null)
r = new Random();
else
r = new Random(opts.seed);

// do one hot (connections already established, metadata table cached)
status = status && doRandomQueries(opts.num, opts.min, opts.max, opts.size, r, batchReader);

batchReader.close();
if (!status) {
int count = 0;
for (Entry<String,Boolean> entry : expectedRows.entrySet()) {
if (!entry.getValue()) {
count++;
}
}
if (count > 0) {
log.warn("Did not find {} rows", count);
System.exit(1);
}
log.info("All expected rows were scanned");
}
}

0 comments on commit 4094d00

Please sign in to comment.