Skip to content

Commit

Permalink
SAMZA-2313: Adding validation for Samza Sql statements. (#1148)
Browse files Browse the repository at this point in the history
* Adding validation for Samza Sql statements.

* Adding validation for Samza Sql statements.

* Adding validation for Samza Sql statements.

* Adding validation for Samza Sql statements.

* Adding validation for Samza Sql statements.

* Adding validation for Samza Sql statements.
  • Loading branch information
atoomula committed Sep 3, 2019
1 parent e0b5a32 commit 97afd3f
Show file tree
Hide file tree
Showing 9 changed files with 853 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,31 +52,58 @@ public class SamzaSqlDslConverter implements DslConverter {
public Collection<RelRoot> convertDsl(String dsl) {
// TODO: Introduce an API to parse a dsl string and return one or more sql statements
List<String> sqlStmts = fetchSqlFromConfig(config);
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig sqlConfig = new SamzaSqlApplicationConfig(config,
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
.collect(Collectors.toList()),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));

QueryPlanner planner =
new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
sqlConfig.getUdfMetadata());

QueryPlanner planner = getQueryPlanner(getSqlConfig(sqlStmts, config));
List<RelRoot> relRoots = new LinkedList<>();
for (String sql: sqlStmts) {
// we always pass only select query to the planner for samza sql. The reason is that samza sql supports
// schema evolution where source and destination could up to an extent have independent schema evolution while
// calcite expects strict comformance of the destination schema with that of the fields in the select query.
SamzaSqlQueryParser.QueryInfo qinfo = SamzaSqlQueryParser.parseQuery(sql);
relRoots.add(planner.plan(qinfo.getSelectQuery()));
RelRoot relRoot = planner.plan(qinfo.getSelectQuery());
relRoots.add(relRoot);
}
return relRoots;
}

/**
* Get {@link SamzaSqlApplicationConfig} given sql statements and samza config.
* @param sqlStmts List of sql statements
* @param config Samza config
* @return {@link SamzaSqlApplicationConfig}
*/
public static SamzaSqlApplicationConfig getSqlConfig(List<String> sqlStmts, Config config) {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
return new SamzaSqlApplicationConfig(config,
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
.collect(Collectors.toList()),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink)
.collect(Collectors.toList()));
}

/**
* Get {@link QueryPlanner} given {@link SamzaSqlApplicationConfig}
* @param sqlConfig {@link SamzaSqlApplicationConfig}
* @return {@link QueryPlanner}
*/
public static QueryPlanner getQueryPlanner(SamzaSqlApplicationConfig sqlConfig) {
return new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
sqlConfig.getUdfMetadata());
}

/**
* Get list of {@link org.apache.samza.sql.util.SamzaSqlQueryParser.QueryInfo} given list of sql statements.
* @param sqlStmts list of sql statements
* @return list of {@link org.apache.samza.sql.util.SamzaSqlQueryParser.QueryInfo}
*/
public static List<SamzaSqlQueryParser.QueryInfo> fetchQueryInfo(List<String> sqlStmts) {
return sqlStmts.stream().map(SamzaSqlQueryParser::parseQuery).collect(Collectors.toList());
}

/**
* Get list of sql statements based on the property set in the config.
* @param config config
* @return list of Sql statements
*/
public static List<String> fetchSqlFromConfig(Map<String, String> config) {
List<String> sql;
if (config.containsKey(SamzaSqlApplicationConfig.CFG_SQL_STMT) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,14 @@
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.calcite.sql.validate.SqlConformanceEnum;
import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.tools.FrameworkConfig;
Expand Down Expand Up @@ -87,49 +82,38 @@ public QueryPlanner(Map<String, RelSchemaProvider> relSchemaProviders,
this.udfMetadata = udfMetadata;
}

private void registerSourceSchemas(SchemaPlus rootSchema) {
RelSchemaConverter relSchemaConverter = new RelSchemaConverter();

for (SqlIOConfig ssc : systemStreamConfigBySource.values()) {
SchemaPlus previousLevelSchema = rootSchema;
List<String> sourceParts = ssc.getSourceParts();
RelSchemaProvider relSchemaProvider = relSchemaProviders.get(ssc.getSource());

for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size(); sourcePartIndex++) {
String sourcePart = sourceParts.get(sourcePartIndex);
if (sourcePartIndex < sourceParts.size() - 1) {
SchemaPlus sourcePartSchema = previousLevelSchema.getSubSchema(sourcePart);
if (sourcePartSchema == null) {
sourcePartSchema = previousLevelSchema.add(sourcePart, new AbstractSchema());
}
previousLevelSchema = sourcePartSchema;
} else {
// If the source part is the last one, then fetch the schema corresponding to the stream and register.
RelDataType relationalSchema = getSourceRelSchema(relSchemaProvider, relSchemaConverter);
previousLevelSchema.add(sourcePart, createTableFromRelSchema(relationalSchema));
break;
}
}
}
}

public RelRoot plan(String query) {
try {
Connection connection = DriverManager.getConnection("jdbc:calcite:");
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
SchemaPlus rootSchema = calciteConnection.getRootSchema();
RelSchemaConverter relSchemaConverter = new RelSchemaConverter();

for (SqlIOConfig ssc : systemStreamConfigBySource.values()) {
SchemaPlus previousLevelSchema = rootSchema;
List<String> sourceParts = ssc.getSourceParts();
RelSchemaProvider relSchemaProvider = relSchemaProviders.get(ssc.getSource());

for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size(); sourcePartIndex++) {
String sourcePart = sourceParts.get(sourcePartIndex);
if (sourcePartIndex < sourceParts.size() - 1) {
SchemaPlus sourcePartSchema = previousLevelSchema.getSubSchema(sourcePart);
if (sourcePartSchema == null) {
sourcePartSchema = previousLevelSchema.add(sourcePart, new AbstractSchema());
}
previousLevelSchema = sourcePartSchema;
} else {
// If the source part is the last one, then fetch the schema corresponding to the stream and register.
SqlSchema sqlSchema = relSchemaProvider.getSqlSchema();

List<String> fieldNames = new ArrayList<>();
List<SqlFieldSchema> fieldTypes = new ArrayList<>();
if (!sqlSchema.containsField(SamzaSqlRelMessage.KEY_NAME)) {
fieldNames.add(SamzaSqlRelMessage.KEY_NAME);
fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY));
}

fieldNames.addAll(
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldName).collect(Collectors.toList()));
fieldTypes.addAll(
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldSchema).collect(Collectors.toList()));

SqlSchema newSchema = new SqlSchema(fieldNames, fieldTypes);
RelDataType relationalSchema = relSchemaConverter.convertToRelSchema(newSchema);
previousLevelSchema.add(sourcePart, createTableFromRelSchema(relationalSchema));
break;
}
}
}
registerSourceSchemas(rootSchema);

List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions = udfMetadata.stream()
.map(x -> new SamzaSqlScalarFunctionImpl(x))
Expand Down Expand Up @@ -162,12 +146,34 @@ public RelRoot plan(String query) {
LOG.info("query plan:\n" + RelOptUtil.toString(relRoot.rel));
return relRoot;
} catch (Exception e) {
LOG.error("Query planner failed with exception.", e);
throw new SamzaException(e);
String errorMsg = SamzaSqlValidator.formatErrorString(query, e);
LOG.error(errorMsg, e);
throw new SamzaException(errorMsg, e);
}
}

public static RelDataType getSourceRelSchema(RelSchemaProvider relSchemaProvider,
RelSchemaConverter relSchemaConverter) {
// If the source part is the last one, then fetch the schema corresponding to the stream and register.
SqlSchema sqlSchema = relSchemaProvider.getSqlSchema();

List<String> fieldNames = new ArrayList<>();
List<SqlFieldSchema> fieldTypes = new ArrayList<>();
if (!sqlSchema.containsField(SamzaSqlRelMessage.KEY_NAME)) {
fieldNames.add(SamzaSqlRelMessage.KEY_NAME);
fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY));
}

fieldNames.addAll(
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldName).collect(Collectors.toList()));
fieldTypes.addAll(
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldSchema).collect(Collectors.toList()));

SqlSchema newSchema = new SqlSchema(fieldNames, fieldTypes);
return relSchemaConverter.convertToRelSchema(newSchema);
}

private Table createTableFromRelSchema(RelDataType relationalSchema) {
private static Table createTableFromRelSchema(RelDataType relationalSchema) {
return new AbstractTable() {
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
return relationalSchema;
Expand Down
Loading

0 comments on commit 97afd3f

Please sign in to comment.