Skip to content

Commit 4b58b0a

Browse files
yanghuapan3793
authored andcommitted
[KYUUBI #1555][FLINK] Bump flink version to 1.14.0
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1573 from yanghua/KYUUBI-1555. Closes #1555 4f5ce74 [yanghua] [KYUUBI #1555] [SUB-TASK][KPIP-2] Bump flink version to 1.14.1 Authored-by: yanghua <yanghua1127@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent be3a964 commit 4b58b0a

File tree

3 files changed

+17
-28
lines changed

3 files changed

+17
-28
lines changed

externals/kyuubi-flink-sql-engine/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454

5555
<dependency>
5656
<groupId>org.apache.flink</groupId>
57-
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
57+
<artifactId>flink-runtime</artifactId>
5858
<scope>provided</scope>
5959
</dependency>
6060

@@ -90,13 +90,13 @@
9090

9191
<dependency>
9292
<groupId>org.apache.flink</groupId>
93-
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
93+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
9494
<scope>provided</scope>
9595
</dependency>
9696

9797
<dependency>
9898
<groupId>org.apache.flink</groupId>
99-
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
99+
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
100100
<scope>provided</scope>
101101
</dependency>
102102

externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/context/ExecutionContext.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.net.URL;
2323
import java.util.Collections;
2424
import java.util.List;
25-
import java.util.Map;
2625
import java.util.Optional;
2726
import java.util.function.Supplier;
2827
import javax.annotation.Nullable;
@@ -43,8 +42,8 @@
4342
import org.apache.flink.table.delegation.Executor;
4443
import org.apache.flink.table.delegation.ExecutorFactory;
4544
import org.apache.flink.table.delegation.Planner;
46-
import org.apache.flink.table.delegation.PlannerFactory;
47-
import org.apache.flink.table.factories.ComponentFactoryService;
45+
import org.apache.flink.table.factories.FactoryUtil;
46+
import org.apache.flink.table.factories.PlannerFactoryUtil;
4847
import org.apache.flink.table.module.ModuleManager;
4948
import org.apache.flink.util.FlinkException;
5049
import org.apache.flink.util.TemporaryClassLoaderContext;
@@ -146,10 +145,9 @@ private TableEnvironmentInternal createStreamTableEnvironment(
146145
CatalogManager catalogManager,
147146
ModuleManager moduleManager,
148147
FunctionCatalog functionCatalog) {
149-
final Map<String, String> plannerProperties = settings.toPlannerProperties();
150148
final Planner planner =
151-
ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
152-
.create(plannerProperties, executor, config, functionCatalog, catalogManager);
149+
PlannerFactoryUtil.createPlanner(
150+
settings.getPlanner(), executor, config, catalogManager, functionCatalog);
153151

154152
return new StreamTableEnvironmentImpl(
155153
catalogManager,
@@ -163,18 +161,16 @@ private TableEnvironmentInternal createStreamTableEnvironment(
163161
classLoader);
164162
}
165163

166-
private static Executor lookupExecutor(
167-
Map<String, String> executorProperties, StreamExecutionEnvironment executionEnvironment) {
164+
private Executor lookupExecutor(
165+
EnvironmentSettings settings, StreamExecutionEnvironment executionEnvironment) {
168166
try {
169167
ExecutorFactory executorFactory =
170-
ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
168+
FactoryUtil.discoverFactory(classLoader, ExecutorFactory.class, settings.getExecutor());
169+
171170
Method createMethod =
172-
executorFactory
173-
.getClass()
174-
.getMethod("create", Map.class, StreamExecutionEnvironment.class);
171+
executorFactory.getClass().getMethod("create", StreamExecutionEnvironment.class);
175172

176-
return (Executor)
177-
createMethod.invoke(executorFactory, executorProperties, executionEnvironment);
173+
return (Executor) createMethod.invoke(executorFactory, executionEnvironment);
178174
} catch (Exception e) {
179175
throw new TableException(
180176
"Could not instantiate the executor. Make sure a planner module is on the classpath", e);
@@ -213,8 +209,7 @@ private void createTableEnvironment(
213209
if (engineEnvironment.getExecution().isStreamingPlanner()) {
214210
streamExecEnv = createStreamExecutionEnvironment();
215211

216-
final Map<String, String> executorProperties = settings.toExecutorProperties();
217-
executor = lookupExecutor(executorProperties, streamExecEnv);
212+
executor = lookupExecutor(settings, streamExecEnv);
218213
tableEnv =
219214
createStreamTableEnvironment(
220215
streamExecEnv,

pom.xml

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@
100100
<commons-lang3.version>3.10</commons-lang3.version>
101101
<curator.version>2.12.0</curator.version>
102102
<delta.version>1.0.0</delta.version>
103-
<flink.version>1.12.5</flink.version>
103+
<flink.version>1.14.0</flink.version>
104104
<flink.archive.name>flink-${flink.version}-bin-scala_${scala.binary.version}.tgz</flink.archive.name>
105105
<flink.archive.mirror>https://archive.apache.org/dist/flink/flink-${flink.version}</flink.archive.mirror>
106106
<flink.archive.download.skip>false</flink.archive.download.skip>
@@ -1072,7 +1072,7 @@
10721072

10731073
<dependency>
10741074
<groupId>org.apache.flink</groupId>
1075-
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
1075+
<artifactId>flink-runtime</artifactId>
10761076
<version>${flink.version}</version>
10771077
</dependency>
10781078

@@ -1114,13 +1114,7 @@
11141114

11151115
<dependency>
11161116
<groupId>org.apache.flink</groupId>
1117-
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
1118-
<version>${flink.version}</version>
1119-
</dependency>
1120-
1121-
<dependency>
1122-
<groupId>org.apache.flink</groupId>
1123-
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
1117+
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
11241118
<version>${flink.version}</version>
11251119
<scope>provided</scope>
11261120
</dependency>

0 commit comments

Comments
 (0)