From a05382ec9fa90c5c1e75a28672fbd168c9fa6be3 Mon Sep 17 00:00:00 2001 From: divyamanohar-stripe <111152273+divyamanohar-stripe@users.noreply.github.com> Date: Mon, 29 Apr 2024 10:15:04 -0700 Subject: [PATCH] flagstore interface (#686) * create feature flags function api * pass to fetcher in javafetcher * make featureFlags nullable * pass in null * use FlagStore * fix imports * edits * add setter * Update FetcherBase.scala Signed-off-by: divyamanohar-stripe <111152273+divyamanohar-stripe@users.noreply.github.com> * Update JavaFetcher.java Signed-off-by: divyamanohar-stripe <111152273+divyamanohar-stripe@users.noreply.github.com> * Update JavaFetcher.java Signed-off-by: divyamanohar-stripe <111152273+divyamanohar-stripe@users.noreply.github.com> * Update Fetcher.scala Signed-off-by: divyamanohar-stripe <111152273+divyamanohar-stripe@users.noreply.github.com> * Update FetcherBase.scala Signed-off-by: divyamanohar-stripe <111152273+divyamanohar-stripe@users.noreply.github.com> * Update FlagStore.java Signed-off-by: divyamanohar-stripe <111152273+divyamanohar-stripe@users.noreply.github.com> * rerun test Signed-off-by: divyamanohar-stripe <111152273+divyamanohar-stripe@users.noreply.github.com> * re-run Signed-off-by: divyamanohar-stripe <111152273+divyamanohar-stripe@users.noreply.github.com> * Update FlagStore.java Signed-off-by: divyamanohar-stripe <111152273+divyamanohar-stripe@users.noreply.github.com> --------- Signed-off-by: divyamanohar-stripe <111152273+divyamanohar-stripe@users.noreply.github.com> --- online/src/main/java/ai/chronon/online/FlagStore.java | 9 +++++++++ .../src/main/java/ai/chronon/online/JavaFetcher.java | 8 ++++++-- online/src/main/scala/ai/chronon/online/Api.scala | 10 ++++++++-- online/src/main/scala/ai/chronon/online/Fetcher.scala | 5 +++-- .../src/main/scala/ai/chronon/online/FetcherBase.scala | 3 ++- 5 files changed, 28 insertions(+), 7 deletions(-) create mode 100644 online/src/main/java/ai/chronon/online/FlagStore.java diff --git a/online/src/main/java/ai/chronon/online/FlagStore.java b/online/src/main/java/ai/chronon/online/FlagStore.java new file mode 100644 index 000000000..cb39668b3 --- /dev/null +++ b/online/src/main/java/ai/chronon/online/FlagStore.java @@ -0,0 +1,9 @@ +package ai.chronon.online; + +import java.io.Serializable; +import java.util.Map; + +// Interface to allow rolling out features/infrastructure changes in a safe & controlled manner +public interface FlagStore extends Serializable { + Boolean isSet(String flagName, Map attributes); +} diff --git a/online/src/main/java/ai/chronon/online/JavaFetcher.java b/online/src/main/java/ai/chronon/online/JavaFetcher.java index 874f76ca5..ba715b006 100644 --- a/online/src/main/java/ai/chronon/online/JavaFetcher.java +++ b/online/src/main/java/ai/chronon/online/JavaFetcher.java @@ -35,11 +35,15 @@ public class JavaFetcher { Fetcher fetcher; public JavaFetcher(KVStore kvStore, String metaDataSet, Long timeoutMillis, Consumer logFunc, ExternalSourceRegistry registry, String callerName) { - this.fetcher = new Fetcher(kvStore, metaDataSet, timeoutMillis, logFunc, false, registry, callerName); + this.fetcher = new Fetcher(kvStore, metaDataSet, timeoutMillis, logFunc, false, registry, callerName, null); } public JavaFetcher(KVStore kvStore, String metaDataSet, Long timeoutMillis, Consumer logFunc, ExternalSourceRegistry registry) { - this.fetcher = new Fetcher(kvStore, metaDataSet, timeoutMillis, logFunc, false, registry, null); + this.fetcher = new Fetcher(kvStore, metaDataSet, timeoutMillis, logFunc, false, registry, null, null); + } + + public JavaFetcher(KVStore kvStore, String metaDataSet, Long timeoutMillis, Consumer logFunc, ExternalSourceRegistry registry, String callerName, FlagStore flagStore) { + this.fetcher = new Fetcher(kvStore, metaDataSet, timeoutMillis, logFunc, false, registry, callerName, flagStore); } diff --git a/online/src/main/scala/ai/chronon/online/Api.scala b/online/src/main/scala/ai/chronon/online/Api.scala index 4a2294b87..55f1c6a96 100644 --- a/online/src/main/scala/ai/chronon/online/Api.scala +++ b/online/src/main/scala/ai/chronon/online/Api.scala @@ -171,6 +171,10 @@ abstract class Api(userConf: Map[String, String]) extends Serializable { private var timeoutMillis: Long = 10000 + private var flagStore: FlagStore = null + + def setFlagStore(customFlagStore: FlagStore): Unit = { flagStore = customFlagStore } + def setTimeout(millis: Long): Unit = { timeoutMillis = millis } // kafka has built-in support - but one can add support to other types using this method. @@ -198,7 +202,8 @@ abstract class Api(userConf: Map[String, String]) extends Serializable { debug = debug, externalSourceRegistry = externalRegistry, timeoutMillis = timeoutMillis, - callerName = callerName) + callerName = callerName, + flagStore = flagStore) final def buildJavaFetcher(callerName: String = null): JavaFetcher = new JavaFetcher(genKvStore, @@ -206,7 +211,8 @@ abstract class Api(userConf: Map[String, String]) extends Serializable { timeoutMillis, responseConsumer, externalRegistry, - callerName) + callerName, + flagStore) final def buildJavaFetcher(): JavaFetcher = buildJavaFetcher(null) diff --git a/online/src/main/scala/ai/chronon/online/Fetcher.scala b/online/src/main/scala/ai/chronon/online/Fetcher.scala index 976853069..e7e490d0a 100644 --- a/online/src/main/scala/ai/chronon/online/Fetcher.scala +++ b/online/src/main/scala/ai/chronon/online/Fetcher.scala @@ -84,8 +84,9 @@ class Fetcher(val kvStore: KVStore, logFunc: Consumer[LoggableResponse] = null, debug: Boolean = false, val externalSourceRegistry: ExternalSourceRegistry = null, - callerName: String = null) - extends FetcherBase(kvStore, metaDataSet, timeoutMillis, debug) { + callerName: String = null, + flagStore: FlagStore = null) + extends FetcherBase(kvStore, metaDataSet, timeoutMillis, debug, flagStore) { private def reportCallerNameFetcherVersion(): Unit = { val message = s"CallerName: ${Option(callerName).getOrElse("N/A")}, FetcherVersion: ${BuildInfo.version}" diff --git a/online/src/main/scala/ai/chronon/online/FetcherBase.scala b/online/src/main/scala/ai/chronon/online/FetcherBase.scala index 8ed1bdd23..3f2051168 100644 --- a/online/src/main/scala/ai/chronon/online/FetcherBase.scala +++ b/online/src/main/scala/ai/chronon/online/FetcherBase.scala @@ -48,7 +48,8 @@ import ai.chronon.online.OnlineDerivationUtil.{ class FetcherBase(kvStore: KVStore, metaDataSet: String = ChrononMetadataKey, timeoutMillis: Long = 10000, - debug: Boolean = false) + debug: Boolean = false, + flagStore: FlagStore = null) extends MetadataStore(kvStore, metaDataSet, timeoutMillis) { private case class GroupByRequestMeta(