From 97afd3fa902c5fa9919fb3f930531eca84642269 Mon Sep 17 00:00:00 2001 From: Aditya Toomula Date: Tue, 3 Sep 2019 15:23:18 -0700 Subject: [PATCH] SAMZA-2313: Adding validation for Samza Sql statements. (#1148) * Adding validation for Samza Sql statements. * Adding validation for Samza Sql statements. * Adding validation for Samza Sql statements. * Adding validation for Samza Sql statements. * Adding validation for Samza Sql statements. * Adding validation for Samza Sql statements. --- .../samza/sql/dsl/SamzaSqlDslConverter.java | 49 ++- .../samza/sql/planner/QueryPlanner.java | 98 +++--- .../samza/sql/planner/SamzaSqlValidator.java | 279 ++++++++++++++++++ .../planner/SamzaSqlValidatorException.java | 40 +++ .../sql/runner/SamzaSqlApplicationConfig.java | 4 + .../samza/sql/util/SamzaSqlQueryParser.java | 6 +- .../sql/planner/TestSamzaSqlValidator.java | 181 ++++++++++++ .../test/samzasql/TestSamzaSqlEndToEnd.java | 255 ++++++++++++---- .../samzasql/TestSamzaSqlRemoteTable.java | 67 ++++- 9 files changed, 853 insertions(+), 126 deletions(-) create mode 100644 samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java create mode 100644 samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidatorException.java create mode 100644 samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java diff --git a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java index ea0ebfa98b..b09d3d6321 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java @@ -52,31 +52,58 @@ public class SamzaSqlDslConverter implements DslConverter { public Collection convertDsl(String dsl) { // TODO: Introduce an API to parse a dsl string and return one or more sql statements List sqlStmts = fetchSqlFromConfig(config); - List queryInfo = fetchQueryInfo(sqlStmts); - SamzaSqlApplicationConfig sqlConfig = new SamzaSqlApplicationConfig(config, - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toList()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList())); - - QueryPlanner planner = - new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(), - sqlConfig.getUdfMetadata()); - + QueryPlanner planner = getQueryPlanner(getSqlConfig(sqlStmts, config)); List relRoots = new LinkedList<>(); for (String sql: sqlStmts) { // we always pass only select query to the planner for samza sql. The reason is that samza sql supports // schema evolution where source and destination could up to an extent have independent schema evolution while // calcite expects strict comformance of the destination schema with that of the fields in the select query. SamzaSqlQueryParser.QueryInfo qinfo = SamzaSqlQueryParser.parseQuery(sql); - relRoots.add(planner.plan(qinfo.getSelectQuery())); + RelRoot relRoot = planner.plan(qinfo.getSelectQuery()); + relRoots.add(relRoot); } return relRoots; } + /** + * Get {@link SamzaSqlApplicationConfig} given sql statements and samza config. + * @param sqlStmts List of sql statements + * @param config Samza config + * @return {@link SamzaSqlApplicationConfig} + */ + public static SamzaSqlApplicationConfig getSqlConfig(List sqlStmts, Config config) { + List queryInfo = fetchQueryInfo(sqlStmts); + return new SamzaSqlApplicationConfig(config, + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) + .collect(Collectors.toList()), + queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink) + .collect(Collectors.toList())); + } + + /** + * Get {@link QueryPlanner} given {@link SamzaSqlApplicationConfig} + * @param sqlConfig {@link SamzaSqlApplicationConfig} + * @return {@link QueryPlanner} + */ + public static QueryPlanner getQueryPlanner(SamzaSqlApplicationConfig sqlConfig) { + return new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(), + sqlConfig.getUdfMetadata()); + } + + /** + * Get list of {@link org.apache.samza.sql.util.SamzaSqlQueryParser.QueryInfo} given list of sql statements. + * @param sqlStmts list of sql statements + * @return list of {@link org.apache.samza.sql.util.SamzaSqlQueryParser.QueryInfo} + */ public static List fetchQueryInfo(List sqlStmts) { return sqlStmts.stream().map(SamzaSqlQueryParser::parseQuery).collect(Collectors.toList()); } + /** + * Get list of sql statements based on the property set in the config. + * @param config config + * @return list of Sql statements + */ public static List fetchSqlFromConfig(Map config) { List sql; if (config.containsKey(SamzaSqlApplicationConfig.CFG_SQL_STMT) && diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java index bbf17703db..bdf03f7e61 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java @@ -36,9 +36,6 @@ import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rel.type.RelDataTypeFieldImpl; -import org.apache.calcite.rel.type.RelRecordType; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Table; import org.apache.calcite.schema.impl.AbstractSchema; @@ -46,9 +43,7 @@ import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlOperatorTable; import org.apache.calcite.sql.parser.SqlParser; -import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.util.ChainedSqlOperatorTable; -import org.apache.calcite.sql.validate.SqlConformance; import org.apache.calcite.sql.validate.SqlConformanceEnum; import org.apache.calcite.sql2rel.SqlToRelConverter; import org.apache.calcite.tools.FrameworkConfig; @@ -87,49 +82,38 @@ public QueryPlanner(Map relSchemaProviders, this.udfMetadata = udfMetadata; } + private void registerSourceSchemas(SchemaPlus rootSchema) { + RelSchemaConverter relSchemaConverter = new RelSchemaConverter(); + + for (SqlIOConfig ssc : systemStreamConfigBySource.values()) { + SchemaPlus previousLevelSchema = rootSchema; + List sourceParts = ssc.getSourceParts(); + RelSchemaProvider relSchemaProvider = relSchemaProviders.get(ssc.getSource()); + + for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size(); sourcePartIndex++) { + String sourcePart = sourceParts.get(sourcePartIndex); + if (sourcePartIndex < sourceParts.size() - 1) { + SchemaPlus sourcePartSchema = previousLevelSchema.getSubSchema(sourcePart); + if (sourcePartSchema == null) { + sourcePartSchema = previousLevelSchema.add(sourcePart, new AbstractSchema()); + } + previousLevelSchema = sourcePartSchema; + } else { + // If the source part is the last one, then fetch the schema corresponding to the stream and register. + RelDataType relationalSchema = getSourceRelSchema(relSchemaProvider, relSchemaConverter); + previousLevelSchema.add(sourcePart, createTableFromRelSchema(relationalSchema)); + break; + } + } + } + } + public RelRoot plan(String query) { try { Connection connection = DriverManager.getConnection("jdbc:calcite:"); CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); SchemaPlus rootSchema = calciteConnection.getRootSchema(); - RelSchemaConverter relSchemaConverter = new RelSchemaConverter(); - - for (SqlIOConfig ssc : systemStreamConfigBySource.values()) { - SchemaPlus previousLevelSchema = rootSchema; - List sourceParts = ssc.getSourceParts(); - RelSchemaProvider relSchemaProvider = relSchemaProviders.get(ssc.getSource()); - - for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size(); sourcePartIndex++) { - String sourcePart = sourceParts.get(sourcePartIndex); - if (sourcePartIndex < sourceParts.size() - 1) { - SchemaPlus sourcePartSchema = previousLevelSchema.getSubSchema(sourcePart); - if (sourcePartSchema == null) { - sourcePartSchema = previousLevelSchema.add(sourcePart, new AbstractSchema()); - } - previousLevelSchema = sourcePartSchema; - } else { - // If the source part is the last one, then fetch the schema corresponding to the stream and register. - SqlSchema sqlSchema = relSchemaProvider.getSqlSchema(); - - List fieldNames = new ArrayList<>(); - List fieldTypes = new ArrayList<>(); - if (!sqlSchema.containsField(SamzaSqlRelMessage.KEY_NAME)) { - fieldNames.add(SamzaSqlRelMessage.KEY_NAME); - fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY)); - } - - fieldNames.addAll( - sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldName).collect(Collectors.toList())); - fieldTypes.addAll( - sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldSchema).collect(Collectors.toList())); - - SqlSchema newSchema = new SqlSchema(fieldNames, fieldTypes); - RelDataType relationalSchema = relSchemaConverter.convertToRelSchema(newSchema); - previousLevelSchema.add(sourcePart, createTableFromRelSchema(relationalSchema)); - break; - } - } - } + registerSourceSchemas(rootSchema); List samzaSqlFunctions = udfMetadata.stream() .map(x -> new SamzaSqlScalarFunctionImpl(x)) @@ -162,12 +146,34 @@ public RelRoot plan(String query) { LOG.info("query plan:\n" + RelOptUtil.toString(relRoot.rel)); return relRoot; } catch (Exception e) { - LOG.error("Query planner failed with exception.", e); - throw new SamzaException(e); + String errorMsg = SamzaSqlValidator.formatErrorString(query, e); + LOG.error(errorMsg, e); + throw new SamzaException(errorMsg, e); + } + } + + public static RelDataType getSourceRelSchema(RelSchemaProvider relSchemaProvider, + RelSchemaConverter relSchemaConverter) { + // If the source part is the last one, then fetch the schema corresponding to the stream and register. + SqlSchema sqlSchema = relSchemaProvider.getSqlSchema(); + + List fieldNames = new ArrayList<>(); + List fieldTypes = new ArrayList<>(); + if (!sqlSchema.containsField(SamzaSqlRelMessage.KEY_NAME)) { + fieldNames.add(SamzaSqlRelMessage.KEY_NAME); + fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY)); } + + fieldNames.addAll( + sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldName).collect(Collectors.toList())); + fieldTypes.addAll( + sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldSchema).collect(Collectors.toList())); + + SqlSchema newSchema = new SqlSchema(fieldNames, fieldTypes); + return relSchemaConverter.convertToRelSchema(newSchema); } - private Table createTableFromRelSchema(RelDataType relationalSchema) { + private static Table createTableFromRelSchema(RelDataType relationalSchema) { return new AbstractTable() { public RelDataType getRowType(RelDataTypeFactory typeFactory) { return relationalSchema; diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java new file mode 100644 index 0000000000..08d4497403 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java @@ -0,0 +1,279 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.planner; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Scanner; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.apache.samza.sql.dsl.SamzaSqlDslConverter; +import org.apache.samza.sql.interfaces.RelSchemaProvider; +import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl; +import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; +import org.apache.samza.sql.util.SamzaSqlQueryParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * SamzaSqlValidator that uses calcite engine to convert the sql query to relational graph and validates the query + * including the output. + */ +public class SamzaSqlValidator { + private static final Logger LOG = LoggerFactory.getLogger(SamzaSqlValidator.class); + + private final Config config; + + public SamzaSqlValidator(Config config) { + this.config = config; + } + + /** + * Validate a list of sql statements + * @param sqlStmts list of sql statements + * @throws SamzaSqlValidatorException + */ + public void validate(List sqlStmts) throws SamzaSqlValidatorException { + SamzaSqlApplicationConfig sqlConfig = SamzaSqlDslConverter.getSqlConfig(sqlStmts, config); + QueryPlanner planner = SamzaSqlDslConverter.getQueryPlanner(sqlConfig); + + for (String sql: sqlStmts) { + // we always pass only select query to the planner for samza sql. The reason is that samza sql supports + // schema evolution where source and destination could up to an extent have independent schema evolution while + // calcite expects strict conformance of the destination schema with that of the fields in the select query. + SamzaSqlQueryParser.QueryInfo qinfo = SamzaSqlQueryParser.parseQuery(sql); + RelRoot relRoot; + try { + relRoot = planner.plan(qinfo.getSelectQuery()); + } catch (SamzaException e) { + throw new SamzaSqlValidatorException("Calcite planning for sql failed.", e); + } + + // Now that we have logical plan, validate different aspects. + validate(relRoot, qinfo, sqlConfig); + } + } + + protected void validate(RelRoot relRoot, SamzaSqlQueryParser.QueryInfo qinfo, SamzaSqlApplicationConfig sqlConfig) + throws SamzaSqlValidatorException { + // Validate select fields (including Udf return types) with output schema + validateOutput(relRoot, sqlConfig.getRelSchemaProviders().get(qinfo.getSink())); + + // TODO: + // 1. SAMZA-2314: Validate Udf arguments. + // 2. SAMZA-2315: Validate operators. These are the operators that are supported by Calcite but not by Samza Sql. + // Eg: LogicalAggregate with sum function is not supported by Samza Sql. + } + + protected void validateOutput(RelRoot relRoot, RelSchemaProvider relSchemaProvider) throws SamzaSqlValidatorException { + RelRecordType outputRecord = (RelRecordType) QueryPlanner.getSourceRelSchema(relSchemaProvider, + new RelSchemaConverter()); + LogicalProject project = (LogicalProject) relRoot.rel; + RelRecordType projetRecord = (RelRecordType) project.getRowType(); + validateOutputRecords(outputRecord, projetRecord); + } + + protected void validateOutputRecords(RelRecordType outputRecord, RelRecordType projectRecord) + throws SamzaSqlValidatorException { + Map outputRecordMap = outputRecord.getFieldList().stream().collect( + Collectors.toMap(RelDataTypeField::getName, RelDataTypeField::getType)); + Map projectRecordMap = projectRecord.getFieldList().stream().collect( + Collectors.toMap(RelDataTypeField::getName, RelDataTypeField::getType)); + + // There could be default values for the output schema and hence fields in project schema could be a subset of + // fields in output schema. + // TODO: SAMZA-2316: Validate that all non-default value fields in output schema are set in the projected fields. + for (Map.Entry entry : projectRecordMap.entrySet()) { + RelDataType outputFieldType = outputRecordMap.get(entry.getKey()); + if (outputFieldType == null) { + if (entry.getKey().equals(SamzaSqlRelMessage.OP_NAME)) { + continue; + } + String errMsg = String.format("Field '%s' in select query does not match any field in output schema.", + entry.getKey()); + LOG.error(errMsg); + throw new SamzaSqlValidatorException(errMsg); + } else if (!compareFieldTypes(outputFieldType, entry.getValue())) { + String errMsg = String.format("Field '%s' with type '%s' in select query does not match the field type '%s' in" + + " output schema.", entry.getKey(), entry.getValue(), outputFieldType); + LOG.error(errMsg); + throw new SamzaSqlValidatorException(errMsg); + } + } + } + + protected boolean compareFieldTypes(RelDataType outputFieldType, RelDataType selectQueryFieldType) { + RelDataType projectFieldType; + + // JavaTypes are relevant for Udf argument and return types + // TODO: Support UDF argument validation. Currently, only return types are validated and argument types are + // validated during run-time. + if (selectQueryFieldType instanceof RelDataTypeFactoryImpl.JavaType) { + projectFieldType = new SamzaSqlJavaTypeFactoryImpl().toSql(selectQueryFieldType); + } else { + projectFieldType = selectQueryFieldType; + } + + SqlTypeName outputSqlType = outputFieldType.getSqlTypeName(); + SqlTypeName projectSqlType = projectFieldType.getSqlTypeName(); + + if (projectSqlType == SqlTypeName.ANY || outputSqlType == SqlTypeName.ANY) { + return true; + } else if (outputSqlType != SqlTypeName.ROW && outputSqlType == projectSqlType) { + return true; + } + + switch (outputSqlType) { + case CHAR: + return projectSqlType == SqlTypeName.VARCHAR; + case VARCHAR: + return projectSqlType == SqlTypeName.CHAR; + case BIGINT: + return projectSqlType == SqlTypeName.INTEGER; + case INTEGER: + return projectSqlType == SqlTypeName.BIGINT; + case FLOAT: + return projectSqlType == SqlTypeName.DOUBLE; + case DOUBLE: + return projectSqlType == SqlTypeName.FLOAT; + case ROW: + try { + validateOutputRecords((RelRecordType) outputFieldType, (RelRecordType) projectFieldType); + } catch (SamzaSqlValidatorException e) { + LOG.error("A field in select query does not match with the output schema.", e); + return false; + } + return true; + default: + return false; + } + } + + // -- All Static Methods below -- + + /** + * Format the Calcite exception to a more readable form. + * + * As an example, consider the below sql query which fails calcite validation due to a non existing field : + * "Insert into testavro.outputTopic(id) select non_existing_name, name as string_value" + * + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1" + * + * This function takes in the above multi-line sql query and the below sample exception as input: + * "org.apache.calcite.runtime.CalciteContextException: From line 1, column 8 to line 1, column 26: Column + * 'non_existing_name' not found in any table" + * + * And returns the following string: + * 2019-08-30 09:05:08 ERROR QueryPlanner:174 - Failed with exception for the following sql statement: + * + * Sql syntax error: + * + * SELECT `non_existing_name`, `name` AS `string_value` + * -------^^^^^^^^^^^^^^^^^^^-------------------------- + * FROM `testavro`.`level1`.`level2`.`SIMPLE1` AS `s` + * WHERE `s`.`id` = 1 + * + * @param query sql query + * @param e Exception returned by Calcite + * @return formatted error string + */ + public static String formatErrorString(String query, Exception e) { + Pattern pattern = Pattern.compile("line [0-9]+, column [0-9]+"); + Matcher matcher = pattern.matcher(e.getMessage()); + String[] queryLines = query.split("\\n"); + StringBuilder result = new StringBuilder(); + int startColIdx, endColIdx, startLineIdx, endLineIdx; + + try { + if (matcher.find()) { + String match = matcher.group(); + LOG.info(match); + startLineIdx = getIdxFromString(match, "line "); + startColIdx = getIdxFromString(match, "column "); + if (matcher.find()) { + match = matcher.group(); + LOG.info(match); + endLineIdx = getIdxFromString(match, "line "); + endColIdx = getIdxFromString(match, "column "); + } else { + endColIdx = startColIdx; + endLineIdx = startLineIdx; + } + int lineLen = endLineIdx - startLineIdx; + int colLen = endColIdx - startColIdx + 1; + + // Error spanning across multiple lines is not supported yet. + if (lineLen > 0) { + throw new SamzaException("lineLen formatting validation error: error cannot span across multiple lines."); + } + + int lineIdx = 0; + for (String line : queryLines) { + result.append(line) + .append("\n"); + if (lineIdx == startLineIdx) { + String lineStr = getStringWithRepeatedChars('-', line.length() - 1); + String pointerStr = getStringWithRepeatedChars('^', colLen); + String errorMarkerStr = + new StringBuilder(lineStr).replace(startColIdx, endColIdx, pointerStr).toString(); + result.append(errorMarkerStr) + .append("\n"); + } + lineIdx++; + } + } + + String[] errorMsgParts = e.getMessage().split("Exception:"); + result.append("\n") + .append(errorMsgParts[errorMsgParts.length - 1].trim()); + return String.format("Sql syntax error:\n\n%s\n", + result); + } catch (Exception ex) { + // Ignore any formatting errors. + LOG.error("Formatting error (Not the actual error. Look for the logs for actual error)", ex); + return String.format("Failed with formatting exception (not the actual error) for the following sql" + + " statement:\n\"%s\"\n\n%s", query, e.getMessage()); + } + } + + private static int getIdxFromString(String inputString, String delimiterStr) { + String[] splitStr = inputString.split(delimiterStr); + Scanner in = new Scanner(splitStr[1]).useDelimiter("[^0-9]+"); + return in.nextInt() - 1; + } + + private static String getStringWithRepeatedChars(char ch, int len) { + char[] chars = new char[len]; + Arrays.fill(chars, ch); + return new String(chars); + } +} diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidatorException.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidatorException.java new file mode 100644 index 0000000000..812508fece --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidatorException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.planner; + +/** + * Checked Exception thrown while validating SQL statement. + */ +public class SamzaSqlValidatorException extends Exception { + public SamzaSqlValidatorException() { + } + + public SamzaSqlValidatorException(String message) { + super(message); + } + + public SamzaSqlValidatorException(String message, Throwable cause) { + super(message, cause); + } + + public SamzaSqlValidatorException(Throwable cause) { + super(cause); + } +} diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java index 07aa6bb71f..c49438237c 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java @@ -308,6 +308,10 @@ public Map getOutputSystemStreamConfigsBySource() { return outputSystemStreamConfigsBySource; } + public SqlIOConfig getOutputSqlIOConfig(String source) { + return outputSystemStreamConfigsBySource.get(source); + } + public Map getSamzaRelConverters() { return samzaRelConvertersBySource; } diff --git a/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java index d2ed991dab..630d3f38ee 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java @@ -53,6 +53,8 @@ import org.apache.samza.SamzaException; import org.apache.samza.sql.interfaces.SamzaSqlDriver; import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl; +import org.apache.samza.sql.planner.QueryPlanner; +import org.apache.samza.sql.planner.SamzaSqlValidator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,7 +104,9 @@ public static QueryInfo parseQuery(String sql) { try { sqlNode = planner.parse(sql); } catch (SqlParseException e) { - throw new SamzaException(e); + String errorMsg = SamzaSqlValidator.formatErrorString(sql, e); + LOG.error(errorMsg, e); + throw new SamzaException(errorMsg, e); } String sink; diff --git a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java new file mode 100644 index 0000000000..b2ce6f6f4a --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestSamzaSqlValidator.java @@ -0,0 +1,181 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.planner; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.calcite.rel.RelRoot; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.sql.dsl.SamzaSqlDslConverter; +import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; +import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; +import org.apache.samza.sql.util.SamzaSqlQueryParser; +import org.apache.samza.sql.util.SamzaSqlTestConfig; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.*; + + +public class TestSamzaSqlValidator { + + private final Map configs = new HashMap<>(); + private static final Logger LOG = LoggerFactory.getLogger(TestSamzaSqlValidator.class); + + @Before + public void setUp() { + configs.put("job.default.system", "kafka"); + } + + @Test + public void testBasicValidation() throws SamzaSqlValidatorException { + Map config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1); + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, + "Insert into testavro.outputTopic(id) select id, name as string_value" + + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1"); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + + List sqlStmts = fetchSqlFromConfig(config); + new SamzaSqlValidator(samzaConfig).validate(sqlStmts); + } + + @Test (expected = SamzaSqlValidatorException.class) + public void testNonExistingOutputField() throws SamzaSqlValidatorException { + Map config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1); + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, + "Insert into testavro.outputTopic(id) select id, name as strings_value" + + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1"); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + + List sqlStmts = fetchSqlFromConfig(config); + new SamzaSqlValidator(samzaConfig).validate(sqlStmts); + } + + @Test(expected = SamzaException.class) + public void testNonExistingSelectField() throws SamzaSqlValidatorException { + Map config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1); + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, + "Insert into testavro.outputTopic(id) select non_existing_field, name as string_value" + + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1"); + SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + } + + @Test(expected = SamzaSqlValidatorException.class) + public void testCalciteErrorString() throws SamzaSqlValidatorException { + Map config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1); + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, + "Insert into testavro.outputTopic(id) select non_existing_field, name as string_value" + + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1"); + + try { + SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + } catch (SamzaException e) { + Assert.assertTrue(e.getMessage().contains("line 1, column 8 to line 1, column 27: Column 'non_existing_field' not found")); + throw new SamzaSqlValidatorException("Calcite planning for sql failed.", e); + } + } + + @Test (expected = SamzaException.class) + public void testNonExistingUdf() throws SamzaSqlValidatorException { + Map config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1); + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, + "Insert into testavro.outputTopic(id) select NonExistingUdf(name) as string_value" + + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1"); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + + List sqlStmts = fetchSqlFromConfig(config); + new SamzaSqlValidator(samzaConfig).validate(sqlStmts); + } + + @Test (expected = SamzaSqlValidatorException.class) + public void testSelectAndOutputValidationFailure() throws SamzaSqlValidatorException { + Map config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1); + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, + "Insert into testavro.outputTopic(id) select name as long_value" + + " from testavro.level1.level2.SIMPLE1 as s where s.id = 1"); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + + List sqlStmts = fetchSqlFromConfig(config); + new SamzaSqlValidator(samzaConfig).validate(sqlStmts); + } + + @Test (expected = SamzaException.class) + public void testValidationStreamTableLeftJoinWithWhere() throws SamzaSqlValidatorException { + Map config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic(profileName, pageKey) select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv left join testavro.PROFILE.`$table` as p where p.id = pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + + List sqlStmts = fetchSqlFromConfig(config); + new SamzaSqlValidator(samzaConfig).validate(sqlStmts); + } + + @Test (expected = SamzaException.class) + public void testUnsupportedOperator() throws SamzaSqlValidatorException { + Map config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.pageViewCountTopic(jobName, pageKey, `count`)" + + " select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`" + + " from testavro.PAGEVIEW as pv" + + " where pv.pageKey = 'job' or pv.pageKey = 'inbox'" + + " group bys (pv.pageKey)"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + + List sqlStmts = fetchSqlFromConfig(config); + new SamzaSqlValidator(samzaConfig).validate(sqlStmts); + } + + @Test + public void testFormatErrorString() { + String sql = + "select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`\n" + + "from testavro.PAGEVIEW as pv\n" + + "where pv.pageKey = 'job' or pv.pageKey = 'inbox'\n" + + "group bys (pv.pageKey)"; + String errorStr = + "org.apache.calcite.tools.ValidationException: org.apache.calcite.runtime.CalciteContextException: " + + "From line 3, column 7 to line 3, column 16: Column 'pv.pageKey' not found in any table"; + String formattedErrStr = SamzaSqlValidator.formatErrorString(sql, new Exception(errorStr)); + LOG.info(formattedErrStr); + } + + @Test + public void testExceptionInFormatErrorString() { + String sql = + "select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`\n" + + "from testavro.PAGEVIEW as pv\n" + + "where pv.pageKey = 'job' or pv.pageKey = 'inbox'\n" + + "group bys (pv.pageKey)"; + String errorStr = + "org.apache.calcite.tools.ValidationException: org.apache.calcite.runtime.CalciteContextException: " + + "From line 3, column 7 to line 3, column 16: Column 'pv.pageKey' not found in any table"; + String formattedErrStr = SamzaSqlValidator.formatErrorString(sql, new Exception(errorStr)); + LOG.info(formattedErrStr); + } +} diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java index ac84fe8e24..d81cb3c04c 100644 --- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java +++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java @@ -32,7 +32,10 @@ import java.util.stream.IntStream; import org.apache.avro.generic.GenericRecord; import org.apache.calcite.plan.RelOptUtil; +import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; +import org.apache.samza.sql.planner.SamzaSqlValidator; +import org.apache.samza.sql.planner.SamzaSqlValidatorException; import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; import org.apache.samza.sql.system.TestAvroSystemFactory; import org.apache.samza.sql.util.JsonUtil; @@ -51,7 +54,7 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness { private static final Logger LOG = LoggerFactory.getLogger(TestSamzaSqlEndToEnd.class); @Test - public void testEndToEnd() { + public void testEndToEnd() throws SamzaSqlValidatorException { int numMessages = 20; TestAvroSystemFactory.messages.clear(); @@ -59,7 +62,11 @@ public void testEndToEnd() { String sql = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1"; List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) @@ -70,7 +77,7 @@ public void testEndToEnd() { } @Test - public void testEndToEndWithSystemMessages() { + public void testEndToEndWithSystemMessages() throws SamzaSqlValidatorException { int numMessages = 20; TestAvroSystemFactory.messages.clear(); @@ -82,7 +89,11 @@ public void testEndToEndWithSystemMessages() { String sql = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1"; List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) @@ -92,7 +103,7 @@ public void testEndToEndWithSystemMessages() { } @Test - public void testEndToEndDisableSystemMessages() { + public void testEndToEndDisableSystemMessages() throws SamzaSqlValidatorException { int numMessages = 20; TestAvroSystemFactory.messages.clear(); @@ -105,7 +116,11 @@ public void testEndToEndDisableSystemMessages() { List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_PROCESS_SYSTEM_EVENTS, "false"); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) @@ -115,7 +130,7 @@ public void testEndToEndDisableSystemMessages() { } @Test - public void testEndToEndWithNullRecords() { + public void testEndToEndWithNullRecords() throws SamzaSqlValidatorException { int numMessages = 20; TestAvroSystemFactory.messages.clear(); @@ -124,7 +139,11 @@ public void testEndToEndWithNullRecords() { String sql = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1"; List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> x.getMessage() == null || ((GenericRecord) x.getMessage()).get("id") == null ? null @@ -141,7 +160,7 @@ public void testEndToEndWithNullRecords() { } @Test - public void testEndToEndWithDifferentSystemSameStream() { + public void testEndToEndWithDifferentSystemSameStream() throws SamzaSqlValidatorException { int numMessages = 20; TestAvroSystemFactory.messages.clear(); @@ -149,7 +168,11 @@ public void testEndToEndWithDifferentSystemSameStream() { String sql = "Insert into testavro2.SIMPLE1 select * from testavro.SIMPLE1"; List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) @@ -160,7 +183,7 @@ public void testEndToEndWithDifferentSystemSameStream() { } @Test - public void testEndToEndMultiSqlStmts() { + public void testEndToEndMultiSqlStmts() throws SamzaSqlValidatorException { int numMessages = 20; TestAvroSystemFactory.messages.clear(); Map staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); @@ -168,7 +191,12 @@ public void testEndToEndMultiSqlStmts() { String sql2 = "Insert into testavro.SIMPLE3 select * from testavro.SIMPLE2"; List sqlStmts = Arrays.asList(sql1, sql2); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); + List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) .sorted() @@ -180,7 +208,7 @@ public void testEndToEndMultiSqlStmts() { } @Test - public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput() { + public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput() throws SamzaSqlValidatorException { int numMessages = 20; TestAvroSystemFactory.messages.clear(); Map staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); @@ -189,7 +217,12 @@ public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput() { List sqlStmts = Arrays.asList(sql1, sql2); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); + List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) .sorted() @@ -201,7 +234,7 @@ public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput() { } @Test - public void testEndToEndFanIn() { + public void testEndToEndFanIn() throws SamzaSqlValidatorException { int numMessages = 20; TestAvroSystemFactory.messages.clear(); Map staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); @@ -209,7 +242,12 @@ public void testEndToEndFanIn() { String sql2 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1"; List sqlStmts = Arrays.asList(sql1, sql2); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); + List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) .sorted() @@ -221,7 +259,7 @@ public void testEndToEndFanIn() { } @Test - public void testEndToEndFanOut() { + public void testEndToEndFanOut() throws SamzaSqlValidatorException { int numMessages = 20; TestAvroSystemFactory.messages.clear(); Map staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); @@ -229,7 +267,12 @@ public void testEndToEndFanOut() { String sql2 = "Insert into testavro.SIMPLE3 select * from testavro.SIMPLE1"; List sqlStmts = Arrays.asList(sql1, sql2); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); + List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) .sorted() @@ -250,7 +293,11 @@ public void testEndToEndWithProjection() throws Exception { + " select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP(), LOCALTIMESTAMP()) + MONTH(CURRENT_DATE()) as long_value from testavro.SIMPLE1"; List sqlStmts = Arrays.asList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) @@ -270,14 +317,18 @@ public void testEndToEndWithBooleanCheck() throws Exception { + " select * from testavro.COMPLEX1 where bool_value IS TRUE"; List sqlStmts = Arrays.asList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = new ArrayList<>(TestAvroSystemFactory.messages); Assert.assertEquals(numMessages / 2, outMessages.size()); } @Test - public void testEndToEndCompoundBooleanCheck() { + public void testEndToEndCompoundBooleanCheck() throws SamzaSqlValidatorException { int numMessages = 20; @@ -287,14 +338,18 @@ public void testEndToEndCompoundBooleanCheck() { + " select * from testavro.COMPLEX1 where id >= 0 and bool_value IS TRUE"; List sqlStmts = Arrays.asList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = new ArrayList<>(TestAvroSystemFactory.messages); Assert.assertEquals(numMessages / 2, outMessages.size()); } @Test - public void testEndToEndCompoundBooleanCheckWorkaround() { + public void testEndToEndCompoundBooleanCheckWorkaround() throws SamzaSqlValidatorException { int numMessages = 20; @@ -305,7 +360,11 @@ public void testEndToEndCompoundBooleanCheckWorkaround() { + " select * from testavro.COMPLEX1 where id >= 0 and CAST(bool_value AS VARCHAR) = 'TRUE'"; List sqlStmts = Arrays.asList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = new ArrayList<>(TestAvroSystemFactory.messages); @@ -322,7 +381,11 @@ public void testEndToEndWithProjectionWithCase() throws Exception { + " select id, NOT(id = 5) as bool_value, CASE WHEN id IN (5, 6, 7) THEN CAST('foo' AS VARCHAR) WHEN id < 5 THEN CAST('bars' AS VARCHAR) ELSE NULL END as string_value from testavro.SIMPLE1"; List sqlStmts = Arrays.asList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) @@ -342,7 +405,11 @@ public void testEndToEndWithLike() throws Exception { + " select id, name as string_value from testavro.SIMPLE1 where name like 'Name%'"; List sqlStmts = Arrays.asList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) @@ -365,7 +432,11 @@ public void testEndToEndFlatten() throws Exception { + " from testavro.COMPLEX1"; List sqlStmts = Collections.singletonList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = new ArrayList<>(TestAvroSystemFactory.messages); @@ -379,18 +450,22 @@ public void testEndToEndFlatten() throws Exception { @Test - public void testEndToEndComplexRecord() { + public void testEndToEndComplexRecord() throws SamzaSqlValidatorException { int numMessages = 10; TestAvroSystemFactory.messages.clear(); Map staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); String sql1 = "Insert into testavro.outputTopic" - + " select map_values['key0'] as string_value, union_value, array_values[0] as string_value, map_values, id, bytes_value, fixed_value, float_value " - + " from testavro.COMPLEX1"; + + " select map_values['key0'] as string_value, union_value, array_values, map_values, id, bytes_value," + + " fixed_value, float_value from testavro.COMPLEX1"; List sqlStmts = Collections.singletonList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = new ArrayList<>(TestAvroSystemFactory.messages); @@ -399,7 +474,7 @@ public void testEndToEndComplexRecord() { @Ignore @Test - public void testEndToEndNestedRecord() { + public void testEndToEndNestedRecord() throws SamzaSqlValidatorException { int numMessages = 10; TestAvroSystemFactory.messages.clear(); Map staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); @@ -410,7 +485,11 @@ public void testEndToEndNestedRecord() { + " from testavro.PROFILE as p"; List sqlStmts = Collections.singletonList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = new ArrayList<>(TestAvroSystemFactory.messages); @@ -426,7 +505,11 @@ public void testEndToEndFlattenWithUdf() throws Exception { "Insert into testavro.outputTopic(id) select Flatten(MyTestArray(id)) as id from testavro.SIMPLE1"; List sqlStmts = Collections.singletonList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = new ArrayList<>(TestAvroSystemFactory.messages); @@ -447,7 +530,11 @@ public void testEndToEndSubQuery() throws Exception { "Insert into testavro.outputTopic(id) select Flatten(a) as id from (select MyTestArray(id) a from testavro.SIMPLE1)"; List sqlStmts = Collections.singletonList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = new ArrayList<>(TestAvroSystemFactory.messages); @@ -460,7 +547,7 @@ public void testEndToEndSubQuery() throws Exception { } @Test - public void testUdfUnTypedArgumentToTypedUdf() { + public void testUdfUnTypedArgumentToTypedUdf() throws SamzaSqlValidatorException { int numMessages = 20; TestAvroSystemFactory.messages.clear(); Map staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); @@ -468,7 +555,11 @@ public void testUdfUnTypedArgumentToTypedUdf() { + "select id, MyTest(MyTestObj(id)) as long_value from testavro.SIMPLE1"; List sqlStmts = Collections.singletonList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); LOG.info("output Messages " + TestAvroSystemFactory.messages); @@ -488,7 +579,11 @@ public void testEndToEndUdf() throws Exception { + "select id, MYTest(id) as long_value from testavro.SIMPLE1"; List sqlStmts = Collections.singletonList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); LOG.info("output Messages " + TestAvroSystemFactory.messages); @@ -532,7 +627,11 @@ public void testEndToEndUdfPolymorphism() throws Exception { + "select MyTestPoly(id) as long_value, MyTestPoly(name) as id from testavro.SIMPLE1"; List sqlStmts = Collections.singletonList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); LOG.info("output Messages " + TestAvroSystemFactory.messages); @@ -559,7 +658,11 @@ public void testRegexMatchUdfInWhereClause() throws Exception { + "where RegexMatch('.*4', name)"; List sqlStmts = Collections.singletonList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); LOG.info("output Messages " + TestAvroSystemFactory.messages); // There should be two messages that contain "4" @@ -582,7 +685,11 @@ public void testEndToEndStreamTableInnerJoin() throws Exception { List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -610,7 +717,11 @@ public void testEndToEndStreamTableInnerJoinWithPrimaryKey() throws Exception { List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -638,7 +749,11 @@ public void testEndToEndStreamTableInnerJoinWithUdf() throws Exception { List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -666,7 +781,11 @@ public void testEndToEndStreamTableInnerJoinWithNestedRecord() throws Exception List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> { @@ -700,7 +819,11 @@ public void testEndToEndStreamTableInnerJoinWithFilter() throws Exception { List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -733,7 +856,11 @@ public void testEndToEndStreamTableInnerJoinWithNullForeignKeys() throws Excepti List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -763,7 +890,11 @@ public void testEndToEndStreamTableLeftJoin() throws Exception { List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -793,7 +924,11 @@ public void testEndToEndStreamTableRightJoin() throws Exception { List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -825,7 +960,11 @@ public void testEndToEndStreamTableTableJoin() throws Exception { List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -855,7 +994,11 @@ public void testEndToEndStreamTableTableJoinWithPrimaryKeys() throws Exception { List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -885,7 +1028,11 @@ public void testEndToEndStreamTableTableJoinWithCompositeKey() throws Exception List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -918,7 +1065,11 @@ public void testEndToEndGroupBy() throws Exception { List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); // Let's capture the list of windows/counts per key. HashMap> pageKeyCountListMap = new HashMap<>(); diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java index c2219e85ef..7eafd48624 100644 --- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java @@ -25,7 +25,10 @@ import java.util.Map; import java.util.stream.Collectors; import org.apache.avro.generic.GenericRecord; +import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; +import org.apache.samza.sql.planner.SamzaSqlValidator; +import org.apache.samza.sql.planner.SamzaSqlValidatorException; import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; import org.apache.samza.sql.system.TestAvroSystemFactory; import org.apache.samza.sql.util.JsonUtil; @@ -38,7 +41,7 @@ public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness { @Test - public void testSinkEndToEndWithKey() { + public void testSinkEndToEndWithKey() throws SamzaSqlValidatorException { int numMessages = 20; RemoteStoreIOResolverTestFactory.records.clear(); @@ -48,14 +51,18 @@ public void testSinkEndToEndWithKey() { String sql = "Insert into testRemoteStore.testTable.`$table` select __key__, id, name from testavro.SIMPLE1"; List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); Assert.assertEquals(numMessages, RemoteStoreIOResolverTestFactory.records.size()); } @Test @Ignore("Disabled due to flakiness related to data generation; Refer Pull Request #905 for details") - public void testSinkEndToEndWithKeyWithNullRecords() { + public void testSinkEndToEndWithKeyWithNullRecords() throws SamzaSqlValidatorException { int numMessages = 20; RemoteStoreIOResolverTestFactory.records.clear(); @@ -68,13 +75,17 @@ public void testSinkEndToEndWithKeyWithNullRecords() { List sqlStmts = Arrays.asList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); Assert.assertEquals(numMessages, RemoteStoreIOResolverTestFactory.records.size()); } @Test (expected = AssertionError.class) - public void testSinkEndToEndWithoutKey() { + public void testSinkEndToEndWithoutKey() throws SamzaSqlValidatorException { int numMessages = 20; RemoteStoreIOResolverTestFactory.records.clear(); @@ -83,13 +94,17 @@ public void testSinkEndToEndWithoutKey() { String sql = "Insert into testRemoteStore.testTable.`$table`(id,name) select id, name from testavro.SIMPLE1"; List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); Assert.assertEquals(numMessages, RemoteStoreIOResolverTestFactory.records.size()); } @Test - public void testSourceEndToEndWithKey() { + public void testSourceEndToEndWithKey() throws SamzaSqlValidatorException { int numMessages = 20; TestAvroSystemFactory.messages.clear(); @@ -107,7 +122,11 @@ public void testSourceEndToEndWithKey() { List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -120,7 +139,7 @@ public void testSourceEndToEndWithKey() { } @Test - public void testSourceEndToEndWithKeyAndUdf() { + public void testSourceEndToEndWithKeyAndUdf() throws SamzaSqlValidatorException { int numMessages = 20; TestAvroSystemFactory.messages.clear(); @@ -138,7 +157,11 @@ public void testSourceEndToEndWithKeyAndUdf() { List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -151,7 +174,7 @@ public void testSourceEndToEndWithKeyAndUdf() { } @Test - public void testSourceEndToEndWithKeyWithNullForeignKeys() { + public void testSourceEndToEndWithKeyWithNullForeignKeys() throws SamzaSqlValidatorException { int numMessages = 20; TestAvroSystemFactory.messages.clear(); @@ -170,7 +193,11 @@ public void testSourceEndToEndWithKeyWithNullForeignKeys() { List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -183,7 +210,7 @@ public void testSourceEndToEndWithKeyWithNullForeignKeys() { } @Test - public void testSourceEndToEndWithKeyWithNullForeignKeysRightOuterJoin() { + public void testSourceEndToEndWithKeyWithNullForeignKeysRightOuterJoin() throws SamzaSqlValidatorException { int numMessages = 20; TestAvroSystemFactory.messages.clear(); @@ -202,7 +229,11 @@ public void testSourceEndToEndWithKeyWithNullForeignKeysRightOuterJoin() { List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); List outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -215,7 +246,7 @@ public void testSourceEndToEndWithKeyWithNullForeignKeysRightOuterJoin() { } @Test - public void testSameJoinTargetSinkEndToEndRightOuterJoin() { + public void testSameJoinTargetSinkEndToEndRightOuterJoin() throws SamzaSqlValidatorException { int numMessages = 21; TestAvroSystemFactory.messages.clear(); @@ -237,7 +268,11 @@ public void testSameJoinTargetSinkEndToEndRightOuterJoin() { List sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - runApplication(new MapConfig(staticConfigs)); + + Config config = new MapConfig(staticConfigs); + new SamzaSqlValidator(config).validate(sqlStmts); + + runApplication(config); Assert.assertEquals((numMessages + 1) / 2, RemoteStoreIOResolverTestFactory.records.size()); }