Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRILL-5089: Dynamically load schema of storage plugin only when neede… #1032

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -230,7 +230,12 @@ public DrillbitContext getDrillbitContext() {
return context;
}

public SchemaPlus getRootSchema() {
/**
* This method is only used to construt InfoSchemaReader, it is for the reader to get full schema, so here we
* are going to return a fully initialized schema tree.
* @return root schema's plus
*/
public SchemaPlus getFullRootSchema() {
if (queryContext == null) {
fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " +
"This is a non-root fragment."));
Expand All @@ -248,7 +253,7 @@ public SchemaPlus getRootSchema() {
.setIgnoreAuthErrors(isImpersonationEnabled)
.build();

return queryContext.getRootSchema(schemaConfig);
return queryContext.getFullRootSchema(schemaConfig);
}

/**
Expand Down
Expand Up @@ -163,14 +163,23 @@ public SchemaPlus getRootSchema(final String userName) {
}

/**
* Create and return a SchemaTree with given <i>schemaConfig</i>.
* Create and return a SchemaTree with given <i>schemaConfig</i> but some schemas (from storage plugins)
* could be initialized later.
* @param schemaConfig
* @return
*/
public SchemaPlus getRootSchema(SchemaConfig schemaConfig) {
return schemaTreeProvider.createRootSchema(schemaConfig);
}
/**
* Create and return a fully initialized SchemaTree with given <i>schemaConfig</i>.
* @param schemaConfig
* @return
*/

public SchemaPlus getFullRootSchema(SchemaConfig schemaConfig) {
return schemaTreeProvider.createFullRootSchema(schemaConfig);
}
/**
* Get the user name of the user who issued the query that is managed by this QueryContext.
* @return
Expand Down
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.sql;

import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.calcite.DataContext;
import org.apache.calcite.jdbc.CalciteRootSchema;
import org.apache.calcite.jdbc.CalciteSchema;

import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.calcite.util.Compatible;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePlugin;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.SubSchemaWrapper;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;

/**
* This class is to allow us loading schemas from storage plugins later when {@link #getSubSchema(String, boolean)}
* is called.
*/
public class DynamicRootSchema extends DynamicSchema
implements CalciteRootSchema {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DynamicRootSchema.class);

protected SchemaConfig schemaConfig;
protected StoragePluginRegistry storages;

public StoragePluginRegistry getSchemaFactories() {
return storages;
}

/** Creates a root schema. */
DynamicRootSchema(StoragePluginRegistry storages, SchemaConfig schemaConfig) {
super(null, new RootSchema(), "");
this.schemaConfig = schemaConfig;
this.storages = storages;
}

@Override
public CalciteSchema getSubSchema(String schemaName, boolean caseSensitive) {
CalciteSchema retSchema = getSubSchemaMap().get(schemaName);
if (retSchema != null) {
return retSchema;
}

loadSchemaFactory(schemaName, caseSensitive);
retSchema = getSubSchemaMap().get(schemaName);
return retSchema;
}

@Override
public NavigableSet<String> getTableNames() {
return Compatible.INSTANCE.navigableSet(ImmutableSortedSet.<String>of());
}

/**
* load schema factory(storage plugin) for schemaName
* @param schemaName
* @param caseSensitive
*/
public void loadSchemaFactory(String schemaName, boolean caseSensitive) {
try {
SchemaPlus thisPlus = this.plus();
StoragePlugin plugin = getSchemaFactories().getPlugin(schemaName);
if (plugin != null) {
plugin.registerSchemas(schemaConfig, thisPlus);
return;
}

// Could not find the plugin of schemaName. The schemaName could be `dfs.tmp`, a 2nd level schema under 'dfs'
String[] paths = schemaName.split("\\.");
if (paths.length == 2) {
plugin = getSchemaFactories().getPlugin(paths[0]);
if (plugin == null) {
return;
}

// Found the storage plugin for first part(e.g. 'dfs') of schemaName (e.g. 'dfs.tmp')
// register schema for this storage plugin to 'this'.
plugin.registerSchemas(schemaConfig, thisPlus);

// Load second level schemas for this storage plugin
final SchemaPlus firstlevelSchema = thisPlus.getSubSchema(paths[0]);
final List<SchemaPlus> secondLevelSchemas = Lists.newArrayList();
for (String secondLevelSchemaName : firstlevelSchema.getSubSchemaNames()) {
secondLevelSchemas.add(firstlevelSchema.getSubSchema(secondLevelSchemaName));
}

for (SchemaPlus schema : secondLevelSchemas) {
org.apache.drill.exec.store.AbstractSchema drillSchema;
try {
drillSchema = schema.unwrap(org.apache.drill.exec.store.AbstractSchema.class);
} catch (ClassCastException e) {
throw new RuntimeException(String.format("Schema '%s' is not expected under root schema", schema.getName()));
}
SubSchemaWrapper wrapper = new SubSchemaWrapper(drillSchema);
thisPlus.add(wrapper.getName(), wrapper);
}
}
} catch(ExecutionSetupException | IOException ex) {
logger.warn("Failed to load schema for \"" + schemaName + "\"!", ex);
}
}

static class RootSchema extends AbstractSchema {
@Override public Expression getExpression(SchemaPlus parentSchema,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please explain why we override getExpression method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from the RootSchema used in SimpleCalciteSchema which class is not public. getExpression is used in Calcite code not in our code.

String name) {
return Expressions.call(
DataContext.ROOT,
BuiltInMethod.DATA_CONTEXT_GET_ROOT_SCHEMA.method);
}
}
}

@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.sql;

import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.SimpleCalciteSchema;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginRegistry;


/**
* Unlike SimpleCalciteSchema, DynamicSchema could have an empty or partial schemaMap, but it could maintain a map of
* name->SchemaFactory, and only register schema when the corresponsdent name is requested.
*/
public class DynamicSchema extends SimpleCalciteSchema {

public DynamicSchema(CalciteSchema parent, Schema schema, String name) {
super(parent, schema, name);
}

@Override
public CalciteSchema getSubSchema(String schemaName, boolean caseSensitive) {
Schema s = schema.getSubSchema(schemaName);
if (s != null) {
return new DynamicSchema(this, s, schemaName);
}
CalciteSchema ret = getSubSchemaMap().get(schemaName);
return ret;
}

@Override
public SchemaPlus plus() {
return super.plus();
}

public static SchemaPlus createRootSchema(StoragePluginRegistry storages, SchemaConfig schemaConfig) {
DynamicRootSchema rootSchema = new DynamicRootSchema(storages, schemaConfig);
return rootSchema.plus();
}

}
Expand Up @@ -24,7 +24,6 @@
import com.google.common.base.Strings;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.CalciteSchemaImpl;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptCluster;
Expand Down Expand Up @@ -117,9 +116,9 @@ public SqlConverter(QueryContext context) {
this.session = context.getSession();
this.drillConfig = context.getConfig();
this.catalog = new DrillCalciteCatalogReader(
this.rootSchema,
rootSchema,
parserConfig.caseSensitive(),
CalciteSchemaImpl.from(defaultSchema).path(null),
DynamicSchema.from(defaultSchema).path(null),
typeFactory,
drillConfig,
session);
Expand Down Expand Up @@ -297,7 +296,7 @@ public RelNode expandView(RelDataType rowType, String queryString, List<String>
@Override
public RelNode expandView(RelDataType rowType, String queryString, SchemaPlus rootSchema, List<String> schemaPath) {
final DrillCalciteCatalogReader catalogReader = new DrillCalciteCatalogReader(
rootSchema, // new root schema
rootSchema,
parserConfig.caseSensitive(),
schemaPath,
typeFactory,
Expand Down Expand Up @@ -443,7 +442,7 @@ private class DrillCalciteCatalogReader extends CalciteCatalogReader {
JavaTypeFactory typeFactory,
DrillConfig drillConfig,
UserSession session) {
super(CalciteSchemaImpl.from(rootSchema), caseSensitive, defaultSchema, typeFactory);
super(DynamicSchema.from(rootSchema), caseSensitive, defaultSchema, typeFactory);
this.drillConfig = drillConfig;
this.session = session;
this.allowTemporaryTables = true;
Expand Down
Expand Up @@ -20,12 +20,12 @@
import java.io.IOException;
import java.util.List;

import org.apache.calcite.jdbc.SimpleCalciteSchema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.ViewExpansionContext;
import org.apache.drill.exec.planner.sql.DynamicSchema;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.OptionValue;
Expand Down Expand Up @@ -105,12 +105,36 @@ public SchemaPlus createRootSchema(final String userName, final SchemaConfigInfo
* @return
*/
public SchemaPlus createRootSchema(SchemaConfig schemaConfig) {
final SchemaPlus rootSchema = DynamicSchema.createRootSchema(dContext.getStorage(), schemaConfig);
schemaTreesToClose.add(rootSchema);
return rootSchema;
}

/**
* Return full root schema with schema owner as the given user.
*
* @param userName Name of the user who is accessing the storage sources.
* @param provider {@link SchemaConfigInfoProvider} instance
* @return Root of the schema tree.
*/
public SchemaPlus createFullRootSchema(final String userName, final SchemaConfigInfoProvider provider) {
final String schemaUser = isImpersonationEnabled ? userName : ImpersonationUtil.getProcessUserName();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be factored out somewhere? Seems this magic stanza will be needed in many places: better to have one copy than several. Maybe a method in ImpersonationUtil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not that many places, and need to pass in isImpersonationEnabled and userName if this line became a standalone method. will keep it as is for now.

final SchemaConfig schemaConfig = SchemaConfig.newBuilder(schemaUser, provider).build();
return createFullRootSchema(schemaConfig);
}
/**
* Create and return a Full SchemaTree with given <i>schemaConfig</i>.
* @param schemaConfig
* @return
*/
public SchemaPlus createFullRootSchema(SchemaConfig schemaConfig) {
try {
final SchemaPlus rootSchema = SimpleCalciteSchema.createRootSchema(false);
final SchemaPlus rootSchema = DynamicSchema.createRootSchema(dContext.getStorage(), schemaConfig);
dContext.getSchemaFactory().registerSchemas(schemaConfig, rootSchema);
schemaTreesToClose.add(rootSchema);
return rootSchema;
} catch(IOException e) {
}
catch(IOException e) {
// We can't proceed further without a schema, throw a runtime exception.
// Improve the error message for client side.

Expand All @@ -124,6 +148,7 @@ public SchemaPlus createRootSchema(SchemaConfig schemaConfig) {
.addContext(contextString)
.build(logger);
}

}

@Override
Expand Down
Expand Up @@ -46,6 +46,7 @@
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.exception.StoreException;
import org.apache.drill.exec.planner.logical.StoragePlugins;
import org.apache.drill.exec.planner.sql.DynamicSchema;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.dfs.FormatPlugin;
Expand Down Expand Up @@ -494,4 +495,6 @@ public static Map<Object, Constructor<? extends StoragePlugin>> findAvailablePlu
return availablePlugins;
}



}