Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.table.sink.BatchWriteBuilder;
Expand All @@ -64,6 +65,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -219,6 +221,10 @@ private static boolean buildTopology(
indexColumn,
maxIndexedRowId);

long minNonIndexableRowId =
findMinNonIndexableRowId(table.schemaManager(), entries, indexColumn);
entries = filterEntriesBefore(entries, minNonIndexableRowId);

RowType rowType = table.rowType();
DataField indexField = rowType.getField(indexColumn);
// Project indexColumn + _ROW_ID so we can read the actual row ID from data
Expand Down Expand Up @@ -292,6 +298,49 @@ private static boolean buildTopology(
return true;
}

/**
* Find the minimum firstRowId among files whose schema does not contain the index column. Files
* at or beyond this rowId cannot be indexed because the column was added later via ALTER TABLE.
*
* @return the boundary rowId, or {@link Long#MAX_VALUE} if all files contain the column
*/
static long findMinNonIndexableRowId(
SchemaManager schemaManager, List<ManifestEntry> entries, String indexColumn) {
Map<Long, Boolean> schemaContainsColumn = new HashMap<>();
long minRowId = Long.MAX_VALUE;
for (ManifestEntry entry : entries) {
long sid = entry.file().schemaId();
boolean contains =
schemaContainsColumn.computeIfAbsent(
sid, id -> schemaManager.schema(id).fieldNames().contains(indexColumn));
if (!contains && entry.file().firstRowId() != null) {
minRowId = Math.min(minRowId, entry.file().nonNullFirstRowId());
}
}
return minRowId;
}

/** Keep only entries whose firstRowId is strictly less than the given boundary. */
static List<ManifestEntry> filterEntriesBefore(
List<ManifestEntry> entries, long boundaryRowId) {
if (boundaryRowId == Long.MAX_VALUE) {
return entries;
}
List<ManifestEntry> result = new ArrayList<>();
for (ManifestEntry entry : entries) {
if (entry.file().firstRowId() != null
&& entry.file().nonNullFirstRowId() < boundaryRowId) {
result.add(entry);
}
}
LOG.info(
"Filtered {} files at or beyond rowId {}, {} files remain.",
entries.size() - result.size(),
boundaryRowId,
result.size());
return result;
}

/**
* Compute shard tasks for a full build (no rows to skip).
*
Expand Down Expand Up @@ -577,7 +626,16 @@ public void processElement(StreamRecord<ShardTask> element) throws Exception {
}
// Only write rows within this shard's range
if (currentRowId >= task.shardRange.from) {
indexWriter.write(indexFieldGetter.getFieldOrNull(row));
Object fieldData = indexFieldGetter.getFieldOrNull(row);
if (fieldData == null) {
LOG.info(
"Null vector at rowId={}, stopping shard [{}, {}].",
currentRowId,
task.shardRange.from,
task.shardRange.to);
break;
}
indexWriter.write(fieldData);
rowsWritten++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.paimon.io.PojoDataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.FileStorePathFactory;
Expand All @@ -36,6 +38,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
Expand Down Expand Up @@ -450,8 +453,64 @@ void testIncrementalFileStartsAfterEffectiveStart() {
assertThat(tasks.get(1).shardRange).isEqualTo(new Range(400, 499));
}

@Test
void testAppendFilterOldFilesBeforeNewFiles() {
// Typical append: write file0[0,99](schema1), file1[100,199](schema1),
// then file2[200,299](schema0) arrives (old schema).
// Boundary = 200, keep files with firstRowId < 200.
SchemaManager schemaManager = mock(SchemaManager.class);
TableSchema oldSchema = mock(TableSchema.class);
TableSchema newSchema = mock(TableSchema.class);
when(schemaManager.schema(0L)).thenReturn(oldSchema);
when(schemaManager.schema(1L)).thenReturn(newSchema);
when(oldSchema.fieldNames()).thenReturn(Arrays.asList("id", "name"));
when(newSchema.fieldNames()).thenReturn(Arrays.asList("id", "name", "vec"));

List<ManifestEntry> entries = new ArrayList<>();
entries.add(createEntryWithSchemaId(BinaryRow.EMPTY_ROW, 0L, 100, 1L));
entries.add(createEntryWithSchemaId(BinaryRow.EMPTY_ROW, 100L, 100, 1L));
entries.add(createEntryWithSchemaId(BinaryRow.EMPTY_ROW, 200L, 100, 0L));

List<ManifestEntry> result =
GenericIndexTopoBuilder.filterEntriesBefore(
entries,
GenericIndexTopoBuilder.findMinNonIndexableRowId(
schemaManager, entries, "vec"));

assertThat(result).hasSize(2);
assertThat(result.get(0).file().nonNullFirstRowId()).isEqualTo(0L);
assertThat(result.get(1).file().nonNullFirstRowId()).isEqualTo(100L);
}

// -- Helpers --

private static ManifestEntry createEntryWithSchemaId(
BinaryRow partition, Long firstRowId, long rowCount, long schemaId) {
PojoDataFileMeta file =
new PojoDataFileMeta(
"test-file-" + UUID.randomUUID(),
1024L,
rowCount,
BinaryRow.EMPTY_ROW,
BinaryRow.EMPTY_ROW,
SimpleStats.EMPTY_STATS,
SimpleStats.EMPTY_STATS,
0L,
0L,
schemaId,
0,
Collections.emptyList(),
null,
null,
null,
null,
null,
null,
firstRowId,
null);
return ManifestEntry.create(FileKind.ADD, partition, 0, 1, file);
}

private static ManifestEntry createEntry(BinaryRow partition, Long firstRowId, long rowCount) {
PojoDataFileMeta file =
new PojoDataFileMeta(
Expand Down