diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index d6f073fd2cff..dddc8ca43996 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.fs.Path; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.options.Options; @@ -44,6 +45,9 @@ import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SegmentsCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -60,6 +64,8 @@ */ public class FallbackReadFileStoreTable extends DelegatedFileStoreTable { + private static final Logger LOG = LoggerFactory.getLogger(FallbackReadFileStoreTable.class); + private final FileStoreTable fallback; public FallbackReadFileStoreTable(FileStoreTable wrapped, FileStoreTable fallback) { @@ -397,10 +403,17 @@ public RecordReader createReader(Split split) throws IOException { DataSplit dataSplit = (DataSplit) split; if (!dataSplit.dataFiles().isEmpty() && dataSplit.dataFiles().get(0).minKey().getFieldCount() > 0) { - return fallbackRead.createReader(split); - } else { - return mainRead.createReader(split); + try { + return fallbackRead.createReader(split); + } catch (Exception ignored) { + LOG.error( + "Reading from fallback branch has problems for files: {}", + dataSplit.dataFiles().stream() + .map(DataFileMeta::fileName) + .collect(Collectors.joining(", "))); + } } + return mainRead.createReader(split); } } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index 7369ee4b2aff..bc886eb83b21 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -311,11 +311,11 @@ public void testFallbackBranchBatchRead() throws Exception { public void testCrossPartitionFallbackBranchBatchRead() throws Exception { sql( "CREATE TABLE t ( pk INT PRIMARY KEY NOT ENFORCED, name STRING, dt STRING ) PARTITIONED BY (dt) WITH ( 'bucket' = '-1' )"); - sql( - "INSERT INTO t VALUES (1, 'Jack', '20250227'), (1, 'Jackson', '20250227'), (2, 'Sam', '20250228')"); sql("CALL sys.create_branch('default.t', 'stream')"); sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'stream' )"); + sql( + "INSERT INTO t VALUES (1, 'Jack', '20250227'), (1, 'Jackson', '20250227'), (2, 'Sam', '20250228')"); sql( "INSERT INTO `t$branch_stream` VALUES (1, 'John Stream', '20250228'), (3, 'Rick Stream', '20250301')"); assertThat(collectResult("SELECT pk, name, dt FROM t order by dt"))