Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated ExportQ and CFM to use new ObserverProvider API #130

Merged
merged 1 commit into from May 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Expand Up @@ -31,7 +31,8 @@ like Spark and Kryo.

Recipes are documented below and in the [Recipes API docs][recipes-api].

* [Collision Free Map][cfm] - A recipe for making many to many updates.
* [Combine Queue][combine-q] - A recipe for concurrently updating many keys while avoiding
collisions.
* [Export Queue][export-q] - A recipe for exporting data from Fluo to external systems.
* [Row Hash Prefix][row-hasher] - A recipe for spreading data evenly in a row prefix.
* [RecordingTransaction][recording-tx] - A wrapper for a Fluo transaction that records all transaction
Expand Down Expand Up @@ -102,7 +103,7 @@ Below is a sample Maven POM containing all possible Fluo Recipes dependencies:
[fluo]: https://fluo.apache.org/
[fluo-api]: https://fluo.apache.org/apidocs/fluo/
[recipes-api]: https://fluo.apache.org/apidocs/fluo-recipes/
[cfm]: docs/cfm.md
[combine-q]: docs/combine-queue.md
[export-q]: docs/export-queue.md
[recording-tx]: docs/recording-tx.md
[serialization]: docs/serialization.md
Expand Down
84 changes: 45 additions & 39 deletions docs/accumulo-export-queue.md
Expand Up @@ -19,8 +19,8 @@ limitations under the License.
## Background

The [Export Queue Recipe][1] provides a generic foundation for building export mechanism to any
external data store. The [AccumuloExporter] provides an implementation of this recipe for
Accumulo. The [AccumuloExporter] is located the `fluo-recipes-accumulo` module and provides the
external data store. The [AccumuloExporter] provides an [Exporter] for writing to
Accumulo. [AccumuloExporter] is located in the `fluo-recipes-accumulo` module and provides the
following functionality:

* Safely batches writes to Accumulo made by multiple transactions exporting data.
Expand All @@ -32,80 +32,86 @@ following functionality:

Exporting to Accumulo is easy. Follow the steps below:

1. Implement a class that extends [AccumuloExporter]. This class will process exported objects that
are placed on your export queue. For example, the `SimpleExporter` class below processes String
key/value exports and generates mutations for Accumulo.
1. First, implement [AccumuloTranslator]. Your implementation translates exported
objects to Accumulo Mutations. For example, the `SimpleTranslator` class below translates String
key/values and into mutations for Accumulo. This step is optional, a lambda could
be used in step 3 instead of creating a class.

```java
public class SimpleExporter extends AccumuloExporter<String, String> {
public class SimpleTranslator implements AccumuloTranslator<String,String> {

@Override
protected void translate(SequencedExport<String, String> export, Consumer<Mutation> consumer) {
public void translate(SequencedExport<String, String> export, Consumer<Mutation> consumer) {
Mutation m = new Mutation(export.getKey());
m.put("cf", "cq", export.getSequence(), export.getValue());
consumer.accept(m);
}
}

```

2. With a `SimpleExporter` created, configure an `ExportQueue` to use `SimpleExporter` and
give it information on how to connect to Accumulo.
2. Configure an `ExportQueue` and the export table prior to initializing Fluo.

```java

FluoConfiguration fluoConfig = ...;

// Set accumulo configuration
String instance = // Name of accumulo instance exporting to
String zookeepers = // Zookeepers used by Accumulo instance exporting to
String user = // Accumulo username, user that can write to exportTable
String password = // Accumulo user password
String exportTable = // Name of table to export to

// Set properties for table to export to in Fluo app configuration.
AccumuloExporter.configure(EXPORT_QID).instance(instance, zookeepers)
.credentials(user, password).table(exportTable).save(fluoConfig);

// Create config for export table.
AccumuloExporter.Configuration exportTableCfg =
new AccumuloExporter.Configuration(instance, zookeepers, user, password, exportTable);

// Create config for export queue.
ExportQueue.Options eqOpts = new ExportQueue.Options(EXPORT_QUEUE_ID, SimpleExporter.class,
String.class, String.class, numMapBuckets).setExporterConfiguration(exportTableCfg);

// Configure export queue. This will modify fluoConfig.
ExportQueue.configure(fluoConfig, eqOpts);
// Set properties for export queue in Fluo app configuration
ExportQueue.configure(EXPORT_QID).keyType(String.class).valueType(String.class)
.buckets(119).save(fluoConfig);

// Initialize Fluo using fluoConfig
```

3. Export queues can be retrieved in Fluo observers and objects can be added to them:
3. In the applications `ObserverProvider`, register an observer that will process exports and write
them to Accumulo using [AccumuloExporter]. Also, register observers that add to the export
queue.

```java
public class MyObserver extends AbstractObserver {

ExportQueue<String, String> exportQ;

@Override
public void init(Context context) throws Exception {
exportQ = ExportQueue.getInstance(EXPORT_QUEUE_ID, context.getAppConfiguration());
}
public class MyObserverProvider implements ObserverProvider {

@Override
public void process(TransactionBase tx, Bytes row, Column col) {

// Read some data and do some work

// Add results to export queue
String key = // key that identifies export
String value = // object to export
export.add(tx, key, value);
public void provide(Registry obsRegistry, Context ctx) {
SimpleConfiguration appCfg = ctx.getAppConfiguration();

ExportQueue<String, String> expQ = ExportQueue.getInstance(EXPORT_QID, appCfg);

// Register observer that will processes entries on export queue and write them to the Accumulo
// table configured earlier. SimpleTranslator from step 1 is passed here, could have used a
// lambda instead.
expQ.registerObserver(obsRegistry,
new AccumuloExporter<>(EXPORT_QID, appCfg, new SimpleTranslator()));

// An example observer created using a lambda that adds to the export queue.
obsRegistry.register(OBS_COL, WEAK, (tx,row,col) -> {
// Read some data and do some work

// Add results to export queue
String key = // key that identifies export
String value = // object to export
expQ.add(tx, key, value);
});
}
}
```

## Other use cases

[AccumuloReplicator] is a specialized [AccumuloExporter] that replicates a Fluo table to Accumulo.
The `getTranslator()` method in [AccumuloReplicator] creates a specialized [AccumuloTranslator] for replicating a Fluo table to Accumulo.

[1]: export-queue.md
[AccumuloExporter]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
[Exporter]: ../modules/core/src/main/java/org/apache/fluo/recipes/core/export/function/Exporter.java
[AccumuloExporter]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/function/AccumuloExporter.java
[AccumuloTranslator]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/function/AccumuloTranslator.java
[AccumuloReplicator]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java