Skip to content

Commit

Permalink
IMPALA-3314/IMPALA-3513: Fix querying tables/partitions altered to Av…
Browse files Browse the repository at this point in the history
…ro format

Bug: Impalads crash if we query an Avro table with stale metadata

Cause: This happens because avroSchema_ is not set in HdfsTable,
which is not propagated to the avro scanner and it doesn't have
appropriate checks to make sure the schema is non-null.

The patch fixes the following.

1. Avro scanner should gracefully handle the case where the avro schema
   is not set. Appropriate null checks and a meaning error message have
   been added.

2. This is a special case with multi-fileformat partitioned tables.
   avroSchema_ should be set in HdfsTable even if any subset of the
   partitions are backed by avro. Without this patch, we only set it
   if the base table file format is Avro.

Change-Id: I09262d3a7b85a2263c721f3beafd0cab2a1bdf4b
Reviewed-on: http://gerrit.cloudera.org:8080/3136
Reviewed-by: Bharath Vissapragada <bharathv@cloudera.com>
Tested-by: Internal Jenkins
  • Loading branch information
Bharath Vissapragada authored and Internal Jenkins committed May 21, 2016
1 parent a8ead31 commit 6d90f47
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 19 deletions.
13 changes: 13 additions & 0 deletions be/src/exec/hdfs-avro-scanner.cc
Expand Up @@ -57,6 +57,15 @@ HdfsAvroScanner::HdfsAvroScanner(HdfsScanNode* scan_node, RuntimeState* state)
codegend_decode_avro_data_(NULL) {
}

Status HdfsAvroScanner::Prepare(ScannerContext* context) {
RETURN_IF_ERROR(BaseSequenceScanner::Prepare(context));
if (scan_node_->avro_schema().schema == NULL) {
return Status("Missing Avro schema in scan node. This could be due to stale "
"metadata. Running 'invalidate metadata <tablename>' may resolve the problem.");
}
return Status::OK();
}

Function* HdfsAvroScanner::Codegen(HdfsScanNode* node,
const vector<ExprContext*>& conjunct_ctxs) {
if (!node->runtime_state()->codegen_enabled()) return NULL;
Expand Down Expand Up @@ -728,6 +737,10 @@ Status HdfsAvroScanner::CodegenReadRecord(
LlvmCodeGen* codegen, void* void_builder, Function* fn, BasicBlock* insert_before,
BasicBlock* bail_out, Value* this_val, Value* pool_val, Value* tuple_val,
Value* data_val) {
if (record.schema == NULL) {
return Status("Missing Avro schema in scan node. This could be due to stale "
"metadata. Running 'invalidate metadata <tablename>' may resolve the problem.");
}
DCHECK_EQ(record.schema->type, AVRO_RECORD);
LLVMContext& context = codegen->context();
LlvmCodeGen::LlvmBuilder* builder =
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/hdfs-avro-scanner.h
Expand Up @@ -86,6 +86,8 @@ class HdfsAvroScanner : public BaseSequenceScanner {

HdfsAvroScanner(HdfsScanNode* scan_node, RuntimeState* state);

virtual Status Prepare(ScannerContext* context);

/// Codegen parsing records, writing tuples and evaluating predicates.
static llvm::Function* Codegen(HdfsScanNode*,
const std::vector<ExprContext*>& conjunct_ctxs);
Expand Down
Expand Up @@ -400,6 +400,10 @@ public void setFileFormat(HdfsFileFormat fileFormat) {
fileFormatDescriptor_.getFileFormat().serializationLib());
}

public HdfsFileFormat getFileFormat() {
return fileFormatDescriptor_.getFileFormat();
}

public void setLocation(String place) {
location_ = table_.getPartitionLocationCompressor().new Location(place);
}
Expand Down
61 changes: 42 additions & 19 deletions fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java
Expand Up @@ -134,6 +134,9 @@ public class HdfsTable extends Table {
// Avro schema of this table if this is an Avro table, otherwise null. Set in load().
private String avroSchema_ = null;

// Set to true if any of the partitions have Avro data.
private boolean hasAvroData_ = false;

// True if this table's metadata is marked as cached. Does not necessarily mean the
// data is cached or that all/any partitions are cached.
private boolean isMarkedCached_ = false;
Expand Down Expand Up @@ -1032,6 +1035,7 @@ public void load(boolean reuseMetadata, HiveMetaStoreClient client,
updatePartitionsFromHms(client, partitionsToUpdate, loadFileMetadata);
}
}
if (loadTableSchema) setAvroSchema(client, msTbl);
updateStatsFromHmsTable(msTbl);
} catch (TableLoadingException e) {
throw e;
Expand Down Expand Up @@ -1245,32 +1249,25 @@ public static int parseSkipHeaderLineCount(Map<String, String> tblProperties,
}

/**
* Loads table schema from Hive Metastore. It also loads column stats.
* Sets avroSchema_ if the table or any of the partitions in the table are stored
* as Avro. Additionally, this method also reconciles the schema if the column
* definitions from the metastore differ from the Avro schema.
*/
private void loadSchema(HiveMetaStoreClient client,
private void setAvroSchema(HiveMetaStoreClient client,
org.apache.hadoop.hive.metastore.api.Table msTbl) throws Exception {
nonPartFieldSchemas_.clear();
// set nullPartitionKeyValue from the hive conf.
nullPartitionKeyValue_ = client.getConfigValue(
"hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__");

// set NULL indicator string from table properties
nullColumnValue_ =
msTbl.getParameters().get(serdeConstants.SERIALIZATION_NULL_FORMAT);
if (nullColumnValue_ == null) nullColumnValue_ = DEFAULT_NULL_COLUMN_VALUE;

// Excludes partition columns.
List<FieldSchema> msColDefs = msTbl.getSd().getCols();
Preconditions.checkState(!nonPartFieldSchemas_.isEmpty());
String inputFormat = msTbl.getSd().getInputFormat();
if (HdfsFileFormat.fromJavaClassName(inputFormat) == HdfsFileFormat.AVRO) {
// Look for the schema in TBLPROPERTIES and in SERDEPROPERTIES, with the latter
if (HdfsFileFormat.fromJavaClassName(inputFormat) == HdfsFileFormat.AVRO
|| hasAvroData_) {
// Look for Avro schema in TBLPROPERTIES and in SERDEPROPERTIES, with the latter
// taking precedence.
List<Map<String, String>> schemaSearchLocations = Lists.newArrayList();
schemaSearchLocations.add(
getMetaStoreTable().getSd().getSerdeInfo().getParameters());
schemaSearchLocations.add(getMetaStoreTable().getParameters());

avroSchema_ = AvroSchemaUtils.getAvroSchema(schemaSearchLocations);

if (avroSchema_ == null) {
// No Avro schema was explicitly set in the table metadata, so infer the Avro
// schema from the column definitions.
Expand All @@ -1285,7 +1282,7 @@ private void loadSchema(HiveMetaStoreClient client,
// indicates there is an issue with the table metadata since Avro table need a
// non-native serde. Instead of failing to load the table, fall back to
// using the fields from the storage descriptor (same as Hive).
nonPartFieldSchemas_.addAll(msColDefs);
return;
} else {
// Generate new FieldSchemas from the Avro schema. This step reconciles
// differences in the column definitions and the Avro schema. For
Expand All @@ -1303,11 +1300,36 @@ private void loadSchema(HiveMetaStoreClient client,
getFullName(), warning.toString()));
}
AvroSchemaUtils.setFromSerdeComment(reconciledColDefs);
// Reset and update nonPartFieldSchemas_ to the reconcicled colDefs.
nonPartFieldSchemas_.clear();
nonPartFieldSchemas_.addAll(ColumnDef.toFieldSchemas(reconciledColDefs));
// Update the columns as per the reconciled colDefs and re-load stats.
clearColumns();
addColumnsFromFieldSchemas(msTbl.getPartitionKeys());
addColumnsFromFieldSchemas(nonPartFieldSchemas_);
loadAllColumnStats(client);
}
} else {
nonPartFieldSchemas_.addAll(msColDefs);
}
}

/**
* Loads table schema and column stats from Hive Metastore.
*/
private void loadSchema(HiveMetaStoreClient client,
org.apache.hadoop.hive.metastore.api.Table msTbl) throws Exception {
nonPartFieldSchemas_.clear();
// set nullPartitionKeyValue from the hive conf.
nullPartitionKeyValue_ = client.getConfigValue(
"hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__");

// set NULL indicator string from table properties
nullColumnValue_ =
msTbl.getParameters().get(serdeConstants.SERIALIZATION_NULL_FORMAT);
if (nullColumnValue_ == null) nullColumnValue_ = DEFAULT_NULL_COLUMN_VALUE;

// Excludes partition columns.
nonPartFieldSchemas_.addAll(msTbl.getSd().getCols());

// The number of clustering columns is the number of partition keys.
numClusteringCols_ = msTbl.getPartitionKeys().size();
partitionLocationCompressor_.setClusteringColumns(numClusteringCols_);
Expand Down Expand Up @@ -1358,6 +1380,7 @@ private void loadPartitionsFromMetastore(Set<String> partitionNames,
// If the partition is null, its HDFS path does not exist, and it was not added to
// this table's partition list. Skip the partition.
if (partition == null) continue;
if (partition.getFileFormat() == HdfsFileFormat.AVRO) hasAvroData_ = true;
if (msPartition.getParameters() != null) {
partition.setNumRows(getRowCount(msPartition.getParameters()));
}
Expand Down
@@ -0,0 +1,36 @@
====
---- QUERY
CREATE EXTERNAL TABLE alltypesagg_staleschema (
id INT,
bool_col BOOLEAN,
tinyint_col INT,
smallint_col INT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_string_col STRING,
string_col STRING,
timestamp_col STRING
)
LOCATION 'hdfs://localhost:20500//test-warehouse/alltypesaggmultifilesnopart_avro_snap'
TBLPROPERTIES ('avro.schema.url'= '/test-warehouse/avro_schemas/functional/alltypesaggmultifilesnopart.json')
====
---- QUERY
alter table alltypesagg_staleschema set fileformat avro
====
---- QUERY
select count(*) from alltypesagg_staleschema
---- CATCH
Missing Avro schema in scan node. This could be due to stale metadata.
====
---- QUERY
invalidate metadata alltypesagg_staleschema
====
---- QUERY
select count(*) from alltypesagg_staleschema
---- RESULTS
11000
---- TYPES
bigint
====
10 changes: 10 additions & 0 deletions tests/query_test/test_avro_schema_resolution.py
Expand Up @@ -41,3 +41,13 @@ def test_avro_codegen_decoder(self, vector):
doesn't match file schema.
"""
self.run_test_case('QueryTest/avro-schema-resolution', vector)

def test_avro_stale_schema(self, vector, unique_database):
"""Test for IMPALA-3314 and IMPALA-3513. Impalad shouldn't crash with stale avro
metadata. Instead, should provide a meaningful error message.
"""
# Create a table with default fileformat and later change it to avro using
# alter sql. The query runs with stale metadata and a warning should be raised.
# Invalidating metadata should cause the Avro schema to be properly set upon the
# next metadata load.
self.run_test_case('QueryTest/avro-stale-schema', vector, unique_database)

0 comments on commit 6d90f47

Please sign in to comment.