diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java index a726cf1aa1..6212d9b5f1 100755 --- a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java +++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java @@ -19,9 +19,12 @@ package org.apache.samza.sql.client.cli; +import java.util.stream.Collectors; import org.apache.samza.sql.client.interfaces.*; import org.apache.samza.sql.client.util.CliException; import org.apache.samza.sql.client.util.CliUtil; +import org.apache.samza.sql.schema.SamzaSqlFieldType; +import org.apache.samza.sql.schema.SqlFieldSchema; import org.apache.samza.sql.schema.SqlSchema; import org.jline.reader.EndOfFileException; import org.jline.reader.LineReader; @@ -736,7 +739,7 @@ private List formatSchema4Display(SqlSchema schema) { for (int i = 0; i < rowCount; ++i) { SqlSchema.SqlField sqlField = schema.getFields().get(i); String field = sqlField.getFieldName(); - String type = sqlField.getFieldSchema().getFieldType().toString(); + String type = getFieldDisplayValue(sqlField.getFieldSchema()); int fieldLen = field.length(); int typeLen = type.length(); int fieldStartIdx = 0, typeStartIdx = 0; @@ -777,6 +780,34 @@ private List formatSchema4Display(SqlSchema schema) { return lines; } + private String getFieldDisplayValue(SqlFieldSchema fieldSchema) { + if (!isComplexField(fieldSchema.getFieldType())) { + return fieldSchema.getFieldType().toString(); + } + SamzaSqlFieldType fieldType = fieldSchema.getFieldType(); + switch (fieldType) { + case ARRAY: + return String.format("ARRAY(%s)", getFieldDisplayValue(fieldSchema.getElementSchema())); + case MAP: + return String.format("MAP(%s, %s)", SamzaSqlFieldType.STRING.toString(), + getFieldDisplayValue(fieldSchema.getValueScehma())); + case ROW: + String rowDisplayValue = fieldSchema.getRowSchema() + .getFields() + .stream() + .map(f -> getFieldDisplayValue(f.getFieldSchema())) + .collect(Collectors.joining(",")); + return String.format("ROW(%s)", rowDisplayValue); + default: + throw new UnsupportedOperationException("Unknown field type " + fieldType); + } + } + + private boolean isComplexField(SamzaSqlFieldType fieldtype) { + return fieldtype == SamzaSqlFieldType.ARRAY || fieldtype == SamzaSqlFieldType.MAP + || fieldtype == SamzaSqlFieldType.ROW; + } + // Trims: leading spaces; trailing spaces and ";"s private String trimCommand(String command) { if (CliUtil.isNullOrEmpty(command)) diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java index 1319a8561b..2b83b60888 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java @@ -105,8 +105,8 @@ public ConfigBasedUdfResolver(Properties config, Config udfConfig) { String udfName = sqlUdf.name(); for (Map.Entry udfMethod : udfMethods.entrySet()) { List params = Arrays.asList(udfMethod.getKey().params()); - udfs.add(new UdfMetadata(udfName, udfMethod.getValue(), udfConfig.subset(udfName + "."), params, - udfMethod.getKey().disableArgumentCheck())); + udfs.add(new UdfMetadata(udfName, sqlUdf.description(), udfMethod.getValue(), udfConfig.subset(udfName + "."), params, + udfMethod.getKey().returns(), udfMethod.getKey().disableArgumentCheck())); } } } diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java index 4adb5ea69c..9288ce7e63 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java @@ -32,20 +32,33 @@ public class UdfMetadata { private final String name; + + private final String description; private final Method udfMethod; private final Config udfConfig; private final boolean disableArgCheck; private final List arguments; - public UdfMetadata(String name, Method udfMethod, Config udfConfig, List arguments, - boolean disableArgCheck) { + private final SamzaSqlFieldType returnType; + + public UdfMetadata(String name, String description, Method udfMethod, Config udfConfig, List arguments, + SamzaSqlFieldType returnType, boolean disableArgCheck) { this.name = name; + this.description = description; this.udfMethod = udfMethod; this.udfConfig = udfConfig; this.arguments = arguments; + this.returnType = returnType; this.disableArgCheck = disableArgCheck; } + /** + * @return returns the returnType of the Samza SQL UDF. + */ + public SamzaSqlFieldType getReturnType() { + return returnType; + } + public Config getUdfConfig() { return udfConfig; } @@ -64,6 +77,13 @@ public String getName() { return name; } + /** + * @return Returns the description of the udf. + */ + public String getDescription() { + return description; + } + /** * @return Returns the list of arguments that the udf should take. */