Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

package org.apache.samza.sql.client.cli;

import java.util.stream.Collectors;
import org.apache.samza.sql.client.interfaces.*;
import org.apache.samza.sql.client.util.CliException;
import org.apache.samza.sql.client.util.CliUtil;
import org.apache.samza.sql.schema.SamzaSqlFieldType;
import org.apache.samza.sql.schema.SqlFieldSchema;
import org.apache.samza.sql.schema.SqlSchema;
import org.jline.reader.EndOfFileException;
import org.jline.reader.LineReader;
Expand Down Expand Up @@ -736,7 +739,7 @@ private List<String> formatSchema4Display(SqlSchema schema) {
for (int i = 0; i < rowCount; ++i) {
SqlSchema.SqlField sqlField = schema.getFields().get(i);
String field = sqlField.getFieldName();
String type = sqlField.getFieldSchema().getFieldType().toString();
String type = getFieldDisplayValue(sqlField.getFieldSchema());
int fieldLen = field.length();
int typeLen = type.length();
int fieldStartIdx = 0, typeStartIdx = 0;
Expand Down Expand Up @@ -777,6 +780,34 @@ private List<String> formatSchema4Display(SqlSchema schema) {
return lines;
}

private String getFieldDisplayValue(SqlFieldSchema fieldSchema) {
if (!isComplexField(fieldSchema.getFieldType())) {
return fieldSchema.getFieldType().toString();
}
SamzaSqlFieldType fieldType = fieldSchema.getFieldType();
switch (fieldType) {
case ARRAY:
return String.format("ARRAY(%s)", getFieldDisplayValue(fieldSchema.getElementSchema()));
case MAP:
return String.format("MAP(%s, %s)", SamzaSqlFieldType.STRING.toString(),
getFieldDisplayValue(fieldSchema.getValueScehma()));
case ROW:
String rowDisplayValue = fieldSchema.getRowSchema()
.getFields()
.stream()
.map(f -> getFieldDisplayValue(f.getFieldSchema()))
.collect(Collectors.joining(","));
return String.format("ROW(%s)", rowDisplayValue);
default:
throw new UnsupportedOperationException("Unknown field type " + fieldType);
}
}

private boolean isComplexField(SamzaSqlFieldType fieldtype) {
return fieldtype == SamzaSqlFieldType.ARRAY || fieldtype == SamzaSqlFieldType.MAP
|| fieldtype == SamzaSqlFieldType.ROW;
}

// Trims: leading spaces; trailing spaces and ";"s
private String trimCommand(String command) {
if (CliUtil.isNullOrEmpty(command))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ public ConfigBasedUdfResolver(Properties config, Config udfConfig) {
String udfName = sqlUdf.name();
for (Map.Entry<SamzaSqlUdfMethod, Method> udfMethod : udfMethods.entrySet()) {
List<SamzaSqlFieldType> params = Arrays.asList(udfMethod.getKey().params());
udfs.add(new UdfMetadata(udfName, udfMethod.getValue(), udfConfig.subset(udfName + "."), params,
udfMethod.getKey().disableArgumentCheck()));
udfs.add(new UdfMetadata(udfName, sqlUdf.description(), udfMethod.getValue(), udfConfig.subset(udfName + "."), params,
udfMethod.getKey().returns(), udfMethod.getKey().disableArgumentCheck()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,33 @@
public class UdfMetadata {

private final String name;

private final String description;
private final Method udfMethod;
private final Config udfConfig;
private final boolean disableArgCheck;
private final List<SamzaSqlFieldType> arguments;

public UdfMetadata(String name, Method udfMethod, Config udfConfig, List<SamzaSqlFieldType> arguments,
boolean disableArgCheck) {
private final SamzaSqlFieldType returnType;

public UdfMetadata(String name, String description, Method udfMethod, Config udfConfig, List<SamzaSqlFieldType> arguments,
SamzaSqlFieldType returnType, boolean disableArgCheck) {
this.name = name;
this.description = description;
this.udfMethod = udfMethod;
this.udfConfig = udfConfig;
this.arguments = arguments;
this.returnType = returnType;
this.disableArgCheck = disableArgCheck;
}

/**
* @return returns the returnType of the Samza SQL UDF.
*/
public SamzaSqlFieldType getReturnType() {
return returnType;
}

public Config getUdfConfig() {
return udfConfig;
}
Expand All @@ -64,6 +77,13 @@ public String getName() {
return name;
}

/**
* @return Returns the description of the udf.
*/
public String getDescription() {
return description;
}

/**
* @return Returns the list of arguments that the udf should take.
*/
Expand Down