From 16a968e28cf681a41454716a1329c940e3c6ffca Mon Sep 17 00:00:00 2001 From: "shaoxiong.zhan" <31836510+microbearz@users.noreply.github.com> Date: Wed, 24 Aug 2022 19:28:26 +0800 Subject: [PATCH] HUDI-4687 add show_invalid_parquet procedure (#6480) Co-authored-by: zhanshaoxiong --- .../command/procedures/HoodieProcedures.scala | 1 + .../ShowInvalidParquetProcedure.scala | 83 +++++++++++++++++++ .../TestShowInvalidParquetProcedure.scala | 71 ++++++++++++++++ 3 files changed, 155 insertions(+) create mode 100644 hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index b245b54f614bc..49c88e5cd6161 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -82,6 +82,7 @@ object HoodieProcedures { mapBuilder.put(RepairOverwriteHoodiePropsProcedure.NAME, RepairOverwriteHoodiePropsProcedure.builder) mapBuilder.put(RunCleanProcedure.NAME, RunCleanProcedure.builder) mapBuilder.put(ValidateHoodieSyncProcedure.NAME, ValidateHoodieSyncProcedure.builder) + mapBuilder.put(ShowInvalidParquetProcedure.NAME, ShowInvalidParquetProcedure.builder) mapBuilder.build } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala new file mode 100644 index 0000000000000..11d170bbed5ea --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hadoop.fs.Path +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.SerializableConfiguration +import org.apache.hudi.common.fs.FSUtils +import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util.function.Supplier + +class ShowInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "path", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("path", DataTypes.StringType, nullable = true, Metadata.empty)) + ) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val srcPath = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String] + val partitionPaths: java.util.List[String] = FSUtils.getAllPartitionPaths(new HoodieSparkEngineContext(jsc), srcPath, false, false) + val javaRdd: JavaRDD[String] = jsc.parallelize(partitionPaths, partitionPaths.size()) + val serHadoopConf = new SerializableConfiguration(jsc.hadoopConfiguration()) + javaRdd.rdd.map(part => { + val fs = FSUtils.getFs(new Path(srcPath), serHadoopConf.get()) + FSUtils.getAllDataFilesInPartition(fs, FSUtils.getPartitionPath(srcPath, part)) + }).flatMap(_.toList) + .filter(status => { + val filePath = status.getPath + var isInvalid = false + if (filePath.toString.endsWith(".parquet")) { + try ParquetFileReader.readFooter(serHadoopConf.get(), filePath, SKIP_ROW_GROUPS).getFileMetaData catch { + case e: Exception => + isInvalid = e.getMessage.contains("is not a Parquet file") + } + } + isInvalid + }) + .map(status => Row(status.getPath.toString)) + .collect() + } + + override def build = new ShowInvalidParquetProcedure() +} + +object ShowInvalidParquetProcedure { + val NAME = "show_invalid_parquet" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get(): ProcedureBuilder = new ShowInvalidParquetProcedure() + } +} + + + diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala new file mode 100644 index 0000000000000..4d0c9c7b34614 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.procedure + +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.fs.FSUtils + +class TestShowInvalidParquetProcedure extends HoodieSparkProcedureTestBase { + test("Test Call show_invalid_parquet Procedure") { + withTempDir { tmp => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | partitioned by (ts) + | location '$basePath' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + + // Check required fields + checkExceptionContain(s"""call show_invalid_parquet(limit => 10)""")( + s"Argument: path is required") + + val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration) + val invalidPath1 = new Path(basePath, "ts=1000/1.parquet") + val out1 = fs.create(invalidPath1) + out1.write(1) + out1.close() + + val invalidPath2 = new Path(basePath, "ts=1500/2.parquet") + val out2 = fs.create(invalidPath2) + out2.write(1) + out2.close() + + // collect result for table + val result = spark.sql( + s"""call show_invalid_parquet(path => '$basePath')""".stripMargin).collect() + assertResult(2) { + result.length + } + } + } +}