Skip to content

[SUPPORT] Hudi Deltastreamer CSV ingestion issue  #6404

@ankitchandnani

Description

@ankitchandnani

Tips before filing an issue

Have you gone through our FAQs? Yes

Describe the problem you faced

Below is a sample chunk from a csv that is being ingested through Hudi Deltastreamer 0.9.
+---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----+
00037BAC6|00037BAF9|100|91.8886010561736|66.1986127558789|4|99.9443005280868|2022-06-21
00037BAC6|00077TAA2|32.3719958216579|67.2034832818589|0|0|49.7877395517584|2022-06-21
00037BAC6|00080QAF2|63.7767687043239|96.1682614803625|38.2990550305725|2|81.9725150923432|2022-06-21
00037BAC6|00081TAK4|54.0624638691505|71.8352439553422|8.21984435797665|1|63.9488539122463|2022-06-21
00037BAC6|00084DAL4|64.8087299031953|91.2979645415028|56.1237724661849|4|82.053347222349|2022-06-21

+---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----+

The csv file does not have headers in it but I am providing it through an avro schema separately . The headers are provided in the following format in source and target schema files:

{
"type":"record",
"name":"data",
"fields":[
{
"name": "field1",
"type": ["string","null"]
}, {
"name": "field2",
"type": ["string","null"]
},{
"name": "field3",
"type":["string","null"]
}, {
"name": "field4",
"type": ["string","null"]
}, {
"name": "field5",
"type": ["string","null"]
}, {
"name": "field6",
"type": ["string","null"]
}, {
"name": "field7",
"type": ["string","null"]
},{
"name": "date",
"type": ["string","null"]
}
]}

Below is the properties for Hudi deltastreamer:

hoodie.datasource.write.recordkey.field=field1,field2
hoodie.datasource.hive_sync.partition_fields=date
hoodie.datasource.write.partitionpath.field=date
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.datasource.hive_sync.table=TABLE1
hoodie.datasource.hive_sync.enable=true
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.parquet.small.file.limit=134217728
hoodie.parquet.max.file.size=268435456
hoodie.cleaner.commits.retained=10
hoodie.deltastreamer.transformer.sql=select 1==2 AS _hoodie_is_deleted, 'I' as Op, * from
hoodie.datasource.hive_sync.support_timestamp=false
hoodie.bloom.index.filter.type=DYNAMIC_V0

When I try ingestion the csv (without headers) using --hoodie-conf hoodie.deltastreamer.csv.header=false, I receive the below error in stacktrace. But if the csv.header = true and I add the headers manually at the top of the csv file, then the ingestion works successfully.

Stacktrace:

22/08/15 18:27:06 INFO Client: 
	 client token: N/A
	 diagnostics: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 6, ip-10-72-3-64.ec2.internal, executor 1): org.apache.hudi.exception.HoodieKeyException: recordKey values: "field1:__null__,field2:__null__" for fields: [field1, field2] cannot be entirely null or empty.
	at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:109)
	at org.apache.hudi.keygen.ComplexAvroKeyGenerator.getRecordKey(ComplexAvroKeyGenerator.java:43)
	at org.apache.hudi.keygen.ComplexKeyGenerator.getRecordKey(ComplexKeyGenerator.java:49)
	at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:62)
	at org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$readFromSource$d62e16$1(DeltaSync.java:448)
	at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:394)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at scala.collection.AbstractIterator.to(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2151)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2151)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2171)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2159)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2158)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2158)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1011)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1011)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1011)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2419)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2368)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2357)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:822)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2111)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2132)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2151)
	at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1409)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1382)
	at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1517)
	at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1517)
	at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1517)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1516)
	at org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544)
	at org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
	at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:472)
	at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:303)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:186)
	at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:184)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:513)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:688)

Expected behavior

CSV file to be ingestion using Deltastreamer when providing AVRO schema separately.

Environment Description

Hudi version : 0.9
Spark version : Spark 2.4.8
Hive version : Hive 2.3.9
Hadoop version : AMZ 2.10.1
Storage (HDFS/S3/GCS..) : S3
Running on Docker? (yes/no) : no

Thanks for the help in advance!

Metadata

Metadata

Assignees

Labels

area:ingestIngestion into Hudipriority:highSignificant impact; potential bugs

Type

No type

Projects

Status

✅ Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions