Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-27705] Prevent num-sorted-run.compaction-trigger from interfering num-levels #132

Merged
merged 3 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.junit.Test;

import java.util.Collections;
import java.util.Arrays;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -30,12 +30,17 @@ public class ForceCompactionITCase extends FileStoreTableITCase {

@Override
protected List<String> ddl() {
return Collections.singletonList(
return Arrays.asList(
"CREATE TABLE IF NOT EXISTS T (\n"
+ " f0 INT\n, "
+ " f1 STRING\n, "
+ " f2 STRING\n"
+ ") PARTITIONED BY (f1)");
+ ") PARTITIONED BY (f1)",
"CREATE TABLE IF NOT EXISTS T1 (\n"
+ " f0 INT\n, "
+ " f1 STRING\n, "
+ " f2 STRING\n"
+ ")");
}

@Test
Expand Down Expand Up @@ -74,4 +79,37 @@ public void testDynamicPartition() {

assertThat(batchSql("SELECT * FROM T")).hasSize(21);
}

@Test
public void testNoDefaultNumOfLevels() throws Exception {
bEnv.executeSql("ALTER TABLE T1 SET ('commit.force-compact' = 'true')");
bEnv.executeSql(
"INSERT INTO T1 VALUES(1, 'Winter', 'Winter is Coming'),"
+ "(2, 'Winter', 'The First Snowflake'), "
+ "(2, 'Spring', 'The First Rose in Spring'), "
+ "(7, 'Summer', 'Summertime Sadness')")
.await();
bEnv.executeSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last Christmas')").await();
bEnv.executeSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is Coming')").await();
bEnv.executeSql("INSERT INTO T1 VALUES(10, 'Autumn', 'Refrain')").await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon Sugar'), "
+ "(4, 'Spring', 'Spring Water')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe'), "
+ "(9, 'Autumn', 'Wake Me Up When September Ends')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'), "
+ "(9, 'Autumn', 'Wake Me Up When September Ends')")
.await();
bEnv.executeSql("ALTER TABLE T1 SET ('num-sorted-run.compaction-trigger' = '2')");
bEnv.executeSql(
"INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'), "
+ "(9, 'Autumn', 'Wake Me Up When September Ends')")
.await();

assertThat(batchSql("SELECT * FROM T1")).hasSize(15);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ public class Levels {

public Levels(Comparator<RowData> keyComparator, List<DataFileMeta> inputFiles, int numLevels) {
this.keyComparator = keyComparator;

// in case the num of levels is not specified explicitly
numLevels =
Math.max(
numLevels,
inputFiles.stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: We can extract a field here: int restoreMaxLevel = .... The format of the java stream in the method parameters doesn't look very good.

.map(DataFileMeta::level)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: .mapToInt().max()

.max(Comparator.naturalOrder())
.orElse(-1)
+ 1);
checkArgument(numLevels > 1, "levels must be at least 2.");
this.level0 =
new TreeSet<>(Comparator.comparing(DataFileMeta::maxSequenceNumber).reversed());
Expand Down