Skip to content

Commit

Permalink
[FLINK-8863] [sql-client] Make user-defined functions more robust
Browse files Browse the repository at this point in the history
- Simplify code and fix various bugs
- Add more tests
- Refactor various names for descriptors and variables
- Make 'from' property mandatory
- Make LiteralValue public API
  • Loading branch information
twalthr authored and AlexanderKoltsov committed Jul 10, 2018
1 parent fbe3afd commit 67caa93
Show file tree
Hide file tree
Showing 36 changed files with 1,342 additions and 893 deletions.
18 changes: 17 additions & 1 deletion flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
Expand Up @@ -21,11 +21,14 @@
# Defaults might be overwritten by a session specific environment.


# See the Table API & SQL documentation for details about supported properties.


#==============================================================================
# Table Sources
#==============================================================================

# Define table sources and sinks here. See the Table API & SQL documentation for details.
# Define table sources and sinks here.

tables: [] # empty list
# A typical table source definition looks like:
Expand All @@ -35,6 +38,19 @@ tables: [] # empty list
# format: ...
# schema: ...

#==============================================================================
# User-defined functions
#==============================================================================

# Define scalar, aggregate, or table functions here.

functions: [] # empty list
# A typical function definition looks like:
# - name: ...
# from: class
# class: ...
# constructor: ...

#==============================================================================
# Execution properties
#==============================================================================
Expand Down
Expand Up @@ -201,6 +201,9 @@ public void open() {
case SHOW_TABLES:
callShowTables(cmdCall);
break;
case SHOW_FUNCTIONS:
callShowFunctions(cmdCall);
break;
case DESCRIBE:
callDescribe(cmdCall);
break;
Expand Down Expand Up @@ -284,6 +287,22 @@ private void callShowTables(SqlCommandCall cmdCall) {
terminal.flush();
}

private void callShowFunctions(SqlCommandCall cmdCall) {
final List<String> functions;
try {
functions = executor.listUserDefinedFunctions(context);
} catch (SqlExecutionException e) {
printException(e);
return;
}
if (functions.isEmpty()) {
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi());
} else {
functions.forEach((v) -> terminal.writer().println(v));
}
terminal.flush();
}

private void callDescribe(SqlCommandCall cmdCall) {
final TableSchema schema;
try {
Expand Down
Expand Up @@ -45,6 +45,7 @@ private CliStrings() {
.append(formatCommand(SqlCommand.CLEAR, "Clears the current terminal."))
.append(formatCommand(SqlCommand.HELP, "Prints the available commands."))
.append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables."))
.append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all registered user-defined functions."))
.append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of a table with the given name."))
.append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name."))
.append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster."))
Expand Down
Expand Up @@ -86,6 +86,7 @@ enum SqlCommand {
CLEAR("clear"),
HELP("help"),
SHOW_TABLES("show tables"),
SHOW_FUNCTIONS("show functions"),
DESCRIBE("describe"),
EXPLAIN("explain"),
SELECT("select"),
Expand Down
Expand Up @@ -42,17 +42,17 @@ public class Environment {

private Map<String, TableDescriptor> tables;

private Map<String, UserDefinedFunction> functions;

private Execution execution;

private Deployment deployment;

private Map<String, UserDefinedFunction> functions;

public Environment() {
this.tables = Collections.emptyMap();
this.functions = Collections.emptyMap();
this.execution = new Execution();
this.deployment = new Deployment();
this.functions = Collections.emptyMap();
}

public Map<String, TableDescriptor> getTables() {
Expand All @@ -69,7 +69,7 @@ public void setTables(List<Map<String, Object>> tables) {
config.remove(TableDescriptorValidator.TABLE_TYPE());
final Source s = Source.create(config);
if (this.tables.containsKey(s.getName())) {
throw new SqlClientException("Duplicate source name '" + s + "'.");
throw new SqlClientException("Duplicate source name '" + s.getName() + "'.");
}
this.tables.put(s.getName(), s);
} else {
Expand All @@ -79,11 +79,18 @@ public void setTables(List<Map<String, Object>> tables) {
});
}

public Map<String, UserDefinedFunction> getFunctions() {
return functions;
}

public void setFunctions(List<Map<String, Object>> functions) {
this.functions = new HashMap<>(functions.size());
functions.forEach(config -> {
final UserDefinedFunction f = UserDefinedFunction.create(config);
this.functions.put(f.name(), f);
if (this.tables.containsKey(f.getName())) {
throw new SqlClientException("Duplicate function name '" + f.getName() + "'.");
}
this.functions.put(f.getName(), f);
});
}

Expand All @@ -103,10 +110,6 @@ public Deployment getDeployment() {
return deployment;
}

public Map<String, UserDefinedFunction> getFunctions() {
return functions;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
Expand All @@ -117,6 +120,13 @@ public String toString() {
table.addProperties(props);
props.asMap().forEach((k, v) -> sb.append(" ").append(k).append(": ").append(v).append('\n'));
});
sb.append("=================== Functions ====================\n");
functions.forEach((name, function) -> {
sb.append("- name: ").append(name).append("\n");
final DescriptorProperties props = new DescriptorProperties(true);
function.addProperties(props);
props.asMap().forEach((k, v) -> sb.append(" ").append(k).append(": ").append(v).append('\n'));
});
sb.append("=================== Execution ====================\n");
execution.toProperties().forEach((k, v) -> sb.append(k).append(": ").append(v).append('\n'));
sb.append("=================== Deployment ===================\n");
Expand Down Expand Up @@ -153,7 +163,7 @@ public static Environment merge(Environment env1, Environment env2) {

// merge functions
final Map<String, UserDefinedFunction> functions = new HashMap<>(env1.getFunctions());
mergedEnv.getFunctions().putAll(env2.getFunctions());
functions.putAll(env2.getFunctions());
mergedEnv.functions = functions;

// merge execution properties
Expand All @@ -165,12 +175,18 @@ public static Environment merge(Environment env1, Environment env2) {
return mergedEnv;
}

/**
* Enriches an environment with new/modified properties and returns the new instance.
*/
public static Environment enrich(Environment env, Map<String, String> properties) {
final Environment enrichedEnv = new Environment();

// merge tables
enrichedEnv.tables = new HashMap<>(env.getTables());

// merge functions
enrichedEnv.functions = new HashMap<>(env.getFunctions());

// enrich execution properties
enrichedEnv.execution = Execution.enrich(env.execution, properties);

Expand Down
Expand Up @@ -26,7 +26,7 @@
import java.util.Map;

/**
* Configuration of a table source. Parses an entry in the `sources` list of an environment
* Configuration of a table source. Parses an entry in the `tables` list of an environment
* file and translates to table descriptor properties.
*/
public class Source extends TableSourceDescriptor {
Expand All @@ -49,6 +49,9 @@ public Map<String, String> getProperties() {
return properties;
}

/**
* Creates a table source descriptor with the given config.
*/
public static Source create(Map<String, Object> config) {
if (!config.containsKey(NAME)) {
throw new SqlClientException("The 'name' attribute of a table source is missing.");
Expand Down
Expand Up @@ -21,68 +21,47 @@
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.FunctionDescriptor;
import org.apache.flink.table.descriptors.FunctionValidator;

import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.table.client.config.UserDefinedFunction.From.CLASS;

/**
* Descriptor for user-defined functions.
*/
public class UserDefinedFunction extends FunctionDescriptor {

private static final String FROM = "from";

private From from;

private String name;
private Map<String, String> properties;

private UserDefinedFunction(String name, From from, Map<String, String> properties) {
super(name);
this.from = from;
private static final String NAME = "name";

private UserDefinedFunction(String name, Map<String, String> properties) {
this.name = name;
this.properties = properties;
}

/**
* Gets where the user-defined function should be created from.
*/
public From getFrom() {
return from;
public String getName() {
return name;
}

public Map<String, String> getProperties() {
return properties;
}

/**
* Creates a UDF descriptor with the given config.
* Creates a user-defined function descriptor with the given config.
*/
public static UserDefinedFunction create(Map<String, Object> config) {
Map<String, String> udfConfig = ConfigUtil.normalizeYaml(config);
if (!udfConfig.containsKey(FunctionValidator.FUNCTION_NAME())) {
if (!config.containsKey(NAME)) {
throw new SqlClientException("The 'name' attribute of a function is missing.");
}

final String name = udfConfig.get(FunctionValidator.FUNCTION_NAME());
if (name.trim().length() <= 0) {
final Object name = config.get(NAME);
if (name == null || !(name instanceof String) || ((String) name).trim().length() <= 0) {
throw new SqlClientException("Invalid function name '" + name + "'.");
}

// the default value is "CLASS"
From fromValue = CLASS;

if (udfConfig.containsKey(FROM)) {
final String from = udfConfig.get(FROM);
try {
fromValue = From.valueOf(from.toUpperCase());
} catch (IllegalArgumentException ex) {
throw new SqlClientException("Unknown 'from' value '" + from + "'.");
}
}

switch (fromValue) {
case CLASS:
return new UserDefinedFunction(name, fromValue, udfConfig);
default:
throw new SqlClientException("The from attribute can only be \"class\" now.");
}
final Map<String, Object> properties = new HashMap<>(config);
properties.remove(NAME);
return new UserDefinedFunction((String) name, ConfigUtil.normalizeYaml(properties));
}

// --------------------------------------------------------------------------------------------
Expand All @@ -91,8 +70,4 @@ public static UserDefinedFunction create(Map<String, Object> config) {
public void addProperties(DescriptorProperties properties) {
this.properties.forEach(properties::putString);
}

enum From {
CLASS
}
}
Expand Up @@ -45,6 +45,11 @@ public interface Executor {
*/
List<String> listTables(SessionContext session) throws SqlExecutionException;

/**
* Lists all user-defined functions known to the executor.
*/
List<String> listUserDefinedFunctions(SessionContext session) throws SqlExecutionException;

/**
* Returns the schema of a table. Throws an exception if the table could not be found. The
* schema might contain time attribute types for helping the user during debugging a query.
Expand Down
Expand Up @@ -48,10 +48,9 @@
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.TableSourceDescriptor;
import org.apache.flink.table.descriptors.service.FunctionService;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.FunctionService;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
Expand Down Expand Up @@ -105,20 +104,18 @@ public ExecutionContext(Environment defaultEnvironment, SessionContext sessionCo
tableSources = new HashMap<>();
mergedEnv.getTables().forEach((name, descriptor) -> {
if (descriptor instanceof TableSourceDescriptor) {
TableSource<?> tableSource = TableSourceFactoryService.findAndCreateTableSource(
(TableSourceDescriptor) descriptor, classLoader);
final TableSource<?> tableSource = TableSourceFactoryService.findAndCreateTableSource(
(TableSourceDescriptor) descriptor,
classLoader);
tableSources.put(name, tableSource);
}
});

// generate user-defined functions
// create user-defined functions
functions = new HashMap<>();
mergedEnv.getFunctions().forEach((name, descriptor) -> {
DescriptorProperties properties = new DescriptorProperties(true);
descriptor.addProperties(properties);
functions.put(
name,
FunctionService.generateUserDefinedFunction(properties, classLoader));
final UserDefinedFunction function = FunctionService.createFunction(descriptor, classLoader);
functions.put(name, function);
});

// convert deployment options into command line options that describe a cluster
Expand Down Expand Up @@ -227,7 +224,7 @@ private EnvironmentInstance() {
// register table sources
tableSources.forEach(tableEnv::registerTableSource);

// register UDFs
// register user-defined functions
if (tableEnv instanceof StreamTableEnvironment) {
StreamTableEnvironment streamTableEnvironment = (StreamTableEnvironment) tableEnv;
functions.forEach((k, v) -> {
Expand All @@ -237,6 +234,8 @@ private EnvironmentInstance() {
streamTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);
} else if (v instanceof TableFunction) {
streamTableEnvironment.registerFunction(k, (TableFunction<?>) v);
} else {
throw new SqlExecutionException("Unsupported function type: " + v.getClass().getName());
}
});
} else {
Expand All @@ -248,6 +247,8 @@ private EnvironmentInstance() {
batchTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);
} else if (v instanceof TableFunction) {
batchTableEnvironment.registerFunction(k, (TableFunction<?>) v);
} else {
throw new SqlExecutionException("Unsupported function type: " + v.getClass().getName());
}
});
}
Expand Down
Expand Up @@ -190,6 +190,14 @@ public List<String> listTables(SessionContext session) throws SqlExecutionExcept
return Arrays.asList(tableEnv.listTables());
}

@Override
public List<String> listUserDefinedFunctions(SessionContext session) throws SqlExecutionException {
final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
.createEnvironmentInstance()
.getTableEnvironment();
return Arrays.asList(tableEnv.listUserDefinedFunctions());
}

@Override
public TableSchema getTableSchema(SessionContext session, String name) throws SqlExecutionException {
final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
Expand Down

0 comments on commit 67caa93

Please sign in to comment.