diff --git a/README.md b/README.md index 9c35728..1a0fa95 100644 --- a/README.md +++ b/README.md @@ -31,14 +31,14 @@ JAVA Server Side SDK is based on Java SE 8 and is available on Maven Central. Yo co.featbit featbit-java-sdk - 1.2.0 + 1.3.0 ``` - Install the SDK using Gradle ``` -implementation 'co.featbit:featbit-java-sdk:1.2.0' +implementation 'co.featbit:featbit-java-sdk:1.3.0' ``` ### Prerequisite @@ -286,6 +286,23 @@ String value = states.getString("flag key", user, "Not Found"); > If evaluation called before Java SDK client initialized, you set the wrong flag key/user for the evaluation or the related feature flag is not found SDK will return the default value you set. `EvalDetail` will explain the details of the latest evaluation including error raison. +### Flag Tracking +You can register registers a listener to be notified of feature flag changes in general. + +Note that a flag value change listener is bound to a specific user and flag key. + +The flag value change listener will be notified whenever the SDK receives any change to any feature flag's configuration, or to a user segment that is referenced by a feature flag. +To register a flag value change listener, use 'FlagTracker#addFlagValueChangeListener' method. + +The flag value change listener just call the `onFlagValueChange` method **_only if_** the flag value changes. + +```java +client.getFlagTracker().addFlagValueChangeListener(flagKey, user, event -> { + // do something +}); +``` + + ### Offline Mode In some situations, you might want to stop making remote calls to FeatBit. Here is how: diff --git a/pom.xml b/pom.xml index f88b2d4..3edf0b8 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ co.featbit featbit-java-sdk - 1.2.0 + 1.3.0 featbit/featbit-java-sdk diff --git a/src/main/java/co/featbit/server/DataModel.java b/src/main/java/co/featbit/server/DataModel.java index 755a585..bcc7088 100644 --- a/src/main/java/co/featbit/server/DataModel.java +++ b/src/main/java/co/featbit/server/DataModel.java @@ -6,12 +6,15 @@ import com.google.common.collect.ImmutableMap; import com.google.gson.annotations.Expose; import com.google.gson.annotations.JsonAdapter; +import com.google.gson.reflect.TypeToken; +import org.apache.commons.lang3.StringUtils; import org.jetbrains.annotations.NotNull; import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public abstract class DataModel { @@ -346,6 +349,18 @@ public String getVariationType() { return variationType; } + Boolean containsSegment(String segmentId) { + return getRules().stream() + .flatMap(rule -> rule.getConditions().stream()) + .filter(cond -> StringUtils.isBlank(cond.getOp())) + .flatMap(cond -> { + List segments = JsonHelper + .deserialize(cond.getValue(), new TypeToken>() { + }.getType()); + return segments.stream(); + }).collect(Collectors.toList()).contains(segmentId); + } + @Override public void afterDeserialization() { this.timestamp = updatedAt.getTime(); diff --git a/src/main/java/co/featbit/server/EventBroadcaster.java b/src/main/java/co/featbit/server/EventBroadcaster.java new file mode 100644 index 0000000..df1672f --- /dev/null +++ b/src/main/java/co/featbit/server/EventBroadcaster.java @@ -0,0 +1,11 @@ +package co.featbit.server; + +public interface EventBroadcaster { + void addListener(Listener listener); + + void removeListener(Listener listener); + + boolean hasListeners(); + + void broadcast(Event event); +} diff --git a/src/main/java/co/featbit/server/EventBroadcasterImpl.java b/src/main/java/co/featbit/server/EventBroadcasterImpl.java new file mode 100644 index 0000000..26b555a --- /dev/null +++ b/src/main/java/co/featbit/server/EventBroadcasterImpl.java @@ -0,0 +1,67 @@ +package co.featbit.server; + +import org.slf4j.Logger; + +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.function.BiConsumer; + +class EventBroadcasterImpl implements EventBroadcaster { + private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); + private final BiConsumer broadcaster; + private final ExecutorService executor; + private final Logger logger; + + EventBroadcasterImpl( + BiConsumer broadcaster, + ExecutorService executor, + Logger logger + ) { + this.broadcaster = broadcaster; + this.executor = executor; + this.logger = logger; + } + + static EventBroadcasterImpl forFlagChangeEvents( + ExecutorService executor, Logger logger) { + return new EventBroadcasterImpl<>(FlagChange.FlagChangeListener::onFlagChange, executor, logger); + } + + static EventBroadcasterImpl forDataUpdateStates( + ExecutorService executor, Logger logger) { + return new EventBroadcasterImpl<>(Status.StateListener::onStateChange, executor, logger); + } + + @Override + public void addListener(Listener listener) { + listeners.add(listener); + } + + @Override + public void removeListener(Listener listener) { + listeners.remove(listener); + } + + @Override + public boolean hasListeners() { + return !listeners.isEmpty(); + } + + @Override + public void broadcast(Event event) { + if (executor == null) { + return; + } + for (Listener listener : listeners) { + executor.submit(() -> { + try { + broadcaster.accept(listener, event); + } catch (Exception e) { + logger.error("Unexpected exception in event listener", e); + } + }); + } + } + + +} diff --git a/src/main/java/co/featbit/server/FBClientImp.java b/src/main/java/co/featbit/server/FBClientImp.java index 3acf181..f77bde3 100644 --- a/src/main/java/co/featbit/server/FBClientImp.java +++ b/src/main/java/co/featbit/server/FBClientImp.java @@ -13,9 +13,7 @@ import java.io.IOException; import java.time.Duration; import java.util.Map; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.function.Consumer; import static co.featbit.server.Evaluator.*; @@ -36,8 +34,12 @@ public final class FBClientImp implements FBClient { private final Status.DataUpdater dataUpdater; private final InsightProcessor insightProcessor; + private final ExecutorService sharedExecutorService; + private final Consumer eventHandler; + private final FlagTracker flagTracker; + /** * Creates a new client to connect to feature flag center with a specified configuration. *

@@ -109,13 +111,18 @@ public FBClientImp(String envSecret, FBConfig config) { return item == null ? null : (DataModel.Segment) item; }; this.evaluator = new EvaluatorImp(flagGetter, segmentGetter); + + this.sharedExecutorService = new ScheduledThreadPoolExecutor(1, Utils.createThreadFactory("featbit-shared-worker-%d", true)); + EventBroadcasterImpl dataUpdateStateNotifier = EventBroadcasterImpl.forDataUpdateStates(this.sharedExecutorService, logger); + EventBroadcasterImpl flagChangeEventNotifier = EventBroadcasterImpl.forFlagChangeEvents(this.sharedExecutorService, logger); + this.flagTracker = new FlagTrackerImpl(flagChangeEventNotifier, (key, user) -> variation(key, user, null)); //data updator - Status.DataUpdaterImpl dataUpdatorImpl = new Status.DataUpdaterImpl(this.storage); + Status.DataUpdaterImpl dataUpdatorImpl = new Status.DataUpdaterImpl(this.storage, dataUpdateStateNotifier, flagChangeEventNotifier); this.dataUpdater = dataUpdatorImpl; //data processor this.dataSynchronizer = config.getDataSynchronizerFactory().createDataSynchronizer(context, dataUpdatorImpl); //data update status provider - this.dataUpdateStatusProvider = new Status.DataUpdateStatusProviderImpl(dataUpdatorImpl); + this.dataUpdateStatusProvider = new Status.DataUpdateStatusProviderImpl(dataUpdatorImpl, dataUpdateStateNotifier); // data sync Duration startWait = config.getStartWaitTime(); @@ -287,6 +294,12 @@ public void close() throws IOException { this.storage.close(); this.dataSynchronizer.close(); this.insightProcessor.close(); + this.sharedExecutorService.shutdownNow(); + } + + @Override + public FlagTracker getFlagTracker() { + return this.flagTracker; } @Override diff --git a/src/main/java/co/featbit/server/FlagChange.java b/src/main/java/co/featbit/server/FlagChange.java new file mode 100644 index 0000000..e9afcf5 --- /dev/null +++ b/src/main/java/co/featbit/server/FlagChange.java @@ -0,0 +1,36 @@ +package co.featbit.server; + +import java.util.Objects; + +public abstract class FlagChange { + + public static class FlagChangeEvent { + private final String key; + + public FlagChangeEvent(String key) { + this.key = key; + } + + public String getKey() { + return key; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FlagChangeEvent that = (FlagChangeEvent) o; + return Objects.equals(key, that.key); + } + + @Override + public int hashCode() { + return Objects.hash(key); + } + } + + public interface FlagChangeListener { + void onFlagChange(FlagChangeEvent event); + } + +} diff --git a/src/main/java/co/featbit/server/FlagTrackerImpl.java b/src/main/java/co/featbit/server/FlagTrackerImpl.java new file mode 100644 index 0000000..c943cd5 --- /dev/null +++ b/src/main/java/co/featbit/server/FlagTrackerImpl.java @@ -0,0 +1,65 @@ +package co.featbit.server; + +import co.featbit.commons.model.FBUser; +import co.featbit.server.exterior.FlagTracker; +import co.featbit.server.exterior.FlagValueChangeEvent; +import co.featbit.server.exterior.FlagValueChangeListener; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; + +class FlagTrackerImpl implements FlagTracker { + + private final EventBroadcaster flagChangeEventNotifier; + + private final BiFunction evaluateFn; + + FlagTrackerImpl(EventBroadcaster flagChangeEventNotifier, + BiFunction evaluateFn) { + this.flagChangeEventNotifier = flagChangeEventNotifier; + this.evaluateFn = evaluateFn; + } + + @Override + public FlagChange.FlagChangeListener addFlagValueChangeListener(String flagKey, FBUser user, FlagValueChangeListener listener) { + FlagChange.FlagChangeListener adapter = new FlagValueChangeAdapter(flagKey, user, listener); + addFlagChangeListener(adapter); + return adapter; + } + + @Override + public void removeFlagChangeListener(FlagChange.FlagChangeListener listener) { + flagChangeEventNotifier.removeListener(listener); + } + + @Override + public void addFlagChangeListener(FlagChange.FlagChangeListener listener) { + flagChangeEventNotifier.addListener(listener); + } + + private final class FlagValueChangeAdapter implements FlagChange.FlagChangeListener { + private final String flagKey; + private final FBUser user; + private final FlagValueChangeListener listener; + private final AtomicReference value; + + FlagValueChangeAdapter(String flagKey, FBUser user, FlagValueChangeListener listener) { + this.flagKey = flagKey; + this.user = user; + this.listener = listener; + this.value = new AtomicReference<>(evaluateFn.apply(flagKey, user)); + } + + @Override + public void onFlagChange(FlagChange.FlagChangeEvent event) { + if (event.getKey().equals(flagKey)) { + Object newValue = evaluateFn.apply(flagKey, user); + Object oldValue = value.getAndSet(newValue); + if (newValue != null && !newValue.equals(oldValue)) { + listener.onFlagValueChange(new FlagValueChangeEvent(flagKey, oldValue, newValue)); + } + } + } + + } +} diff --git a/src/main/java/co/featbit/server/Loggers.java b/src/main/java/co/featbit/server/Loggers.java index 55766d2..9c69ec8 100644 --- a/src/main/java/co/featbit/server/Loggers.java +++ b/src/main/java/co/featbit/server/Loggers.java @@ -17,6 +17,8 @@ abstract class Loggers { static final Logger EVENTS = LoggerFactory.getLogger(EVENTS_LOGGER_NAME); private static final String UTILS_LOGGER_NAME = BASE_LOGGER_NAME + ".Utils"; static final Logger UTILS = LoggerFactory.getLogger(UTILS_LOGGER_NAME); + private static final String TEST_LOGGER_NAME = BASE_LOGGER_NAME + ".Test"; + static final Logger TEST = LoggerFactory.getLogger(TEST_LOGGER_NAME); Loggers() { super(); diff --git a/src/main/java/co/featbit/server/Status.java b/src/main/java/co/featbit/server/Status.java index e0ea6a6..ab4e02c 100644 --- a/src/main/java/co/featbit/server/Status.java +++ b/src/main/java/co/featbit/server/Status.java @@ -4,6 +4,7 @@ import co.featbit.server.exterior.DataStorageTypes; import co.featbit.server.exterior.DataSynchronizer; import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableMap; import java.io.Serializable; import java.time.Duration; @@ -23,6 +24,8 @@ public abstract class Status { public static final String UNKNOWN_CLOSE_CODE = "Unknown close code"; public static final String WEBSOCKET_ERROR = "WebSocket error"; + static final String BROADCASTING_IGNORED_FLAG_KEYS = "broacasting_ingnored_flag_keys"; + /** * possible values for {@link DataSynchronizer} */ @@ -183,6 +186,17 @@ public int hashCode() { * and maintain the processor status in your own code, but note that the implementation of this interface is not public */ public interface DataUpdater { + + /** + * Retrieves all items from the specified collection. + *

+ * If the store contains placeholders for deleted items, it should filter them in the results. + * + * @param category specifies which collection to use + * @return a map of ids and their versioned items + */ + Map getAll(DataStorageTypes.Category category); + /** * Overwrites the storage with a set of items for each collection, if the new version > the old one *

@@ -245,6 +259,14 @@ public interface DataUpdater { */ boolean storageInitialized(); + /** + * get flag change event notifier {@link EventBroadcaster} + * + * @return EventBroadcaster + */ + EventBroadcaster getFlagChangeEventNotifier(); + + } /** @@ -255,20 +277,31 @@ public interface DataUpdater { * This component is thread safe and is basic component usd in bootstrapping. */ static final class DataUpdaterImpl implements DataUpdater { - private final DataStorage storage; private volatile State currentState; private final Object lockObject = new Object(); - public DataUpdaterImpl(DataStorage storage) { + private final EventBroadcasterImpl dataUpdateStateNotifier; + private final EventBroadcasterImpl flagChangeEventNotifier; + + public DataUpdaterImpl(DataStorage storage, + EventBroadcasterImpl dataUpdateStateNotifier, + EventBroadcasterImpl flagChangeEventNotifier) { this.storage = storage; + this.dataUpdateStateNotifier = dataUpdateStateNotifier; + this.flagChangeEventNotifier = flagChangeEventNotifier; this.currentState = State.initializingState(); } // just use for test - DataUpdaterImpl(DataStorage storage, State state) { + DataUpdaterImpl(DataStorage storage, + State state, + EventBroadcasterImpl dataUpdateStateNotifier, + EventBroadcasterImpl flagChangeEventNotifier) { this.storage = storage; this.currentState = state; + this.dataUpdateStateNotifier = dataUpdateStateNotifier; + this.flagChangeEventNotifier = flagChangeEventNotifier; } private void handleErrorFromStorage(Exception ex, ErrorTrack errorTrack) { @@ -276,6 +309,11 @@ private void handleErrorFromStorage(Exception ex, ErrorTrack errorTrack) { updateStatus(State.interruptedState(errorTrack)); } + @Override + public Map getAll(DataStorageTypes.Category category) { + return storage.getAll(category); + } + @Override public boolean init(Map> allData, Long version) { try { @@ -290,12 +328,14 @@ public boolean init(Map getFlagChangeEventNotifier() { + return this.flagChangeEventNotifier; + } + // blocking util you get the desired state, time out reaches or thread is interrupted boolean waitFor(StateType state, Duration timeout) throws InterruptedException { Duration timeout1 = timeout == null ? Duration.ZERO : timeout; @@ -429,14 +482,21 @@ public interface DataUpdateStatusProvider { */ boolean waitForOKState(Duration timeout) throws InterruptedException; + void addStateListener(StateListener listener); + + void removeStateListener(StateListener listener); + } static final class DataUpdateStatusProviderImpl implements DataUpdateStatusProvider { private final DataUpdaterImpl dataUpdater; - public DataUpdateStatusProviderImpl(DataUpdaterImpl dataUpdater) { + private final EventBroadcasterImpl dataUpdateStateNotifier; + + public DataUpdateStatusProviderImpl(DataUpdaterImpl dataUpdater, EventBroadcasterImpl dataUpdateStateNotifier) { this.dataUpdater = dataUpdater; + this.dataUpdateStateNotifier = dataUpdateStateNotifier; } @Override @@ -453,7 +513,28 @@ public boolean waitFor(StateType state, Duration timeout) throws InterruptedExce public boolean waitForOKState(Duration timeout) throws InterruptedException { return waitFor(StateType.OK, timeout); } + + @Override + public void addStateListener(StateListener listener) { + dataUpdateStateNotifier.addListener(listener); + } + + @Override + public void removeStateListener(StateListener listener) { + dataUpdateStateNotifier.removeListener(listener); + } } + /** + * Interface for receiving data updating state change notifications. + */ + public interface StateListener { + /** + * Called when the data updating state has changed. + * + * @param newState the new state + */ + void onStateChange(State newState); + } } diff --git a/src/main/java/co/featbit/server/Streaming.java b/src/main/java/co/featbit/server/Streaming.java index bc20c6c..95cfab9 100644 --- a/src/main/java/co/featbit/server/Streaming.java +++ b/src/main/java/co/featbit/server/Streaming.java @@ -2,19 +2,10 @@ import co.featbit.commons.json.JsonHelper; import co.featbit.commons.json.JsonParseException; -import co.featbit.server.exterior.BasicConfig; -import co.featbit.server.exterior.Context; -import co.featbit.server.exterior.DataStorageTypes; -import co.featbit.server.exterior.DataSynchronizer; -import co.featbit.server.exterior.HttpConfig; +import co.featbit.server.exterior.*; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import okhttp3.Headers; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; -import okhttp3.WebSocket; -import okhttp3.WebSocketListener; +import okhttp3.*; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.jetbrains.annotations.NotNull; @@ -26,9 +17,7 @@ import java.net.SocketException; import java.net.SocketTimeoutException; import java.time.Duration; -import java.util.Comparator; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -39,9 +28,7 @@ import static co.featbit.server.Status.REQUEST_INVALID_ERROR; import static co.featbit.server.Status.UNKNOWN_CLOSE_CODE; -import static co.featbit.server.Streaming.StreamingOps.isReconnOnClose; -import static co.featbit.server.Streaming.StreamingOps.isReconnOnFailure; -import static co.featbit.server.Streaming.StreamingOps.processData; +import static co.featbit.server.Streaming.StreamingOps.*; final class Streaming implements DataSynchronizer { @@ -176,6 +163,34 @@ private OkHttpClient buildWebOkHttpClient() { } static final class StreamingOps { + private static void broadcast(Status.DataUpdater updater, Map> updatedData) { + Set flagKeySet = new HashSet<>(); + if (updater.getFlagChangeEventNotifier().hasListeners()) { + for (Map.Entry> entry : updatedData.entrySet()) { + if (DataStorageTypes.FEATURES.equals(entry.getKey())) { + for (String id : entry.getValue().keySet()) { + if (!flagKeySet.contains(id)) { + updater.getFlagChangeEventNotifier().broadcast(new FlagChange.FlagChangeEvent(id)); + flagKeySet.add(id); + } + } + } else if (DataStorageTypes.SEGMENTS.equals(entry.getKey())) { + List flags = updater.getAll(DataStorageTypes.FEATURES).values().stream() + .map(item -> (DataModel.FeatureFlag) item) + .collect(Collectors.toList()); + for (String sig : entry.getValue().keySet()) { + flags.stream() + .filter(flag -> flag.containsSegment(sig) && !flagKeySet.contains(flag.getId())) + .forEach(flag -> { + updater.getFlagChangeEventNotifier().broadcast(new FlagChange.FlagChangeEvent(flag.getId())); + flagKeySet.add(flag.getId()); + }); + } + } + } + } + } + static Boolean processData(Status.DataUpdater updater, DataModel.Data data, AtomicBoolean initialized, CompletableFuture initFuture) { boolean opOK = false; String eventType = data.getEventType(); @@ -198,6 +213,7 @@ static Boolean processData(Status.DataUpdater updater, DataModel.Data data, Atom } logger.debug("processing data is well done"); updater.updateStatus(Status.State.OKState()); + broadcast(updater, updatedData); } return opOK; } diff --git a/src/main/java/co/featbit/server/exterior/FBClient.java b/src/main/java/co/featbit/server/exterior/FBClient.java index f5a4dcd..99b0955 100644 --- a/src/main/java/co/featbit/server/exterior/FBClient.java +++ b/src/main/java/co/featbit/server/exterior/FBClient.java @@ -101,6 +101,13 @@ public interface FBClient extends Closeable { */ boolean isFlagKnown(String featureKey); + /** + * Returns an interface for registering listeners that will be notified of changes in feature flag configurations. + * + * @return a {@link FlagTracker} + */ + FlagTracker getFlagTracker(); + /** * Returns an interface for tracking the status of the update processor. *

diff --git a/src/main/java/co/featbit/server/exterior/FlagTracker.java b/src/main/java/co/featbit/server/exterior/FlagTracker.java new file mode 100644 index 0000000..679eeab --- /dev/null +++ b/src/main/java/co/featbit/server/exterior/FlagTracker.java @@ -0,0 +1,57 @@ +package co.featbit.server.exterior; + +import co.featbit.commons.model.FBUser; +import co.featbit.server.FlagChange; + +/** + * A registry to register the flag change listeners in order to track changes in feature flag configurations. + *

+ * The registered listeners only work if the SDK is actually connecting to FeatBit feature flag center. + * If the SDK is only in offline mode then it cannot know when there is a change, because flags are read on an as-needed basis. + *

+ * Application code never needs to initialize or extend this class directly. + */ +public interface FlagTracker { + /** + * Registers a listener to be notified of a change in a specific feature flag's value for a specific FeatBit user. + *

+ * When you call this method, it evaluates immediately the feature flag. It then registers a + * {@link co.featbit.server.FlagChange.FlagChangeListener} to start listening for feature flag configuration + * changes, and whenever the specified feature flag or user segment changes, it evaluates the flag for the {@link FBUser}. + * It then calls your {@link FlagValueChangeListener} if and only if the resulting value has changed. + *

+ * The returned {@link co.featbit.server.FlagChange.FlagChangeListener} represents the subscription that was created by this method + * call; to unsubscribe, pass that object (not your {@code FlagValueChangeListener}) to + * {@link #removeFlagChangeListener(co.featbit.server.FlagChange.FlagChangeListener)}. + * + * @param flagKey The key of the feature flag to track + * @param user The {@link FBUser} to evaluate the flag value + * @param listener The {@link FlagValueChangeListener} to be notified when the flag value changes + * @return The {@link co.featbit.server.FlagChange.FlagChangeListener} that was registered + */ + FlagChange.FlagChangeListener addFlagValueChangeListener(String flagKey, FBUser user, FlagValueChangeListener listener); + + /** + * Unregisters a listener so that it will no longer be notified of feature flag changes. + * + * @param listener the {@link FlagChange.FlagChangeListener} to unregister + */ + void removeFlagChangeListener(FlagChange.FlagChangeListener listener); + + /** + * Registers a listener to be notified of feature flag changes in general. + *

+ * The listener will be notified whenever the SDK receives any change to any feature flag's configuration, + * or to a user segment that is referenced by a feature flag. + *

+ * Note that this does not necessarily mean the flag's value has changed for any particular user, + * only that some part of the flag configuration was changed so that it may return a + * different value than it previously returned for some user. If you want to track flag value changes, + * use {@link #addFlagValueChangeListener(String, FBUser, FlagValueChangeListener)} instead. + *

+ * The registered listeners only work if the SDK is actually connecting to FeatBit feature flag center. + * + * @param listener the {@link FlagChange.FlagChangeListener} to register + */ + void addFlagChangeListener(FlagChange.FlagChangeListener listener); +} diff --git a/src/main/java/co/featbit/server/exterior/FlagValueChangeEvent.java b/src/main/java/co/featbit/server/exterior/FlagValueChangeEvent.java new file mode 100644 index 0000000..ecde03e --- /dev/null +++ b/src/main/java/co/featbit/server/exterior/FlagValueChangeEvent.java @@ -0,0 +1,48 @@ +package co.featbit.server.exterior; + +import co.featbit.server.FlagChange; + +import java.util.Objects; + +/** + * An event that is sent to {@link FlagValueChangeListener} when a feature flag's value has changed for a specific flag key and a {@link co.featbit.commons.model.FBUser}. + */ +public class FlagValueChangeEvent extends FlagChange.FlagChangeEvent { + private final Object oldValue; + private final Object newValue; + + /** + * Constructs a new instance. + * + * @param key the feature flag key + * @param oldValue the previous value of the feature flag + * @param newValue the new value of the feature flag + */ + public FlagValueChangeEvent(String key, Object oldValue, Object newValue) { + super(key); + this.oldValue = oldValue; + this.newValue = newValue; + } + + public Object getOldValue() { + return oldValue; + } + + public Object getNewValue() { + return newValue; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + FlagValueChangeEvent that = (FlagValueChangeEvent) o; + return Objects.equals(oldValue, that.oldValue) && Objects.equals(newValue, that.newValue); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), oldValue, newValue); + } +} diff --git a/src/main/java/co/featbit/server/exterior/FlagValueChangeListener.java b/src/main/java/co/featbit/server/exterior/FlagValueChangeListener.java new file mode 100644 index 0000000..eee97c5 --- /dev/null +++ b/src/main/java/co/featbit/server/exterior/FlagValueChangeListener.java @@ -0,0 +1,14 @@ +package co.featbit.server.exterior; + +/** + * An event listener that is notified when a feature flag's value has changed for a specific flag key and a {@link co.featbit.commons.model.FBUser}. + * @see FlagTracker + */ +public interface FlagValueChangeListener { + /** + * The SDK calls this method when a feature flag's value has changed for a specific flag key and a {@link co.featbit.commons.model.FBUser}. + * + * @param event The {@link FlagValueChangeEvent} that contains the flag key, the old value and the new value + */ + void onFlagValueChange(FlagValueChangeEvent event); +} diff --git a/src/test/java/co/featbit/server/ComponentBaseTest.java b/src/test/java/co/featbit/server/ComponentBaseTest.java index a5af07f..e21aee3 100644 --- a/src/test/java/co/featbit/server/ComponentBaseTest.java +++ b/src/test/java/co/featbit/server/ComponentBaseTest.java @@ -1,14 +1,57 @@ package co.featbit.server; import co.featbit.commons.json.JsonHelper; +import co.featbit.server.exterior.FlagValueChangeEvent; import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableSet; import com.google.common.io.Resources; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; + abstract class ComponentBaseTest { + protected static ExecutorService sharedExcutor = new ScheduledThreadPoolExecutor(1, Utils.createThreadFactory("featbit-test-shared-worker-%d", true)); + protected DataModel.Data loadData() throws Exception { - DataModel.All all = JsonHelper.deserialize(Resources.toString(Resources.getResource("fbclient_test_data.json"), Charsets.UTF_8), - DataModel.All.class); + DataModel.All all = JsonHelper.deserialize(Resources.toString(Resources.getResource("fbclient_test_data.json"), Charsets.UTF_8), DataModel.All.class); return (all.isProcessData()) ? all.data() : null; } + + protected static void expectFlagChangeEvents(BlockingQueue events, String... flagKeys) throws Exception { + Set expectedChangedFlagKeys = ImmutableSet.copyOf(flagKeys); + Set actualChangedFlagKeys = new HashSet<>(); + for (int i = 0; i < expectedChangedFlagKeys.size(); i++) { + T e = events.poll(1, TimeUnit.SECONDS); + actualChangedFlagKeys.add(e.getKey()); + } + assertEquals(expectedChangedFlagKeys, actualChangedFlagKeys); + } + + protected static void expectStateEvents(BlockingQueue events, Status.StateType... stateTypes) throws Exception { + Set expectedStateTypes = ImmutableSet.copyOf(stateTypes); + Set actualStateTypes = new HashSet<>(); + for (int i = 0; i < expectedStateTypes.size(); i++) { + S e = events.poll(1, TimeUnit.SECONDS); + actualStateTypes.add(e.getStateType()); + } + assertEquals(expectedStateTypes, actualStateTypes); + } + + protected static void expectFlagValueChangeEvents(BlockingQueue queue, FlagValueChangeEvent... events) throws Exception { + Set expectedEvents = ImmutableSet.copyOf(events); + Set actualEvents = new HashSet<>(); + for (int i = 0; i < expectedEvents.size(); i++) { + T e = queue.poll(1, TimeUnit.SECONDS); + actualEvents.add(e); + } + assertEquals(expectedEvents, expectedEvents); + } + } diff --git a/src/test/java/co/featbit/server/DataUpdaterTest.java b/src/test/java/co/featbit/server/DataUpdaterTest.java index bc9aaee..a6b46dd 100644 --- a/src/test/java/co/featbit/server/DataUpdaterTest.java +++ b/src/test/java/co/featbit/server/DataUpdaterTest.java @@ -11,6 +11,8 @@ import java.time.Duration; import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import static co.featbit.server.Status.DATA_STORAGE_INIT_ERROR; import static co.featbit.server.Status.DATA_STORAGE_UPDATE_ERROR; @@ -28,12 +30,22 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(EasyMockExtension.class) -class DataUpdaterTest { +class DataUpdaterTest extends ComponentBaseTest { private DataStorage dataStorage; private Status.DataUpdaterImpl dataUpdater; private Status.DataUpdateStatusProvider dataUpdateStatusProvider; private final EasyMockSupport support = new EasyMockSupport(); private final DataStorageTypes.Item item1 = new TestDataModel.TestItem(false, "test item 1"); + private final EventBroadcasterImpl dataUpdateStateNotifier = EventBroadcasterImpl.forDataUpdateStates(ComponentBaseTest.sharedExcutor, Loggers.TEST); + private final EventBroadcasterImpl flagChangeEventNotifier = EventBroadcasterImpl.forFlagChangeEvents(ComponentBaseTest.sharedExcutor, Loggers.TEST); + + private Status.DataUpdaterImpl makeInstance() { + return new Status.DataUpdaterImpl(dataStorage, dataUpdateStateNotifier, flagChangeEventNotifier); + } + + private Status.DataUpdaterImpl makeInstance(Status.State state) { + return new Status.DataUpdaterImpl(dataStorage, state, dataUpdateStateNotifier, flagChangeEventNotifier); + } @AfterEach void dispose() { @@ -45,7 +57,7 @@ void dispose() { @Test void testInitDataStorage() { dataStorage = new InMemoryDataStorage(); - dataUpdater = new Status.DataUpdaterImpl(dataStorage); + dataUpdater = makeInstance(); Map items = ImmutableMap.of(item1.getId(), item1); Map> allData = ImmutableMap.of(DATATESTS, items); @@ -58,7 +70,7 @@ void testInitDataStorage() { @Test void testInitDataStorageThrowException() { dataStorage = support.createNiceMock(DataStorage.class); - dataUpdater = new Status.DataUpdaterImpl(dataStorage); + dataUpdater = makeInstance(); dataStorage.init(anyObject(Map.class), anyLong()); expectLastCall().andThrow(new RuntimeException("test exception")); @@ -76,7 +88,7 @@ void testInitDataStorageThrowException() { @Test void testUpsertDataStorage() { dataStorage = new InMemoryDataStorage(); - dataUpdater = new Status.DataUpdaterImpl(dataStorage); + dataUpdater = makeInstance(); assertTrue(dataUpdater.upsert(DATATESTS, item1.getId(), item1, 1L)); assertTrue(dataUpdater.storageInitialized()); @@ -87,7 +99,7 @@ void testUpsertDataStorage() { @Test void testUpsertDataStorageThrowException() { dataStorage = support.createNiceMock(DataStorage.class); - dataUpdater = new Status.DataUpdaterImpl(dataStorage, Status.State.OKState()); + dataUpdater = makeInstance(Status.State.OKState()); expect(dataStorage.upsert(anyObject(DataStorageTypes.Category.class), anyString(), @@ -104,11 +116,11 @@ void testUpsertDataStorageThrowException() { @Test void testUpdateStatus() { dataStorage = new InMemoryDataStorage(); - dataUpdater = new Status.DataUpdaterImpl(dataStorage); + dataUpdater = makeInstance(); dataUpdater.updateStatus(Status.State.interruptedState("some type", "some reason")); assertEquals(INITIALIZING, dataUpdater.getCurrentState().getStateType()); dataUpdater = null; - dataUpdater = new Status.DataUpdaterImpl(dataStorage, Status.State.OKState()); + dataUpdater = makeInstance(Status.State.OKState()); dataUpdater.updateStatus(Status.State.interruptedState("some type", "some reason")); assertEquals(INTERRUPTED, dataUpdater.getCurrentState().getStateType()); } @@ -116,8 +128,8 @@ void testUpdateStatus() { @Test void waitForStatusIfStatusAlreadyCorrect() throws InterruptedException { dataStorage = new InMemoryDataStorage(); - dataUpdater = new Status.DataUpdaterImpl(dataStorage); - dataUpdateStatusProvider = new Status.DataUpdateStatusProviderImpl(dataUpdater); + dataUpdater = makeInstance(); + dataUpdateStatusProvider = new Status.DataUpdateStatusProviderImpl(dataUpdater, dataUpdateStateNotifier); dataUpdater.updateStatus(Status.State.OKState()); assertTrue(dataUpdateStatusProvider.waitForOKState(Duration.ofMillis(100))); @@ -129,8 +141,8 @@ void waitForStatusIfStatusAlreadyCorrect() throws InterruptedException { @Test void waitForStatusWhenStatusSucceeded() throws InterruptedException { dataStorage = new InMemoryDataStorage(); - dataUpdater = new Status.DataUpdaterImpl(dataStorage); - dataUpdateStatusProvider = new Status.DataUpdateStatusProviderImpl(dataUpdater); + dataUpdater = makeInstance(); + dataUpdateStatusProvider = new Status.DataUpdateStatusProviderImpl(dataUpdater, dataUpdateStateNotifier); new Thread(() -> { try { @@ -149,8 +161,8 @@ void waitForStatusWhenStatusSucceeded() throws InterruptedException { @Test void waitForStatusTimeOut() throws InterruptedException { dataStorage = new InMemoryDataStorage(); - dataUpdater = new Status.DataUpdaterImpl(dataStorage); - dataUpdateStatusProvider = new Status.DataUpdateStatusProviderImpl(dataUpdater); + dataUpdater = makeInstance(); + dataUpdateStatusProvider = new Status.DataUpdateStatusProviderImpl(dataUpdater, dataUpdateStateNotifier); long timeStart = System.currentTimeMillis(); assertFalse(dataUpdateStatusProvider.waitForOKState(Duration.ofMillis(10))); @@ -161,8 +173,8 @@ void waitForStatusTimeOut() throws InterruptedException { @Test void waitForStatusButStatusOff() throws InterruptedException { dataStorage = new InMemoryDataStorage(); - dataUpdater = new Status.DataUpdaterImpl(dataStorage); - dataUpdateStatusProvider = new Status.DataUpdateStatusProviderImpl(dataUpdater); + dataUpdater = makeInstance(); + dataUpdateStatusProvider = new Status.DataUpdateStatusProviderImpl(dataUpdater, dataUpdateStateNotifier); new Thread(() -> { try { diff --git a/src/test/java/co/featbit/server/FlagTrackerTest.java b/src/test/java/co/featbit/server/FlagTrackerTest.java new file mode 100644 index 0000000..2b4e010 --- /dev/null +++ b/src/test/java/co/featbit/server/FlagTrackerTest.java @@ -0,0 +1,65 @@ +package co.featbit.server; + +import co.featbit.commons.model.FBUser; +import co.featbit.server.exterior.FlagValueChangeEvent; +import org.junit.jupiter.api.Test; + +import java.util.AbstractMap; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +class FlagTrackerTest extends ComponentBaseTest { + @Test + void addflagChangeListeners() throws Exception { + String flagKey = "test-flag"; + EventBroadcasterImpl flagChangeEventNotifier = EventBroadcasterImpl.forFlagChangeEvents(ComponentBaseTest.sharedExcutor, Loggers.TEST); + FlagTrackerImpl flagTracker = new FlagTrackerImpl(flagChangeEventNotifier, null); + + BlockingQueue events1 = new LinkedBlockingQueue<>(); + FlagChange.FlagChangeListener listener1 = events1::add; + flagTracker.addFlagChangeListener(listener1); + + BlockingQueue events2 = new LinkedBlockingQueue<>(); + FlagChange.FlagChangeListener listener2 = events2::add; + flagTracker.addFlagChangeListener(listener2); + + flagChangeEventNotifier.broadcast(new FlagChange.FlagChangeEvent(flagKey)); + + expectFlagChangeEvents(events1, flagKey); + expectFlagChangeEvents(events2, flagKey); + + flagTracker.removeFlagChangeListener(listener1); + } + + @Test + void addFlagValueChangeListeners() throws Exception { + String flagKey = "test-flag"; + FBUser user1 = new FBUser.Builder("test-user-1").userName("test-user-1").build(); + FBUser user2 = new FBUser.Builder("test-user-2").userName("test-user-2").build(); + + Map, Boolean> resultMap = new HashMap<>(); + EventBroadcasterImpl flagChangeEventNotifier = EventBroadcasterImpl.forFlagChangeEvents(ComponentBaseTest.sharedExcutor, Loggers.TEST); + FlagTrackerImpl flagTracker = new FlagTrackerImpl(flagChangeEventNotifier, (k, u)->resultMap.get(new AbstractMap.SimpleEntry<>(k, u))); + + resultMap.put(new AbstractMap.SimpleEntry<>(flagKey, user1), false); + resultMap.put(new AbstractMap.SimpleEntry<>(flagKey, user2), false); + + BlockingQueue events1 = new LinkedBlockingQueue<>(); + BlockingQueue events2 = new LinkedBlockingQueue<>(); + FlagChange.FlagChangeListener listener1 = flagTracker.addFlagValueChangeListener(flagKey, user1, events1::add); + FlagChange.FlagChangeListener listener2 = flagTracker.addFlagValueChangeListener(flagKey, user2, events2::add); + + resultMap.put(new AbstractMap.SimpleEntry<>(flagKey, user1), true); + flagChangeEventNotifier.broadcast(new FlagChange.FlagChangeEvent(flagKey)); + + expectFlagValueChangeEvents(events1, new FlagValueChangeEvent(flagKey, false, true)); + assertTrue(events2.isEmpty()); + + flagTracker.removeFlagChangeListener(listener1); + flagTracker.removeFlagChangeListener(listener2); + } +} diff --git a/src/test/java/co/featbit/server/StreamingOpsTest.java b/src/test/java/co/featbit/server/StreamingOpsTest.java index cb4791b..746d862 100644 --- a/src/test/java/co/featbit/server/StreamingOpsTest.java +++ b/src/test/java/co/featbit/server/StreamingOpsTest.java @@ -12,47 +12,38 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; -import static co.featbit.server.Status.DATA_INVALID_ERROR; -import static co.featbit.server.Status.DATA_STORAGE_INIT_ERROR; -import static co.featbit.server.Status.NETWORK_ERROR; -import static co.featbit.server.Status.RUNTIME_ERROR; -import static co.featbit.server.Status.StateType.INITIALIZING; -import static co.featbit.server.Status.StateType.INTERRUPTED; -import static co.featbit.server.Status.StateType.OFF; -import static co.featbit.server.Status.StateType.OK; -import static co.featbit.server.Status.UNKNOWN_ERROR; -import static co.featbit.server.Status.WEBSOCKET_ERROR; -import static co.featbit.server.Streaming.StreamingOps.isReconnOnClose; -import static co.featbit.server.Streaming.StreamingOps.isReconnOnFailure; -import static co.featbit.server.Streaming.StreamingOps.processData; -import static org.easymock.EasyMock.anyLong; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.getCurrentArguments; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static co.featbit.server.Status.*; +import static co.featbit.server.Status.StateType.*; +import static co.featbit.server.Streaming.StreamingOps.*; +import static org.easymock.EasyMock.*; +import static org.junit.jupiter.api.Assertions.*; /** * this class just tests streaming operations, not covers the connection test */ class StreamingOpsTest extends ComponentBaseTest { private DataStorage dataStorage; - private Status.DataUpdaterImpl dataUpdaterImpl; - private final EasyMockSupport support = new EasyMockSupport(); - private Status.DataUpdater dataUpdaterMock; + private final EventBroadcasterImpl dataUpdateStateNotifier = EventBroadcasterImpl.forDataUpdateStates(ComponentBaseTest.sharedExcutor, Loggers.TEST); + private final EventBroadcasterImpl flagChangeEventNotifier = EventBroadcasterImpl.forFlagChangeEvents(ComponentBaseTest.sharedExcutor, Loggers.TEST); + + private Status.DataUpdaterImpl makeInstance() { + return new Status.DataUpdaterImpl(dataStorage, + dataUpdateStateNotifier, + flagChangeEventNotifier); + } @BeforeEach void init() { dataStorage = new InMemoryDataStorage(); - dataUpdaterImpl = new Status.DataUpdaterImpl(dataStorage); + dataUpdaterImpl = makeInstance(); dataUpdaterMock = support.createNiceMock(Status.DataUpdater.class); } @@ -61,6 +52,14 @@ void testProcessFullData() throws Exception { AtomicBoolean initialized = new AtomicBoolean(false); CompletableFuture initFuture = new CompletableFuture<>(); + BlockingQueue states = new LinkedBlockingQueue<>(); + Status.StateListener listener = states::add; + dataUpdateStateNotifier.addListener(listener); + + BlockingQueue events = new LinkedBlockingQueue<>(); + FlagChange.FlagChangeListener listener1 = events::add; + flagChangeEventNotifier.addListener(listener1); + DataModel.Data data = loadData(); assertTrue(processData(dataUpdaterImpl, data, initialized, initFuture)); assertTrue(initialized.get()); @@ -68,6 +67,13 @@ void testProcessFullData() throws Exception { assertTrue(dataUpdaterImpl.storageInitialized()); assertEquals(OK, dataUpdaterImpl.getCurrentState().getStateType()); assertEquals(data.getTimestamp(), dataUpdaterImpl.getVersion()); + expectStateEvents(states, OK); + expectFlagChangeEvents(events, "ff-test-seg", + "ff-test-bool", "ff-test-number", "ff-test-string", + "ff-test-json", "ff-test-off", "ff-evaluation-test"); + + dataUpdateStateNotifier.removeListener(listener); + flagChangeEventNotifier.removeListener(listener1); } @Test @@ -75,6 +81,14 @@ void testProcessPatchData() throws Exception { AtomicBoolean initialized = new AtomicBoolean(false); CompletableFuture initFuture = new CompletableFuture<>(); + BlockingQueue states = new LinkedBlockingQueue<>(); + Status.StateListener listener = states::add; + dataUpdateStateNotifier.addListener(listener); + + BlockingQueue events = new LinkedBlockingQueue<>(); + FlagChange.FlagChangeListener listener1 = events::add; + flagChangeEventNotifier.addListener(listener1); + DataModel.Data data = loadData(); data.eventType = "patch"; assertTrue(processData(dataUpdaterImpl, data, initialized, initFuture)); @@ -83,13 +97,20 @@ void testProcessPatchData() throws Exception { assertTrue(dataUpdaterImpl.storageInitialized()); assertEquals(OK, dataUpdaterImpl.getCurrentState().getStateType()); assertEquals(data.getTimestamp(), dataUpdaterImpl.getVersion()); + expectStateEvents(states, OK); + expectFlagChangeEvents(events, "ff-test-seg", + "ff-test-bool", "ff-test-number", "ff-test-string", + "ff-test-json", "ff-test-off", "ff-evaluation-test"); + + dataUpdateStateNotifier.removeListener(listener); + flagChangeEventNotifier.removeListener(listener1); } @Test @SuppressWarnings("unchecked") void testProcessDataThrowException() throws Exception { dataStorage = support.createNiceMock(DataStorage.class); - dataUpdaterImpl = new Status.DataUpdaterImpl(dataStorage); + dataUpdaterImpl = new Status.DataUpdaterImpl(dataStorage, dataUpdateStateNotifier, flagChangeEventNotifier); AtomicBoolean initialized = new AtomicBoolean(false); CompletableFuture initFuture = new CompletableFuture<>();