diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc index e7559ac3b79..5f469b750ab 100644 --- a/be/src/exec/hdfs-avro-scanner.cc +++ b/be/src/exec/hdfs-avro-scanner.cc @@ -57,6 +57,15 @@ HdfsAvroScanner::HdfsAvroScanner(HdfsScanNode* scan_node, RuntimeState* state) codegend_decode_avro_data_(NULL) { } +Status HdfsAvroScanner::Prepare(ScannerContext* context) { + RETURN_IF_ERROR(BaseSequenceScanner::Prepare(context)); + if (scan_node_->avro_schema().schema == NULL) { + return Status("Missing Avro schema in scan node. This could be due to stale " + "metadata. Running 'invalidate metadata ' may resolve the problem."); + } + return Status::OK(); +} + Function* HdfsAvroScanner::Codegen(HdfsScanNode* node, const vector& conjunct_ctxs) { if (!node->runtime_state()->codegen_enabled()) return NULL; @@ -728,6 +737,10 @@ Status HdfsAvroScanner::CodegenReadRecord( LlvmCodeGen* codegen, void* void_builder, Function* fn, BasicBlock* insert_before, BasicBlock* bail_out, Value* this_val, Value* pool_val, Value* tuple_val, Value* data_val) { + if (record.schema == NULL) { + return Status("Missing Avro schema in scan node. This could be due to stale " + "metadata. Running 'invalidate metadata ' may resolve the problem."); + } DCHECK_EQ(record.schema->type, AVRO_RECORD); LLVMContext& context = codegen->context(); LlvmCodeGen::LlvmBuilder* builder = diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h index 682ba04b5c4..316f697042e 100644 --- a/be/src/exec/hdfs-avro-scanner.h +++ b/be/src/exec/hdfs-avro-scanner.h @@ -86,6 +86,8 @@ class HdfsAvroScanner : public BaseSequenceScanner { HdfsAvroScanner(HdfsScanNode* scan_node, RuntimeState* state); + virtual Status Prepare(ScannerContext* context); + /// Codegen parsing records, writing tuples and evaluating predicates. static llvm::Function* Codegen(HdfsScanNode*, const std::vector& conjunct_ctxs); diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartition.java b/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartition.java index bd52682638e..78ac7802eba 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartition.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartition.java @@ -400,6 +400,10 @@ public void setFileFormat(HdfsFileFormat fileFormat) { fileFormatDescriptor_.getFileFormat().serializationLib()); } + public HdfsFileFormat getFileFormat() { + return fileFormatDescriptor_.getFileFormat(); + } + public void setLocation(String place) { location_ = table_.getPartitionLocationCompressor().new Location(place); } diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java b/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java index 2c0bae75470..6b70a40a55b 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java @@ -134,6 +134,9 @@ public class HdfsTable extends Table { // Avro schema of this table if this is an Avro table, otherwise null. Set in load(). private String avroSchema_ = null; + // Set to true if any of the partitions have Avro data. + private boolean hasAvroData_ = false; + // True if this table's metadata is marked as cached. Does not necessarily mean the // data is cached or that all/any partitions are cached. private boolean isMarkedCached_ = false; @@ -1032,6 +1035,7 @@ public void load(boolean reuseMetadata, HiveMetaStoreClient client, updatePartitionsFromHms(client, partitionsToUpdate, loadFileMetadata); } } + if (loadTableSchema) setAvroSchema(client, msTbl); updateStatsFromHmsTable(msTbl); } catch (TableLoadingException e) { throw e; @@ -1245,25 +1249,17 @@ public static int parseSkipHeaderLineCount(Map tblProperties, } /** - * Loads table schema from Hive Metastore. It also loads column stats. + * Sets avroSchema_ if the table or any of the partitions in the table are stored + * as Avro. Additionally, this method also reconciles the schema if the column + * definitions from the metastore differ from the Avro schema. */ - private void loadSchema(HiveMetaStoreClient client, + private void setAvroSchema(HiveMetaStoreClient client, org.apache.hadoop.hive.metastore.api.Table msTbl) throws Exception { - nonPartFieldSchemas_.clear(); - // set nullPartitionKeyValue from the hive conf. - nullPartitionKeyValue_ = client.getConfigValue( - "hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__"); - - // set NULL indicator string from table properties - nullColumnValue_ = - msTbl.getParameters().get(serdeConstants.SERIALIZATION_NULL_FORMAT); - if (nullColumnValue_ == null) nullColumnValue_ = DEFAULT_NULL_COLUMN_VALUE; - - // Excludes partition columns. - List msColDefs = msTbl.getSd().getCols(); + Preconditions.checkState(!nonPartFieldSchemas_.isEmpty()); String inputFormat = msTbl.getSd().getInputFormat(); - if (HdfsFileFormat.fromJavaClassName(inputFormat) == HdfsFileFormat.AVRO) { - // Look for the schema in TBLPROPERTIES and in SERDEPROPERTIES, with the latter + if (HdfsFileFormat.fromJavaClassName(inputFormat) == HdfsFileFormat.AVRO + || hasAvroData_) { + // Look for Avro schema in TBLPROPERTIES and in SERDEPROPERTIES, with the latter // taking precedence. List> schemaSearchLocations = Lists.newArrayList(); schemaSearchLocations.add( @@ -1271,6 +1267,7 @@ private void loadSchema(HiveMetaStoreClient client, schemaSearchLocations.add(getMetaStoreTable().getParameters()); avroSchema_ = AvroSchemaUtils.getAvroSchema(schemaSearchLocations); + if (avroSchema_ == null) { // No Avro schema was explicitly set in the table metadata, so infer the Avro // schema from the column definitions. @@ -1285,7 +1282,7 @@ private void loadSchema(HiveMetaStoreClient client, // indicates there is an issue with the table metadata since Avro table need a // non-native serde. Instead of failing to load the table, fall back to // using the fields from the storage descriptor (same as Hive). - nonPartFieldSchemas_.addAll(msColDefs); + return; } else { // Generate new FieldSchemas from the Avro schema. This step reconciles // differences in the column definitions and the Avro schema. For @@ -1303,11 +1300,36 @@ private void loadSchema(HiveMetaStoreClient client, getFullName(), warning.toString())); } AvroSchemaUtils.setFromSerdeComment(reconciledColDefs); + // Reset and update nonPartFieldSchemas_ to the reconcicled colDefs. + nonPartFieldSchemas_.clear(); nonPartFieldSchemas_.addAll(ColumnDef.toFieldSchemas(reconciledColDefs)); + // Update the columns as per the reconciled colDefs and re-load stats. + clearColumns(); + addColumnsFromFieldSchemas(msTbl.getPartitionKeys()); + addColumnsFromFieldSchemas(nonPartFieldSchemas_); + loadAllColumnStats(client); } - } else { - nonPartFieldSchemas_.addAll(msColDefs); } + } + + /** + * Loads table schema and column stats from Hive Metastore. + */ + private void loadSchema(HiveMetaStoreClient client, + org.apache.hadoop.hive.metastore.api.Table msTbl) throws Exception { + nonPartFieldSchemas_.clear(); + // set nullPartitionKeyValue from the hive conf. + nullPartitionKeyValue_ = client.getConfigValue( + "hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__"); + + // set NULL indicator string from table properties + nullColumnValue_ = + msTbl.getParameters().get(serdeConstants.SERIALIZATION_NULL_FORMAT); + if (nullColumnValue_ == null) nullColumnValue_ = DEFAULT_NULL_COLUMN_VALUE; + + // Excludes partition columns. + nonPartFieldSchemas_.addAll(msTbl.getSd().getCols()); + // The number of clustering columns is the number of partition keys. numClusteringCols_ = msTbl.getPartitionKeys().size(); partitionLocationCompressor_.setClusteringColumns(numClusteringCols_); @@ -1358,6 +1380,7 @@ private void loadPartitionsFromMetastore(Set partitionNames, // If the partition is null, its HDFS path does not exist, and it was not added to // this table's partition list. Skip the partition. if (partition == null) continue; + if (partition.getFileFormat() == HdfsFileFormat.AVRO) hasAvroData_ = true; if (msPartition.getParameters() != null) { partition.setNumRows(getRowCount(msPartition.getParameters())); } diff --git a/testdata/workloads/functional-query/queries/QueryTest/avro-stale-schema.test b/testdata/workloads/functional-query/queries/QueryTest/avro-stale-schema.test new file mode 100644 index 00000000000..e21a98093d2 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/avro-stale-schema.test @@ -0,0 +1,36 @@ +==== +---- QUERY +CREATE EXTERNAL TABLE alltypesagg_staleschema ( + id INT, + bool_col BOOLEAN, + tinyint_col INT, + smallint_col INT, + int_col INT, + bigint_col BIGINT, + float_col FLOAT, + double_col DOUBLE, + date_string_col STRING, + string_col STRING, + timestamp_col STRING +) +LOCATION 'hdfs://localhost:20500//test-warehouse/alltypesaggmultifilesnopart_avro_snap' +TBLPROPERTIES ('avro.schema.url'= '/test-warehouse/avro_schemas/functional/alltypesaggmultifilesnopart.json') +==== +---- QUERY +alter table alltypesagg_staleschema set fileformat avro +==== +---- QUERY +select count(*) from alltypesagg_staleschema +---- CATCH +Missing Avro schema in scan node. This could be due to stale metadata. +==== +---- QUERY +invalidate metadata alltypesagg_staleschema +==== +---- QUERY +select count(*) from alltypesagg_staleschema +---- RESULTS +11000 +---- TYPES +bigint +==== diff --git a/tests/query_test/test_avro_schema_resolution.py b/tests/query_test/test_avro_schema_resolution.py index 68e507221ef..21d65bdcad5 100644 --- a/tests/query_test/test_avro_schema_resolution.py +++ b/tests/query_test/test_avro_schema_resolution.py @@ -41,3 +41,13 @@ def test_avro_codegen_decoder(self, vector): doesn't match file schema. """ self.run_test_case('QueryTest/avro-schema-resolution', vector) + + def test_avro_stale_schema(self, vector, unique_database): + """Test for IMPALA-3314 and IMPALA-3513. Impalad shouldn't crash with stale avro + metadata. Instead, should provide a meaningful error message. + """ + # Create a table with default fileformat and later change it to avro using + # alter sql. The query runs with stale metadata and a warning should be raised. + # Invalidating metadata should cause the Avro schema to be properly set upon the + # next metadata load. + self.run_test_case('QueryTest/avro-stale-schema', vector, unique_database)