Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Support for Schema evolution. Facing an error #1845

Closed
sbernauer opened this issue Jul 17, 2020 · 23 comments
Closed

[SUPPORT] Support for Schema evolution. Facing an error #1845

sbernauer opened this issue Jul 17, 2020 · 23 comments

Comments

@sbernauer
Copy link
Contributor

Hi Hudi team,

In https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-What'sHudi'sschemaevolutionstory you describe, that "As long as the schema passed to Hudi [...] is backwards compatible [...] Hudi will seamlessly handle read/write of old and new data and also keep the Hive schema up-to date."
We need and try to use this mechanism but are failing.
Steps to reproduce:

  1. We have some old schema and events.
  2. We update the schema with a new, optional union field and restart the DeltaStreamer
  3. We ingest new events
  4. We ingest old events again (there are some upserts). In production we would have both event versions until all producers have changed to the new version. At this step the Deltastream crashes with
210545 [pool-28-thread-1] ERROR org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  - Shutting down delta-sync due to exception
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 22.0 failed 4 times, most recent failure: Lost task 0.3 in stage 22.0 (TID 1162, 100.65.150.166, executor 1): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
        at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:253)
        at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.lambda$execute$caffe4c4$1(BaseCommitActionExecutor.java:102)
        at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
        at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:889)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:889)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:362)
        at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1388)
        at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1298)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1362)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1186)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to close UpdateHandle
        at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:302)
        at org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdateInternal(CommitActionExecutor.java:101)
        at org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdate(CommitActionExecutor.java:74)
        at org.apache.hudi.table.action.deltacommit.DeltaCommitActionExecutor.handleUpdate(DeltaCommitActionExecutor.java:73)
        at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:246)
        ... 28 more
Caused by: java.io.EOFException
        at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
        at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
        at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423)
        at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
        at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
        at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:108)
        at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:80)
        at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:271)
        ... 32 more

I've created a test to reproduce this behaviour here: #1844. I hope the test is helpful.
The test cause the following exception, but i think they are quite related.

Job aborted due to stage failure: Task 0 in stage 16.0 failed 1 times, most recent failure: Lost task 0.0 in stage 16.0 (TID 23, localhost, executor driver): java.lang.ArrayIndexOutOfBoundsException: 19
        at org.apache.avro.generic.GenericData$Record.get(GenericData.java:212)
        at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToRow$9(AvroConversionHelper.scala:170)
        at org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:66)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator.isEmpty(Iterator.scala:385)
        at scala.collection.Iterator.isEmpty$(Iterator.scala:385)
        at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1429)
        at org.apache.hudi.AvroConversionUtils$.$anonfun$createRdd$2(AvroConversionUtils.scala:44)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:801)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:801)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
        at org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer.testSchemaEvolution(TestHoodieDeltaStreamer.java:491)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 19

I think the mailing list entry from avro could help a lot: http://apache-avro.679487.n3.nabble.com/AVRO-schema-evolution-adding-optional-column-with-default-fails-deserialization-td4043025.html
The corresponding place in code is here:

GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);

Thanks a lot in advance!

Cheers,
Sebastian

Expected behavior
As stated in FAQ schema evolution should not crash the DeltaStreamer

Environment Description

  • Hudi version : current master branch

  • Spark version : 3.0.0

  • Hive version : 3.1.2

  • Hadoop version : 3.2.1

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : yes, running on kubernetes

@bvaradar
Copy link
Contributor

Thanks Sebastian for the detailed information. From looking at the code related to your originally seen exception (not the test), it has to do with plain avro schema compatibility rules. Would you mind providing both versions of schema. From the test code in PR, there are 4 different fields (various combination of nullable vs default types). I would like to see which field specifically is contributing to the issue.

@sbernauer
Copy link
Contributor Author

Thanks for your fast reply!

The PR adds a new Test and improves 2 existing tests. The mentioned 4 new cols in TestHoodieAvroUtils increase the number of tested cases, the tests are still working.
The new test is in TestHoodieDeltaStreamer and starts the DeltaStreamer with different schemas and transformers to reproduce the actual problem.

Regarding the both versions of my schema, i can only provide a diff but it should be sufficient. The new field has the same type (union of null and string, default of null) as reproduced in the test here https://github.com/apache/hudi/pull/1844/files#diff-07dd5ed6077721a382c35b2700da0883R130.

diff Event.json_schema Event-aged.json_schema

},
{
  "name": "agedOptionalField",
  "type": [
    "null",
    {
      "type": "string",
      "avro.java.string": "String"
    }
  ],
  "doc": "New aged optional field",
  "default": null

@bvaradar
Copy link
Contributor

@sbernauer : Are you appending this field to the end of the schema ? Otherwise looks ok. Although honestly, I have not seen the usage of "avro.java.string": "String" before.

@prashantwason @n3nash : Any ideas what is wrong here ?

@sbernauer
Copy link
Contributor Author

@bvaradar, yes i am appending the field to end of the schema (as reproduced in the test). The definition of the event is outside my scope, we just consume this events ;)

@n3nash
Copy link
Contributor

n3nash commented Jul 22, 2020

@sbernauer I briefly looked at the test-case you added and I see what you are trying to reproduce. The issue seems to be as follows :

  1. Generate data with schema A, pass schema A
  2. Generate data with schema B, pass schema B
  3. Generate data with schema A, pass schema B <fails in AvroConversionHelper>
    Where schema A is smaller schema, schema B is a larger schema with 1 added field.
    @bvaradar Have we tested such a scenario before with the AvroConversionHelper class where we convert the RDD to DF with schema A, but pass schema B as the writer schema ?

@bvaradar
Copy link
Contributor

Thanks @n3nash
@sbernauer : I think the exception you are seeing in the production could be because of different reasons than the tests. I would like to decouple them and focus on the exception you are seeing in your prod setup. To localize the problem (due to pure avro issue vs hudi-avro-usage), can you write an avro only test (without any hudi/spark), writing a record with your old schema and reading with new schema and ensure if it works fine ?

Also, I see that you are using spark and Hadoop 3.x. Please make sure you are using the right version of avro that we support - 1.8.2 at runtime. That could also be an issue.

@sbernauer
Copy link
Contributor Author

sbernauer commented Jul 23, 2020

Hi @bvaradar,

I've created a test using pure avro DataFileWriter to reproduce my DeltaStreamer-test and it kind of works.
You can find it here https://github.com/sbernauer/avro-schema-evolution-test/blob/master/src/test/java/main/AvroSchemaEvolutionTest.java
There is no exception, but the evoluted optional field is missing. We can also check the generated avro file.

 java -jar avro-tools-1.8.2.jar tojson output.avro
{"timestamp":0.0,"rider":"myRider0","driver":"myDriver0"}
{"timestamp":1.0,"rider":"myRider1","driver":"myDriver1"}
{"timestamp":2.0,"rider":"myRider2","driver":"myDriver2"}

The second test

1.) Write record of B using schema B
2.) Write record of A using schema B
It fails with ArrayIndexOutOfBoundsException.

I found another post https://stackoverflow.com/questions/34733604/avro-schema-doesnt-honor-backward-compatibilty, it provides a testcase for schema evolution using a BinaryDecoder and reproduces the EOF exception.
The mailing list entry http://apache-avro.679487.n3.nabble.com/AVRO-schema-evolution-adding-optional-column-with-default-fails-deserialization-td4043025.html describes the same problem but causes the ArrayIndexOutOfBoundsException.

My setup is using avro 1.8.2.

@prashantwason
Copy link
Member

The javadoc for DatumWriter says:

"Stores in a file a sequence of data conforming to a schema. The schema is stored in the file with the data. Each datum in a file is of the same schema."

Your test case has records with different schema being written to the same file.

Please try using the GenericDatumReader construtor which specifies both the reader/writer schema.
GenericDatumReader(Schema writer, Schema reader)

@sbernauer
Copy link
Contributor Author

sbernauer commented Jul 23, 2020

Yes, to be correct you have to specify both reader and writer schema to GenericDatumReader. Hudi currenty only passes one schema here

GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
and here
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);

I think that's a problem

I would have used GenericDatumReader with reader and writer schema, but first I wanted to append new evoluted events to an existing file. Looking at "Each datum in a file is of the same schema" this seems expected behavior to be impossible.

"Each datum in a file is of the same schema" - How is the strategy of Hudi when there are Upserts for an evoluted event? The new events can't simply be written to an existing file.
Every time a file must be updated, read it correctly with the reader and writer schema and than create a new file with the new schema?

@prashantwason
Copy link
Member

I would have used GenericDatumReader with reader and writer schema, but first I wanted to append new evoluted events to an existing file
In HUDI log files, records are added to AvroDataBlock which have a header containing the schema for the writer. Different AvroDataBlock could have different schemas but all records within the same block should have the same schema.

Log blocks are read using both reader/write schema.

https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java#L143

@prashantwason
Copy link
Member

The issue is probably related to using the wrong schema for Upsert. Let me expand your original steps to reproduce and you can correct me:

Steps to reproduce:

  1. We have some old schema (SCHEMA_V1) and events.
  2. We update the schema with a new (SCHEMA_V2), optional union field and restart the DeltaStreamer
  3. We ingest new events (Events with SCHEMA_V2)
  4. We ingest old events again (there are some upserts). ?????? What schema is being used here?

If in step 4 you are using SCHEMA_V1 then it wont be able to read in records ingested at step 3 as that is a newer schema.

@sbernauer
Copy link
Contributor Author

  1. We ingest old events again (there are some upserts). ?????? What schema is being used here?

At this step I used SCHEMA_V2
We use Deltastreamer in continues mode and only restart it in step 2, where we provide the new SCHEMA_V2 to the Deltastreamer.
I tried to reproduce everything as good as possible in my DeltaStreamer-test

@sbernauer
Copy link
Contributor Author

Is there anything we can do further to resolve this issue?

@bvaradar
Copy link
Contributor

@sbernauer : Do you mean using SCHEMA_v1 at step 4. Otherwise, both reader and writer schema is same. right ?

There are couple of issues here :

  1. Regarding your production setup, From the looks of it, you are not passing any transformers : In this case the source avro record is converted to HoodieRecord. In this process, we serialize the record to bytes using the schema referenced by the generic record. We do this to reduce memory footprint while shuffling records as Spark native serialization of avro records also serializes Schema for each record. I am a little surprised on how this issue was not seen early enough in the community. This made me wonder if the schema was indeed backwards compatible. But from your snippet, it does not look to be the case. Let me open a jira to track this issue. We are anyways going to tackle schema evolution in general in next release. For your problem though, you can implement a new Payload class which does not serialize to bytes upon construction (Example : https://github.com/apache/hudi/blob/hoodie-0.4.0/hoodie-spark/src/main/java/com/uber/hoodie/OverwriteWithLatestAvroPayload.java). Note that once you do this, you will be able to write the records to parquet files successfully but we would need to test reading from it. Hudi uses parquet-avro library and passes latest schema to. If parquet-avro is unable to read, then you can set ("hoodie.avro.schema.externalTransformation=true") after [HUDI-242] Support for RFC-12/Bootstrapping of external datasets #1876 gets landed (ETA: sometime this week).

  2. Regarding your unit-test failure, this happens in AvroConversionUtils which is used when transformer is configured. The code itself is based on spark-avro. I will open a jira for it. Would you be interested in looking at the code in AvroConversionUtils to see how this can be fixed since you already wrote the tests for it ?

@bvaradar
Copy link
Contributor

@sbernauer
Copy link
Contributor Author

Thanks @bvaradar for opening the tickets!
I will continue commenting in this tickets.

@sbernauer
Copy link
Contributor Author

Do you mean using SCHEMA_v1 at step 4. Otherwise, both reader and writer schema is same. right ?

I could have been more precise. At Step 4 we consume events without the new optional field (we could call them events produced with SCHEMA_V1). The Deltastreamer is passed the SCHEMA_v2.

@vinothchandar
Copy link
Member

cc @nsivabalan as well. This is related to the schema evolution PR.

@vinothchandar
Copy link
Member

@n3nash The concrete thing to do here, is to just take and repro this scenario with master branch and see if its still an issue.

@sbernauer
Copy link
Contributor Author

sbernauer commented Jun 5, 2021

Hi @vinothchandar, @nsivabalan and me are working on Slack trying to get our schema evolution usecase running. Currently only Inserts, no Upserts for CoW. And more complicated avro schema evolution than adding a field at the end of the schema. I'm trying out his patch in #2927
I seems to work fine but only if I put in a dummy transformation and specify both - source and target - Schemas. Setting target schema to null the schema evolution part fails, setting no transformer does also cause problems If I recall correctly.

@n3nash n3nash moved this from Blocked On User to Repro Needed in GI Tracker Board Jun 9, 2021
@n3nash
Copy link
Contributor

n3nash commented Jun 16, 2021

@nsivabalan Can you please reply above ?

@nsivabalan
Copy link
Contributor

yes, I am in sync w/ @sbernauer via slack. He confirmed that the PR we have put up works for him (older records able to be upserted to hudi after schema evolved w/ hudi table). He is doing more testing for now.

@nsivabalan
Copy link
Contributor

@sbernauer : closing this issue out as we have the fixes landed already. Feel free to open up a new issue if you have more requirements. thanks!

GI Tracker Board automation moved this from Repro Needed to Done Sep 20, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
6 participants