Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRILL-5481: Allow to persist profiles in-memory only with a max capacity #834

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -107,6 +107,8 @@ public interface ExecConstants {
String SYS_STORE_PROVIDER_CLASS = "drill.exec.sys.store.provider.class";
String SYS_STORE_PROVIDER_LOCAL_PATH = "drill.exec.sys.store.provider.local.path";
String SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE = "drill.exec.sys.store.provider.local.write";
String PROFILES_STORE_INMEMORY = "drill.exec.profiles.store.inmemory";
String PROFILES_STORE_CAPACITY = "drill.exec.profiles.store.capacity";
String IMPERSONATION_ENABLED = "drill.exec.impersonation.enabled";
String IMPERSONATION_MAX_CHAINED_USER_HOPS = "drill.exec.impersonation.max_chained_user_hops";
String AUTHENTICATION_MECHANISMS = "drill.exec.security.auth.mechanisms";
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.QueryProfileStoreContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.server.options.QueryOptionManager;
Expand Down Expand Up @@ -209,6 +210,10 @@ public DrillConfig getConfig() {
return drillbitContext.getConfig();
}

public QueryProfileStoreContext getProfileStoreContext() {
return drillbitContext.getProfileStoreContext();
}

@Override
public FunctionImplementationRegistry getFunctionRegistry() {
return drillbitContext.getFunctionImplementationRegistry();
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.drill.exec.service.ServiceEngine;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.sys.store.provider.CachingPersistentStoreProvider;
import org.apache.drill.exec.store.sys.store.provider.InMemoryStoreProvider;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
Expand Down Expand Up @@ -76,6 +77,7 @@ public class Drillbit implements AutoCloseable {
private final WebServer webServer;
private RegistrationHandle registrationHandle;
private volatile StoragePluginRegistry storageRegistry;
private final PersistentStoreProvider profileStoreProvider;

@VisibleForTesting
public Drillbit(
Expand Down Expand Up @@ -105,6 +107,14 @@ public Drillbit(
isDistributedMode = true;
}

//Check if InMemory Profile Store, else use Default Store Provider
if (config.getBoolean(ExecConstants.PROFILES_STORE_INMEMORY)) {
profileStoreProvider = new InMemoryStoreProvider(config.getInt(ExecConstants.PROFILES_STORE_CAPACITY));
logger.info("Upto {} latest query profiles will be retained in-memory", config.getInt(ExecConstants.PROFILES_STORE_CAPACITY));
} else {
profileStoreProvider = storeProvider;
}

engine = new ServiceEngine(manager, context, allowPortHunting, isDistributedMode);

logger.info("Construction completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS));
Expand All @@ -115,8 +125,11 @@ public void run() throws Exception {
logger.debug("Startup begun.");
coord.start(10000);
storeProvider.start();
if (profileStoreProvider != storeProvider) {
profileStoreProvider.start();
}
final DrillbitEndpoint md = engine.start();
manager.start(md, engine.getController(), engine.getDataConnectionCreator(), coord, storeProvider);
manager.start(md, engine.getController(), engine.getDataConnectionCreator(), coord, storeProvider, profileStoreProvider);
final DrillbitContext drillbitContext = manager.getContext();
storageRegistry = drillbitContext.getStorage();
storageRegistry.init();
Expand Down Expand Up @@ -164,6 +177,11 @@ public synchronized void close() {
manager,
storageRegistry,
context);

//Closing the profile store provider if distinct
if (storeProvider != profileStoreProvider) {
AutoCloseables.close(profileStoreProvider);
}
} catch(Exception e) {
logger.warn("Failure on close()", e);
}
Expand Down
Expand Up @@ -65,7 +65,7 @@ public class DrillbitContext implements AutoCloseable {
private final LogicalPlanPersistence lpPersistence;
// operator table for standard SQL operators and functions, Drill built-in UDFs
private final DrillOperatorTable table;

private final QueryProfileStoreContext profileStoreContext;

public DrillbitContext(
DrillbitEndpoint endpoint,
Expand All @@ -75,6 +75,19 @@ public DrillbitContext(
DataConnectionCreator connectionsPool,
WorkEventBus workBus,
PersistentStoreProvider provider) {
//PersistentStoreProvider is re-used for providing Query Profile Store as well
this(endpoint, context, coord, controller, connectionsPool, workBus, provider, provider);
}

public DrillbitContext(
DrillbitEndpoint endpoint,
BootStrapContext context,
ClusterCoordinator coord,
Controller controller,
DataConnectionCreator connectionsPool,
WorkEventBus workBus,
PersistentStoreProvider provider,
PersistentStoreProvider profileStoreProvider) {
this.classpathScan = context.getClasspathScan();
this.workBus = workBus;
this.controller = checkNotNull(controller);
Expand All @@ -97,6 +110,13 @@ public DrillbitContext(

// This operator table is built once and used for all queries which do not need dynamic UDF support.
this.table = new DrillOperatorTable(functionRegistry, systemOptions);

//This profile store context is built from the profileStoreProvider
this.profileStoreContext = new QueryProfileStoreContext(context.getConfig(), profileStoreProvider, coord);
}

public QueryProfileStoreContext getProfileStoreContext() {
return profileStoreContext;
}

public FunctionImplementationRegistry getFunctionImplementationRegistry() {
Expand Down
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.server;

import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.store.TransientStore;
import org.apache.drill.exec.coord.store.TransientStoreConfig;
import org.apache.drill.exec.proto.SchemaUserBitShared;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
import org.apache.drill.exec.store.sys.PersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.store.sys.PersistentStoreConfig.StoreConfigBuilder;

public class QueryProfileStoreContext {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryProfileStoreContext.class);

private static final String PROFILES = "profiles";

private static final String RUNNING = "running";

private final PersistentStore<UserBitShared.QueryProfile> completedProfiles;

private final TransientStore<UserBitShared.QueryInfo> runningProfiles;

private final PersistentStoreConfig<QueryProfile> profileStoreConfig;

public QueryProfileStoreContext(DrillConfig config, PersistentStoreProvider storeProvider,
ClusterCoordinator coordinator) {
profileStoreConfig = PersistentStoreConfig.newProtoBuilder(SchemaUserBitShared.QueryProfile.WRITE,
SchemaUserBitShared.QueryProfile.MERGE)
.name(PROFILES)
.blob()
.build();

try {
completedProfiles = storeProvider.getOrCreateStore(profileStoreConfig);
} catch (final Exception e) {
throw new DrillRuntimeException(e);
}

runningProfiles = coordinator.getOrCreateTransientStore(TransientStoreConfig
.newProtoBuilder(SchemaUserBitShared.QueryInfo.WRITE, SchemaUserBitShared.QueryInfo.MERGE)
.name(RUNNING)
.build());
}

public PersistentStoreConfig<QueryProfile> getProfileStoreConfig() {
return profileStoreConfig;
}

public PersistentStore<QueryProfile> getCompletedProfileStore() {
return completedProfiles;
}

public TransientStore<QueryInfo> getRunningProfileStore() {
return runningProfiles;
}
}
Expand Up @@ -48,6 +48,7 @@
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled;
import org.apache.drill.exec.server.QueryProfileStoreContext;
import org.apache.drill.exec.server.rest.ViewableWithPermissions;
import org.apache.drill.exec.server.rest.auth.DrillUserPrincipal;
import org.apache.drill.exec.store.sys.PersistentStore;
Expand Down Expand Up @@ -180,8 +181,9 @@ public List<ProfileInfo> getFinishedQueries() {
@Produces(MediaType.APPLICATION_JSON)
public QProfiles getProfilesJSON(@Context UriInfo uriInfo) {
try {
final PersistentStore<QueryProfile> completed = getProvider().getOrCreateStore(QueryManager.QUERY_PROFILE);
final TransientStore<QueryInfo> running = getCoordinator().getOrCreateTransientStore(QueryManager.RUNNING_QUERY_INFO);
final QueryProfileStoreContext profileStoreContext = work.getContext().getProfileStoreContext();
final PersistentStore<QueryProfile> completed = profileStoreContext.getCompletedProfileStore();
final TransientStore<QueryInfo> running = profileStoreContext.getRunningProfileStore();

final List<String> errors = Lists.newArrayList();

Expand Down Expand Up @@ -258,7 +260,7 @@ private QueryProfile getQueryProfile(String queryId) {

// then check remote running
try {
final TransientStore<QueryInfo> running = getCoordinator().getOrCreateTransientStore(QueryManager.RUNNING_QUERY_INFO);
final TransientStore<QueryInfo> running = work.getContext().getProfileStoreContext().getRunningProfileStore();
final QueryInfo info = running.get(queryId);
if (info != null) {
QueryProfile queryProfile = work.getContext()
Expand All @@ -275,7 +277,7 @@ private QueryProfile getQueryProfile(String queryId) {

// then check blob store
try {
final PersistentStore<QueryProfile> profiles = getProvider().getOrCreateStore(QueryManager.QUERY_PROFILE);
final PersistentStore<QueryProfile> profiles = work.getContext().getProfileStoreContext().getCompletedProfileStore();
final QueryProfile queryProfile = profiles.get(queryId);
if (queryProfile != null) {
checkOrThrowProfileViewAuthorization(queryProfile);
Expand All @@ -296,7 +298,7 @@ private QueryProfile getQueryProfile(String queryId) {
@Produces(MediaType.APPLICATION_JSON)
public String getProfileJSON(@PathParam("queryid") String queryId) {
try {
return new String(QueryManager.QUERY_PROFILE.getSerializer().serialize(getQueryProfile(queryId)));
return new String(work.getContext().getProfileStoreContext().getProfileStoreConfig().getSerializer().serialize(getQueryProfile(queryId)));
} catch (Exception e) {
logger.debug("Failed to serialize profile for: " + queryId);
return ("{ 'message' : 'error (unable to serialize profile)' }");
Expand Down Expand Up @@ -329,7 +331,7 @@ public String cancelQuery(@PathParam("queryid") String queryId) {

// then check remote running
try {
final TransientStore<QueryInfo> running = getCoordinator().getOrCreateTransientStore(QueryManager.RUNNING_QUERY_INFO);
final TransientStore<QueryInfo> running = work.getContext().getProfileStoreContext().getRunningProfileStore();
final QueryInfo info = running.get(queryId);
checkOrThrowQueryCancelAuthorization(info.getUser(), queryId);
Ack a = work.getContext().getController().getTunnel(info.getForeman()).requestCancelQuery(id).checkedGet(2, TimeUnit.SECONDS);
Expand Down
Expand Up @@ -38,11 +38,17 @@ public class PersistentStoreConfig<V> {
private final String name;
private final InstanceSerializer<V> valueSerializer;
private final PersistentStoreMode mode;
private final int capacity;

protected PersistentStoreConfig(String name, InstanceSerializer<V> valueSerializer, PersistentStoreMode mode) {
protected PersistentStoreConfig(String name, InstanceSerializer<V> valueSerializer, PersistentStoreMode mode, int capacity) {
this.name = name;
this.valueSerializer = valueSerializer;
this.mode = mode;
this.capacity = capacity;
}

public int getCapacity() {
return capacity;
}

public PersistentStoreMode getMode() {
Expand Down Expand Up @@ -85,6 +91,7 @@ public static class StoreConfigBuilder<V> {
private String name;
private InstanceSerializer<V> serializer;
private PersistentStoreMode mode = PersistentStoreMode.PERSISTENT;
private int capacity;

protected StoreConfigBuilder(InstanceSerializer<V> serializer) {
super();
Expand All @@ -106,9 +113,14 @@ public StoreConfigBuilder<V> blob(){
return this;
}

public StoreConfigBuilder<V> setCapacity(int capacity) {
this.capacity = capacity;
return this;
}

public PersistentStoreConfig<V> build(){
Preconditions.checkNotNull(name);
return new PersistentStoreConfig<>(name, serializer, mode);
return new PersistentStoreConfig<>(name, serializer, mode, capacity);
}
}

Expand Down