Skip to content

Commit

Permalink
DRILL-3992: Add/fix support for JDBC schemas (tested against oracle a…
Browse files Browse the repository at this point in the history
…nd derby)

This closes #225
  • Loading branch information
jacques-n committed Nov 2, 2015
1 parent 7f55051 commit 22e5316
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 27 deletions.
Expand Up @@ -234,6 +234,7 @@ public RelNode convert(RelNode in) {


private class CapitalizingJdbcSchema extends AbstractSchema { private class CapitalizingJdbcSchema extends AbstractSchema {


final Map<String, CapitalizingJdbcSchema> schemaMap = Maps.newHashMap();
private final JdbcSchema inner; private final JdbcSchema inner;


public CapitalizingJdbcSchema(List<String> parentSchemaPath, String name, DataSource dataSource, public CapitalizingJdbcSchema(List<String> parentSchemaPath, String name, DataSource dataSource,
Expand All @@ -258,13 +259,21 @@ public Set<String> getFunctionNames() {
} }


@Override @Override
public Schema getSubSchema(String name) { public CapitalizingJdbcSchema getSubSchema(String name) {
return inner.getSubSchema(name); return schemaMap.get(name);
}

void setHolder(SchemaPlus plusOfThis) {
for (String s : getSubSchemaNames()) {
CapitalizingJdbcSchema inner = getSubSchema(s);
SchemaPlus holder = plusOfThis.add(s, inner);
inner.setHolder(holder);
}
} }


@Override @Override
public Set<String> getSubSchemaNames() { public Set<String> getSubSchemaNames() {
return inner.getSubSchemaNames(); return schemaMap.keySet();
} }


@Override @Override
Expand Down Expand Up @@ -295,25 +304,74 @@ public JdbcCatalogSchema(String name) {
try (Connection con = source.getConnection(); ResultSet set = con.getMetaData().getCatalogs()) { try (Connection con = source.getConnection(); ResultSet set = con.getMetaData().getCatalogs()) {
while (set.next()) { while (set.next()) {
final String catalogName = set.getString(1); final String catalogName = set.getString(1);
CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(getSchemaPath(), catalogName, source, dialect, CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(
convention, catalogName, null); getSchemaPath(), catalogName, source, dialect, convention, catalogName, null);
schemaMap.put(catalogName, schema); schemaMap.put(catalogName, schema);
} }
} catch (SQLException e) { } catch (SQLException e) {
logger.warn("Failure while attempting to load JDBC schema.", e); logger.warn("Failure while attempting to load JDBC schema.", e);
} }


// unable to read general catalog // unable to read catalog list.
if (schemaMap.isEmpty()) { if (schemaMap.isEmpty()) {
schemaMap.put("default", new CapitalizingJdbcSchema(ImmutableList.<String> of(), name, source, dialect,
convention, // try to add a list of schemas to the schema map.
null, null)); boolean schemasAdded = addSchemas();

if (!schemasAdded) {
// there were no schemas, just create a default one (the jdbc system doesn't support catalogs/schemas).
schemaMap.put("default", new CapitalizingJdbcSchema(ImmutableList.<String> of(), name, source, dialect,
convention, null, null));
}
} else {
// We already have catalogs. Add schemas in this context of their catalogs.
addSchemas();
} }


defaultSchema = schemaMap.values().iterator().next(); defaultSchema = schemaMap.values().iterator().next();



}

void setHolder(SchemaPlus plusOfThis) {
for (String s : getSubSchemaNames()) {
CapitalizingJdbcSchema inner = getSubSchema(s);
SchemaPlus holder = plusOfThis.add(s, inner);
inner.setHolder(holder);
}
} }


private boolean addSchemas() {
boolean added = false;
try (Connection con = source.getConnection(); ResultSet set = con.getMetaData().getSchemas()) {
while (set.next()) {
final String schemaName = set.getString(1);
final String catalogName = set.getString(2);

CapitalizingJdbcSchema parentSchema = schemaMap.get(catalogName);
if (parentSchema == null) {
CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(getSchemaPath(), schemaName, source, dialect,
convention, catalogName, schemaName);

// if a catalog schema doesn't exist, we'll add this at the top level.
schemaMap.put(schemaName, schema);
} else {
CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(parentSchema.getSchemaPath(), schemaName,
source, dialect,
convention, catalogName, schemaName);
parentSchema.schemaMap.put(schemaName, schema);

}
added = true;
}
} catch (SQLException e) {
logger.warn("Failure while attempting to load JDBC schema.", e);
}

return added;
}


@Override @Override
public String getTypeName() { public String getTypeName() {
return JdbcStorageConfig.NAME; return JdbcStorageConfig.NAME;
Expand All @@ -325,7 +383,7 @@ public Schema getDefaultSchema() {
} }


@Override @Override
public Schema getSubSchema(String name) { public CapitalizingJdbcSchema getSubSchema(String name) {
return schemaMap.get(name); return schemaMap.get(name);
} }


Expand Down Expand Up @@ -358,9 +416,11 @@ public Set<String> getTableNames() {
@Override @Override
public void registerSchemas(SchemaConfig config, SchemaPlus parent) { public void registerSchemas(SchemaConfig config, SchemaPlus parent) {
JdbcCatalogSchema schema = new JdbcCatalogSchema(name); JdbcCatalogSchema schema = new JdbcCatalogSchema(name);
parent.add(name, schema); SchemaPlus holder = parent.add(name, schema);
schema.setHolder(holder);
} }



@Override @Override
public JdbcStorageConfig getConfig() { public JdbcStorageConfig getConfig() {
return config; return config;
Expand Down
Expand Up @@ -88,7 +88,7 @@ public void validateResult() throws Exception {
// we'll test data except for date, time and timestamps. Derby mangles these due to improper timezone support. // we'll test data except for date, time and timestamps. Derby mangles these due to improper timezone support.
testBuilder() testBuilder()
.sqlQuery( .sqlQuery(
"select PERSONID, LASTNAME, FIRSTNAME, ADDRESS, CITY, CODE, DBL, FLT, REL, NUM, SM, BI, BOOL from testdb.`default`.PERSON") "select PERSONID, LASTNAME, FIRSTNAME, ADDRESS, CITY, CODE, DBL, FLT, REL, NUM, SM, BI, BOOL from testdb.PERSON")
.ordered() .ordered()
.baselineColumns("PERSONID", "LASTNAME", "FIRSTNAME", "ADDRESS", "CITY", "CODE", "DBL", "FLT", "REL", .baselineColumns("PERSONID", "LASTNAME", "FIRSTNAME", "ADDRESS", "CITY", "CODE", "DBL", "FLT", "REL",
"NUM", "SM", "BI", "BOOL") "NUM", "SM", "BI", "BOOL")
Expand Down Expand Up @@ -122,9 +122,9 @@ public void pushdownJoin() throws Exception {
@Test @Test
public void pushdownJoinAndFilterPushDown() throws Exception { public void pushdownJoinAndFilterPushDown() throws Exception {
final String query = "select * from \n" + final String query = "select * from \n" +
"testdb.`default`.PERSON e\n" + "testdb.PERSON e\n" +
"INNER JOIN \n" + "INNER JOIN \n" +
"testdb.`default`.PERSON s\n" + "testdb.PERSON s\n" +
"ON e.FirstName = s.FirstName\n" + "ON e.FirstName = s.FirstName\n" +
"WHERE e.LastName > 'hello'"; "WHERE e.LastName > 'hello'";


Expand All @@ -134,20 +134,20 @@ public void pushdownJoinAndFilterPushDown() throws Exception {
@Test @Test
public void pushdownAggregation() throws Exception { public void pushdownAggregation() throws Exception {
final String query = "select count(*) from \n" + final String query = "select count(*) from \n" +
"testdb.`default`.PERSON"; "testdb.PERSON";


testPlanMatchingPatterns(query, new String[] {}, new String[] { "Aggregate" }); testPlanMatchingPatterns(query, new String[] {}, new String[] { "Aggregate" });
} }


@Test @Test
public void pushdownDoubleJoinAndFilter() throws Exception { public void pushdownDoubleJoinAndFilter() throws Exception {
final String query = "select * from \n" + final String query = "select * from \n" +
"testdb.`default`.PERSON e\n" + "testdb.PERSON e\n" +
"INNER JOIN \n" + "INNER JOIN \n" +
"testdb.`default`.PERSON s\n" + "testdb.PERSON s\n" +
"ON e.PersonId = s.PersonId\n" + "ON e.PersonId = s.PersonId\n" +
"INNER JOIN \n" + "INNER JOIN \n" +
"testdb.`default`.PERSON ed\n" + "testdb.PERSON ed\n" +
"ON e.PersonId = ed.PersonId\n" + "ON e.PersonId = ed.PersonId\n" +
"WHERE s.FirstName > 'abc' and ed.FirstName > 'efg'"; "WHERE s.FirstName > 'abc' and ed.FirstName > 'efg'";
testPlanMatchingPatterns(query, new String[] {}, new String[] { "Join", "Filter" }); testPlanMatchingPatterns(query, new String[] {}, new String[] { "Join", "Filter" });
Expand Down
Expand Up @@ -17,26 +17,29 @@
*/ */
package org.apache.drill.exec.store.ischema; package org.apache.drill.exec.store.ischema;


import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_NAME;
import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_SCHEMA_NAME;
import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;

import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;


import com.google.common.collect.ImmutableMap; import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.schema.Schema.TableType; import org.apache.calcite.schema.Schema.TableType;
import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table; import org.apache.calcite.schema.Table;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;

import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.*;
import org.apache.drill.exec.planner.logical.DrillViewInfoProvider; import org.apache.drill.exec.planner.logical.DrillViewInfoProvider;
import org.apache.drill.exec.store.AbstractSchema; import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.ischema.InfoSchemaFilter.Result; import org.apache.drill.exec.store.ischema.InfoSchemaFilter.Result;
import org.apache.drill.exec.store.pojo.PojoRecordReader; import org.apache.drill.exec.store.pojo.PojoRecordReader;


import org.apache.calcite.rel.type.RelDataType; import com.google.common.base.Preconditions;
import org.apache.calcite.rel.type.RelDataTypeField;

import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;


/** /**
Expand Down Expand Up @@ -180,8 +183,14 @@ public RecordReader getRecordReader() {


@Override @Override
public boolean visitTable(String schemaName, String tableName, Table table) { public boolean visitTable(String schemaName, String tableName, Table table) {
records.add(new Records.Table(IS_CATALOG_NAME, schemaName, tableName, Preconditions.checkNotNull(table, "Error. Table %s.%s provided is null.", schemaName, tableName);
table.getJdbcTableType().toString()));
// skip over unknown table types
if (table.getJdbcTableType() != null) {
records.add(new Records.Table(IS_CATALOG_NAME, schemaName, tableName,
table.getJdbcTableType().toString()));
}

return false; return false;
} }
} }
Expand Down

0 comments on commit 22e5316

Please sign in to comment.