Skip to content

Commit

Permalink
Adding validation for Samza Sql statements.
Browse files Browse the repository at this point in the history
  • Loading branch information
atoomula committed Aug 30, 2019
1 parent e0b5a32 commit b1c2db5
Show file tree
Hide file tree
Showing 9 changed files with 765 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.samza.config.Config;
import org.apache.samza.sql.interfaces.DslConverter;
import org.apache.samza.sql.planner.QueryPlanner;
import org.apache.samza.sql.planner.SamzaSqlValidator;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.util.SamzaSqlQueryParser;
import org.apache.samza.sql.util.SqlFileParser;
Expand All @@ -52,27 +53,33 @@ public class SamzaSqlDslConverter implements DslConverter {
public Collection<RelRoot> convertDsl(String dsl) {
// TODO: Introduce an API to parse a dsl string and return one or more sql statements
List<String> sqlStmts = fetchSqlFromConfig(config);
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig sqlConfig = new SamzaSqlApplicationConfig(config,
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
.collect(Collectors.toList()),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));

QueryPlanner planner =
new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
sqlConfig.getUdfMetadata());

QueryPlanner planner = getQueryPlanner(getSqlConfig(sqlStmts, config));
List<RelRoot> relRoots = new LinkedList<>();
for (String sql: sqlStmts) {
// we always pass only select query to the planner for samza sql. The reason is that samza sql supports
// schema evolution where source and destination could up to an extent have independent schema evolution while
// calcite expects strict comformance of the destination schema with that of the fields in the select query.
SamzaSqlQueryParser.QueryInfo qinfo = SamzaSqlQueryParser.parseQuery(sql);
relRoots.add(planner.plan(qinfo.getSelectQuery()));
RelRoot relRoot = planner.plan(qinfo.getSelectQuery());
relRoots.add(relRoot);
}
return relRoots;
}

public static SamzaSqlApplicationConfig getSqlConfig(List<String> sqlStmts, Config config) {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
return new SamzaSqlApplicationConfig(config,
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
.collect(Collectors.toList()),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink)
.collect(Collectors.toList()));
}

public static QueryPlanner getQueryPlanner(SamzaSqlApplicationConfig sqlConfig) {
return new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
sqlConfig.getUdfMetadata());
}

public static List<SamzaSqlQueryParser.QueryInfo> fetchQueryInfo(List<String> sqlStmts) {
return sqlStmts.stream().map(SamzaSqlQueryParser::parseQuery).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.stream.Collectors;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteConnection;
Expand All @@ -36,17 +38,13 @@
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.calcite.sql.validate.SqlConformanceEnum;
Expand Down Expand Up @@ -87,49 +85,59 @@ public QueryPlanner(Map<String, RelSchemaProvider> relSchemaProviders,
this.udfMetadata = udfMetadata;
}

public static RelDataType getSourceRelSchema(RelSchemaProvider relSchemaProvider,
RelSchemaConverter relSchemaConverter) {
// If the source part is the last one, then fetch the schema corresponding to the stream and register.
SqlSchema sqlSchema = relSchemaProvider.getSqlSchema();

List<String> fieldNames = new ArrayList<>();
List<SqlFieldSchema> fieldTypes = new ArrayList<>();
if (!sqlSchema.containsField(SamzaSqlRelMessage.KEY_NAME)) {
fieldNames.add(SamzaSqlRelMessage.KEY_NAME);
fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY));
}

fieldNames.addAll(
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldName).collect(Collectors.toList()));
fieldTypes.addAll(
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldSchema).collect(Collectors.toList()));

SqlSchema newSchema = new SqlSchema(fieldNames, fieldTypes);
return relSchemaConverter.convertToRelSchema(newSchema);
}

private void fetchSourceSchema(SchemaPlus rootSchema) {
RelSchemaConverter relSchemaConverter = new RelSchemaConverter();

for (SqlIOConfig ssc : systemStreamConfigBySource.values()) {
SchemaPlus previousLevelSchema = rootSchema;
List<String> sourceParts = ssc.getSourceParts();
RelSchemaProvider relSchemaProvider = relSchemaProviders.get(ssc.getSource());

for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size(); sourcePartIndex++) {
String sourcePart = sourceParts.get(sourcePartIndex);
if (sourcePartIndex < sourceParts.size() - 1) {
SchemaPlus sourcePartSchema = previousLevelSchema.getSubSchema(sourcePart);
if (sourcePartSchema == null) {
sourcePartSchema = previousLevelSchema.add(sourcePart, new AbstractSchema());
}
previousLevelSchema = sourcePartSchema;
} else {
// If the source part is the last one, then fetch the schema corresponding to the stream and register.
RelDataType relationalSchema = getSourceRelSchema(relSchemaProvider, relSchemaConverter);
previousLevelSchema.add(sourcePart, createTableFromRelSchema(relationalSchema));
break;
}
}
}
}

public RelRoot plan(String query) {
try {
Connection connection = DriverManager.getConnection("jdbc:calcite:");
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
SchemaPlus rootSchema = calciteConnection.getRootSchema();
RelSchemaConverter relSchemaConverter = new RelSchemaConverter();

for (SqlIOConfig ssc : systemStreamConfigBySource.values()) {
SchemaPlus previousLevelSchema = rootSchema;
List<String> sourceParts = ssc.getSourceParts();
RelSchemaProvider relSchemaProvider = relSchemaProviders.get(ssc.getSource());

for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size(); sourcePartIndex++) {
String sourcePart = sourceParts.get(sourcePartIndex);
if (sourcePartIndex < sourceParts.size() - 1) {
SchemaPlus sourcePartSchema = previousLevelSchema.getSubSchema(sourcePart);
if (sourcePartSchema == null) {
sourcePartSchema = previousLevelSchema.add(sourcePart, new AbstractSchema());
}
previousLevelSchema = sourcePartSchema;
} else {
// If the source part is the last one, then fetch the schema corresponding to the stream and register.
SqlSchema sqlSchema = relSchemaProvider.getSqlSchema();

List<String> fieldNames = new ArrayList<>();
List<SqlFieldSchema> fieldTypes = new ArrayList<>();
if (!sqlSchema.containsField(SamzaSqlRelMessage.KEY_NAME)) {
fieldNames.add(SamzaSqlRelMessage.KEY_NAME);
fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY));
}

fieldNames.addAll(
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldName).collect(Collectors.toList()));
fieldTypes.addAll(
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldSchema).collect(Collectors.toList()));

SqlSchema newSchema = new SqlSchema(fieldNames, fieldTypes);
RelDataType relationalSchema = relSchemaConverter.convertToRelSchema(newSchema);
previousLevelSchema.add(sourcePart, createTableFromRelSchema(relationalSchema));
break;
}
}
}
fetchSourceSchema(rootSchema);

List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions = udfMetadata.stream()
.map(x -> new SamzaSqlScalarFunctionImpl(x))
Expand Down Expand Up @@ -162,12 +170,13 @@ public RelRoot plan(String query) {
LOG.info("query plan:\n" + RelOptUtil.toString(relRoot.rel));
return relRoot;
} catch (Exception e) {
LOG.error("Query planner failed with exception.", e);
throw new SamzaException(e);
String errorMsg = SamzaSqlValidator.formatErrorString(query, e);
LOG.error(errorMsg, e);
throw new SamzaException(errorMsg, e);
}
}

private Table createTableFromRelSchema(RelDataType relationalSchema) {
private static Table createTableFromRelSchema(RelDataType relationalSchema) {
return new AbstractTable() {
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
return relationalSchema;
Expand Down
Loading

0 comments on commit b1c2db5

Please sign in to comment.