From 13ca0830c15ce3d45a98fc6546aab0f003ba0879 Mon Sep 17 00:00:00 2001 From: Srinivasulu Punuru Date: Wed, 10 Jul 2019 14:01:39 -0700 Subject: [PATCH 1/3] Support for complex schema in Samza SQL shell for describe command --- .../apache/samza/sql/client/cli/CliShell.java | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java index a726cf1aa1..6212d9b5f1 100755 --- a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java +++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java @@ -19,9 +19,12 @@ package org.apache.samza.sql.client.cli; +import java.util.stream.Collectors; import org.apache.samza.sql.client.interfaces.*; import org.apache.samza.sql.client.util.CliException; import org.apache.samza.sql.client.util.CliUtil; +import org.apache.samza.sql.schema.SamzaSqlFieldType; +import org.apache.samza.sql.schema.SqlFieldSchema; import org.apache.samza.sql.schema.SqlSchema; import org.jline.reader.EndOfFileException; import org.jline.reader.LineReader; @@ -736,7 +739,7 @@ private List formatSchema4Display(SqlSchema schema) { for (int i = 0; i < rowCount; ++i) { SqlSchema.SqlField sqlField = schema.getFields().get(i); String field = sqlField.getFieldName(); - String type = sqlField.getFieldSchema().getFieldType().toString(); + String type = getFieldDisplayValue(sqlField.getFieldSchema()); int fieldLen = field.length(); int typeLen = type.length(); int fieldStartIdx = 0, typeStartIdx = 0; @@ -777,6 +780,34 @@ private List formatSchema4Display(SqlSchema schema) { return lines; } + private String getFieldDisplayValue(SqlFieldSchema fieldSchema) { + if (!isComplexField(fieldSchema.getFieldType())) { + return fieldSchema.getFieldType().toString(); + } + SamzaSqlFieldType fieldType = fieldSchema.getFieldType(); + switch (fieldType) { + case ARRAY: + return String.format("ARRAY(%s)", getFieldDisplayValue(fieldSchema.getElementSchema())); + case MAP: + return String.format("MAP(%s, %s)", SamzaSqlFieldType.STRING.toString(), + getFieldDisplayValue(fieldSchema.getValueScehma())); + case ROW: + String rowDisplayValue = fieldSchema.getRowSchema() + .getFields() + .stream() + .map(f -> getFieldDisplayValue(f.getFieldSchema())) + .collect(Collectors.joining(",")); + return String.format("ROW(%s)", rowDisplayValue); + default: + throw new UnsupportedOperationException("Unknown field type " + fieldType); + } + } + + private boolean isComplexField(SamzaSqlFieldType fieldtype) { + return fieldtype == SamzaSqlFieldType.ARRAY || fieldtype == SamzaSqlFieldType.MAP + || fieldtype == SamzaSqlFieldType.ROW; + } + // Trims: leading spaces; trailing spaces and ";"s private String trimCommand(String command) { if (CliUtil.isNullOrEmpty(command)) From eaf1ca280886e5d9b7342579b61ab6c7abe144ac Mon Sep 17 00:00:00 2001 From: Srinivasulu Punuru Date: Wed, 10 Jul 2019 14:19:27 -0700 Subject: [PATCH 2/3] Adding Udf description to UdfMetadata --- .../samza/sql/impl/ConfigBasedUdfResolver.java | 2 +- .../org/apache/samza/sql/interfaces/UdfMetadata.java | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java index 1319a8561b..1142438ff7 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java @@ -105,7 +105,7 @@ public ConfigBasedUdfResolver(Properties config, Config udfConfig) { String udfName = sqlUdf.name(); for (Map.Entry udfMethod : udfMethods.entrySet()) { List params = Arrays.asList(udfMethod.getKey().params()); - udfs.add(new UdfMetadata(udfName, udfMethod.getValue(), udfConfig.subset(udfName + "."), params, + udfs.add(new UdfMetadata(udfName, sqlUdf.description(), udfMethod.getValue(), udfConfig.subset(udfName + "."), params, udfMethod.getKey().disableArgumentCheck())); } } diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java index 4adb5ea69c..cd35a8c80e 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java @@ -32,14 +32,17 @@ public class UdfMetadata { private final String name; + + private final String description; private final Method udfMethod; private final Config udfConfig; private final boolean disableArgCheck; private final List arguments; - public UdfMetadata(String name, Method udfMethod, Config udfConfig, List arguments, + public UdfMetadata(String name, String description, Method udfMethod, Config udfConfig, List arguments, boolean disableArgCheck) { this.name = name; + this.description = description; this.udfMethod = udfMethod; this.udfConfig = udfConfig; this.arguments = arguments; @@ -64,6 +67,13 @@ public String getName() { return name; } + /** + * @return Returns the description of the udf. + */ + public String getDescription() { + return description; + } + /** * @return Returns the list of arguments that the udf should take. */ From c512c8679f6d3ffcd82ebb42f4778d733b7cdf47 Mon Sep 17 00:00:00 2001 From: Srinivasulu Punuru Date: Wed, 10 Jul 2019 15:34:21 -0700 Subject: [PATCH 3/3] Adding the returnType to the UdfMetadata --- .../samza/sql/impl/ConfigBasedUdfResolver.java | 2 +- .../org/apache/samza/sql/interfaces/UdfMetadata.java | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java index 1142438ff7..2b83b60888 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java @@ -106,7 +106,7 @@ public ConfigBasedUdfResolver(Properties config, Config udfConfig) { for (Map.Entry udfMethod : udfMethods.entrySet()) { List params = Arrays.asList(udfMethod.getKey().params()); udfs.add(new UdfMetadata(udfName, sqlUdf.description(), udfMethod.getValue(), udfConfig.subset(udfName + "."), params, - udfMethod.getKey().disableArgumentCheck())); + udfMethod.getKey().returns(), udfMethod.getKey().disableArgumentCheck())); } } } diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java index cd35a8c80e..9288ce7e63 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java @@ -39,16 +39,26 @@ public class UdfMetadata { private final boolean disableArgCheck; private final List arguments; + private final SamzaSqlFieldType returnType; + public UdfMetadata(String name, String description, Method udfMethod, Config udfConfig, List arguments, - boolean disableArgCheck) { + SamzaSqlFieldType returnType, boolean disableArgCheck) { this.name = name; this.description = description; this.udfMethod = udfMethod; this.udfConfig = udfConfig; this.arguments = arguments; + this.returnType = returnType; this.disableArgCheck = disableArgCheck; } + /** + * @return returns the returnType of the Samza SQL UDF. + */ + public SamzaSqlFieldType getReturnType() { + return returnType; + } + public Config getUdfConfig() { return udfConfig; }