Skip to content

Commit

Permalink
Add Feature Toggles
Browse files Browse the repository at this point in the history
Feature Toggles should allow teams to modify system behavior without changing
code. Feature Toggles are configured using google guice. Basic definition of
toggles are crated using FeatureToggleBinder. FeatureToggleBinder creates
FeatureToggle and additional configuration can be done using feature
configuration.
In current stage Feature Toggles supports:
- if / else based feature toggles
- Dependency Injection based
  Hot reloading implementation without restart require code refactoring to
  add an interface when injecting the new implementation/class
- using various toggle strategies along with simple on / off toggles

configuration:

to allow feature toggle configuration four lines are needed in
config.properties file
```
features.config-source-type=file
features.config-source=/etc/feature-config.properties
features.config-type=properties
features.refresh-period=30s
```

`configuration-source-type` is source type for Feature Toggles configuration
`features.config-source` is a source (file) of the configuration
`features.config-type` format in which configuration is stored (json or properties)
`features.refresh-period` configuration refresh period

Defining Feature Toggles

Feature toggle definition is done in google guice module using `FeatureToggleBinder`

simple feature toggle definition
```
    featureToggleBinder(binder)
                        .featureId("featureXX")
                        .bind()
```
This example creates bindings for @Inject
```
    @Inject
    public Runner(@FeatureToggle("featureXX") Supplier<Boolean> isFeatureXXEnabled)
    {
        this.isFeatureXXEnabled = isFeatureXXEnabled;
    }
```
`isFeatureXXEnabled` can be used to test if feature is enabled or disabled:
```
    boolean testFeatureXXEnabled()
    {
        return isFeatureXXEnabled.get();
    }
```

hot reloadable feature toggle definition
```
featureToggleBinder(binder, Feature01.class)
                        .featureId("feature01")
                        .baseClass(Feature01.class)
                        .defaultClass(Feature01Impl01.class)
                        .allOf(Feature01Impl01.class, Feature01Impl02.class)
                        .bind()
```
adding Feature Toggle switching strategy
```
featureToggleBinder(binder)
                        .featureId("feature04")
                        .toggleStrategy("AllowAll")
                        .toggleStrategyConfig(ImmutableMap.of("key", "value", "key2", "value2"))
```

feature-config.properties file example
```
# feature query-logger
feature.query-logger.enabled=true
feature.query-logger.strategy=OsToggle
feature.query-logger.strategy.os_name=.*Linux.*

#feature.query-rate-limiter
feature.query-rate-limiter.currentInstance=com.facebook.presto.server.protocol.QueryBlockingRateLimiter

# feature.query-cancel
feature.query-cancel.strategy=AllowList
feature.query-cancel.strategy.allow-list-source=.*IDEA.*
feature.query-cancel.strategy.allow-list-user=.*prestodb
```
in this example for first feature `query-logger`
changing value of feature.query-logger.enabled to `false` will 'disable' this feature.
Changes will be effective within refresh period.
Pass column delimiter info to reader (prestodb#6338)

Summary: Pull Request resolved: facebookincubator/velox#6338

Reviewed By: Yuhta

Differential Revision: D48457913

fbshipit-source-id: 57d76dfa229de3801bf3181f780a485b628427ad
  • Loading branch information
branimir-vujicic committed Sep 18, 2023
1 parent 759a30e commit d5af879
Show file tree
Hide file tree
Showing 22 changed files with 421 additions and 14 deletions.
6 changes: 6 additions & 0 deletions presto-main/etc/catalog/hive.properties
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@

connector.name=hive-hadoop2
hive.metastore.uri=thrift://localhost:9083

hive.allow-drop-table=true
hive.allow-rename-table=true
hive.allow-add-column=true
hive.allow-drop-column=true
hive.allow-rename-column=true
6 changes: 6 additions & 0 deletions presto-main/etc/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#

# sample nodeId to provide consistency across test runs

node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
node.environment=test
http-server.http.port=8080
Expand Down Expand Up @@ -54,3 +55,8 @@ plugin.bundles=\

presto.version=testversion
node-scheduler.include-coordinator=true

# feature toggles
features.config-source-type=file
features.refresh-period=30s
features.configuration-directory=etc/feature-toggle/
3 changes: 3 additions & 0 deletions presto-main/etc/feature-toggle/file.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
features.config-source-type=file
features.config-source=/home/bane/java/etc/feature-config.properties
features.config-type=properties
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.facebook.presto.execution.scheduler.SectionExecutionFactory;
import com.facebook.presto.execution.scheduler.SplitSchedulerStats;
import com.facebook.presto.failureDetector.FailureDetectorModule;
import com.facebook.presto.features.strategy.AllowListToggleStrategy;
import com.facebook.presto.memory.ClusterMemoryManager;
import com.facebook.presto.memory.ForMemoryManager;
import com.facebook.presto.memory.LowMemoryKiller;
Expand All @@ -65,16 +66,20 @@
import com.facebook.presto.memory.NoneLowMemoryKiller;
import com.facebook.presto.memory.TotalReservationLowMemoryKiller;
import com.facebook.presto.memory.TotalReservationOnBlockedNodesLowMemoryKiller;
import com.facebook.presto.memory.context.ft.MemoryContextModule;
import com.facebook.presto.metadata.CatalogManager;
import com.facebook.presto.operator.ForScheduler;
import com.facebook.presto.operator.OperatorInfo;
import com.facebook.presto.resourcemanager.ForResourceManager;
import com.facebook.presto.resourcemanager.ResourceManagerProxy;
import com.facebook.presto.server.protocol.AnotherQueryBlockingRateLimiter;
import com.facebook.presto.server.protocol.ExecutingStatementResource;
import com.facebook.presto.server.protocol.LocalQueryProvider;
import com.facebook.presto.server.protocol.QueryBlockingRateLimiter;
import com.facebook.presto.server.protocol.QueryRateLimiter;
import com.facebook.presto.server.protocol.QueuedStatementResource;
import com.facebook.presto.server.protocol.RetryCircuitBreaker;
import com.facebook.presto.server.protocol.RetryCircuitBreakerInt;
import com.facebook.presto.server.remotetask.HttpRemoteTaskFactory;
import com.facebook.presto.server.remotetask.RemoteTaskStats;
import com.facebook.presto.spi.memory.ClusterMemoryPoolManager;
Expand All @@ -88,6 +93,7 @@
import com.facebook.presto.transaction.TransactionManagerConfig;
import com.facebook.presto.util.PrestoDataDefBindingHelper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Provides;
import com.google.inject.Scopes;
Expand Down Expand Up @@ -116,6 +122,7 @@
import static com.facebook.presto.execution.DDLDefinitionExecution.DDLDefinitionExecutionFactory;
import static com.facebook.presto.execution.SessionDefinitionExecution.SessionDefinitionExecutionFactory;
import static com.facebook.presto.execution.SqlQueryExecution.SqlQueryExecutionFactory;
import static com.facebook.presto.features.binder.FeatureToggleBinder.featureToggleBinder;
import static com.google.inject.multibindings.MapBinder.newMapBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static java.util.concurrent.Executors.newCachedThreadPool;
Expand All @@ -140,11 +147,37 @@ protected void setup(Binder binder)
// presto coordinator announcement
discoveryBinder(binder).bindHttpAnnouncement("presto-coordinator");

featureToggleBinder(binder, QueryRateLimiter.class)
.featureId("query-rate-limiter")
.baseClass(QueryRateLimiter.class)
.defaultClass(QueryBlockingRateLimiter.class)
.allOf(QueryBlockingRateLimiter.class, AnotherQueryBlockingRateLimiter.class)
.bind();

featureToggleBinder(binder)
.featureId("query-logger")
.enabled(true)
.bind();

featureToggleBinder(binder, RetryCircuitBreakerInt.class)
.featureId("circuit-breaker")
.defaultClass(RetryCircuitBreaker.class)
.enabled(true)
.bind();

featureToggleBinder(binder)
.featureId("query-cancel")
.enabled(true)
.toggleStrategy("AllowList")
.registerToggleStrategy("AllowList", AllowListToggleStrategy.class)
.toggleStrategyConfig(ImmutableMap.of("allow-list-source", ".*IDEA.*", "allow-list-user", ".*prestodb"))
.bind();

featureToggleBinder(binder)
.registerToggleStrategy("AllowList", AllowListToggleStrategy.class);

binder.install(new MemoryContextModule());

// statement resource
jsonCodecBinder(binder).bindJsonCodec(QueryInfo.class);
jsonCodecBinder(binder).bindJsonCodec(TaskInfo.class);
Expand Down Expand Up @@ -298,6 +331,10 @@ protected void setup(Binder binder)

// cleanup
binder.bind(ExecutorCleanup.class).in(Scopes.SINGLETON);
//
// binder.bind(PrestoFeatureToggle.class).in(Singleton.class);
//
// jaxrsBinder(binder).bind(FeatureToggleInfo.class);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon

// Feature Toggle
configBinder(binder).bindConfig(FeatureToggleConfig.class);
install(new FeatureToggleModule());
binder.install(new FeatureToggleModule());

// Thrift RPC
binder.install(new DriftNettyServerModule());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.server.protocol;

import com.facebook.airlift.stats.CounterStat;
import com.facebook.airlift.stats.TimeStat;
import com.facebook.presto.execution.QueryManagerConfig;
import com.facebook.presto.spi.QueryId;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.RateLimiter;
import com.google.inject.Inject;
import io.airlift.units.Duration;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import javax.annotation.PreDestroy;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static java.util.Objects.requireNonNull;

/*
* Rate Limiting per query with token bucket
* Rate = rateLimitBucketMaxSize/second
* When having sufficient tokens, Request will be responded immediately.
* When not having enough tokens available, it uses the delayed processing method.
*/
public class AnotherQueryBlockingRateLimiter
implements QueryRateLimiter
{
private final long rateLimiterBucketMaxSize;
private final ListeningExecutorService rateLimiterExecutorService;
private final LoadingCache<QueryId, RateLimiter> rateLimiterCache;
private final CounterStat rateLimiterTriggeredCounter = new CounterStat();
private final TimeStat rateLimiterBlockTime = new TimeStat();

@Inject
public AnotherQueryBlockingRateLimiter(QueryManagerConfig queryManagerConfig)
{
requireNonNull(queryManagerConfig, "queryManagerConfig is null");
this.rateLimiterBucketMaxSize = queryManagerConfig.getRateLimiterBucketMaxSize();
// Using a custom thread pool with size 1-10 to reduce initial thread resources
ExecutorService executorService = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), daemonThreadsNamed("rate-limiter-listener"));
rateLimiterExecutorService = listeningDecorator(executorService);
rateLimiterCache = CacheBuilder.newBuilder().maximumSize(queryManagerConfig.getRateLimiterCacheLimit()).expireAfterAccess(queryManagerConfig.getRateLimiterCacheWindowMinutes(), TimeUnit.MINUTES).build(CacheLoader.from(key -> RateLimiter.create(rateLimiterBucketMaxSize)));
}

/*
* For accidental bug-caused DoS, we will use delayed processing method to reduce the requests, even when user do not have back-off logic implemented
* Optimized to avoid blocking for normal usages with TryRequire first
* Fall back to delayed processing method to acquire a permit, in a separate thread pool
* Internal guava rate limiter returns time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited, we use a future to wrap around that.
*/
@Override
public ListenableFuture<Double> acquire(QueryId queryId)
{ // if rateLimitBucketMaxSize < 0, we disable rate limiting by returning immediately
if (rateLimiterBucketMaxSize < 0) {
return immediateFuture(0.0);
}
if (queryId == null) {
return immediateFailedFuture(new IllegalArgumentException("queryId should not be null"));
}
RateLimiter rateLimiter = rateLimiterCache.getUnchecked(queryId);
if (rateLimiter.tryAcquire()) {
return immediateFuture(0.0);
}
ListenableFuture<Double> asyncTask = rateLimiterExecutorService.submit(() -> rateLimiter.acquire());
rateLimiterTriggeredCounter.update(1);
return asyncTask;
}

@Managed
@Nested
public CounterStat getRateLimiterTriggeredCounter()
{
return rateLimiterTriggeredCounter;
}

@Override
public TimeStat getRateLimiterBlockTime()
{
return rateLimiterBlockTime;
}

@Override
public void addRateLimiterBlockTime(Duration duration)
{
rateLimiterBlockTime.add(duration);
}

@PreDestroy
public void destroy()
{
rateLimiterExecutorService.shutdownNow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@
package com.facebook.presto.server.protocol;

import com.facebook.airlift.concurrent.BoundedExecutor;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.TimeStat;
import com.facebook.presto.client.QueryResults;
import com.facebook.presto.features.annotations.FeatureToggle;
import com.facebook.presto.memory.context.ft.MemoryFeatureToggleInterface;
import com.facebook.presto.server.ForStatementResource;
import com.facebook.presto.server.ServerConfig;
import com.facebook.presto.spi.QueryId;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Provider;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.weakref.jmx.Managed;
Expand All @@ -42,6 +46,9 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;

import java.util.function.Function;
import java.util.function.Supplier;

import static com.facebook.airlift.http.server.AsyncResponseHandler.bindAsyncResponse;
import static com.facebook.presto.client.PrestoHeaders.PRESTO_PREFIX_URL;
import static com.facebook.presto.server.protocol.QueryResourceUtil.abortIfPrefixUrlInvalid;
Expand All @@ -60,6 +67,8 @@
@RolesAllowed(USER)
public class ExecutingStatementResource
{
private static final Logger log = Logger.get(ExecutingStatementResource.class);

private static final Duration MAX_WAIT_TIME = new Duration(1, SECONDS);
private static final Ordering<Comparable<Duration>> WAIT_ORDERING = Ordering.natural().nullsLast();
private static final DataSize DEFAULT_TARGET_RESULT_SIZE = new DataSize(1, MEGABYTE);
Expand All @@ -68,26 +77,39 @@ public class ExecutingStatementResource
private final BoundedExecutor responseExecutor;
private final LocalQueryProvider queryProvider;
private final boolean compressionEnabled;
private final QueryBlockingRateLimiter queryRateLimiter;
private final Provider<QueryRateLimiter> queryRateLimiter;
private final Supplier<Boolean> isQueryLoggerEnabled;
private final Function<Object, Boolean> isQueryCancelEnabled;
private final Supplier<Boolean> isMemoryFeatureEnabled;
private final Provider<MemoryFeatureToggleInterface> memoryFeatureToggleInterface;

@Inject
public ExecutingStatementResource(
@ForStatementResource BoundedExecutor responseExecutor,
LocalQueryProvider queryProvider,
ServerConfig serverConfig,
QueryBlockingRateLimiter queryRateLimiter)
@FeatureToggle("query-rate-limiter") Provider<QueryRateLimiter> queryRateLimiter,
QueryRateLimiter xqueryRateLimiter,
@FeatureToggle("query-logger") Supplier<Boolean> isQueryLoggerEnabled,
@FeatureToggle("query-cancel") Function<Object, Boolean> isQueryCancelEnabled,
@FeatureToggle("memory-feature") Supplier<Boolean> isMemoryFeatureEnabled,
@FeatureToggle("memory-feature") Provider<MemoryFeatureToggleInterface> memoryFeatureToggleInterface)
{
this.responseExecutor = requireNonNull(responseExecutor, "responseExecutor is null");
this.queryProvider = requireNonNull(queryProvider, "queryProvider is null");
this.compressionEnabled = requireNonNull(serverConfig, "serverConfig is null").isQueryResultsCompressionEnabled();
this.queryRateLimiter = requireNonNull(queryRateLimiter, "queryRateLimiter is null");
this.isQueryLoggerEnabled = isQueryLoggerEnabled;
this.isQueryCancelEnabled = isQueryCancelEnabled;
this.isMemoryFeatureEnabled = isMemoryFeatureEnabled;
this.memoryFeatureToggleInterface = memoryFeatureToggleInterface;
}

@Managed
@Nested
public TimeStat getRateLimiterBlockTime()
{
return queryRateLimiter.getRateLimiterBlockTime();
return queryRateLimiter.get().getRateLimiterBlockTime();
}

@GET
Expand Down Expand Up @@ -118,6 +140,26 @@ public void getQueryResults(
abortIfPrefixUrlInvalid(xPrestoPrefixUrl);

Query query = queryProvider.getQuery(queryId, slug);
QueryRateLimiter queryRateLimiter = this.queryRateLimiter.get();

if (isQueryLoggerEnabled.get()) {
log.info("query-logger is ENABLED");
log.info("DELETE localhost:8080/v1/statement/executing/%s/123456789?slug=%s", queryId.getId(), slug);
}
else {
log.info("query-logger is DISABLED");
}

if (isMemoryFeatureEnabled.get()) {
log.info("\"memory-feature\" is ENABLED");
log.info("MemoryFeatureToggleInterface class " + memoryFeatureToggleInterface.get().getClass().getName());
}
else {
log.info("\"memory-feature\" is DISABLED");
}

log.info("QueryRateLimiter class " + queryRateLimiter.getClass().getName());

ListenableFuture<Double> acquirePermitAsync = queryRateLimiter.acquire(queryId);
String effectiveFinalProto = proto;
DataSize effectiveFinalTargetResultSize = targetResultSize;
Expand All @@ -143,7 +185,13 @@ public Response cancelQuery(
@PathParam("token") long token,
@QueryParam("slug") String slug)
{
queryProvider.cancel(queryId, slug);
return Response.noContent().build();
boolean cancelEnabled = isQueryCancelEnabled.apply(queryId);
if (cancelEnabled) {
queryProvider.cancel(queryId, slug);
return Response.noContent().build();
}
else {
throw new RuntimeException("Cancel is not allowed!");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class LocalQueryProvider
private final BlockEncodingSerde blockEncodingSerde;
private final BoundedExecutor responseExecutor;
private final ScheduledExecutorService timeoutExecutor;
private final RetryCircuitBreaker retryCircuitBreaker;
private final RetryCircuitBreakerInt retryCircuitBreaker;

private final ConcurrentMap<QueryId, Query> queries = new ConcurrentHashMap<>();
private final ScheduledExecutorService queryPurger = newSingleThreadScheduledExecutor(threadsNamed("execution-query-purger"));
Expand All @@ -68,7 +68,7 @@ public LocalQueryProvider(
BlockEncodingSerde blockEncodingSerde,
@ForStatementResource BoundedExecutor responseExecutor,
@ForStatementResource ScheduledExecutorService timeoutExecutor,
RetryCircuitBreaker retryCircuitBreaker)
RetryCircuitBreakerInt retryCircuitBreaker)
{
this.queryManager = requireNonNull(queryManager, "queryManager is null");
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
Expand Down

0 comments on commit d5af879

Please sign in to comment.