Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
chunhui-shi committed Nov 16, 2017
1 parent a381677 commit 6eeb933
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ public DrillbitContext getDrillbitContext() {
return context;
}

/**
* This method is only used to construt InfoSchemaReader, it is for the reader to get full schema, so here we
* are going to return a fully initialized schema tree.
* @return root schema's plus
*/
public SchemaPlus getFullRootSchema() {
if (queryContext == null) {
fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,19 @@ public SchemaPlus getRootSchema(final String userName) {
}

/**
* Create and return a SchemaTree with given <i>schemaConfig</i>.
* Create and return a SchemaTree with given <i>schemaConfig</i> but some schemas (from storage plugins)
* could be initialized later.
* @param schemaConfig
* @return
*/
public SchemaPlus getRootSchema(SchemaConfig schemaConfig) {
return schemaTreeProvider.createRootSchema(schemaConfig);
}
/**
* Create and return a fully initialized SchemaTree with given <i>schemaConfig</i>.
* @param schemaConfig
* @return
*/

public SchemaPlus getFullRootSchema(SchemaConfig schemaConfig) {
return schemaTreeProvider.createFullRootSchema(schemaConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,18 @@
import org.apache.drill.exec.store.SubSchemaWrapper;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;

/**
* This class is to allow us loading schemas from storage plugins later when {@link #getSubSchema(String, boolean)}
* is called.
*/
public class DynamicRootSchema extends DynamicSchema
implements CalciteRootSchema {

private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DynamicRootSchema.class);
/** Creates a root schema. */
DynamicRootSchema(StoragePluginRegistry storages, SchemaConfig schemaConfig) {
super(null, new RootSchema(), "");
Expand All @@ -54,11 +59,11 @@ public class DynamicRootSchema extends DynamicSchema
@Override
public CalciteSchema getSubSchema(String schemaName, boolean caseSensitive) {
CalciteSchema retSchema = getSubSchemaMap().get(schemaName);

if (retSchema == null) {
loadSchemaFactory(schemaName, caseSensitive);
if (retSchema != null) {
return retSchema;
}

loadSchemaFactory(schemaName, caseSensitive);
retSchema = getSubSchemaMap().get(schemaName);
return retSchema;
}
Expand All @@ -85,35 +90,45 @@ public void loadSchemaFactory(String schemaName, boolean caseSensitive) {
StoragePlugin plugin = getSchemaFactories().getPlugin(schemaName);
if (plugin != null) {
plugin.registerSchemas(schemaConfig, thisPlus);
return;
}
else {
//this schema name could be `dfs.tmp`, a 2nd level schema under 'dfs'
String[] paths = schemaName.split("\\.");
if (paths.length == 2) {
plugin = getSchemaFactories().getPlugin(paths[0]);
if (plugin != null) {
plugin.registerSchemas(schemaConfig, thisPlus);
}

final CalciteSchema secondLevelSchema = getSubSchemaMap().get(paths[0]).getSubSchema(paths[1], caseSensitive);
if (secondLevelSchema != null) {
SchemaPlus secondlevel = secondLevelSchema.plus();
org.apache.drill.exec.store.AbstractSchema drillSchema =
secondlevel.unwrap(org.apache.drill.exec.store.AbstractSchema.class);
SubSchemaWrapper wrapper = new SubSchemaWrapper(drillSchema);
thisPlus.add(wrapper.getName(), wrapper);
// we could not find the plugin, the schemaName could be `dfs.tmp`, a 2nd level schema under 'dfs'
String[] paths = schemaName.split("\\.");
if (paths.length == 2) {
plugin = getSchemaFactories().getPlugin(paths[0]);
if (plugin == null) {
return;
}

// we could find storage plugin for first part(e.g. 'dfs') of schemaName (e.g. 'dfs.tmp')
// register schema for this storage plugin to 'this'.
plugin.registerSchemas(schemaConfig, thisPlus);

// we load second level schemas for this storage plugin
final SchemaPlus firstlevelSchema = thisPlus.getSubSchema(paths[0]);
final List<SchemaPlus> secondLevelSchemas = Lists.newArrayList();
for (String secondLevelSchemaName : firstlevelSchema.getSubSchemaNames()) {
secondLevelSchemas.add(firstlevelSchema.getSubSchema(secondLevelSchemaName));
}

for (SchemaPlus schema : secondLevelSchemas) {
org.apache.drill.exec.store.AbstractSchema drillSchema;
try {
drillSchema = schema.unwrap(org.apache.drill.exec.store.AbstractSchema.class);
} catch (ClassCastException e) {
throw new RuntimeException(String.format("Schema '%s' is not expected under root schema", schema.getName()));
}
SubSchemaWrapper wrapper = new SubSchemaWrapper(drillSchema);
thisPlus.add(wrapper.getName(), wrapper);
}
}
} catch(ExecutionSetupException | IOException ex) {
logger.warn("Failed to load schema for \"" + schemaName + "\"!", ex);
}
}

static class RootSchema extends AbstractSchema {
RootSchema() {
super();
}

@Override public Expression getExpression(SchemaPlus parentSchema,
String name) {
return Expressions.call(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.util.List;

//import org.apache.calcite.jdbc.SimpleCalciteSchema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.UserException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ public class FileSystemSchemaFactory implements SchemaFactory{

public static final String DEFAULT_WS_NAME = "default";

public static final String LOCAL_FS_SCHEME = "file";

private List<WorkspaceSchemaFactory> factories;
private String schemaName;
protected FileSystemPlugin plugin;

public FileSystemSchemaFactory(String schemaName, List<WorkspaceSchemaFactory> factories) {
super();
// when the correspondent FileSystemPlugin is not passed in, we dig into ANY workspace factory to get it.
if (factories.size() > 0 ) {
this.plugin = factories.get(0).getPlugin();
}
Expand All @@ -67,7 +69,6 @@ public FileSystemSchemaFactory(String schemaName, List<WorkspaceSchemaFactory> f
}

public FileSystemSchemaFactory(FileSystemPlugin plugin, String schemaName, List<WorkspaceSchemaFactory> factories) {
super();
this.plugin = plugin;
this.schemaName = schemaName;
this.factories = factories;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public boolean accessible(DrillFileSystem fs) throws IOException {
* In this case, we will still use method listStatus.
* In other cases, we use access method since it is cheaper.
*/
if (SystemUtils.IS_OS_WINDOWS && fs.getUri().getScheme().equalsIgnoreCase("file")) {
if (SystemUtils.IS_OS_WINDOWS && fs.getUri().getScheme().equalsIgnoreCase(FileSystemSchemaFactory.LOCAL_FS_SCHEME)) {
fs.listStatus(wsPath);
}
else {
Expand Down Expand Up @@ -572,10 +572,10 @@ public boolean isMutable() {
}

public DrillFileSystem getFS() {
if (this.fs == null) {
this.fs = ImpersonationUtil.createFileSystem(schemaConfig.getUserName(), fsConf);
if (fs == null) {
fs = ImpersonationUtil.createFileSystem(schemaConfig.getUserName(), fsConf);
}
return this.fs;
return fs;
}

public String getDefaultLocation() {
Expand Down

0 comments on commit 6eeb933

Please sign in to comment.