Skip to content

[spark] support merge-read between kv snapshot and log for primary-key table#2523

Merged
wuchong merged 4 commits intoapache:mainfrom
YannByron:main-spark
Feb 1, 2026
Merged

[spark] support merge-read between kv snapshot and log for primary-key table#2523
wuchong merged 4 commits intoapache:mainfrom
YannByron:main-spark

Conversation

@YannByron
Copy link
Copy Markdown
Contributor

Purpose

Linked issue: close #2427

Brief change log

Tests

API and Format

Documentation

@YannByron
Copy link
Copy Markdown
Contributor Author

@wuchong @Yohahaha please review this.

Copy link
Copy Markdown
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YannByron thanks for the contribution.

I rebased the branch and appended a commit to address my minor comments. Will merge it if you don't have concerns.


private def createSortMergeReader(): SortMergeReader = {
// Create key encoder for primary keys
val keyEncoder = encode.KeyEncoder.of(rowType, tableInfo.getPhysicalPrimaryKeys, null)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the latest main branch, we’ve refactored KeyEncoder.
Could you please rebase onto the latest main and use the KeyEncoder.ofPrimaryKey(...) method? Otherwise, the key encoding won’t align with the keys stored in RocksDB, leading to incorrect query results.

3a803df

public SortMergeReader(
@Nullable int[] projectedFields,
int[] pkIndexes,
@Nullable CloseableIterator<LogRecord> lakeRecordIterator,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you rename this parameter and the member variable lakeRecordIterator to snapshotRecordIterator? This can better reflect the usage of Spark.

@wuchong wuchong merged commit d3a935f into apache:main Feb 1, 2026
6 checks passed
}

// Collect all log records until logStoppingOffset
val allLogRecords = mutable.ArrayBuffer[ScanRecord]()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to fetch by size to avoid OOM when log store has huge records.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yohahaha However, we need to sort the changelog, which requires buffering all changelog. Therefore, fetching by size doesn't make much sense in this context. If the changelog is truly huge, we may need to consider supporting spilling the changelog buffer to local disk.

@Yohahaha
Copy link
Copy Markdown
Contributor

Yohahaha commented Feb 2, 2026

@YannByron I found a bug while testing reading PK tables, it fails when using the last column as the primary key, current cases in SparkPrimaryKeyTableReadTest all use first column and partition column as primary key.

test("Spark Read: primary key table with last pk") {
    withTable("t") {
      sql("CREATE TABLE t (id int, name string, pk int, pk2 string) TBLPROPERTIES('primary.key'='pk,pk2')")
      checkAnswer(sql("SELECT * FROM t"), Nil)
      sql("INSERT INTO t VALUES (1, 'a', 10, 'x'), (2, 'b', 20, 'y')")
      checkAnswer(sql("SELECT * FROM t ORDER BY id"), Row(1, "a", 10, "x") :: Row(2, "b", 20, "y") :: Nil)
    }
  }

above case will failed with

Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 4) (192.168.0.116 executor driver): java.lang.ArrayIndexOutOfBoundsException: Index 2 out of bounds for length 2
	at org.apache.fluss.row.ProjectedRow.getInt(ProjectedRow.java:90)
	at org.apache.fluss.row.InternalRow.lambda$createFieldGetter$ff31e09f$6(InternalRow.java:198)
	at org.apache.fluss.row.encode.CompactedKeyEncoder.encodeKey(CompactedKeyEncoder.java:83)
	at org.apache.fluss.spark.read.FlussUpsertPartitionReader$$anon$1.compare(FlussUpsertPartitionReader.scala:113)
	at org.apache.fluss.spark.read.FlussUpsertPartitionReader$$anon$1.compare(FlussUpsertPartitionReader.scala:111)
	at org.apache.fluss.spark.utils.LogChangesIterator.hasSamePrimaryKey(LogChangesIterator.scala:117)
	at org.apache.fluss.spark.utils.LogChangesIterator.hasNext(LogChangesIterator.scala:85)
	at org.apache.fluss.client.table.scanner.SortMergeReader.readBatch(SortMergeReader.java:90)
	at org.apache.fluss.spark.read.FlussUpsertPartitionReader.initialize(FlussUpsertPartitionReader.scala:217)
	at org.apache.fluss.spark.read.FlussUpsertPartitionReader.<init>(FlussUpsertPartitionReader.scala:86)
	at org.apache.fluss.spark.read.FlussUpsertPartitionReaderFactory.createReader(FlussPartitionReaderFactory.scala:61)

@wuchong
Copy link
Copy Markdown
Member

wuchong commented Feb 4, 2026

@YannByron I found a bug while testing reading PK tables, it fails when using the last column as the primary key, current cases in SparkPrimaryKeyTableReadTest all use first column and partition column as primary key.

test("Spark Read: primary key table with last pk") {
    withTable("t") {
      sql("CREATE TABLE t (id int, name string, pk int, pk2 string) TBLPROPERTIES('primary.key'='pk,pk2')")
      checkAnswer(sql("SELECT * FROM t"), Nil)
      sql("INSERT INTO t VALUES (1, 'a', 10, 'x'), (2, 'b', 20, 'y')")
      checkAnswer(sql("SELECT * FROM t ORDER BY id"), Row(1, "a", 10, "x") :: Row(2, "b", 20, "y") :: Nil)
    }
  }

above case will failed with

Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 4) (192.168.0.116 executor driver): java.lang.ArrayIndexOutOfBoundsException: Index 2 out of bounds for length 2
	at org.apache.fluss.row.ProjectedRow.getInt(ProjectedRow.java:90)
	at org.apache.fluss.row.InternalRow.lambda$createFieldGetter$ff31e09f$6(InternalRow.java:198)
	at org.apache.fluss.row.encode.CompactedKeyEncoder.encodeKey(CompactedKeyEncoder.java:83)
	at org.apache.fluss.spark.read.FlussUpsertPartitionReader$$anon$1.compare(FlussUpsertPartitionReader.scala:113)
	at org.apache.fluss.spark.read.FlussUpsertPartitionReader$$anon$1.compare(FlussUpsertPartitionReader.scala:111)
	at org.apache.fluss.spark.utils.LogChangesIterator.hasSamePrimaryKey(LogChangesIterator.scala:117)
	at org.apache.fluss.spark.utils.LogChangesIterator.hasNext(LogChangesIterator.scala:85)
	at org.apache.fluss.client.table.scanner.SortMergeReader.readBatch(SortMergeReader.java:90)
	at org.apache.fluss.spark.read.FlussUpsertPartitionReader.initialize(FlussUpsertPartitionReader.scala:217)
	at org.apache.fluss.spark.read.FlussUpsertPartitionReader.<init>(FlussUpsertPartitionReader.scala:86)
	at org.apache.fluss.spark.read.FlussUpsertPartitionReaderFactory.createReader(FlussPartitionReaderFactory.scala:61)

Thank you @Yohahaha , could you open a pull request to add and fix the test case?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Spark] Support union read that can combine snapshot data and change-log data

3 participants