Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-1720] when query incr view of mor table which has many delete records use sparksql/hive-beeline, StackOverflowError #2721

Merged
merged 1 commit into from Apr 13, 2021

Conversation

xiarixiaoyao
Copy link
Contributor

Tips

What is the purpose of the pull request

Fix the StackOverflowError on [HUDI-1720]

now RealtimeCompactedRecordReader.next deal with delete records by recursion, see:


however when the log file contains many delete record, the logcial of RealtimeCompactedRecordReader.next will lead stackOverflowError

we can use Loop instead of recursion。

Brief change log

(for example:)

  • Modify AnnotationLocation checkstyle rule in checkstyle.xml

Verify this pull request

Manually verified the change by running a job locally

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

…ecords use sparksql/hive-beeline, StackOverflowError
@xiarixiaoyao
Copy link
Contributor Author

xiarixiaoyao commented Mar 25, 2021

test step:

before patch:

step1:

val df = spark.range(0, 1000000).toDF("keyid")
.withColumn("col3", expr("keyid + 10000000"))
.withColumn("p", lit(0))
.withColumn("p1", lit(0))
.withColumn("p2", lit(7))
.withColumn("a1", lit(Array[String] ("sb1", "rz")))
.withColumn("a2", lit(Array[String] ("sb1", "rz")))

// bulk_insert 100w row (keyid from 0 to 1000000)

merge(df, 4, "default", "hive_9b", DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "bulk_insert")

step2:

val df = spark.range(0, 900000).toDF("keyid")
.withColumn("col3", expr("keyid + 10000000"))
.withColumn("p", lit(0))
.withColumn("p1", lit(0))
.withColumn("p2", lit(7))
.withColumn("a1", lit(Array[String] ("sb1", "rz")))
.withColumn("a2", lit(Array[String] ("sb1", "rz")))

// delete 90w row (keyid from 0 to 900000)

delete(df, 4, "default", "hive_9b")

step3:

query on beeline/spark-sql : select count(col3) from hive_9b_rt
2021-03-25 15:33:29,029 | INFO | main | RECORDS_OUT_OPERATOR_RS_3:1, RECORDS_OUT_INTERMEDIATE:1, | Operator.java:10382021-03-25 15:33:29,029 | INFO | main | RECORDS_OUT_OPERATOR_RS_3:1, RECORDS_OUT_INTERMEDIATE:1, | Operator.java:10382021-03-25 15:33:29,029 | ERROR | main | Error running child : java.lang.StackOverflowError at org.apache.parquet.bytes.BytesUtils.readIntLittleEndian(BytesUtils.java:83) at org.apache.parquet.column.values.plain.BinaryPlainValuesReader.readBytes(BinaryPlainValuesReader.java:39) at org.apache.parquet.column.impl.ColumnReaderBase$2$6.read(ColumnReaderBase.java:344) at org.apache.parquet.column.impl.ColumnReaderBase.readValue(ColumnReaderBase.java:503) at org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:30) at org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:409) at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30) at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406) at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:226) at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207) at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.next(ParquetRecordReaderWrapper.java:159) at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.next(ParquetRecordReaderWrapper.java:41) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:84) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)

After patch:
select count(col3) from hive_9b_rt
+---------+
| _c0 |
+---------+
| 100000 |
+---------+
step4:
val df = spark.range(900000, 1000000).toDF("keyid")
.withColumn("col3", expr("keyid + 10000000"))
.withColumn("p", lit(0))
.withColumn("p1", lit(0))
.withColumn("p2", lit(7))
.withColumn("a1", lit(Array[String] ("sb1", "rz")))
.withColumn("a2", lit(Array[String] ("sb1", "rz")))
// delete remaining 10W lines
delete(df, 4, "default", "hive_9b")
sparksql/hive-beeline:
select count(col3) from hive_9b_rt;
+------+
| _c0 |
+------+
| 0 |
+------+

delete function:
def delete(df: DataFrame, par: Int, db: String, tableName: String, tableType: String = DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL): Unit = {
df.write.format("hudi").
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType).
option(HoodieCompactionConfig.INLINE_COMPACT_PROP, false).
option(PRECOMBINE_FIELD_OPT_KEY, "col3").
option(RECORDKEY_FIELD_OPT_KEY, "keyid").
option(PARTITIONPATH_FIELD_OPT_KEY, "p,p1,p2").
option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, classOf[ComplexKeyGenerator].getName).
option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete").
option("hoodie.bulkinsert.shuffle.parallelism", par.toString).
option("hoodie.insert.shuffle.parallelism", par.toString).
option("hoodie.upsert.shuffle.parallelism", par.toString).
option("hoodie.delete.shuffle.parallelism", par.toString).
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "p,p1,p2").
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor").
option("hoodie.datasource.hive_sync.support_timestamp", "true").
option(HIVE_STYLE_PARTITIONING_OPT_KEY, "true").
option(HIVE_USE_JDBC_OPT_KEY, "false").
option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, db).
option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName).
option(TABLE_NAME, tableName).mode(Append).save(s"/tmp/${db}/${tableName}")
}

merge function:
def merge(df: org.apache.spark.sql.DataFrame, par: Int, db: String, tableName: String,
tableType: String = DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
hivePartitionExtract: String = "org.apache.hudi.hive.MultiPartKeysValueExtractor", op: String = "upsert"): Unit = {
val mode = if (op.equals("bulk_insert")) {
Overwrite
} else {
Append
}
df.write.format("hudi").
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType).
option(HoodieCompactionConfig.INLINE_COMPACT_PROP, false).
option(PRECOMBINE_FIELD_OPT_KEY, "col3").
option(RECORDKEY_FIELD_OPT_KEY, "keyid").
option(PARTITIONPATH_FIELD_OPT_KEY, "p,p1,p2").
option(DataSourceWriteOptions.OPERATION_OPT_KEY, op).
option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, classOf[ComplexKeyGenerator].getName).
option("hoodie.bulkinsert.shuffle.parallelism", par.toString).
option("hoodie.metadata.enable", "false").
option("hoodie.insert.shuffle.parallelism", par.toString).
option("hoodie.upsert.shuffle.parallelism", par.toString).
option("hoodie.delete.shuffle.parallelism", par.toString).
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "p,p1,p2").
option("hoodie.datasource.hive_sync.support_timestamp", "true").
option(HIVE_STYLE_PARTITIONING_OPT_KEY, "true").
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor").
option(HIVE_USE_JDBC_OPT_KEY, "false").
option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, db).
option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName).
option(TABLE_NAME, tableName).mode(mode).save(s"/tmp/${db}/${tableName}")
}

@xiarixiaoyao
Copy link
Contributor Author

@garyli1019 could you help me review this pr, thanks

@garyli1019 garyli1019 self-assigned this Mar 25, 2021
@codecov-io
Copy link

Codecov Report

Merging #2721 (fb529db) into master (6e803e0) will decrease coverage by 42.32%.
The diff coverage is n/a.

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #2721       +/-   ##
============================================
- Coverage     51.72%   9.40%   -42.33%     
+ Complexity     3601      48     -3553     
============================================
  Files           476      54      -422     
  Lines         22595    1989    -20606     
  Branches       2409     236     -2173     
============================================
- Hits          11687     187    -11500     
+ Misses         9889    1789     -8100     
+ Partials       1019      13     -1006     
Flag Coverage Δ Complexity Δ
hudicli ? ?
hudiclient ? ?
hudicommon ? ?
hudiflink ? ?
hudihadoopmr ? ?
hudisparkdatasource ? ?
hudisync ? ?
huditimelineservice ? ?
hudiutilities 9.40% <ø> (-60.34%) 0.00 <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ Complexity Δ
...va/org/apache/hudi/utilities/IdentitySplitter.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-2.00%)
...va/org/apache/hudi/utilities/schema/SchemaSet.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-3.00%)
...a/org/apache/hudi/utilities/sources/RowSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-4.00%)
.../org/apache/hudi/utilities/sources/AvroSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-1.00%)
.../org/apache/hudi/utilities/sources/JsonSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-1.00%)
...rg/apache/hudi/utilities/sources/CsvDFSSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-10.00%)
...g/apache/hudi/utilities/sources/JsonDFSSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-4.00%)
...apache/hudi/utilities/sources/JsonKafkaSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-6.00%)
...pache/hudi/utilities/sources/ParquetDFSSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-5.00%)
...lities/schema/SchemaProviderWithPostProcessor.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-4.00%)
... and 452 more

@nsivabalan nsivabalan added the priority:critical production down; pipelines stalled; Need help asap. label Mar 30, 2021
@nsivabalan
Copy link
Contributor

@garyli1019 : fix the label as required. (sev:critical or sev:high). Also fix the corresponding jira if applicable.

@nsivabalan nsivabalan added this to In progress in PR Tracker Board Mar 31, 2021
@nsivabalan nsivabalan removed this from In progress in PR Tracker Board Mar 31, 2021
Copy link
Member

@garyli1019 garyli1019 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@garyli1019 garyli1019 requested a review from n3nash April 1, 2021 03:05
@garyli1019 garyli1019 merged commit 65844a8 into apache:master Apr 13, 2021
@xiarixiaoyao xiarixiaoyao deleted the hive_delete branch December 3, 2021 02:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:critical production down; pipelines stalled; Need help asap.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants