Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Implicit schema changes supported by Avro schema-resolution will not work properly if there are filegroups with old schema #7444

Closed
voonhous opened this issue Dec 13, 2022 · 10 comments · Fixed by #7480
Assignees
Labels
priority:major degraded perf; unable to move forward; potential bugs schema-and-data-types

Comments

@voonhous
Copy link
Member

voonhous commented Dec 13, 2022

  • Have you gone through our FAQs? (Yes)

  • Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.

  • If you have triaged this as a bug, then file an issue directly.

Implicit schema changes that do not write to the .schema folder will cause read issues on Spark's end.

The current implementation of Schema Evolution is as such:
If the schema change is supported by the Avro's Schema resolution, ALTER TABLE DDL is not required.

The column type changes that are supported by Avro's Schema resolution is as such:
image

Caveat:
The current implementation is sufficient provided that ALL data is re-written with the new schema. However, if there are certain filegroups/partition that are still in the old schema when being read out, errors will be thrown.

As such, the current support for implicit column changes is still a little buggy when it comes to column type changes.

To reproduce the issue, one can use this script below to test the schema evolution that is "allegedly" supported by Hudi's implicit schema change support.

What the test does is write a partition in the old schema, followed by inserting a row with a new schema into another partition.

Note: This mainly affects schema-type changes only

Steps to reproduce the behavior:

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *

 */
package org.apache.hudi

import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}

class TestAvroSchemaResolutionSupportError extends HoodieClientTestBase {

  var spark: SparkSession = _
  val commonOpts: Map[String, String] = Map(
    HoodieWriteConfig.TBL_NAME.key -> "hoodie_avro_schema_resolution_support",
    "hoodie.insert.shuffle.parallelism" -> "1",
    "hoodie.upsert.shuffle.parallelism" -> "1",
    DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE",
    DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
    DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "id",
    DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "name",
    DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.SimpleKeyGenerator",
    HoodieMetadataConfig.ENABLE.key -> "false"
  )

  /**
   * Setup method running before each test.
   */
  @BeforeEach override def setUp(): Unit = {
    setTableName("hoodie_avro_schema_resolution_support")
    initPath()
    initSparkContexts()
    spark = sqlContext.sparkSession
  }

  @AfterEach override def tearDown(): Unit = {
    cleanupSparkContexts()
  }

  def castColToX(x: Int, colToCast: String, df: DataFrame): DataFrame = x match {
    case 0 => df.withColumn(colToCast, df.col(colToCast).cast("long"))
    case 1 => df.withColumn(colToCast, df.col(colToCast).cast("float"))
    case 2 => df.withColumn(colToCast, df.col(colToCast).cast("double"))
    case 3 => df.withColumn(colToCast, df.col(colToCast).cast("binary"))
    case 4 => df.withColumn(colToCast, df.col(colToCast).cast("string"))
  }

  def initialiseTable(df: DataFrame, saveDir: String): Unit = {
    df.write.format("hudi")
      .options(commonOpts)
      .mode("overwrite")
      .save(saveDir)
  }

  def upsertData(df: DataFrame, saveDir: String): Unit = {
    df.write.format("hudi")
      .options(commonOpts)
      .mode("append")
      .save(saveDir)
  }

  @Test def testDataTypePromotion(): Unit = {
    val _spark = spark
    import _spark.implicits._

    val colToCast = "userId"
    val df1 = Seq((1, 100, "aaa")).toDF("id", "userid", "name")
    val df2 = Seq((2, 200L, "bbb")).toDF("id", "userid", "name")
    val tempRecordPath = basePath + "/record_tbl/"

    def doTest(colInitType: String, start: Int, end: Int): Unit = {
      for (a <- Range(start, end)) {
        try {
          Console.println(s"Performing test: $a with $colInitType")

          // convert int to string first before conversion to binary
          val initDF = if (colInitType == "binary") {
            val castDf1 = df1.withColumn(colToCast, df1.col(colToCast).cast("string"))
            castDf1.withColumn(colToCast, castDf1.col(colToCast).cast(colInitType))
          } else {
            df1.withColumn(colToCast, df1.col(colToCast).cast(colInitType))
          }
          initDF.printSchema()
          initDF.show(false)

          // recreate table
          initialiseTable(initDF, tempRecordPath)

          // perform avro supported casting
          var upsertDf = df2
          upsertDf = castColToX(a, colToCast, upsertDf)
          upsertDf.printSchema()
          upsertDf.show(false)

          // upsert
          upsertData(upsertDf, tempRecordPath)

          // read out the table
          val readDf = spark.read.format("hudi").load(tempRecordPath)
          readDf.printSchema()
          readDf.show(false)
          readDf.foreach(_ => {})

          assert(true)
        } catch {
          case e: Exception => {
            // e.printStackTrace()
            // Console.println(s"Test $a failed with error: ${e.getMessage}")
            assert(false, e)
          }
        }
      }
    }

    // INT -> [Long, Float, Double]
    doTest("int", 0, 3)
    // Long -> [Float, Double]
    doTest("long", 1, 3)
    // Float -> [Double]
    doTest("float", 2, 3)
    // String -> [Bytes]
    doTest("string", 3, 4)
    // Bytes -> [String]
    doTest("binary", 4, 5)
  }
}
  1. Copy and paste the snippet into: ~/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupportError.scala
  2. Switch profile to Spark3
  3. Run the function/test testDataTypePromotion as a test case

Expected behavior

Able to do a full table scan.

Environment Description

  • Hudi version : 0.10, 0.11, 0.12, 0.13

  • Spark version : 3.x

  • Hive version : NIL

  • Hadoop version : NIL

  • Storage (HDFS/S3/GCS..) : NIL

  • Running on Docker? (yes/no) : NO

Additional context

Add any other context about the problem here.

Stacktrace

java.lang.AssertionError: assertion failed: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 46.0 failed 1 times, most recent failure: Lost task 0.0 in stage 46.0 (TID 53) (1.2.3.4 executor driver): org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file file:///var/folders/p_/09zfm5sx3v14w97hhk4vqrn8s817xt/T/junit5722563086978229716/dataset/record_tbl/aaa/bec9f5b7-09e6-40c3-9c53-de8bbaa2d656-0_0-14-19_20221213200558758.parquet. Column: [userId], Expected: bigint, Found: INT32
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:179)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:503)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.constructConvertNotSupportedException(VectorizedColumnReader.java:339)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readIntBatch(VectorizedColumnReader.java:571)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:294)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:283)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:181)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173)
	... 20 more
@voonhous voonhous changed the title [SUPPORT] Implicit schema changes supported by Avro schema-resolution will [SUPPORT] Implicit schema changes supported by Avro schema-resolution will not work properly if there are filegroups with old schema Dec 13, 2022
@codope codope added schema-and-data-types priority:major degraded perf; unable to move forward; potential bugs labels Dec 13, 2022
@codope
Copy link
Member

codope commented Dec 13, 2022

@voonhous Thanks for sharing a test to reproduce the issue!
@xiarixiaoyao Can you please take a look?

@xiarixiaoyao
Copy link
Contributor

@voonhous
Implicit schema changes that do not write to the .schema folder will cause read issues on Spark's end.
but we can enable Implicit full schema evolution by enable following two parameters。

"hoodie.schema.on.read.enable"  ->  "true",
"hoodie.datasource.write.reconcile.schema" -> "true",

test by master branch

    // INT -> [Long, Float, Double]    pass
    doTest("int", 0, 3)    
    // Long -> [Float, Double]
    doTest("long", 1, 3)       pass
    // Float -> [Double]
    doTest("float", 2, 3)     pass
    // String -> [Bytes]   faild,  full  schema evolution not support  string   ->  byte.
//    doTest("string", 3, 4)
    // Bytes -> [String]   faild  full  schema evolution   not support  byte  -> string.
//    doTest("binary", 4, 5)

   full schema evolution
   * now only support:
   * int => long/float/double/string
   * long => float/double/string
   * float => double/String
   * double => String/Decimal
   * Decimal => Decimal/String
   * String => date/decimal
   * date => String

@xiarixiaoyao
Copy link
Contributor

@codope @nsivabalan
At first, the schema evolution configuration was called hoodie.schema.evolution.enable, and later changed to hoodie.schema.on.read.enable.
however hoodie.schema.on.read.enable is verify data on write, not on read
In other words, user must enable both hoodie.schema.on.read.enable and hoodie.datasource.write.reconcile.schema when writing data to hudi.
If user forgets to enable these two parameters, and in incompatible data types being written into the hudi table, the hudi table will not be read out,it's not friendly to user.
I'll raise a PR to fix this. and the meaning of schema.on.read.enabled will be correct

@voonhous
Copy link
Member Author

voonhous commented Dec 15, 2022

@xiarixiaoyao Thank you for the reply.

I tested your proposed fix and it works.

Changes that need to be made to the tests i provided:

  val commonOpts: Map[String, String] = Map(
    HoodieWriteConfig.TBL_NAME.key -> "hoodie_avro_schema_resolution_support",
    "hoodie.insert.shuffle.parallelism" -> "1",
    "hoodie.upsert.shuffle.parallelism" -> "1",
    DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE",
    DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
    DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "id",
    DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "name",
    DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.SimpleKeyGenerator",
    HoodieMetadataConfig.ENABLE.key -> "false",
    "hoodie.schema.on.read.enable" -> "true",
    "hoodie.datasource.write.reconcile.schema" -> "true"
  )
val readDf = spark.read.format("hudi").options(commonOpts).load(tempRecordPath)

@codope
Copy link
Member

codope commented Dec 15, 2022

@xiarixiaoyao Thanks for triaging and proposing the fix!
@voonhous Can this issue be closed now?

@voonhous
Copy link
Member Author

voonhous commented Dec 15, 2022

@codope While this issue can be fixed with the 2 parameters provided above, there is a possibility that implicit schema changes can still be with the default parameter values (2 parameters set to false).

I do believe this is not a "proper" fix for such cases. Say if these implicit schema changes have already been written to the table, there might not be any recourse that users can do to "fix" the table.

I believe the proper way of fixing this issue is to:

  1. Enable these 2 parameters by default (Requires [HUDI-4588][HUDI-4472] Addressing schema handling issues in the write path #6358 and it's accompanying fixes)
  2. Should there be any implicit schema changes detected, enable these 2 parameters (Requires [HUDI-4588][HUDI-4472] Addressing schema handling issues in the write path #6358 and it's accompanying fixes)
  3. Prevent implicit changes if these 2 parameters are not enabled (Requires [HUDI-4588][HUDI-4472] Addressing schema handling issues in the write path #6358 and it's accompanying fixes)
  4. Modify SparkXXParquetFileFormat.scala to handle these type changes when reading (Does not need to rely on .schema/schema history)

I currently using approach (4) and will raise a PR for review for it tomorrow.

@codope
Copy link
Member

codope commented Dec 16, 2022

@voonhous #6358 is already in master branch. Have you tested with latest snapshot?

@voonhous
Copy link
Member Author

voonhous commented Dec 16, 2022

@codope Yeap, the proposed fix is working for the latest snapshot.

However, the fix will not work for filegroups that have already been written without the 2 parameters being enabled. (even on master/latest snapshot)

i.e. If the table (path) does not have any .schema changes, and there are multiple filegroups with differing schemas, IIUC, this error will still be thrown.

CMIIW, given that users are allowed to perform implicit schema changes without enabling the 2 parameters, reading of filegroups/base files with differing should be supported without enabling the 2 parameters no?

If it is not supported, the existing validation should be changed.

@voonhous
Copy link
Member Author

@codope @xiarixiaoyao Raised a PR here: #7480

@yihua
Copy link
Contributor

yihua commented Dec 22, 2022

Thanks @voonhous for putting up the fix.

Closing this ticket as the JIRA ticket HUDI-5400 is created, and the fix #7480 is near landing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:major degraded perf; unable to move forward; potential bugs schema-and-data-types
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

4 participants