Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-22832][sql-client] Drop usages of legacy planner in SQL Client #16052

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 1 addition & 4 deletions docs/content.zh/docs/dev/table/sqlClient.md
Expand Up @@ -339,7 +339,6 @@ CREATE FUNCTION foo.bar.AggregateUDF AS myUDF;

-- Properties that change the fundamental execution behavior of a table program.

SET 'table.planner' = 'blink'; -- planner: either 'blink' (default) or 'old'
SET 'execution.runtime-mode' = 'streaming'; -- execution mode either 'batch' or 'streaming'
SET 'sql-client.execution.result-mode' = 'table'; -- available values: 'table', 'changelog' and 'tableau'
SET 'sql-client.execution.max-table-result.rows' = '10000'; -- optional: maximum number of maintained rows
Expand All @@ -362,7 +361,7 @@ This configuration:
- defines a table `MyTableSource` that can read data from a CSV file,
- defines a view `MyCustomView` that declares a virtual table using a SQL query,
- defines a user-defined function `myUDF` that can be instantiated using the class name,
- uses the blink planner in streaming mode for running statements and a parallelism of 1,
- uses streaming mode for running statements and a parallelism of 1,
- runs exploratory queries in the `table` result mode,
- and makes some planner adjustments around join reordering and spilling via configuration options.

Expand Down Expand Up @@ -688,8 +687,6 @@ To distinguish the deprecated key, the sql client use the '[DEPRECATED]' as the
Flink SQL>SET;
execution.runtime-mode=batch
sql-client.execution.result-mode=table
table.planner=blink
[DEPRECATED] execution.planner=blink
[DEPRECATED] execution.result-mode=table
[DEPRECATED] execution.type=batch
```
Expand Down
5 changes: 1 addition & 4 deletions docs/content/docs/dev/table/sqlClient.md
Expand Up @@ -345,7 +345,6 @@ CREATE FUNCTION foo.bar.AggregateUDF AS myUDF;

-- Properties that change the fundamental execution behavior of a table program.

SET 'table.planner' = 'blink'; -- planner: either 'blink' (default) or 'old'
SET 'execution.runtime-mode' = 'streaming'; -- execution mode either 'batch' or 'streaming'
SET 'sql-client.execution.result-mode' = 'table'; -- available values: 'table', 'changelog' and 'tableau'
SET 'sql-client.execution.max-table-result.rows' = '10000'; -- optional: maximum number of maintained rows
Expand All @@ -368,7 +367,7 @@ This configuration:
- defines a table `MyTableSource` that can read data from a CSV file,
- defines a view `MyCustomView` that declares a virtual table using a SQL query,
- defines a user-defined function `myUDF` that can be instantiated using the class name,
- uses the blink planner in streaming mode for running statements and a parallelism of 1,
- uses streaming mode for running statements and a parallelism of 1,
- runs exploratory queries in the `table` result mode,
- and makes some planner adjustments around join reordering and spilling via configuration options.

Expand Down Expand Up @@ -694,8 +693,6 @@ To distinguish the deprecated key, the sql client use the '[DEPRECATED]' as the
Flink SQL>SET;
execution.runtime-mode=batch
sql-client.execution.result-mode=table
table.planner=blink
[DEPRECATED] execution.planner=blink
[DEPRECATED] execution.result-mode=table
[DEPRECATED] execution.type=batch
```
Expand Down
3 changes: 1 addition & 2 deletions flink-end-to-end-tests/run-nightly-tests.sh
Expand Up @@ -216,8 +216,7 @@ fi
run_test "State TTL Heap backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh hashmap" "skip_check_exceptions"
run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks" "skip_check_exceptions"

run_test "SQL Client end-to-end test (Old planner) Elasticsearch (v7.5.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old"
run_test "SQL Client end-to-end test (Blink planner) Elasticsearch (v7.5.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink"
run_test "SQL Client end-to-end test" "$END_TO_END_DIR/test-scripts/test_sql_client.sh"

run_test "TPC-H end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_tpch.sh"
run_test "TPC-DS end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_tpcds.sh"
Expand Down
5 changes: 0 additions & 5 deletions flink-end-to-end-tests/test-scripts/test_sql_client.sh
Expand Up @@ -19,8 +19,6 @@

set -Eeuo pipefail

PLANNER="${1:-old}"

KAFKA_VERSION="2.2.2"
CONFLUENT_VERSION="5.0.0"
CONFLUENT_MAJOR_VERSION="5.0"
Expand Down Expand Up @@ -213,9 +211,6 @@ functions:
- name: RegReplace
from: class
class: org.apache.flink.table.toolbox.StringRegexReplaceFunction

execution:
planner: "$PLANNER"
EOF

# submit SQL statements
Expand Down
6 changes: 0 additions & 6 deletions flink-table/flink-sql-client/pom.xml
Expand Up @@ -78,12 +78,6 @@ under the License.
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand All @@ -54,8 +55,6 @@ public class ExecutionEntry extends ConfigEntry {

public static final String EXECUTION_PLANNER = "planner";

public static final String EXECUTION_PLANNER_VALUE_OLD = "old";

public static final String EXECUTION_PLANNER_VALUE_BLINK = "blink";

public static final String EXECUTION_TYPE = "type";
Expand Down Expand Up @@ -117,9 +116,7 @@ private ExecutionEntry(DescriptorProperties properties) {
@Override
protected void validate(DescriptorProperties properties) {
properties.validateEnumValues(
EXECUTION_PLANNER,
true,
Arrays.asList(EXECUTION_PLANNER_VALUE_OLD, EXECUTION_PLANNER_VALUE_BLINK));
EXECUTION_PLANNER, true, Collections.singletonList(EXECUTION_PLANNER_VALUE_BLINK));
properties.validateEnumValues(
EXECUTION_TYPE,
true,
Expand Down Expand Up @@ -166,35 +163,6 @@ public boolean inBatchMode() {
.orElse(false);
}

public boolean isStreamingPlanner() {
final String planner =
properties
.getOptionalString(EXECUTION_PLANNER)
.orElse(EXECUTION_PLANNER_VALUE_BLINK);

// Blink planner is a streaming planner
if (planner.equals(EXECUTION_PLANNER_VALUE_BLINK)) {
return true;
}
// Old planner can be a streaming or batch planner
else if (planner.equals(EXECUTION_PLANNER_VALUE_OLD)) {
return inStreamingMode();
}

return false;
}

public boolean isBlinkPlanner() {
final String planner =
properties
.getOptionalString(EXECUTION_PLANNER)
.orElse(EXECUTION_PLANNER_VALUE_BLINK);
if (planner.equals(EXECUTION_PLANNER_VALUE_OLD)) {
return false;
}
return true;
}

public Optional<Integer> getParallelism() {
return properties.getOptionalInt(EXECUTION_PARALLELISM);
}
Expand Down
Expand Up @@ -18,14 +18,13 @@

package org.apache.flink.table.client.gateway.context;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
Expand Down Expand Up @@ -57,7 +56,7 @@ public class ExecutionContext {
private final SessionState sessionState;
private final URLClassLoader classLoader;

private final TableEnvironment tableEnv;
private final StreamTableEnvironment tableEnv;

public ExecutionContext(
Configuration flinkConfig, URLClassLoader classLoader, SessionState sessionState) {
Expand All @@ -78,7 +77,7 @@ public ExecutionContext(ExecutionContext context) {
this.flinkConfig = context.flinkConfig;
this.sessionState = context.sessionState;
this.classLoader = context.classLoader;
// create a new table env

this.tableEnv = createTableEnvironment();
}

Expand All @@ -91,41 +90,42 @@ public <R> R wrapClassLoader(Supplier<R> supplier) {
}
}

public TableEnvironment getTableEnvironment() {
public StreamTableEnvironment getTableEnvironment() {
return tableEnv;
}

// ------------------------------------------------------------------------------------------------------------------
// Helper to create Table Environment
// ------------------------------------------------------------------------------------------------------------------

private TableEnvironment createTableEnvironment() {
// check the value of TABLE_PLANNER and RUNTIME_MODE
private StreamTableEnvironment createTableEnvironment() {
// checks the value of RUNTIME_MODE
EnvironmentSettings settings = EnvironmentSettings.fromConfiguration(flinkConfig);

if (!settings.isBlinkPlanner()) {
throw new TableException(
"The old planner is not supported anymore. Please update to new default planner.");
}

TableConfig config = new TableConfig();
config.addConfiguration(flinkConfig);
if (!settings.isStreamingMode() && !settings.isBlinkPlanner()) {
ExecutionEnvironment execEnv = createExecutionEnvironment();
return new BatchTableEnvironmentImpl(
execEnv, config, sessionState.catalogManager, sessionState.moduleManager);
} else {
StreamExecutionEnvironment streamExecEnv = createStreamExecutionEnvironment();

final Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, streamExecEnv);
return createStreamTableEnvironment(
streamExecEnv,
settings,
config,
executor,
sessionState.catalogManager,
sessionState.moduleManager,
sessionState.functionCatalog,
classLoader);
}

StreamExecutionEnvironment streamExecEnv = createStreamExecutionEnvironment();

final Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, streamExecEnv);
return createStreamTableEnvironment(
streamExecEnv,
settings,
config,
executor,
sessionState.catalogManager,
sessionState.moduleManager,
sessionState.functionCatalog,
classLoader);
}

private TableEnvironment createStreamTableEnvironment(
private StreamTableEnvironment createStreamTableEnvironment(
StreamExecutionEnvironment env,
EnvironmentSettings settings,
TableConfig config,
Expand Down Expand Up @@ -184,10 +184,4 @@ private StreamExecutionEnvironment createStreamExecutionEnvironment() {
// This requires StreamExecutionEnvironment to have a full flink configuration.
return new StreamExecutionEnvironment(new Configuration(flinkConfig), classLoader);
}

private ExecutionEnvironment createExecutionEnvironment() {
ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
execEnv.getConfiguration().addAll(flinkConfig);
return execEnv;
}
}