Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.util.Collections.singletonList;
import static org.apache.paimon.schema.SchemaEvolutionUtil.createIndexCastMapping;
Expand All @@ -44,7 +47,7 @@ public class SimpleStatsEvolutions {
private final long tableSchemaId;
private final List<DataField> tableDataFields;
private final AtomicReference<List<DataField>> tableFields;
private final ConcurrentMap<Long, SimpleStatsEvolution> evolutions;
private final ConcurrentMap<EvolutionKey, SimpleStatsEvolution> evolutions;

public SimpleStatsEvolutions(Function<Long, List<DataField>> schemaFields, long tableSchemaId) {
this.schemaFields = schemaFields;
Expand All @@ -55,20 +58,37 @@ public SimpleStatsEvolutions(Function<Long, List<DataField>> schemaFields, long
}

public SimpleStatsEvolution getOrCreate(long dataSchemaId) {
return getOrCreate(dataSchemaId, null);
}

public SimpleStatsEvolution getOrCreate(long dataSchemaId, @Nullable List<String> writeCols) {
EvolutionKey key = new EvolutionKey(dataSchemaId, writeCols);
return evolutions.computeIfAbsent(
dataSchemaId,
id -> {
if (tableSchemaId == id) {
key,
k -> {
if (tableSchemaId == k.schemaId && k.writeCols == null) {
return new SimpleStatsEvolution(
new RowType(schemaFields.apply(id)), null, null);
new RowType(schemaFields.apply(k.schemaId)), null, null);
}

// Get atomic schema fields.
List<DataField> schemaTableFields =
tableFields.updateAndGet(v -> v == null ? tableDataFields : v);
List<DataField> dataFields = schemaFields.apply(id);
List<DataField> dataFields = schemaFields.apply(k.schemaId);

// Project data fields to write cols for data evolution table
if (k.writeCols != null) {
RowType rowType = new RowType(dataFields);
// writeCols may contain some metadata fields i.e. row_id & max_seq
dataFields =
rowType.project(
k.writeCols.stream()
.filter(rowType::containsField)
.collect(Collectors.toList()))
.getFields();
}
IndexCastMapping indexCastMapping =
createIndexCastMapping(schemaTableFields, schemaFields.apply(id));
createIndexCastMapping(schemaTableFields, dataFields);
@Nullable int[] indexMapping = indexCastMapping.getIndexMapping();
// Create col stats array serializer with schema evolution
return new SimpleStatsEvolution(
Expand Down Expand Up @@ -127,4 +147,36 @@ public Predicate filterUnsafeFilter(
public List<DataField> tableDataFields() {
return tableDataFields;
}

/** Immutable key for StatsEvolution. */
private static class EvolutionKey {

private final long schemaId;
@Nullable private final List<String> writeCols;

private EvolutionKey(long schemaId, @Nullable List<String> writeCols) {
this.schemaId = schemaId;
this.writeCols =
writeCols == null
? null
: Collections.unmodifiableList(new ArrayList<>(writeCols));
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EvolutionKey that = (EvolutionKey) o;
return schemaId == that.schemaId && Objects.equals(writeCols, that.writeCols);
}

@Override
public int hashCode() {
return Objects.hash(schemaId, writeCols);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,8 @@ private StatsLazyGetter(DataFileMeta file, SimpleStatsEvolutions simpleStatsEvol
}

private void initialize() {
SimpleStatsEvolution evolution = simpleStatsEvolutions.getOrCreate(file.schemaId());
SimpleStatsEvolution evolution =
simpleStatsEvolutions.getOrCreate(file.schemaId(), file.writeCols());
// Create value stats
SimpleStatsEvolution.Result result =
evolution.evolution(file.valueStats(), file.rowCount(), file.valueStatsCols());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,18 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
Expand All @@ -54,6 +60,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.paimon.SnapshotTest.newSnapshotManager;
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
Expand Down Expand Up @@ -223,6 +230,92 @@ public void testReadFilesFromNotExistSnapshot() {
.satisfies(anyCauseMatches(IllegalArgumentException.class));
}

@Test
public void testReadStatsWithDataEvolutionWriteCols() throws Exception {
String tableName = "DataEvolutionFilesTable";
Identifier identifier = identifier(tableName);
Schema schema =
Schema.newBuilder()
.column("f0", DataTypes.INT())
.column("f1", DataTypes.STRING())
.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true")
.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true")
.build();
catalog.createTable(identifier, schema, true);

// Write a data-evolution table.
FileStoreTable dataEvolutionTable = getTable(identifier);
BatchWriteBuilder writeBuilder = dataEvolutionTable.newBatchWriteBuilder();
try (BatchTableWrite write =
writeBuilder
.newWrite()
.withWriteType(
schema.rowType().project(Collections.singletonList("f1")));
BatchTableCommit commit = writeBuilder.newCommit()) {
write.write(GenericRow.of(BinaryString.fromString("a")));
commit.commit(write.prepareCommit());
}

catalog.alterTable(identifier, SchemaChange.addColumn("f2", DataTypes.INT()), false);
dataEvolutionTable = getTable(identifier);
writeBuilder = dataEvolutionTable.newBatchWriteBuilder();
try (BatchTableWrite write =
writeBuilder
.newWrite()
.withWriteType(
dataEvolutionTable
.schema()
.logicalRowType()
.project(Collections.singletonList("f2")));
BatchTableCommit commit = writeBuilder.newCommit()) {
write.write(GenericRow.of(1));
List<CommitMessage> commitables = write.prepareCommit();
setFirstRowId(commitables, 0L);
commit.commit(commitables);
}

FilesTable dataEvolutionFilesTable =
(FilesTable)
catalog.getTable(
identifier(tableName + SYSTEM_TABLE_SPLITTER + FilesTable.FILES));
List<InternalRow> result = read(dataEvolutionFilesTable);

// Each file only contain partial data columns.
assertThat(result).hasSize(2);
assertThat(
result.stream()
.map(row -> row.getString(10).toString())
.collect(Collectors.toList()))
.containsExactlyInAnyOrder("{f0=1, f1=0, f2=1}", "{f0=1, f1=1, f2=0}");
assertThat(
result.stream()
.map(row -> row.getString(11).toString())
.collect(Collectors.toList()))
.containsExactlyInAnyOrder("{f0=null, f1=a, f2=null}", "{f0=null, f1=null, f2=1}");
assertThat(
result.stream()
.map(row -> row.getString(12).toString())
.collect(Collectors.toList()))
.containsExactlyInAnyOrder("{f0=null, f1=a, f2=null}", "{f0=null, f1=null, f2=1}");
}

private void setFirstRowId(List<CommitMessage> commitables, long firstRowId) {
commitables.forEach(
c -> {
CommitMessageImpl commitMessage = (CommitMessageImpl) c;
List<DataFileMeta> newFiles =
new ArrayList<>(commitMessage.newFilesIncrement().newFiles());
commitMessage.newFilesIncrement().newFiles().clear();
commitMessage
.newFilesIncrement()
.newFiles()
.addAll(
newFiles.stream()
.map(s -> s.assignFirstRowId(firstRowId))
.collect(Collectors.toList()));
});
}

private List<InternalRow> getExpectedResult(long snapshotId) {
if (!snapshotManager.snapshotExists(snapshotId)) {
return Collections.emptyList();
Expand Down