Skip to content

Commit

Permalink
Address several shortcomings with respect to the usability of Avro ta…
Browse files Browse the repository at this point in the history
…bles.

Addressed JIRAs: IMPALA-1947 and IMPALA-1813

New Feature:
Adds support for creating an Avro table without an explicit
Avro schema with the following syntax.

CREATE TABLE <table_name> column_defs STORED AS AVRO

Fixes and Improvements:
This patch fixes and unifies the logic for reconciling differences between
an Avro table's Avro Schema and its column definitions. This reconciliation
logic is executed during Impala's CREATE TABLE and when loading a table's
metadata. Impala generally performs the schema reconciliation during table
creation, but Hive does not. In many cases, Hive's CREATE TABLE stores the
original column definitions in the HMS (in the StorageDescriptor) instead
of the reconciled column definitions.

The reconciliation logic considers the field/column names and follows this
conflict resolution policy which is similar to Hive's:

Mismatched number of columns -> Prefer Avro columns.
Mismatched name/type -> Prefer Avro column, except:
  A CHAR/VARCHAR column definition maps to an Avro STRING, and is preserved
  as a CHAR/VARCHAR in the reconciled schema.

Behavior for TIMESTAMP:
A TIMESTAMP column definition maps to an Avro STRING and is presented as a STRING
in the reconciled schema, because Avro has no binary TIMESTAMP representation.
As a result, no Avro table may have a TIMESTAMP column (existing behavior).

Change-Id: I8457354568b6049b2dd2794b65fadc06e619d648
Reviewed-on: http://gerrit.cloudera.org:8080/550
Reviewed-by: Alex Behm <alex.behm@cloudera.com>
Tested-by: Internal Jenkins
  • Loading branch information
Alex Behm authored and Internal Jenkins committed Aug 25, 2015
1 parent 4cfe20c commit af46f26
Show file tree
Hide file tree
Showing 11 changed files with 474 additions and 252 deletions.
45 changes: 43 additions & 2 deletions fe/src/main/java/com/cloudera/impala/analysis/ColumnDef.java
Expand Up @@ -14,12 +14,17 @@

package com.cloudera.impala.analysis;

import java.util.List;

import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;

import com.cloudera.impala.catalog.Type;
import com.cloudera.impala.common.AnalysisException;
import com.cloudera.impala.thrift.TColumn;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;

/**
* Represents a column definition in a CREATE/ALTER TABLE/VIEW statement.
Expand All @@ -31,23 +36,41 @@
*/
public class ColumnDef {
private final String colName_;
private final String comment_;
private String comment_;

// Required in CREATE/ALTER TABLE stmts. Set to NULL in CREATE/ALTER VIEW stmts,
// for which we setType() after analyzing the defining view definition stmt.
private final TypeDef typeDef_;
private Type type_;

public ColumnDef(String colName, TypeDef typeDef, String comment) {
colName_ = colName;
colName_ = colName.toLowerCase();
typeDef_ = typeDef;
comment_ = comment;
}

/**
* Creates an analyzed ColumnDef from a Hive FieldSchema. Throws if the FieldSchema's
* type is not supported.
*/
private ColumnDef(FieldSchema fs) throws AnalysisException {
Type type = Type.parseColumnType(fs.getType());
if (type == null) {
throw new AnalysisException(String.format(
"Unsupported type '%s' in Hive field schema '%s'",
fs.getType(), fs.getName()));
}
colName_ = fs.getName();
typeDef_ = new TypeDef(type);
comment_ = fs.getComment();
analyze();
}

public void setType(Type type) { type_ = type; }
public Type getType() { return type_; }
public TypeDef getTypeDef() { return typeDef_; }
public String getColName() { return colName_; }
public void setComment(String comment) { comment_ = comment; }
public String getComment() { return comment_; }

public void analyze() throws AnalysisException {
Expand Down Expand Up @@ -80,4 +103,22 @@ public TColumn toThrift() {
col.setComment(getComment());
return col;
}

public static List<ColumnDef> createFromFieldSchemas(List<FieldSchema> fieldSchemas)
throws AnalysisException {
List<ColumnDef> result = Lists.newArrayListWithCapacity(fieldSchemas.size());
for (FieldSchema fs: fieldSchemas) result.add(new ColumnDef(fs));
return result;
}

public static List<FieldSchema> toFieldSchemas(List<ColumnDef> colDefs) {
return Lists.transform(colDefs, new Function<ColumnDef, FieldSchema>() {
public FieldSchema apply(ColumnDef colDef) {
Preconditions.checkNotNull(colDef.getType());
return new FieldSchema(colDef.getColName(), colDef.getType().toSql(),
colDef.getComment());
}
});
}

}
132 changes: 41 additions & 91 deletions fe/src/main/java/com/cloudera/impala/analysis/CreateTableStmt.java
Expand Up @@ -19,12 +19,12 @@
import java.util.Map;
import java.util.Set;

import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.hadoop.fs.permission.FsAction;

import com.cloudera.impala.authorization.Privilege;
import com.cloudera.impala.catalog.Column;
import com.cloudera.impala.catalog.HdfsStorageDescriptor;
import com.cloudera.impala.catalog.HdfsTable;
import com.cloudera.impala.catalog.RowFormat;
import com.cloudera.impala.catalog.TableLoadingException;
import com.cloudera.impala.common.AnalysisException;
Expand All @@ -33,7 +33,9 @@
import com.cloudera.impala.thrift.TCreateTableParams;
import com.cloudera.impala.thrift.THdfsFileFormat;
import com.cloudera.impala.thrift.TTableName;
import com.cloudera.impala.util.AvroSchemaConverter;
import com.cloudera.impala.util.AvroSchemaParser;
import com.cloudera.impala.util.AvroSchemaUtils;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
Expand All @@ -43,21 +45,20 @@
* Represents a CREATE TABLE statement.
*/
public class CreateTableStmt extends StatementBase {
private final ArrayList<ColumnDef> columnDefs_;
private List<ColumnDef> columnDefs_;
private final String comment_;
private final boolean isExternal_;
private final boolean ifNotExists_;
private final THdfsFileFormat fileFormat_;
private final ArrayList<ColumnDef> partitionColDefs_;
private final RowFormat rowFormat_;
private final TableName tableName_;
private TableName tableName_;
private final Map<String, String> tblProperties_;
private final Map<String, String> serdeProperties_;
private final HdfsCachingOp cachingOp_;
private HdfsUri location_;

// Set during analysis
private String dbName_;
private String owner_;

/**
Expand Down Expand Up @@ -153,8 +154,8 @@ public String getOwner() {
* be created within.
*/
public String getDb() {
Preconditions.checkNotNull(dbName_);
return dbName_;
Preconditions.checkState(isAnalyzed());
return tableName_.getDb();
}

@Override
Expand Down Expand Up @@ -184,18 +185,18 @@ public TCreateTableParams toThrift() {

@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
super.analyze(analyzer);
Preconditions.checkState(tableName_ != null && !tableName_.isEmpty());
tableName_ = analyzer.getFqTableName(tableName_);
tableName_.analyze();
dbName_ = analyzer.getTargetDbName(tableName_);
owner_ = analyzer.getUser().getName();

if (analyzer.dbContainsTable(dbName_, tableName_.getTbl(), Privilege.CREATE) &&
!ifNotExists_) {
throw new AnalysisException(Analyzer.TBL_ALREADY_EXISTS_ERROR_MSG +
String.format("%s.%s", dbName_, getTbl()));
if (analyzer.dbContainsTable(tableName_.getDb(), tableName_.getTbl(), Privilege.CREATE)
&& !ifNotExists_) {
throw new AnalysisException(Analyzer.TBL_ALREADY_EXISTS_ERROR_MSG + tableName_);
}

analyzer.addAccessEvent(new TAccessEvent(dbName_ + "." + tableName_.getTbl(),
analyzer.addAccessEvent(new TAccessEvent(tableName_.toString(),
TCatalogObjectType.TABLE, Privilege.CREATE.toString()));

// Only Avro tables can have empty column defs because they can infer them from
Expand All @@ -216,13 +217,13 @@ public void analyze(Analyzer analyzer) throws AnalysisException {
analyzeColumnDefs(analyzer);

if (fileFormat_ == THdfsFileFormat.AVRO) {
List<ColumnDef> newColumnDefs = analyzeAvroSchema(analyzer);
if (newColumnDefs != columnDefs_) {
// Replace the old column defs with the new ones and analyze them.
columnDefs_.clear();
columnDefs_.addAll(newColumnDefs);
analyzeColumnDefs(analyzer);
columnDefs_ = analyzeAvroSchema(analyzer);
if (columnDefs_.isEmpty()) {
throw new AnalysisException(
"An Avro table requires column definitions or an Avro schema.");
}
AvroSchemaUtils.setFromSerdeComment(columnDefs_);
analyzeColumnDefs(analyzer);
}

if (cachingOp_ != null) cachingOp_.analyze(analyzer);
Expand Down Expand Up @@ -261,93 +262,42 @@ private void analyzeColumnDefs(Analyzer analyzer) throws AnalysisException {
private List<ColumnDef> analyzeAvroSchema(Analyzer analyzer)
throws AnalysisException {
Preconditions.checkState(fileFormat_ == THdfsFileFormat.AVRO);
// Look for the schema in TBLPROPERTIES and in SERDEPROPERTIES, with the latter
// Look for the schema in TBLPROPERTIES and in SERDEPROPERTIES, with latter
// taking precedence.
List<Map<String, String>> schemaSearchLocations = Lists.newArrayList();
String fullTblName = dbName_ + "." + tableName_.getTbl();
schemaSearchLocations.add(serdeProperties_);
schemaSearchLocations.add(tblProperties_);
String avroSchema = null;
List<ColumnDef> avroCols = null; // parsed from avroSchema
try {
avroSchema = HdfsTable.getAvroSchema(schemaSearchLocations);
// TODO: Allow creating Avro tables without an Avro schema, inferring the schema
// from the column definitions.
avroSchema = AvroSchemaUtils.getAvroSchema(schemaSearchLocations);
if (avroSchema == null) {
throw new AnalysisException(String.format("No Avro schema provided in " +
"SERDEPROPERTIES or TBLPROPERTIES for table: %s ",
dbName_ + "." + tableName_.getTbl()));
// No Avro schema was explicitly set in the serde or table properties, so infer
// the Avro schema from the column definitions.
Schema inferredSchema = AvroSchemaConverter.convertColumnDefs(
columnDefs_, tableName_.toString());
avroSchema = inferredSchema.toString();
}
} catch (TableLoadingException e) {
throw new AnalysisException(e.getMessage(), e);
}

if (Strings.isNullOrEmpty(avroSchema)) {
throw new AnalysisException("Avro schema is null or empty: " + fullTblName);
}

// List of columns parsed from the Avro schema.
List<Column> avroColumns = null;
try {
avroColumns = AvroSchemaParser.parse(avroSchema);
} catch (Exception e) {
if (Strings.isNullOrEmpty(avroSchema)) {
throw new AnalysisException("Avro schema is null or empty: " +
tableName_.toString());
}
avroCols = AvroSchemaParser.parse(avroSchema);
} catch (SchemaParseException e) {
throw new AnalysisException(String.format(
"Error parsing Avro schema for table '%s': %s", fullTblName,
"Error parsing Avro schema for table '%s': %s", tableName_.toString(),
e.getMessage()));
}
Preconditions.checkNotNull(avroColumns);
Preconditions.checkNotNull(avroCols);

// Analyze the Avro schema to detect inconsistencies with the columnDefs_.
// In case of inconsistencies, the column defs are ignored in favor of the Avro
// schema for simplicity and, in particular, to enable COMPUTE STATS (IMPALA-1104).
String warnStr = null; // set if inconsistency detected
if (avroColumns.size() != columnDefs_.size() && !columnDefs_.isEmpty()) {
warnStr = String.format(
"Ignoring column definitions in favor of Avro schema.\n" +
"The Avro schema has %s column(s) but %s column definition(s) were given.",
avroColumns.size(), columnDefs_.size());
} else {
// Determine whether the column names and the types match.
for (int i = 0; i < columnDefs_.size(); ++i) {
ColumnDef colDesc = columnDefs_.get(i);
Column avroCol = avroColumns.get(i);
String warnDetail = null;
if (!colDesc.getColName().equalsIgnoreCase(avroCol.getName())) {
warnDetail = "name";
}
if (colDesc.getType().isStringType() &&
avroCol.getType().isStringType()) {
// This is OK -- avro types for CHAR, VARCHAR, and STRING are "string"
} else if (!colDesc.getType().equals(avroCol.getType())) {
warnDetail = "type";
}
if (warnDetail != null) {
warnStr = String.format(
"Ignoring column definitions in favor of Avro schema due to a " +
"mismatched column %s at position %s.\n" +
"Column definition: %s\n" +
"Avro schema column: %s", warnDetail, i + 1,
colDesc.getColName() + " " + colDesc.getType().toSql(),
avroCol.getName() + " " + avroCol.getType().toSql());
break;
}
}
}

if (warnStr != null || columnDefs_.isEmpty()) {
analyzer.addWarning(warnStr);
// Create new columnDefs_ based on the Avro schema and return them.
List<ColumnDef> avroSchemaColDefs =
Lists.newArrayListWithCapacity(avroColumns.size());
for (Column avroCol: avroColumns) {
ColumnDef colDef =
new ColumnDef(avroCol.getName(), null, avroCol.getComment());
colDef.setType(avroCol.getType());
avroSchemaColDefs.add(colDef);
}
return avroSchemaColDefs;
}
// The existing col defs are consistent with the Avro schema.
return columnDefs_;
StringBuilder warning = new StringBuilder();
List<ColumnDef> reconciledColDefs =
AvroSchemaUtils.reconcileSchemas(columnDefs_, avroCols, warning);
if (warning.length() > 0) analyzer.addWarning(warning.toString());
return reconciledColDefs;
}

private void analyzeRowFormatValue(String value) throws AnalysisException {
Expand Down

0 comments on commit af46f26

Please sign in to comment.