Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-1129] Deltastreamer Add support for schema evolution #2012

Closed
wants to merge 1 commit into from

Conversation

sathyaprakashg
Copy link
Contributor

@sathyaprakashg sathyaprakashg commented Aug 23, 2020

What is the purpose of the pull request

When schema is evolved but producer is still producing events using older version of schema, Hudi delta streamer is failing. This fix is to make sure delta streamer works fine with schema evoluation.

Related issues #1845 #1971 #1972

Brief change log

  • Update avro to spark conversion method AvroConversionHelper.createConverterToRow to handle scenario when provided schema has more fields than data (scenario where producer is still sending events with old schema)
  • Introduce new payload class called BaseAvroPayloadWithSchema. This is used to store the writer schema part of payload. Currently, HoodieAvroUtils.avroToBytes uses the schema of the data to convert to bytes, but HoodieAvroUtils.bytesToAvro uses provided schema. Since both may not match always, it results in error. By having data's schema as part of payload, we can ensure, same schema is used for converting avro to bytes and bytes back to avro.

Verify this pull request

This change added tests and can be verified as follows:

  • Added unit test to verify schema evoluation Thanks @sbernauer for unit test

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@sathyaprakashg
Copy link
Contributor Author

@bvaradar @sbernauer

@sbernauer
Copy link
Contributor

Hi @sathyaprakashg, thanks for your work!

When i move the new field evoluted_optional_union_field to a place not at the end of the schema (somewhere in the middle) i get the following exception:
java.lang.ClassCastException: java.lang.Boolean cannot be cast to java.lang.Iterable.
This makes sense, since now the ids of the fields dont line up any more. So reducing the length to the minimum is not sufficient here.

I suggest using the field names instead of the ids (which dont match up any more after a new field in the middle). See sbernauer@1adbc7b#diff-3c046573a91f36ba0f12dad0e3395dc9R139

Should i open a PR for this change or do you want to modify yours?

Cheers,
Sebastian

@sathyaprakashg
Copy link
Contributor Author

Thanks @sbernauer for the code example. I fixed it now

@vinothchandar vinothchandar changed the title HUDI-1129 Deltastreamer Add support for schema evaluation HUDI-1129 Deltastreamer Add support for schema evolution Aug 31, 2020
Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

@sathyaprakashg does this method help? I dont think we need to introduce any new changes for this. Using reader and writer schema, as you mention is the way to go. writerSchema = data's schema and readerSchema = one we have from registry etc. Can we rework based on this.

If you have specific qs, please let me know.

@n3nash @pratyakshsharma this is an important issue to fix. Please chime in and let's get this landed soon

@sathyaprakashg
Copy link
Contributor Author

@vinothchandar Sure, I will start looking into using writer and reader schema. Once that looks good, then i will remove the new Schema provide class i added

@sathyaprakashg
Copy link
Contributor Author

@vinothchandar I updated the code to use reader and writer schema for avro payload. Please review it

@n3nash
Copy link
Contributor

n3nash commented Sep 2, 2020

@sathyaprakashg Looks good to me. Can you please see why the build is failing ?

@n3nash n3nash self-requested a review September 2, 2020 03:33
Copy link
Contributor

@n3nash n3nash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will land after build succeeds.

@sathyaprakashg
Copy link
Contributor Author

@n3nash I am working on fixing build issue and will have that fix pushed soon. I would like to point out that with this new approach, we are stroing writer schema part of payload, which means, size of dataframe would increase to store same schema information with each record. Any suggestion on optimizing this?

@sathyaprakashg
Copy link
Contributor Author

Below issue was causing build failure and i fixed it now by implementing a method HoodieAvroUtils.addNamespaceToFixedField. Not sure whether this is the best way to fix this issue. So, let me know if any sugestions.

Below is the height field defintion in triprec schema

  {
      "name": "height",
      "type": {
        "type": "fixed",
        "name": "abc",
        "size": 5,
        "logicalType": "decimal",
        "precision": 10,
        "scale": 6
      }
    }

When we use org.apache.spark.sql.avro.SchemaConverters.toAvroType method in AvroConversionHelper to convert Spark data type to avro type, spark-avro gives name as fixed and also adds namespace to it as per code here

  {
      "name": "height",
      "type": {
        "type": "fixed",
        "name": "fixed",
        "namespace": "hoodie.source.hoodie_source.height",
        "size": 5,
        "logicalType": "decimal",
        "precision": 10,
        "scale": 6
      }
    }

First schema is used as reader schema and second one is used as writer schema in HoodieAvroUtils.bytesToAvro. This results in below error.

ERROR org.apache.hudi.io.HoodieWriteHandle  - Error writing record HoodieRecord{key=HoodieKey { recordKey=c597b0c2-6f07-4c8e-b239-2752b60e0449 partitionPath=default}, currentLocation='null', newLocation='null'}
org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting abc
  at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
  at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
  at org.apache.avro.io.ValidatingDecoder.checkFixed(ValidatingDecoder.java:135)
  at org.apache.avro.io.ValidatingDecoder.readFixed(ValidatingDecoder.java:146)
  at org.apache.avro.generic.GenericDatumReader.readFixed(GenericDatumReader.java:342)
  at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
  at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
  at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
  at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
  at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
  at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
  at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
  at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:142)
  at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:70)
  at org.apache.hudi.execution.LazyInsertIterable$HoodieInsertValueGenResult.<init>(LazyInsertIterable.java:92)
  at org.apache.hudi.execution.LazyInsertIterable.lambda$getTransformFunction$0(LazyInsertIterable.java:105)
  at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:170)
  at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
  at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

In order to fix this issue, approach i took is change the field name from abc to fixed in all the schema used in test and also add namespace to reader schema if it exists in writer schema for fixed field.

If we just rename the field from abc to fixed but don't add namespace, then we get below error
org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed

@vinothchandar vinothchandar changed the title HUDI-1129 Deltastreamer Add support for schema evolution [HUDI-1129] Deltastreamer Add support for schema evolution Sep 8, 2020
@n3nash
Copy link
Contributor

n3nash commented Sep 8, 2020

@sathyaprakashg Thanks for looking into this. I see that the org.apache.spark.sql.avro.SchemaConverters uses the fixed name so it's difficult to workaround it. Your approach sounds fine to me as long as it does not break any existing tests. Left a couple of comments for changes.

@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
* Convert serialized bytes back into avro record.
*/
public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {
Copy link
Contributor

@n3nash n3nash Sep 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How many places is this method being used ? Is adding namespace to the reader schema backwards compatible for older data/schema written without the namespace ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@n3nash Yes, it is very valid point. Adding namespace to reader schema might make it incompatible with data already written without namespace.

I am thinking we should probably remove the namespace in the writerSchema fixed fields instead of adding namespace to the reader schema fixed fields. Since we are not altering reader schema in this approach, there shouldn't be incompatibility issue with data already written.

If you also agree with this approach, i will update AvroConversionHelper.createConverterToAvro method to remove namespace in writer schema fixed fields and revoke the changes made in HoodieAvroUtils

Copy link
Contributor

@n3nash n3nash Sep 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sathyaprakashg But like you mentioned, if you don't add namespace, you see the type mismatch exception, right ? How are you planning to avoid adding namespace but still not run into the type exception ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@n3nash This is the execption we get when we don't add namespace to reader schema. org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed.

This exception is raised because we have namespace in writer schema and not in reader schema. As per my proposed approach above, if we remove the namespace from writer schema, then we wouldn't get type exception, because now namespace does not exist in fixed fields for both writer and reader schema

@pratyakshsharma
Copy link
Contributor

Lagging a bit, will circle back on this.

@n3nash
Copy link
Contributor

n3nash commented Sep 16, 2020

@sathyaprakashg thanks for the explanation, please update the PR accordingly, rebase and squash the commits.

@sathyaprakashg
Copy link
Contributor Author

@n3nash Changes are done. Please review it when you get time

* @param schema Schema from which namespace needs to be removed for fixed fields
* @return input schema with namespace removed for fixed fields, if any
*/
def removeNamespaceFromFixedFields(schema: Schema): Schema ={
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sathyaprakashg : What happens to existing records in hudi dataset which have namespace for fixed fields ?
https://github.com/apache/hudi/blob/master/hudi-client/src/main/java/org/apache/hudi/table/action/commit/MergeHelper.java#L70

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bvaradar

In delta streamer, currently, we have below three flows

  1. No transformation
  2. Transformation with userProvidedSchema
  3. Transformation without userProvidedSchema

Only schema converted from spark data type to avro schema has this namespace added to fixed fields. In delta streamer, currently we use user provided schema (userProvidedSchemaProvider.targetSchema) to convert bytes to avro, except for thrid flow (Transformation without userProvidedSchema). In such case, we derive schema from spark data type. So, backward compatible issue arises only when we use transformer and no user provided schema.

Below is example of avro fixed field with and without namespace.

{"name":"height","type":{"type":"fixed","name":"fixed","size":5,"logicalType":"decimal","precision":10,"scale":6}}

{"name":"height","type":{"type":"fixed","name":"fixed","namespace":"hoodie.source.hoodie_source.height","size":5,"logicalType":"decimal","precision":10,"scale":6}}

Both of these result in same parquet schema
required fixed_len_byte_array(5) height (DECIMAL(10,6));

As we can see here, namespace in fixed field does not seem to have any impact on parquet schema. So, may be HoodieFileReader in MergeHelper file you referred shouldn't have any issue?

In general, it looks parquet file in existing hudi dataset would not have issue. I tested in COPY ON WRITE table type and couldn't see any issue. But in case of MERGE ON READ table, I could see issue for thrid flow (Transformation without userProvidedSchema). Below is the stack trace.

51511 [Executor task launch worker for task 502] ERROR org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner  - Got exception when reading log file
org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed
	at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
	at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
	at org.apache.avro.io.ValidatingDecoder.checkFixed(ValidatingDecoder.java:135)
	at org.apache.avro.io.ValidatingDecoder.readFixed(ValidatingDecoder.java:146)
	at org.apache.avro.generic.GenericDatumReader.readFixed(GenericDatumReader.java:342)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
	at org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.deserializeRecords(HoodieAvroDataBlock.java:157)
	at org.apache.hudi.common.table.log.block.HoodieDataBlock.createRecordsFromContentBytes(HoodieDataBlock.java:128)
	at org.apache.hudi.common.table.log.block.HoodieDataBlock.getRecords(HoodieDataBlock.java:106)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processDataBlock(AbstractHoodieLogRecordScanner.java:275)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processQueuedBlocksForInstant(AbstractHoodieLogRecordScanner.java:308)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:241)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
	at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:259)
	at org.apache.hudi.HoodieMergeOnReadRDD$$anon$2.<init>(HoodieMergeOnReadRDD.scala:164)
	at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:154)
	at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

In summary, it looks third flow (Transformation without userProvidedSchema) produces different output schema in log file when compared to two other flows if there are fixed fields and this means if we want to change from thrid flow to say first flow (by removing transformation), then we already have problem since log files in MERGE ON READ table will have different schema, if there are fixed fields. This PR may cause backward compatible issue for thrid flow but would make sure, we produce same schema regardless of which flow we use.

Incase if you have better suggestion to make this work without causing issue in existing dataset for third flow, please let me know, happy to update PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bvaradar @n3nash @pratyakshsharma Just checking to see if you guys had chance to review my last comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sathyaprakashg : Thanks for the detailed write up. Sorry for missing this part. Regarding the 3rd flow (transformation without user provided schema), the exception indicate https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java#L157 but per your observation, HoodieAvroUtils.bytesToAvro() works without issue. Can you see if you can use HoodieAvroUtils.bytesToAvro() in HoodieAvroDataBlock. Does this solve the issue w.r.t schema evolution handling ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bvaradar HoodieAvroDataBlock.deserializeRecords and HoodieAvroUtils.bytesToAvro both uses simillar code (avro deserializer) to convert byte to avro. So using HoodieAvroUtils.bytesToAvro() in HoodieAvroDataBlock wouldn't resolve this error.

Reason bytesToAvro works fine is because it handles only the records converted to bytes using HoodieAvroUtils.avroToBytes in the same run. So, same schema is used for both avroToBytes and bytesToAvro and hence no error.

But HoodieAvroDataBlock.deserializeRecords converts the records from avro delta file and since the the 3rd flow (transformation without user provided schema) introduces backward incompatible change for fixed field as discussed earlier, it is throwing error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@n3nash : This might require holistic look at how schema evolution is handled.

As a last option before I let @n3nash decide on how to best take in this change, @sathyaprakashg : Since this is not a backwards compatible change in the true sense (underlying type is same), Can you try adding a additional where, we do a variant of https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L73

In HoodieAvroDataBlock:

  1. Use genericReader with only old schema. This will avoid schema evolution handling.
  2. Create a genericWriter and writes the record back to bytes but written with the new (updated) schema
  3. then use genericReader (like 1) to read but use the updated schema

Can you see if this works around the issue ? If it does, then this needs to be a configuration controlled feature when reading records from log records.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@n3nash @bvaradar I checked the three steps you mentioned and it works fine when the reader and writer schema has same set of fields (and writer schema has namespace in fixed field).

If reader schema has extra field then, this approach does not work. Here is an example that has extra field in reader schema and gives error. When schema evolves, table schema (reader schema) may have more or less number of fields then writer schema(mor log file schema). So, if we have to implement this approach, then it would work only when schema is same (except the extra namespace information in writer schema). Please let me know how to handle this or correct me if approach i took is wrong.

Just to recap, issue we are trying to solve is, in the existing code, when we write fixed avro field in mor log file, it gets written with extra namespace information in one of the flow (Transformation without userProvidedSchema) but not in other two flows and with this PR, extra namespace information will no longer be written.

Since this extra namespace information is written only in mor log file and not in parquet file, one possible solution for user to do is do compaction before running job with this upgraded version of hudi. Also, compaction is not mandatory for upgrading to this version but only needs to be done if they are having fixed field in schema and they were using Transformation without userProvidedSchema flow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@n3nash @sathyaprakashg : The fundamental problem is this would break existing MOR table who have log records written with old namespace. right ? So, this would be unsafe. @n3nash : Thoughts ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bvaradar @n3nash Yes, it will break existing MOR table who have log records written with old namespace and one way it can be avoided is by doing one time compaction before running job with version of hudi that this change is going to ship with.

There are three different flows in delta streamer as I have explained in one of my previous comment and I would like to reiterate that it will affect only those using thrid flow (Transformation without userProvidedSchema) and also when schema has fixed fields. Even without this change, if user wants to change from thrid flow to any of the other flows, they will still face this issue. So, by implementing this change, we will make all three flows to produce same output

Copy link
Contributor

@giaosudau giaosudau Dec 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it didn't work with table with more than 2 fixed type field.

      "name": "fixed_type_field",
      "type": {
        "type": "fixed",
        "name": "fixed",
        "namespace": "hoodie.source.hoodie_source.fixed_type_field",
        "size": 7,
        "logicalType": "decimal",
        "precision": 15,
        "scale": 8
      }
    },
  "name": "fixed_type_field2",
      "type": {
        "type": "fixed",
        "name": "fixed",
        "namespace": "hoodie.source.hoodie_source.fixed_type_field2",
        "size": 7,
        "logicalType": "decimal",
        "precision": 15,
        "scale": 8
      }
    }
Exception in thread "main" org.apache.avro.SchemaParseException: Can't redefine: fixed
	at org.apache.avro.Schema$Names.put(Schema.java:1128)
	at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
	at org.apache.avro.Schema$FixedSchema.toJson(Schema.java:907)
	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
	at org.apache.avro.Schema.toString(Schema.java:324)
	at org.apache.hudi.utilities.schema.DebeziumSchemaRegistryProvider.getTargetSchema(DebeziumSchemaRegistryProvider.java:77)

with namespace it has another error

Caused by: java.lang.ClassCastException: java.nio.HeapByteBuffer cannot be cast to org.apache.avro.generic.GenericFixed
	at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToRow$5(AvroConversionHelper.scala:98)
	at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToRow$9(AvroConversionHelper.scala:173)
	at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToRow$9(AvroConversionHelper.scala:173)
	at org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:43)

Fixed by

case (d: DecimalType, FIXED) =>
          (item: AnyRef) =>
            if (item == null) {
              null
            } else {
              val decimalConversion = new DecimalConversion
//              val bigDecimal = decimalConversion.fromFixed(item.asInstanceOf[GenericFixed], avroSchema,
//                LogicalTypes.decimal(d.precision, d.scale))
//              createDecimal(bigDecimal, d.precision, d.scale)

              val bigDecimal = decimalConversion.fromBytes(item.asInstanceOf[ByteBuffer], avroSchema,
                LogicalTypes.decimal(d.precision, d.scale))
              createDecimal(bigDecimal, d.precision, d.scale)

            }
``` using the `case (d: DecimalType, BYTES) =>`

@nsivabalan
Copy link
Contributor

I spent sometime to understand this PR. thanks for putting it up @sathyaprakashg. I have few clarifications.

  1. Can you fix the description wrt latest status. I don't see SchemaBasedSchemaProvider etc.

  2. FYI We landed a fix wrt default vals and null in unions. If incase, the schema post processing is not required at all w/ this fix, it would simplify things. Guess the namespace fix in this PR may not be required if the post processing step is not required. @bvaradar @n3nash : can you folks chime in here please. another related fixed datatype jira. the backwards incompatibility may not be an issue if we go this route.

  3. Also, I pulled the test locally and was trying to verify things. Looks like the test is not generating records as intended in 3rd step. Here is what is happening.

    • TestDataSource generates data w/ intended schema(old)
    • But in SourceFormatAdapter, when we do AvroConversionUtils.createDataFrame(...), evolved schema is passed in. and so InputBatch returned from here has new column set to null for all records.
    • I also verified this from within the IdentityTransformer which was showing evolved schema and record having new column as well.
      so, essentially the test also need to be fixed.

@vinothchandar : We need to iron out the perf issue. But these were my comments earlier. it could simplify the backwards compatibility issue which was being discussed.

@nsivabalan
Copy link
Contributor

@sathyaprakashg and others: trying to understand the use-case here. I understand its related to deltastreamer receiving events in old schema after Hudi's dataset schema got evolved. what's the schema from schema provider when source is producing events in old schema (after schema got evolved w/ hudi dataset)? if the schema provider's schema is updated, I guess there is no need to store the writer schema w/ payload.
AvroConversionUtils.createDataFrame() will ensure to convert the JavaRDD w/ old schema to Dataset w/ new schema if schemaProvider.SourceSchema() has the evolved schema.

@tandonraghavs
Copy link

@sathyaprakashg @vinothchandar Can this PR handle this issue as well?

@sathyaprakashg
Copy link
Contributor Author

@sathyaprakashg and others: trying to understand the use-case here. I understand its related to deltastreamer receiving events in old schema after Hudi's dataset schema got evolved. what's the schema from schema provider when source is producing events in old schema (after schema got evolved w/ hudi dataset)? if the schema provider's schema is updated, I guess there is no need to store the writer schema w/ payload.
AvroConversionUtils.createDataFrame() will ensure to convert the JavaRDD w/ old schema to Dataset w/ new schema if schemaProvider.SourceSchema() has the evolved schema.

PR Tracker Board automation moved this from Opened PRs to Done May 8, 2021
@sathyaprakashg sathyaprakashg reopened this May 8, 2021
PR Tracker Board automation moved this from Done to Opened PRs May 8, 2021
@sathyaprakashg
Copy link
Contributor Author

sathyaprakashg commented May 8, 2021

@sathyaprakashg and others: trying to understand the use-case here. I understand its related to deltastreamer receiving events in old schema after Hudi's dataset schema got evolved. what's the schema from schema provider when source is producing events in old schema (after schema got evolved w/ hudi dataset)? if the schema provider's schema is updated, I guess there is no need to store the writer schema w/ payload.
AvroConversionUtils.createDataFrame() will ensure to convert the JavaRDD w/ old schema to Dataset w/ new schema if schemaProvider.SourceSchema() has the evolved schema.

Issue with schema evoluation happens in HoodieAvroUtils.avroToBytes and HoodieAvroUtils.bytesToAvro. Let us consider a scenario where there are two versions of schema in schema regsitry and in the 2nd (latest) version, there is a new field added. But data is stilling coming with schema version 1.

HoodieAvroUtils.avroToBytes uses schema part of data (i.e v1 schema) to convert avro to bytes. HoodieAvroUtils.bytesToAvro uses the latest schema (v2) from schema registryto convert the bytes to avro. This will fail because v1 schema was used to convert to bytes, but v2 schema is being used to convert bytes back to avro.

In order to solve this, we need both v1 (writer schema) and v2 (reader schema) to convert bytes back to avro. We can get v2 schema from schema registry, but to get v1 schema, we were trying to store the writer schema part of the payload itself.

Please let me know if it is still not very clear

@nsivabalan
Copy link
Contributor

nsivabalan commented May 8, 2021

yes, thanks for clarifying. I guess, embedding schema in every payload might be detrimental as you have experienced. So, have thought of a diff approach to regenerate records w/ new schema at spark datasource layer. Only the batch that is getting ingested w/ old schema after table's schema got evolved will take a hit with this conversion.

#2927

Also, as I have mentioned earlier, if others (@n3nash , @bvaradar ) confirm that schema post processor is not required as a mandatory step with this fix for default vals, we don't need any changes in delta streamer as such, just #2927 would suffice.

@n3nash is doing more testing around this as well. So, will wait for him to comment on the patch as well.

@n3nash
Copy link
Contributor

n3nash commented May 8, 2021

At a high level, my understanding is there are 3 issues

  1. Removing namespace could cause backwards compatibility -> Validated with this PR that it should be OK to make this change.
  2. Records with older schema received without a user defined schema. This breaks the getInsertRecord API since the writer schema is smaller than the reader schema and bytes to generic record conversion breaks, addressed by @nsivabalan diff
  3. For general schema evolution to work when converting from DataFrame to Avro, we need to ensure that new elements added in the schema have a default value "null" using the UNION type as follows name: a, type ["null", <type>]. Spark's internal converter breaks the Avro spec of creating a UNION by reversing the order of null as follows name: a, type [<type>, "null"] , @nsivabalan fixed this with this PR by relying on Hudi's internal ConversionHelper to ensure that "null" is appended first. Also, this PR addressed some other issues as well.

With all of these 3 fixes, we should be able to land this PR. @sathyaprakashg Let us know if there were any other concerns that you came across.

@nsivabalan I can confirm that (3) should be good to land. I've approved the diff and @bvaradar will be taking a look at this later today, once he confirms, we can land both (3) and then (2) with the assumption that (1) validates both.

@vinothchandar
Copy link
Member

vinothchandar commented May 11, 2021

@n3nash BaseAvroPayloadWithSchema is this still needed, which keeps the schema in each record? What are the concrete next steps here?

@vinothchandar vinothchandar moved this from Opened PRs to Ready for Review in PR Tracker Board May 11, 2021
@nsivabalan
Copy link
Contributor

@vinothchandar : we have raised another PR which fixes this use-case #2927. We don't need to store the schema with payload. Feel free to review the fix.

@n3nash
Copy link
Contributor

n3nash commented May 15, 2021

@vinothchandar We have found a way to avoid BaseAvroPayloadWithSchema. We are validating some more corner cases with respect to schema evolution in the PR @nsivabalan has pointed out before landing that PR. ETA is May 17th

@nsivabalan
Copy link
Contributor

@sathyaprakashg and others: Can you folks check out #2927 and see if it solves the schema evolution problem you guys are facing.

@nsivabalan
Copy link
Contributor

@sbernauer : We would appreciate if you can try out our fix #2927 and let us know if it work for you. We can sync up via slack if you are available so that we can have quick turn around or clarify any doubts you have along the way. If you are open, let us know your slackId, we can create a group and go about.

@sbernauer
Copy link
Contributor

Sure @nsivabalan i will try out the fix in #2927 and give feedback.
Thanks for the invitation for slack, i appreciate! My memberId is U022VCGBVLM

@nsivabalan
Copy link
Contributor

@sbernauer : sorry I might need your email to invite to apache hudi's slack workspace.

@sbernauer
Copy link
Contributor

No problem. It is bernauerse@web.de

@nsivabalan
Copy link
Contributor

nsivabalan commented May 25, 2021

@sbernauer @giaosudau @dirksan28 @sathyaprakashg : There are quite a few flows or use-cases in general wrt schema evolution. Would you mind helping us explain your use-case.

Let me call out few of them :

  1. Existing hudi table is in schema1 with 3 cols and you are trying to ingest new batch with schema2 with 4 cols.
  2. Existing hudi table is in schema2 with 4 cols (after schema got evolved from schema1). new batch of ingest has records in old schema(schema1).
    For both (1) and (2), there could be different flows in deltastreamer.
    a. no transformer and no schema provider.
    b. no transformer and user provides a schema provider with non null target schema.
    c. no transformer and user provides a schema provider with NULL target schema.
    d. has transformer and no schema provider.
    e. has transformer and user provides a schema provider with non null target schema.
    f. has transformer and user provides a schema provider with NULL target schema.

Can you call out if your use case is 1a or 2e etc. Patch we have put up solves most of the above use-cases, but we would like to better understand whats exactly your use-case is. And simple schema evolution of cases of 1a, 1b should already work in hudi w/o any fix.
If your use-case does not belong to any of the above categories, do help us explain so that we can work towards a fix.

@sbernauer
Copy link
Contributor

sbernauer commented May 25, 2021

Hi @nsivabalan,

we have multiple schema versions of the events we consume. We use kafka and Confluent Schema Registry. I think all the events in kafka are written with schema version 9.
My testcase would be to read some Events with schema version 8, switch to schema version 9 and consume some evolved Events. We use a COW Table and INSERTs only (with dropping of duplicates) and no transformation (for most of our applications).

With the patch in #2927 starting from an empty directory the ingestion throws this exception in the executors. Reading with schema version 9 works fine.

schemaRegistryUrl: https://eventbus-schema-bs-qa.server.lan/subjects/MyEvent-v1/versions/8
# Sets
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
# and
curl --silent $SCHEMA_REGISTRY_URL | jq -r -c '.schema' | jq '.' > /tmp/schema_source.json
cp /tmp/schema_source.json /tmp/schema_target.json

21/05/25 14:45:55 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=sourceEventHeader.happenedTimestamp:1621953763077,sourceEventHeader.eventId:143d1259-01c2-4346-a3c4-85b2e3325ff3 partitionPath=2021/05/25}, currentLocation='null', newLocation='null'}
 java.lang.ArrayIndexOutOfBoundsException: 22
        at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
        at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
        at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
        at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:136)
        at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:126)
        at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:69)
        at org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:88)
        at org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:101)
        at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190)
        at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
        at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

The schema difference. The field is nested multiple times.

$ curl https://eventbus-schema-bs-qa.server.lan/subjects/MyEvent-v1/versions/8 | jq -r '.schema' | jq > 8
$ curl https://eventbus-schema-bs-qa.server.lan/subjects/MyEvent-v1/versions/9 | jq -r '.schema' | jq > 9

$ diff -U 5 8 9
--- 8   2021-05-25 16:51:21.416603077 +0200
+++ 9   2021-05-25 16:51:25.072629744 +0200
@@ -326,10 +326,22 @@
                 "type": "string",
                 "avro.java.string": "String"
               }
             },
             "doc": "* List of optional claim names"
+          },
+          {
+            "name": "voluntary",
+            "type": {
+              "type": "array",
+              "items": {
+                "type": "string",
+                "avro.java.string": "String"
+              }
+            },
+            "doc": "* List of voluntary claim names",
+            "default": []
           }
         ],
         "version": "1.0.0"
       },
       "doc": "* Info about the requested claims"

@nsivabalan
Copy link
Contributor

@sbernauer : based on our offline sync up. I guess you are explicitly setting target schema to old schema even though Hudi's table schema got evolved to new schema. not sure if hudi can do much here. If you hadn't set any target schema or set to RowBased, hudi could try to evolve your old records to latest table schema.

@nsivabalan
Copy link
Contributor

Just to update everyone in the thread.
Regular schema evolution (adding new columns) does not have any issues w/ deltastreamer. This is not so common use-case where in, once hudi's schema got evolved, users wish to route records w/ older schema and expects hudi to handle them. The patch that have been put up addresses this particular issue. Wanted to clarify with everyone that basic schema evolution is working as expected with deltastreamer.

@vinothchandar
Copy link
Member

@nsivabalan whats the next step for this pr

@hudi-bot
Copy link

hudi-bot commented Jun 27, 2021

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run travis re-run the last Travis build
  • @hudi-bot run azure re-run the last Azure build

@nsivabalan
Copy link
Contributor

From my sync up with @sbernauer, #2927 and #3111 are required. 3111 is already merged, and 2927 is up for review.
We can close this one though 2012 as 2927 is a replacement.

@vinothchandar
Copy link
Member

can we close this, given #2927 is landed?

PR Tracker Board automation moved this from Ready for Review to Done Sep 7, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:critical production down; pipelines stalled; Need help asap. schema-and-data-types
Projects
Development

Successfully merging this pull request may close these issues.

None yet