From 8ce6fe18c97e8437e1eac1b174236eae0cca2ecf Mon Sep 17 00:00:00 2001 From: Juntao Zhang Date: Wed, 25 Feb 2026 10:11:57 +0800 Subject: [PATCH] [core] Fix ChainSplit NPE after branch table cache invalidation --- .../apache/paimon/catalog/CachingCatalog.java | 7 ++++ .../paimon/spark/SparkChainTableITCase.java | 35 +++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index 9dfb32e88827..aad5d16a11e0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -324,6 +324,13 @@ public void invalidateTable(Identifier identifier) { if (partitionCache != null) { partitionCache.invalidate(identifier); } + // clear all branches of this table + for (Identifier i : tableCache.asMap().keySet()) { + if (identifier.getTableName().equals(i.getTableName()) + && identifier.getDatabaseName().equals(i.getDatabaseName())) { + tableCache.invalidate(i); + } + } } // ================================== Cache Public API diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java index 5d14f0a3cb64..2465a925d4d8 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java @@ -754,4 +754,39 @@ public void testDropSnapshotPartition(@TempDir java.nio.file.Path tempDir) throw spark.close(); } + + @Test + public void testChainTableCacheInvalidation(@TempDir java.nio.file.Path tempDir) + throws IOException { + Path warehousePath = new Path("file:" + tempDir.toString()); + SparkSession.Builder builder = createSparkSessionBuilder(warehousePath); + SparkSession spark = builder.getOrCreate(); + spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); + spark.sql("USE spark_catalog.my_db1"); + spark.sql( + "CREATE TABLE chain_test_t (" + + " `t1` string ," + + " `t2` string ," + + " `t3` string" + + ") PARTITIONED BY (`date` string)" + + "TBLPROPERTIES (" + + " 'chain-table.enabled' = 'true'" + + " ,'primary-key' = 'date,t1'" + + " ,'sequence.field' = 't2'" + + " ,'bucket-key' = 't1'" + + " ,'bucket' = '1'" + + " ,'partition.timestamp-pattern' = '$date'" + + " ,'partition.timestamp-formatter' = 'yyyyMMdd'" + + ")"); + setupChainTableBranches(spark, "chain_test_t"); + spark.sql( + "insert overwrite `chain_test_t$branch_delta` partition (date = '20260224') values ('1', '1', '1');"); + assertThat( + spark.sql("SELECT * FROM `chain_test_t`").collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder("[1,1,1,20260224]"); + spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test_t`;"); + spark.close(); + } }