Skip to content

Commit

Permalink
[FLINK-34517][table]fix environment configs ignored when calling proc…
Browse files Browse the repository at this point in the history
…edure operation (#24656)
  • Loading branch information
JustinLeesin committed May 6, 2024
1 parent ac4aa35 commit fa426f1
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.planner.operations;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
Expand Down Expand Up @@ -126,9 +127,7 @@ private Object[] getConvertedArgumentValues(
TableConfig tableConfig, ClassLoader userClassLoader) {
// should be [ProcedureContext, arg1, arg2, ..]
Object[] argumentVal = new Object[1 + internalInputArguments.length];
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(tableConfig.getConfiguration());
argumentVal[0] = new DefaultProcedureContext(env);
argumentVal[0] = getProcedureContext(tableConfig);
for (int i = 0; i < internalInputArguments.length; i++) {
argumentVal[i + 1] =
(internalInputArguments[i] != null)
Expand All @@ -138,6 +137,15 @@ private Object[] getConvertedArgumentValues(
return argumentVal;
}

private ProcedureContext getProcedureContext(TableConfig tableConfig) {
Configuration configuration =
new Configuration((Configuration) tableConfig.getRootConfiguration());
configuration.addAll(tableConfig.getConfiguration());
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
return new DefaultProcedureContext(env);
}

/** Convert the value with internal representation to the value with external representation. */
private Object toExternal(Object internalValue, DataType inputType, ClassLoader classLoader) {
if (!(DataTypeUtils.isInternal(inputType))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
Expand All @@ -39,6 +40,7 @@

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -94,6 +96,8 @@ public static class CatalogWithBuiltInProcedure extends GenericInMemoryCatalog {
PROCEDURE_MAP.put(
ObjectPath.fromString("system.named_args_optional"),
new NamedArgumentsProcedureWithOptionalArguments());
PROCEDURE_MAP.put(
ObjectPath.fromString("system.get_env_conf"), new EnvironmentConfProcedure());
}

public CatalogWithBuiltInProcedure(String name) {
Expand Down Expand Up @@ -230,6 +234,22 @@ public String[] call(ProcedureContext procedureContext, String arg1, Integer arg
}
}

/** A procedure to get environment configs for testing purpose. */
@ProcedureHint(output = @DataTypeHint("ROW<k STRING, v STRING>"))
public static class EnvironmentConfProcedure implements Procedure {
public Row[] call(ProcedureContext procedureContext) throws Exception {
StreamExecutionEnvironment env = procedureContext.getExecutionEnvironment();
Configuration config = (Configuration) env.getConfiguration();
List<Row> rows = new ArrayList<>();
config.toMap()
.forEach(
(k, v) -> {
rows.add(Row.of(k, v));
});
return rows.toArray(new Row[0]);
}
}

/** A simple pojo class for testing purpose. */
public static class UserPojo {
private final String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@
package org.apache.flink.table.planner.runtime.stream.sql;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.planner.factories.TestProcedureCatalogFactory;
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -88,7 +92,7 @@ void testShowProcedures() {
tEnv().executeSql("show procedures in `system`").collect());
assertThat(rows.toString())
.isEqualTo(
"[+I[generate_n], +I[generate_user], +I[get_year], +I[named_args], +I[named_args_optional], +I[named_args_overload], +I[sum_n]]");
"[+I[generate_n], +I[generate_user], +I[get_env_conf], +I[get_year], +I[named_args], +I[named_args_optional], +I[named_args_overload], +I[sum_n]]");

// show procedure with like
rows =
Expand Down Expand Up @@ -116,7 +120,7 @@ void testShowProcedures() {
.collect());
assertThat(rows.toString())
.isEqualTo(
"[+I[get_year], +I[named_args], +I[named_args_optional], +I[named_args_overload], +I[sum_n]]");
"[+I[get_env_conf], +I[get_year], +I[named_args], +I[named_args_optional], +I[named_args_overload], +I[sum_n]]");

// show procedure with not ilike
rows =
Expand All @@ -125,7 +129,7 @@ void testShowProcedures() {
.collect());
assertThat(rows.toString())
.isEqualTo(
"[+I[get_year], +I[named_args], +I[named_args_optional], +I[named_args_overload], +I[sum_n]]");
"[+I[get_env_conf], +I[get_year], +I[named_args], +I[named_args_optional], +I[named_args_overload], +I[sum_n]]");
}

@Test
Expand Down Expand Up @@ -210,6 +214,34 @@ void testNamedArgumentsWithOptionalArguments() {
ResolvedSchema.of(Column.physical("result", DataTypes.STRING())));
}

@Test
void testEnvironmentConf() throws DatabaseAlreadyExistException {
// root conf should work
Configuration configuration = new Configuration();
configuration.setString("key1", "value1");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.getConfig().set("key2", "value2");

TestProcedureCatalogFactory.CatalogWithBuiltInProcedure procedureCatalog =
new TestProcedureCatalogFactory.CatalogWithBuiltInProcedure("procedure_catalog");
procedureCatalog.createDatabase(
"system", new CatalogDatabaseImpl(Collections.emptyMap(), null), true);
tableEnv.registerCatalog("test_p", procedureCatalog);
tableEnv.useCatalog("test_p");
TableResult tableResult = tableEnv.executeSql("call `system`.get_env_conf()");
List<Row> environmentConf = CollectionUtil.iteratorToList(tableResult.collect());
assertThat(environmentConf.contains(Row.of("key1", "value1"))).isTrue();
assertThat(environmentConf.contains(Row.of("key2", "value2"))).isTrue();

// table conf should overwrite root conf
tableEnv.getConfig().set("key1", "value11");
tableResult = tableEnv.executeSql("call `system`.get_env_conf()");
environmentConf = CollectionUtil.iteratorToList(tableResult.collect());
assertThat(environmentConf.contains(Row.of("key1", "value11"))).isTrue();
}

private void verifyTableResult(
TableResult tableResult, List<Row> expectedResult, ResolvedSchema expectedSchema) {
assertThat(CollectionUtil.iteratorToList(tableResult.collect()).toString())
Expand Down

0 comments on commit fa426f1

Please sign in to comment.