Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: sync 1.15 with 1.17 for backports missed or not ported identically #7402

Merged
merged 1 commit into from
Apr 24, 2023

Conversation

stevenzwu
Copy link
Contributor

@stevenzwu stevenzwu commented Apr 22, 2023

seems that we have some divergencies from the backport effort. maintaining the versions in sync can make future backport easier when comparing git diff result.

Here are the remaining diffs after this sync. they are due to changes in 1.15 or 1.17.
git diff --no-index flink/v1.15/flink/src/ flink/v1.17/flink/src

diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java
index 26201c7a5..d72f57dce 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java
@@ -97,7 +97,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
   public void clean() {
     sql("DROP TABLE IF EXISTS %s.%s", DATABASE_NAME, TABLE_NAME);
     sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME);
-    sql("DROP CATALOG IF EXISTS %s", CATALOG_NAME);
+    dropCatalog(CATALOG_NAME, true);
   }
 
   @Test
@@ -422,8 +422,12 @@ public class TestFlinkTableSource extends FlinkTestBase {
     Assert.assertEquals("Should have 1 record", 1, result.size());
     Assert.assertEquals(
         "Should produce the expected record", Row.of(1, "iceberg", 10.0), result.get(0));
+
+    // In SQL, null check can only be done as IS NULL or IS NOT NULL, so it's correct to ignore it
+    // and push the rest down.
+    String expectedScan = "ref(name=\"data\") == \"iceberg\"";
     Assert.assertEquals(
-        "Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
+        "Should contain the push down filter", expectedScan, lastScanEvent.filter().toString());
   }
 
   @Test
@@ -445,8 +449,9 @@ public class TestFlinkTableSource extends FlinkTestBase {
     String sqlNotInNull = String.format("SELECT * FROM %s WHERE id NOT IN (1,2,NULL) ", TABLE_NAME);
     List<Row> resultGT = sql(sqlNotInNull);
     Assert.assertEquals("Should have 0 record", 0, resultGT.size());
-    Assert.assertEquals(
-        "Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
+    Assert.assertNull(
+        "As the predicate pushdown filter out all rows, Flink did not create scan plan, so it doesn't publish any ScanEvent.",
+        lastScanEvent);
   }
 
   @Test
@@ -542,6 +547,17 @@ public class TestFlinkTableSource extends FlinkTestBase {
     Assert.assertEquals("Should create only one scan", 1, scanEventCount);
     Assert.assertEquals(
         "Should contain the push down filter", expectedFilter, lastScanEvent.filter().toString());
+
+    // %% won't match the row with null value
+    sqlLike = "SELECT * FROM  " + TABLE_NAME + "  WHERE data LIKE '%%' ";
+    resultLike = sql(sqlLike);
+    Assert.assertEquals("Should have 2 records", 2, resultLike.size());
+    List<Row> expectedRecords =
+        Lists.newArrayList(Row.of(1, "iceberg", 10.0), Row.of(2, "b", 20.0));
+    assertSameElements(expectedRecords, resultLike);
+    String expectedScan = "not_null(ref(name=\"data\"))";
+    Assert.assertEquals(
+        "Should contain the push down filter", expectedScan, lastScanEvent.filter().toString());
   }
 
   @Test
@@ -549,7 +565,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
     Row expectRecord = Row.of(1, "iceberg", 10.0);
     String sqlNoPushDown = "SELECT * FROM " + TABLE_NAME + " WHERE data LIKE '%%i' ";
     List<Row> resultLike = sql(sqlNoPushDown);
-    Assert.assertEquals("Should have 1 record", 0, resultLike.size());
+    Assert.assertEquals("Should have 0 record", 0, resultLike.size());
     Assert.assertEquals(
         "Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
 
@@ -567,15 +583,6 @@ public class TestFlinkTableSource extends FlinkTestBase {
     Assert.assertEquals(
         "Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
 
-    sqlNoPushDown = "SELECT * FROM  " + TABLE_NAME + "  WHERE data LIKE '%%' ";
-    resultLike = sql(sqlNoPushDown);
-    Assert.assertEquals("Should have 3 records", 3, resultLike.size());
-    List<Row> expectedRecords =
-        Lists.newArrayList(Row.of(1, "iceberg", 10.0), Row.of(2, "b", 20.0), Row.of(3, null, 30.0));
-    assertSameElements(expectedRecords, resultLike);
-    Assert.assertEquals(
-        "Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
-
     sqlNoPushDown = "SELECT * FROM  " + TABLE_NAME + "  WHERE data LIKE 'iceber_' ";
     resultLike = sql(sqlNoPushDown);
     Assert.assertEquals("Should have 1 record", 1, resultLike.size());
@@ -602,53 +609,8 @@ public class TestFlinkTableSource extends FlinkTestBase {
         "Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter());
   }
 
-  /**
-   * NaN is not supported by flink now, so we add the test case to assert the parse error, when we
-   * upgrade the flink that supports NaN, we will delele the method, and add some test case to test
-   * NaN.
-   */
   @Test
-  public void testSqlParseError() {
-    String sqlParseErrorEqual =
-        String.format("SELECT * FROM %s WHERE d = CAST('NaN' AS DOUBLE) ", TABLE_NAME);
-    AssertHelpers.assertThrows(
-        "The NaN is not supported by flink now. ",
-        NumberFormatException.class,
-        () -> sql(sqlParseErrorEqual));
-
-    String sqlParseErrorNotEqual =
-        String.format("SELECT * FROM %s WHERE d <> CAST('NaN' AS DOUBLE) ", TABLE_NAME);
-    AssertHelpers.assertThrows(
-        "The NaN is not supported by flink now. ",
-        NumberFormatException.class,
-        () -> sql(sqlParseErrorNotEqual));
-
-    String sqlParseErrorGT =
-        String.format("SELECT * FROM %s WHERE d > CAST('NaN' AS DOUBLE) ", TABLE_NAME);
-    AssertHelpers.assertThrows(
-        "The NaN is not supported by flink now. ",
-        NumberFormatException.class,
-        () -> sql(sqlParseErrorGT));
-
-    String sqlParseErrorLT =
-        String.format("SELECT * FROM %s WHERE d < CAST('NaN' AS DOUBLE) ", TABLE_NAME);
-    AssertHelpers.assertThrows(
-        "The NaN is not supported by flink now. ",
-        NumberFormatException.class,
-        () -> sql(sqlParseErrorLT));
-
-    String sqlParseErrorGTE =
-        String.format("SELECT * FROM %s WHERE d >= CAST('NaN' AS DOUBLE) ", TABLE_NAME);
-    AssertHelpers.assertThrows(
-        "The NaN is not supported by flink now. ",
-        NumberFormatException.class,
-        () -> sql(sqlParseErrorGTE));
-
-    String sqlParseErrorLTE =
-        String.format("SELECT * FROM %s WHERE d <= CAST('NaN' AS DOUBLE) ", TABLE_NAME);
-    AssertHelpers.assertThrows(
-        "The NaN is not supported by flink now. ",
-        NumberFormatException.class,
-        () -> sql(sqlParseErrorLTE));
+  public void testSqlParseNaN() {
+    // todo add some test case to test NaN
   }
 }
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java
index caacbd4b5..08cccbbc8 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java
@@ -26,6 +26,6 @@ public class TestFlinkPackage {
   /** This unit test would need to be adjusted as new Flink version is supported. */
   @Test
   public void testVersion() {
-    Assert.assertEquals("1.15.0", FlinkPackage.version());
+    Assert.assertEquals("1.17.0", FlinkPackage.version());
   }
 }

@github-actions github-actions bot added the flink label Apr 22, 2023
@stevenzwu stevenzwu changed the title Flink: sync 1.15 with 1.17 for missed backports previously Flink: sync 1.15 with 1.17 for backports missed or not ported identically Apr 22, 2023
@@ -233,6 +233,7 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
// the files,
// Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
if (checkpointId > maxCommittedCheckpointId) {
LOG.info("Checkpoint {} completed. Attempting commit.", checkpointId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember that we merged this one: 5a4761c

What happened with it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like a later backport unintentionally reverted it. I missed it during review. #6949

@@ -65,7 +65,7 @@ public void before() {

@After
public void clean() {
sql("DROP CATALOG IF EXISTS %s", catalogName);
dropCatalog(catalogName, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is not strictly needed for 1.15, as the change which caused the need for this is introduced to Flink 1.16. But it is ok to have it here too, to have less diverged code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. it is a little better to have less diverged code

Copy link
Contributor

@pvary pvary left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes are good.
It would be nice to understand what happened with 5a4761c, but otherwise it is good to go

@stevenzwu stevenzwu merged commit 3a04faa into apache:master Apr 24, 2023
12 checks passed
@stevenzwu
Copy link
Contributor Author

thanks @pvary for review

coufon pushed a commit to coufon/iceberg that referenced this pull request Apr 25, 2023
manisin pushed a commit to Snowflake-Labs/iceberg that referenced this pull request May 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants