Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public int[] projection(List<String> projectedFieldNames) {
}

public TableSchema project(@Nullable List<String> writeCols) {
if (writeCols == null || writeCols.isEmpty()) {
if (writeCols == null) {
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private int offsetInRow(int pos) {

@Override
public boolean isNullAt(int pos) {
if (rowOffsets[pos] == -1) {
if (rowOffsets[pos] < 0) {
return true;
}
return chooseArray(pos).isNullAt(offsetInRow(pos));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void setRowKind(RowKind kind) {

@Override
public boolean isNullAt(int pos) {
if (rowOffsets[pos] == -1) {
if (rowOffsets[pos] < 0) {
return true;
}
return chooseRow(pos).isNullAt(offsetInRow(pos));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.operation;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryArray;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
Expand All @@ -30,14 +31,14 @@
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.stats.SimpleStatsEvolution;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.table.source.DataEvolutionSplitGenerator;
import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.SnapshotManager;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;

/** {@link FileStoreScan} for data-evolution enabled table. */
Expand Down Expand Up @@ -98,15 +99,20 @@ protected List<ManifestEntry> postFilter(List<ManifestEntry> entries) {

private boolean filterByStats(List<ManifestEntry> metas) {
long rowCount = metas.get(0).file().rowCount();
SimpleStatsEvolution.Result evolutionResult = evolutionStats(metas);
SimpleStatsEvolution.Result evolutionResult =
evolutionStats(schema, this::scanTableSchema, metas);
return inputFilter.test(
rowCount,
evolutionResult.minValues(),
evolutionResult.maxValues(),
evolutionResult.nullCounts());
}

private SimpleStatsEvolution.Result evolutionStats(List<ManifestEntry> metas) {
@VisibleForTesting
static SimpleStatsEvolution.Result evolutionStats(
TableSchema schema,
Function<Long, TableSchema> scanTableSchema,
List<ManifestEntry> metas) {
int[] allFields = schema.fields().stream().mapToInt(DataField::id).toArray();
int fieldsCount = schema.fields().size();
int[] rowOffsets = new int[fieldsCount];
Expand All @@ -127,31 +133,43 @@ private SimpleStatsEvolution.Result evolutionStats(List<ManifestEntry> metas) {

for (int i = 0; i < metas.size(); i++) {
DataFileMeta fileMeta = metas.get(i).file();

TableSchema dataFileSchema =
scanTableSchema(fileMeta.schemaId())
.project(
fileMeta.valueStatsCols() == null
? fileMeta.writeCols()
: fileMeta.valueStatsCols());
scanTableSchema.apply(fileMeta.schemaId()).project(fileMeta.writeCols());

TableSchema dataFileSchemaWithStats = dataFileSchema.project(fileMeta.valueStatsCols());

int[] fieldIds =
SpecialFields.rowTypeWithRowTracking(dataFileSchema.logicalRowType())
.getFields().stream()
dataFileSchema.logicalRowType().getFields().stream()
.mapToInt(DataField::id)
.toArray();

int count = 0;
int[] fieldIdsWithStats =
dataFileSchemaWithStats.logicalRowType().getFields().stream()
.mapToInt(DataField::id)
.toArray();

loop1:
for (int j = 0; j < fieldsCount; j++) {
if (rowOffsets[j] != -1) {
continue;
}
int targetFieldId = allFields[j];
for (int fieldId : fieldIds) {
if (allFields[j] == fieldId) {
// TODO: If type not match (e.g. int -> string), we need to skip this, set
// rowOffsets[j] = -1 always. (may -2, after all, set it back to -1)
// Because schema evolution may happen to change int to string or something
// like that.
if (rowOffsets[j] == -1) {
rowOffsets[j] = i;
fieldOffsets[j] = count++;
if (targetFieldId == fieldId) {
for (int k = 0; k < fieldIdsWithStats.length; k++) {
if (fieldId == fieldIdsWithStats[k]) {
// TODO: If type not match (e.g. int -> string), we need to skip
// this, set rowOffsets[j] = -1 always. (may -2, after all, set it
// back to -1) Because schema evolution may happen to change int to
// string or something like that.
rowOffsets[j] = i;
fieldOffsets[j] = k;
continue loop1;
}
}
break;
rowOffsets[j] = -2;
continue loop1;
}
}
}
Expand Down
Loading