Skip to content

Commit

Permalink
fixes #816 introduced better way to setup observers and deprecated old
Browse files Browse the repository at this point in the history
  • Loading branch information
keith-turner committed Mar 29, 2017
1 parent bf13386 commit d6af386
Show file tree
Hide file tree
Showing 48 changed files with 1,398 additions and 463 deletions.
45 changes: 33 additions & 12 deletions docs/applications.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ public class AppCommand {

To create an observer, follow these steps:

1. Create a class that extends [AbstractObserver] like the example below. Please use [slf4j] for
1. Create one or more classes that extend [Observer] like the example below. Please use [slf4j] for
any logging in observers as [slf4j] supports multiple logging implementations. This is
necessary as Fluo applications have a hard requirement on [logback] when running in YARN.

```java
public class InvertObserver extends AbstractObserver {
public class InvertObserver implements Observer {

@Override
public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
Expand All @@ -114,25 +114,45 @@ To create an observer, follow these steps:
// invert row and value
tx.set(value, new Column("inv", "data"), row);
}
}
```
2. Create a class that implements [ObserverProvider] like the example below. The purpose of this
class is associate a set Observers with columns that trigger the observers. The class can
create multiple observers.

```java
class AppObserverProvider implements ObserverProvider {
@Override
public ObservedColumn getObservedColumn() {
return new ObservedColumn(new Column("obs", "data"), NotificationType.STRONG);
public void provide(Registry or, Context ctx) {
//setup InvertObserver to be triggered when the column obs:data is modified
or.register(new Column("obs", "data"),
NotificationType.STRONG,
new InvertObserver());

//Observer is a Functional interface. So Obsevers can be written as lambdas.
or.register(new Column("new","data"),
NotificationType.WEAK,
(tx,row,col) -> {
Bytes combined = combineNewAndOld(tx,row);
tx.set(row, new Column("current","data"), combined);
});
}
}
```
2. Build a jar containing this class and include this jar in the `lib/` directory of your Fluo

3. Build a jar containing thses classes and include this jar in the `lib/` directory of your Fluo
application.
3. Configure your Fluo instance to use this observer by modifying the Observer section of
4. Configure your Fluo instance to use this observer provider by modifying the Observer section of
[fluo.properties].
4. Restart your Fluo instance so that your Fluo workers load the new observer.
5. Initialize Fluo. During initialization Fluo will obtain the observed columns from the
ObserverProvider and persist the columns in Zookeeper. These columns persisted in Zookeeper
are used by transactions to know when to trigger observers.
6. Start your Fluo instance so that your Fluo workers load the new observer.

## Application Configuration

Each observer can have its own configuration. This is useful for the case of using the same
observer code w/ different parameters. However for the case of sharing the same configuration
across observers, fluo provides a simple mechanism to set and access application specific
configuration. See the javadoc on [FluoClient].getAppConfiguration() for more details.
For configuring observers, fluo provides a simple mechanism to set and access application specific
configuration. See the javadoc on [FluoClient].getAppConfiguration() for more details.

## Debugging Applications

Expand Down Expand Up @@ -195,7 +215,8 @@ where D is a hex digit. Also the `\` character is escaped to make the output una
[FluoFactory]: ../modules/api/src/main/java/org/apache/fluo/api/client/FluoFactory.java
[FluoClient]: ../modules/api/src/main/java/org/apache/fluo/api/client/FluoClient.java
[FluoConfiguration]: ../modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
[AbstractObserver]: ../modules/api/src/main/java/org/apache/fluo/api/observer/AbstractObserver.java
[Observer]: ../modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java
[ObserverProvider]: ../modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java
[fluo.properties]: ../modules/distribution/src/main/config/fluo.properties
[API]: https://fluo.apache.org/apidocs/
[metrics]: metrics.md
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ public class ZookeeperPath {
public static final String CONFIG_ACCUMULO_INSTANCE_NAME = CONFIG + "/accumulo.instance.name";
public static final String CONFIG_ACCUMULO_INSTANCE_ID = CONFIG + "/accumulo.instance.id";
public static final String CONFIG_FLUO_APPLICATION_ID = CONFIG + "/fluo.application.id";
public static final String CONFIG_FLUO_OBSERVERS = CONFIG + "/fluo.observers";
@Deprecated
public static final String CONFIG_FLUO_OBSERVERS1 = CONFIG + "/fluo.observers";
public static final String CONFIG_FLUO_OBSERVERS2 = CONFIG + "/fluo.observers2";
public static final String CONFIG_SHARED = CONFIG + "/shared.config";

public static final String ORACLE = "/oracle";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

package org.apache.fluo.api.client;

import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.metrics.MetricsReporter;
import org.apache.fluo.api.observer.ObserverProvider.Context;

/**
* Client interface for Fluo. Fluo clients will have shared resources used by all objects created by
Expand Down Expand Up @@ -63,6 +65,8 @@ public interface FluoClient extends AutoCloseable {
* keeping config files consistent across a cluster. To update this configuration, use
* {@link FluoAdmin#updateSharedConfig()}. Changes made to the returned Configuration will
* not update Zookeeper.
* @see FluoConfiguration#getAppConfiguration()
* @see Context#getAppConfiguration()
*/
SimpleConfiguration getAppConfiguration();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import com.google.common.base.Preconditions;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.observer.ObserverProvider;
import org.apache.fluo.api.observer.ObserverProvider.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -86,8 +88,19 @@ public class FluoConfiguration extends SimpleConfiguration {

/** The properties below get loaded into/from Zookeeper */
// Observer
@Deprecated
public static final String OBSERVER_PREFIX = FLUO_PREFIX + ".observer.";

/**
* @since 1.1.0
*/
public static final String OBSERVER_PROVIDER = FLUO_PREFIX + ".observer.provider";

/**
* @since 1.1.0
*/
public static final String OBSERVER_PROVIDER_DEFAULT = "";

// Transaction
public static final String TRANSACTION_PREFIX = FLUO_PREFIX + ".tx";
public static final String TRANSACTION_ROLLBACK_TIME_PROP = TRANSACTION_PREFIX + ".rollback.time";
Expand Down Expand Up @@ -281,6 +294,11 @@ public int getWorkerThreads() {
return getPositiveInt(WORKER_NUM_THREADS_PROP, WORKER_NUM_THREADS_DEFAULT);
}

/**
* @deprecated since 1.1.0. Replaced by {@link #setObserverProvider(String)} and
* {@link #getObserverProvider()}
*/
@Deprecated
public List<ObserverSpecification> getObserverSpecifications() {

List<ObserverSpecification> configList = new ArrayList<>();
Expand Down Expand Up @@ -344,6 +362,37 @@ private int getNextObserverId() {
return max + 1;
}

/**
* Configure the observer provider that Fluo workers will use.
*
* @since 1.1.0
*
* @param className Name of a class that implements {@link ObserverProvider}. Must be non-null and
* non-empty.
*/
public void setObserverProvider(String className) {
setNonEmptyString(OBSERVER_PROVIDER, className);
}

/**
* Calls {@link #setObserverProvider(String)} with the class name.
*
* @since 1.1.0
*/
public void setObserverProvider(Class<? extends ObserverProvider> clazz) {
setObserverProvider(clazz.getName());
}

/**
* @return The configured {@link ObserverProvider} class name. If one was not configured, returns
* {@value #OBSERVER_PROVIDER_DEFAULT}
* @since 1.1.0
*/
public String getObserverProvider() {
return getString(OBSERVER_PROVIDER, OBSERVER_PROVIDER_DEFAULT);
}

@Deprecated
private void addObserver(ObserverSpecification oconf, int next) {
Map<String, String> params = oconf.getConfiguration().toMap();
StringBuilder paramString = new StringBuilder();
Expand All @@ -359,7 +408,11 @@ private void addObserver(ObserverSpecification oconf, int next) {
/**
* Adds an {@link ObserverSpecification} to the configuration using a unique integer prefix thats
* not currently in use.
*
* @deprecated since 1.1.0. Replaced by {@link #setObserverProvider(String)} and
* {@link #getObserverProvider()}
*/
@Deprecated
public FluoConfiguration addObserver(ObserverSpecification oconf) {
int next = getNextObserverId();
addObserver(oconf, next);
Expand All @@ -368,7 +421,11 @@ public FluoConfiguration addObserver(ObserverSpecification oconf) {

/**
* Adds multiple observers using unique integer prefixes for each.
*
* @deprecated since 1.1.0. Replaced by {@link #setObserverProvider(String)} and
* {@link #getObserverProvider()}
*/
@Deprecated
public FluoConfiguration addObservers(Iterable<ObserverSpecification> observers) {
int next = getNextObserverId();
for (ObserverSpecification oconf : observers) {
Expand All @@ -379,7 +436,11 @@ public FluoConfiguration addObservers(Iterable<ObserverSpecification> observers)

/**
* Removes any configured observers.
*
* @deprecated since 1.1.0. Replaced by {@link #setObserverProvider(String)} and
* {@link #getObserverProvider()}
*/
@Deprecated
public FluoConfiguration clearObservers() {
Iterator<String> iter1 = getKeys(OBSERVER_PREFIX.substring(0, OBSERVER_PREFIX.length() - 1));
while (iter1.hasNext()) {
Expand Down Expand Up @@ -429,7 +490,7 @@ public SimpleConfiguration getReporterConfiguration(String reporter) {
* to subset will be reflected in this configuration, but with the prefix added. This
* method is useful for setting application configuration before initialization. For
* reading application configuration after initialization, see
* {@link FluoClient#getAppConfiguration()}
* {@link FluoClient#getAppConfiguration()} and {@link Context#getAppConfiguration()}
*/
public SimpleConfiguration getAppConfiguration() {
return subset(APP_PREFIX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
* {@link FluoConfiguration#addObserver(ObserverSpecification)}.
*
* @since 1.0.0
* @deprecated since 1.1.0. The methods that used this class in {@link FluoConfiguration} were
* deprecated.
*/
@Deprecated
public class ObserverSpecification {
private final String className;
private final Map<String, String> configMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public final class Bytes implements Comparable<Bytes>, Serializable {
private final int offset;
private final int length;

private WeakReference<String> utf8String;
private transient WeakReference<String> utf8String;

public static final Bytes EMPTY = new Bytes(new byte[0]);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
* user.
*
* @since 1.0.0
* @deprecated since 1.1.0. This class was deprecated for two reasons. First the methods its
* overrides were deprecated. Second, the methods it overrides were made into Java 8
* default methods.
*/
@Deprecated
public abstract class AbstractObserver implements Observer {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,23 @@

import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.metrics.MetricsReporter;

/**
* Implemented by users to a watch a {@link Column} and be notified of changes to the Column via the
* {@link #process(TransactionBase, Bytes, Column)} method. An observer is created for each worker
* thread and reused for the lifetime of a worker thread. Consider extending
* {@link AbstractObserver} as it will let you optionally implement {@link #init(Context)} and
* {@link #close()}. The abstract class will also shield you from the addition of interface methods.
* {@link #process(TransactionBase, Bytes, Column)} method.
*
* <p>
* In Fluo version 1.1.0 this was converted to a functional interface. This change along with the
* introduction of {@link ObserverProvider} allows Observers to be written as lambdas.
*
* @since 1.0.0
*/
@FunctionalInterface
public interface Observer {

/**
Expand All @@ -44,7 +47,9 @@ enum NotificationType {
* A {@link Column} and {@link NotificationType} pair
*
* @since 1.0.0
* @deprecated since 1.1.0. The method that used this class was deprecated.
*/
@Deprecated
class ObservedColumn {
private final Column col;
private final NotificationType notificationType;
Expand All @@ -61,11 +66,22 @@ public Column getColumn() {
public NotificationType getType() {
return notificationType;
}

/**
* @since 1.1.0
*/
@Override
public String toString() {
return col + " " + notificationType;
}
}

/**
* @since 1.0.0
*
* @deprecated since 1.1.0. The method that used this interface was deprecated.
*/
@Deprecated
interface Context {
/**
* @return A configuration object with application configuration like that returned by
Expand All @@ -88,8 +104,14 @@ interface Context {
* Implemented by user to initialize Observer.
*
* @param context Observer context
*
* @deprecated since 1.1.0. Fluo will no longer call this method when observers are configured by
* {@link FluoConfiguration#setObserverProvider(String)}. Its only called when
* observers are configured the old way by
* {@link FluoConfiguration#addObserver(org.apache.fluo.api.config.ObserverSpecification)}
*/
void init(Context context) throws Exception;
@Deprecated
default void init(Context context) throws Exception {}

/**
* Implemented by users to process notifications on a {@link ObservedColumn}. If a notification
Expand All @@ -107,11 +129,25 @@ interface Context {
* then an exception will be thrown. It is safe to assume that {@link #init(Context)} will be
* called before this method. If the return value of the method is derived from what is passed to
* {@link #init(Context)}, then the derivation process should be deterministic.
*
* @deprecated since 1.1.0 Fluo will no longer call this method when observers are configured by
* {@link FluoConfiguration#setObserverProvider(String)}. Its only called when
* observers are configured the old way by
* {@link FluoConfiguration#addObserver(org.apache.fluo.api.config.ObserverSpecification)}
*/
ObservedColumn getObservedColumn();
@Deprecated
default ObservedColumn getObservedColumn() {
throw new UnsupportedOperationException();
}

/**
* Implemented by user to close resources used by Observer
*
* @deprecated since 1.1.0. Fluo will no longer call this method when observers are configured by
* {@link FluoConfiguration#setObserverProvider(String)}. Its only called when
* observers are configured the old way by
* {@link FluoConfiguration#addObserver(org.apache.fluo.api.config.ObserverSpecification)}
*/
void close();
@Deprecated
default void close() {}
}
Loading

0 comments on commit d6af386

Please sign in to comment.