Skip to content

Commit

Permalink
[BP-1.16][FLINK-32374][table-planner] Fix the issue that ExecNodeGrap…
Browse files Browse the repository at this point in the history
…hInternalPlan#writeToFile should support TRUNCATE_EXISTING for overwriting (apache#22820) (apache#22826)
  • Loading branch information
LadyForest authored and gpkc committed Mar 6, 2024
1 parent f7fab82 commit 8020b85
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public void writeToFile(File file, boolean ignoreIfExists, boolean failIfExists)
file.toPath(),
serializedPlan.getBytes(StandardCharsets.UTF_8),
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING,
StandardOpenOption.WRITE);
} catch (IOException e) {
throw new TableException("Cannot write the compiled plan to file '" + file + "'.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
Expand Down Expand Up @@ -258,7 +259,8 @@ public void testCompilePlanOverwrite() throws Exception {
TableResult tableResult =
tableEnv.executeSql(
String.format(
"COMPILE PLAN '%s' FOR INSERT INTO sink SELECT * FROM src",
"COMPILE PLAN '%s' FOR INSERT INTO sink "
+ "SELECT IF(a > b, a, b) AS a, b + 1 AS b, SUBSTR(c, 1, 4) AS c FROM src WHERE a > 10",
planPath));

assertThat(tableResult).isEqualTo(TableResultInternal.TABLE_RESULT_OK);
Expand All @@ -271,6 +273,11 @@ public void testCompilePlanOverwrite() throws Exception {
"COMPILE PLAN '%s' FOR INSERT INTO sink SELECT a + 1, b + 1, CONCAT(c, '-something') FROM src",
planPath)))
.isEqualTo(TableResultInternal.TABLE_RESULT_OK);
assertThat(
TableTestUtil.isValidJson(
FileUtils.readFileToString(
planPath.toFile(), StandardCharsets.UTF_8)))
.isTrue();

tableEnv.executeSql(String.format("EXECUTE PLAN '%s'", planPath)).await();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.configuration.ExecutionOptions
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode
import org.apache.flink.streaming.api.{environment, TimeCharacteristic}
import org.apache.flink.streaming.api.datastream.DataStream
Expand Down Expand Up @@ -1732,6 +1733,19 @@ object TableTestUtil {
jsonNode.toPrettyString
}

@throws[IOException]
def isValidJson(json: String): Boolean = {
try {
val parser = objectMapper.getFactory.createParser(json)
while (parser.nextToken() != null) {
// Do nothing, just parse the JSON string
}
true
} catch {
case _: JsonParseException => false
}
}

/**
* Stage {id} is ignored, because id keeps incrementing in test class while
* StreamExecutionEnvironment is up
Expand Down

0 comments on commit 8020b85

Please sign in to comment.