Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datasource Writer throws error on resolving struct fields #1034

Closed
alphairys opened this issue Nov 20, 2019 · 6 comments
Closed

Datasource Writer throws error on resolving struct fields #1034

alphairys opened this issue Nov 20, 2019 · 6 comments

Comments

@alphairys
Copy link

Issue

I have a dataframe with the following schema that I would like to write out as a hudi table.

Schema

root
 |-- deviceId: string (nullable = true)
 |-- eventTimeMilli: long (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)

Write out to hudi table:

simple.write.format("org.apache.hudi")
       .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "deviceId")
       .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "eventTimeMilli")
       .option(HoodieWriteConfig.TABLE_NAME, hudiTableName)
       .mode(SaveMode.Append)
       .save(hudiTablePath)

Running this, i get the following error:

Caused by: org.apache.avro.UnresolvedUnionException: Not in union [{"type":"record","name":"location","namespace":"hoodie.hudi_test.hudi_test_record","fields":[{"name":"latitude","type":["double","null"]},{"name":"longitude","type":["double","null"]}]},"null"]: {"latitude": 34.7027, "longitude": 137.54862}
        at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:740)
        at ......

Full Error Log

TaskSetManager: Task 0 in stage 8.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 4 times, most recent failure: Lost task 0.3 in stage 8.0 (TID 243, ip-172-31-23-108.ec2.internal, executor 39): java.io.IOException: Could not create payload for class: org.apache.hudi.OverwriteWithLatestAvroPayload
        at org.apache.hudi.DataSourceUtils.createPayload(DataSourceUtils.java:129)
        at org.apache.hudi.DataSourceUtils.createHoodieRecord(DataSourceUtils.java:178)
        at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$1.apply(HoodieSparkSqlWriter.scala:91)
        at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$1.apply(HoodieSparkSqlWriter.scala:88)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:394)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at scala.collection.AbstractIterator.to(Iterator.scala:1334)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieException: Unable to instantiate class
        at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:75)
        at org.apache.hudi.DataSourceUtils.createPayload(DataSourceUtils.java:126)
        ... 28 more
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73)
        ... 29 more
Caused by: org.apache.avro.UnresolvedUnionException: Not in union [{"type":"record","name":"location","namespace":"hoodie.hudi_test.hudi_test_record","fields":[{"name":"latitude","type":["double","null"]},{"name":"longitude","type":["double","null"]}]},"null"]: {"latitude": 34.7027, "longitude": 137.54862}
        at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:740)
        at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:205)
        at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:123)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
        at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
        at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
        at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
        at org.apache.hudi.common.util.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:72)
        at org.apache.hudi.BaseAvroPayload.<init>(BaseAvroPayload.java:49)
        at org.apache.hudi.OverwriteWithLatestAvroPayload.<init>(OverwriteWithLatestAvroPayload.java:43)
        ... 34 more


@alphairys alphairys changed the title Datasource writer throws Working with nested fields Datasource Writer throws error on resolving struct fields Nov 20, 2019
@lamberken
Copy link
Member

lamberken commented Nov 21, 2019

hi, @alphairys, according to your describion, I can not get the error. My steps are bellow.

export SPARK_HOME=/work/BigData/install/spark/spark-2.3.3-bin-hadoop2.6
${SPARK_HOME}/bin/spark-shell --packages org.apache.hudi:hudi-spark-bundle:0.5.0-incubating --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

val tableName = "tmp"
val basePath = "file:///tmp/tmp"

var datas = List("{ \"deviceId\": \"aaaaa\", \"eventTimeMilli\": 1574297893836, \"location\": { \"latitude\": 2.5, \"longitude\": 3.5 }}");
val df = spark.read.json(spark.sparkContext.parallelize(datas, 2))
df.write.format("org.apache.hudi").
    option(RECORDKEY_FIELD_OPT_KEY, "deviceId").
    option(PRECOMBINE_FIELD_OPT_KEY, "eventTimeMilli").
    option(TABLE_NAME, tableName).
    mode(Append).
    save(basePath);

@alphairys
Copy link
Author

@lamber-ken , thanks for looking into this. Turns out the bundled hudi package with EMR is the source of this. I was following the instructions here: https://aws.amazon.com/blogs/aws/new-insert-update-delete-data-on-s3-with-amazon-emr-and-apache-hudi/

Launching spark-shell with the latest package as you did (with --packages org.apache.hudi:hudi-spark-bundle:0.5.0-incubating --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' ) worked.

@rbhartia
Copy link

@alphairys - Would you mind dropping a mail to emr-hudi@amazon.com or rbhartia@amazon.com with the steps to reproduce this problem on EMR?

@rbhartia
Copy link

@alphairys - We were able to reproduce the problem and see that it is happening due to the struct type in there. Most likely it is a function of EMR using - Spark 2.4 with spark-avro library. Will update you back on this thread once we have a fix for the same. Thank you

@alphairys
Copy link
Author

@rbhartia , thanks for the update. Will be on the lookout for the fix.

@vinothchandar
Copy link
Member

This has been landed and verified!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants