Skip to content

Commit

Permalink
DRILL-2341: Get type information from view def when views field list …
Browse files Browse the repository at this point in the history
…is specified.
  • Loading branch information
vkorukanti committed Apr 15, 2015
1 parent 314e5a2 commit 09e752e
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 62 deletions.
Expand Up @@ -17,15 +17,20 @@
*/ */
package org.apache.drill.exec.planner.common; package org.apache.drill.exec.planner.common;


import java.util.AbstractList;
import java.util.List; import java.util.List;


import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types; import org.apache.drill.common.types.Types;
import org.apache.drill.exec.planner.logical.DrillOptiq; import org.apache.drill.exec.planner.logical.DrillOptiq;
import org.apache.drill.exec.resolver.TypeCastRules; import org.apache.drill.exec.resolver.TypeCastRules;
import org.eigenbase.rel.CalcRel;
import org.eigenbase.rel.RelNode;
import org.eigenbase.reltype.RelDataType; import org.eigenbase.reltype.RelDataType;
import org.eigenbase.reltype.RelDataTypeField; import org.eigenbase.reltype.RelDataTypeField;
import org.eigenbase.rex.RexInputRef;
import org.eigenbase.rex.RexNode;
import org.eigenbase.sql.type.SqlTypeName; import org.eigenbase.sql.type.SqlTypeName;
import org.eigenbase.util.Pair; import org.eigenbase.util.Pair;


Expand Down Expand Up @@ -82,4 +87,35 @@ public static boolean areRowTypesCompatible(
} }
return true; return true;
} }

/**
* Returns a relational expression which has the same fields as the
* underlying expression, but the fields have different names.
*
* Note: This method is copied from {@link org.eigenbase.rel.CalcRel#createRename(RelNode, List)} because it has a bug
* which doesn't rename the exprs. This bug is fixed in latest version of Apache Calcite (1.2).
*
* @param rel Relational expression
* @param fieldNames Field names
* @return Renamed relational expression
*/
public static RelNode createRename(
RelNode rel,
final List<String> fieldNames) {
final List<RelDataTypeField> fields = rel.getRowType().getFieldList();
assert fieldNames.size() == fields.size();
final List<Pair<RexNode, String>> refs =
new AbstractList<Pair<RexNode, String>>() {
public int size() {
return fields.size();
}

public Pair<RexNode, String> get(int index) {
return Pair.of(
(RexNode) new RexInputRef(index, fields.get(index).getType()),
fieldNames.get(index));
}
};
return CalcRel.createProject(rel, refs, true);
}
} }
Expand Up @@ -56,38 +56,8 @@ public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConv
SqlCreateTable sqlCreateTable = unwrap(sqlNode, SqlCreateTable.class); SqlCreateTable sqlCreateTable = unwrap(sqlNode, SqlCreateTable.class);


try { try {
// Convert the query in CTAS statement into a RelNode final RelNode newTblRelNode =
SqlNode validatedQuery = validateNode(sqlCreateTable.getQuery()); SqlHandlerUtil.resolveNewTableRel(false, planner, sqlCreateTable.getFieldNames(), sqlCreateTable.getQuery());
RelNode relQuery = convertToRel(validatedQuery);

List<String> tblFiledNames = sqlCreateTable.getFieldNames();
RelDataType queryRowType = relQuery.getRowType();

if (tblFiledNames.size() > 0) {
// Field count should match.
if (tblFiledNames.size() != queryRowType.getFieldCount()) {
return DirectPlan.createDirectPlan(context, false,
"Table's field list and the table's query field list have different counts.");
}

// CTAS's query field list shouldn't have "*" when table's field list is specified.
for (String field : queryRowType.getFieldNames()) {
if (field.equals("*")) {
return DirectPlan.createDirectPlan(context, false,
"Table's query field list has a '*', which is invalid when table's field list is specified.");
}
}
}

// if the CTAS statement has table fields lists (ex. below), add a project rel to rename the query fields.
// Ex. CREATE TABLE tblname(col1, medianOfCol2, avgOfCol3) AS
// SELECT col1, median(col2), avg(col3) FROM sourcetbl GROUP BY col1 ;
if (tblFiledNames.size() > 0) {
// create rowtype to which the select rel needs to be casted.
RelDataType rowType = new DrillFixedRelDataTypeImpl(planner.getTypeFactory(), tblFiledNames);

relQuery = RelOptUtil.createCastRel(relQuery, rowType, true);
}


SchemaPlus schema = findSchema(context.getRootSchema(), context.getNewDefaultSchema(), SchemaPlus schema = findSchema(context.getRootSchema(), context.getNewDefaultSchema(),
sqlCreateTable.getSchemaPath()); sqlCreateTable.getSchemaPath());
Expand All @@ -104,10 +74,10 @@ public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConv
return DirectPlan.createDirectPlan(context, false, String.format("Table '%s' already exists.", newTblName)); return DirectPlan.createDirectPlan(context, false, String.format("Table '%s' already exists.", newTblName));
} }


log("Optiq Logical", relQuery); log("Optiq Logical", newTblRelNode);


// Convert the query to Drill Logical plan and insert a writer operator on top. // Convert the query to Drill Logical plan and insert a writer operator on top.
DrillRel drel = convertToDrel(relQuery, drillSchema, newTblName); DrillRel drel = convertToDrel(newTblRelNode, drillSchema, newTblName);
log("Drill Logical", drel); log("Drill Logical", drel);
Prel prel = convertToPrel(drel); Prel prel = convertToPrel(drel);
log("Drill Physical", prel); log("Drill Physical", prel);
Expand Down
@@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.sql.handlers;

import net.hydromatic.optiq.tools.Planner;
import net.hydromatic.optiq.tools.RelConversionException;
import net.hydromatic.optiq.tools.ValidationException;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.sql.DirectPlan;
import org.apache.drill.exec.planner.types.DrillFixedRelDataTypeImpl;
import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptUtil;
import org.eigenbase.reltype.RelDataType;
import org.eigenbase.sql.SqlNode;

import java.util.List;

public class SqlHandlerUtil {

/**
* Resolve final RelNode of the new table (or view) for given table field list and new table definition.
*
* @param isNewTableView Is the new table created a view? This doesn't affect the functionality, but it helps format
* better error messages.
* @param planner Planner instance.
* @param tableFieldNames List of fields specified in new table/view field list. These are the fields given just after
* new table name.
* Ex. CREATE TABLE newTblName(col1, medianOfCol2, avgOfCol3) AS
* SELECT col1, median(col2), avg(col3) FROM sourcetbl GROUP BY col1;
* @param newTableQueryDef Sql tree of definition of the new table or view (query after the AS keyword). This tree is
* modified, so it is responsibility of caller's to make a copy if needed.
* @throws ValidationException If table's fields list and field list specified in table definition are not valid.
* @throws RelConversionException If failed to convert the table definition into a RelNode.
*/
public static RelNode resolveNewTableRel(boolean isNewTableView, Planner planner, List<String> tableFieldNames,
SqlNode newTableQueryDef) throws ValidationException, RelConversionException {

SqlNode validatedQuery = planner.validate(newTableQueryDef);
RelNode validatedQueryRelNode = planner.convert(validatedQuery);

if (tableFieldNames.size() > 0) {
final RelDataType queryRowType = validatedQueryRelNode.getRowType();

// Field count should match.
if (tableFieldNames.size() != queryRowType.getFieldCount()) {
final String tblType = isNewTableView ? "view" : "table";
throw new ValidationException(
String.format("%s's field list and the %s's query field list have different counts.", tblType, tblType));
}

// CTAS's query field list shouldn't have "*" when table's field list is specified.
for (String field : queryRowType.getFieldNames()) {
if (field.equals("*")) {
final String tblType = isNewTableView ? "view" : "table";
throw new ValidationException(
String.format("%s's query field list has a '*', which is invalid when %s's field list is specified.",
tblType, tblType));
}
}

// CTAS statement has table field list (ex. below), add a project rel to rename the query fields.
// Ex. CREATE TABLE tblname(col1, medianOfCol2, avgOfCol3) AS
// SELECT col1, median(col2), avg(col3) FROM sourcetbl GROUP BY col1 ;
// Similary for CREATE VIEW.

return DrillRelOptUtil.createRename(validatedQueryRelNode, tableFieldNames);
}

return validatedQueryRelNode;
}
}
Expand Up @@ -28,14 +28,17 @@
import org.apache.drill.exec.dotdrill.View; import org.apache.drill.exec.dotdrill.View;
import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.sql.DirectPlan; import org.apache.drill.exec.planner.sql.DirectPlan;
import org.apache.drill.exec.planner.sql.parser.SqlCreateView; import org.apache.drill.exec.planner.sql.parser.SqlCreateView;
import org.apache.drill.exec.planner.sql.parser.SqlDropView; import org.apache.drill.exec.planner.sql.parser.SqlDropView;
import org.apache.drill.exec.planner.types.DrillFixedRelDataTypeImpl; import org.apache.drill.exec.planner.types.DrillFixedRelDataTypeImpl;
import org.apache.drill.exec.store.AbstractSchema; import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema; import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
import org.apache.drill.exec.work.foreman.ForemanSetupException; import org.apache.drill.exec.work.foreman.ForemanSetupException;
import org.eigenbase.rel.CalcRel;
import org.eigenbase.rel.RelNode; import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptUtil;
import org.eigenbase.reltype.RelDataType; import org.eigenbase.reltype.RelDataType;
import org.eigenbase.sql.SqlNode; import org.eigenbase.sql.SqlNode;


Expand Down Expand Up @@ -64,6 +67,12 @@ public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConv
SqlCreateView createView = unwrap(sqlNode, SqlCreateView.class); SqlCreateView createView = unwrap(sqlNode, SqlCreateView.class);


try { try {
// Store the viewSql as view def SqlNode is modified as part of the resolving the new table definition below.
final String viewSql = createView.getQuery().toString();

final RelNode newViewRelNode =
SqlHandlerUtil.resolveNewTableRel(true, planner, createView.getFieldNames(), createView.getQuery());

SchemaPlus defaultSchema = context.getNewDefaultSchema(); SchemaPlus defaultSchema = context.getNewDefaultSchema();
SchemaPlus schema = findSchema(context.getRootSchema(), defaultSchema, createView.getSchemaPath()); SchemaPlus schema = findSchema(context.getRootSchema(), defaultSchema, createView.getSchemaPath());
AbstractSchema drillSchema = getDrillSchema(schema); AbstractSchema drillSchema = getDrillSchema(schema);
Expand All @@ -80,34 +89,7 @@ public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConv
workspaceSchemaPath = getDrillSchema(defaultSchema).getSchemaPath(); workspaceSchemaPath = getDrillSchema(defaultSchema).getSchemaPath();
} }


String viewSql = createView.getQuery().toString(); View view = new View(createView.getName(), viewSql, newViewRelNode.getRowType(), workspaceSchemaPath);

SqlNode validatedQuery = planner.validate(createView.getQuery());
RelNode validatedRelNode = planner.convert(validatedQuery);

// If view's field list is specified then its size should match view's query field list size.
RelDataType queryRowType = validatedRelNode.getRowType();

List<String> viewFieldNames = createView.getFieldNames();
if (viewFieldNames.size() > 0) {
// number of fields match.
if (viewFieldNames.size() != queryRowType.getFieldCount()) {
return DirectPlan.createDirectPlan(context, false,
"View's field list and View's query field list have different counts.");
}

// make sure View's query field list has no "*"
for (String field : queryRowType.getFieldNames()) {
if (field.equals("*")) {
return DirectPlan.createDirectPlan(context, false,
"View's query field list has a '*', which is invalid when View's field list is specified.");
}
}

queryRowType = new DrillFixedRelDataTypeImpl(planner.getTypeFactory(), viewFieldNames);
}

View view = new View(createView.getName(), viewSql, queryRowType, workspaceSchemaPath);


boolean replaced; boolean replaced;
if (drillSchema instanceof WorkspaceSchema) { if (drillSchema instanceof WorkspaceSchema) {
Expand Down
Expand Up @@ -19,12 +19,14 @@


import java.io.File; import java.io.File;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.sql.Date;


import org.apache.drill.BaseTestQuery; import org.apache.drill.BaseTestQuery;
import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ExecConstants;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.joda.time.DateTime;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Ignore; import org.junit.Ignore;
Expand Down Expand Up @@ -421,6 +423,30 @@ public void testWriteEmptyFile() throws Exception {
} }
} }


@Test // DRILL-2341
public void tableSchemaWhenSelectFieldsInDef_SelectFieldsInView() throws Exception {
final String newTblName = "testTableOutputSchema";

try {
final String ctas = String.format("CREATE TABLE dfs_test.tmp.%s(id, name, bday) AS SELECT " +
"cast(`employee_id` as integer), " +
"cast(`full_name` as varchar(100)), " +
"cast(`birth_date` as date) " +
"FROM cp.`employee.json` ORDER BY `employee_id` LIMIT 1", newTblName);

test(ctas);

testBuilder()
.unOrdered()
.sqlQuery(String.format("SELECT * FROM dfs_test.tmp.`%s`", newTblName))
.baselineColumns("id", "name", "bday")
.baselineValues(1, "Sheri Nowmer", new DateTime(Date.valueOf("1961-08-26").getTime()))
.go();
} finally {
deleteTableIfExists(newTblName);
}
}

@Test // see DRILL-2408 @Test // see DRILL-2408
public void testWriteEmptyFileAfterFlush() throws Exception { public void testWriteEmptyFileAfterFlush() throws Exception {
String outputFile = "testparquetwriter_test_write_empty_file_after_flush"; String outputFile = "testparquetwriter_test_write_empty_file_after_flush";
Expand Down
Expand Up @@ -413,4 +413,33 @@ public void viewResolvingTablesInWorkspaceSchema() throws Exception {
dropViewHelper(TEMP_SCHEMA, viewName, TEMP_SCHEMA); dropViewHelper(TEMP_SCHEMA, viewName, TEMP_SCHEMA);
} }
} }

// DRILL-2341, View schema verification where view's field is not specified is already tested in
// TestViewSupport.infoSchemaWithView.
@Test
public void viewSchemaWhenSelectFieldsInDef_SelectFieldsInView() throws Exception {
final String viewName = generateViewName();

try {
test("USE " + TEMP_SCHEMA);
createViewHelper(null, viewName, TEMP_SCHEMA, "(id, name, bday)",
"SELECT " +
"cast(`region_id` as integer), " +
"cast(`full_name` as varchar(100)), " +
"cast(`birth_date` as date) " +
"FROM cp.`employee.json`");

// Test DESCRIBE view
testBuilder()
.sqlQuery(String.format("DESCRIBE `%s`", viewName))
.unOrdered()
.baselineColumns("COLUMN_NAME", "DATA_TYPE", "IS_NULLABLE")
.baselineValues("id", "INTEGER", "YES")
.baselineValues("name", "VARCHAR", "YES")
.baselineValues("bday", "DATE", "YES")
.go();
} finally {
dropViewHelper(TEMP_SCHEMA, viewName, TEMP_SCHEMA);
}
}
} }

0 comments on commit 09e752e

Please sign in to comment.