Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2313: Adding validation for Samza Sql statements. #1148

Merged
merged 6 commits into from
Sep 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,31 +52,58 @@ public class SamzaSqlDslConverter implements DslConverter {
public Collection<RelRoot> convertDsl(String dsl) {
// TODO: Introduce an API to parse a dsl string and return one or more sql statements
List<String> sqlStmts = fetchSqlFromConfig(config);
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig sqlConfig = new SamzaSqlApplicationConfig(config,
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
.collect(Collectors.toList()),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));

QueryPlanner planner =
new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
sqlConfig.getUdfMetadata());

QueryPlanner planner = getQueryPlanner(getSqlConfig(sqlStmts, config));
List<RelRoot> relRoots = new LinkedList<>();
for (String sql: sqlStmts) {
// we always pass only select query to the planner for samza sql. The reason is that samza sql supports
// schema evolution where source and destination could up to an extent have independent schema evolution while
// calcite expects strict comformance of the destination schema with that of the fields in the select query.
SamzaSqlQueryParser.QueryInfo qinfo = SamzaSqlQueryParser.parseQuery(sql);
relRoots.add(planner.plan(qinfo.getSelectQuery()));
RelRoot relRoot = planner.plan(qinfo.getSelectQuery());
relRoots.add(relRoot);
}
return relRoots;
}

/**
* Get {@link SamzaSqlApplicationConfig} given sql statements and samza config.
* @param sqlStmts List of sql statements
* @param config Samza config
* @return {@link SamzaSqlApplicationConfig}
*/
public static SamzaSqlApplicationConfig getSqlConfig(List<String> sqlStmts, Config config) {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
return new SamzaSqlApplicationConfig(config,
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
.collect(Collectors.toList()),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink)
.collect(Collectors.toList()));
}

/**
* Get {@link QueryPlanner} given {@link SamzaSqlApplicationConfig}
* @param sqlConfig {@link SamzaSqlApplicationConfig}
* @return {@link QueryPlanner}
*/
public static QueryPlanner getQueryPlanner(SamzaSqlApplicationConfig sqlConfig) {
return new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
sqlConfig.getUdfMetadata());
}

/**
* Get list of {@link org.apache.samza.sql.util.SamzaSqlQueryParser.QueryInfo} given list of sql statements.
* @param sqlStmts list of sql statements
* @return list of {@link org.apache.samza.sql.util.SamzaSqlQueryParser.QueryInfo}
*/
public static List<SamzaSqlQueryParser.QueryInfo> fetchQueryInfo(List<String> sqlStmts) {
return sqlStmts.stream().map(SamzaSqlQueryParser::parseQuery).collect(Collectors.toList());
}

/**
* Get list of sql statements based on the property set in the config.
* @param config config
* @return list of Sql statements
*/
public static List<String> fetchSqlFromConfig(Map<String, String> config) {
List<String> sql;
if (config.containsKey(SamzaSqlApplicationConfig.CFG_SQL_STMT) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,14 @@
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.calcite.sql.validate.SqlConformanceEnum;
import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.tools.FrameworkConfig;
Expand Down Expand Up @@ -87,49 +82,38 @@ public QueryPlanner(Map<String, RelSchemaProvider> relSchemaProviders,
this.udfMetadata = udfMetadata;
}

private void registerSourceSchemas(SchemaPlus rootSchema) {
RelSchemaConverter relSchemaConverter = new RelSchemaConverter();

for (SqlIOConfig ssc : systemStreamConfigBySource.values()) {
SchemaPlus previousLevelSchema = rootSchema;
List<String> sourceParts = ssc.getSourceParts();
RelSchemaProvider relSchemaProvider = relSchemaProviders.get(ssc.getSource());

for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size(); sourcePartIndex++) {
String sourcePart = sourceParts.get(sourcePartIndex);
if (sourcePartIndex < sourceParts.size() - 1) {
SchemaPlus sourcePartSchema = previousLevelSchema.getSubSchema(sourcePart);
if (sourcePartSchema == null) {
sourcePartSchema = previousLevelSchema.add(sourcePart, new AbstractSchema());
}
previousLevelSchema = sourcePartSchema;
} else {
// If the source part is the last one, then fetch the schema corresponding to the stream and register.
RelDataType relationalSchema = getSourceRelSchema(relSchemaProvider, relSchemaConverter);
previousLevelSchema.add(sourcePart, createTableFromRelSchema(relationalSchema));
break;
}
}
}
}

public RelRoot plan(String query) {
try {
Connection connection = DriverManager.getConnection("jdbc:calcite:");
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
SchemaPlus rootSchema = calciteConnection.getRootSchema();
RelSchemaConverter relSchemaConverter = new RelSchemaConverter();

for (SqlIOConfig ssc : systemStreamConfigBySource.values()) {
SchemaPlus previousLevelSchema = rootSchema;
List<String> sourceParts = ssc.getSourceParts();
RelSchemaProvider relSchemaProvider = relSchemaProviders.get(ssc.getSource());

for (int sourcePartIndex = 0; sourcePartIndex < sourceParts.size(); sourcePartIndex++) {
String sourcePart = sourceParts.get(sourcePartIndex);
if (sourcePartIndex < sourceParts.size() - 1) {
SchemaPlus sourcePartSchema = previousLevelSchema.getSubSchema(sourcePart);
if (sourcePartSchema == null) {
sourcePartSchema = previousLevelSchema.add(sourcePart, new AbstractSchema());
}
previousLevelSchema = sourcePartSchema;
} else {
// If the source part is the last one, then fetch the schema corresponding to the stream and register.
SqlSchema sqlSchema = relSchemaProvider.getSqlSchema();

List<String> fieldNames = new ArrayList<>();
List<SqlFieldSchema> fieldTypes = new ArrayList<>();
if (!sqlSchema.containsField(SamzaSqlRelMessage.KEY_NAME)) {
fieldNames.add(SamzaSqlRelMessage.KEY_NAME);
fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY));
}

fieldNames.addAll(
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldName).collect(Collectors.toList()));
fieldTypes.addAll(
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldSchema).collect(Collectors.toList()));

SqlSchema newSchema = new SqlSchema(fieldNames, fieldTypes);
RelDataType relationalSchema = relSchemaConverter.convertToRelSchema(newSchema);
previousLevelSchema.add(sourcePart, createTableFromRelSchema(relationalSchema));
break;
}
}
}
registerSourceSchemas(rootSchema);

List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions = udfMetadata.stream()
.map(x -> new SamzaSqlScalarFunctionImpl(x))
Expand Down Expand Up @@ -162,12 +146,34 @@ public RelRoot plan(String query) {
LOG.info("query plan:\n" + RelOptUtil.toString(relRoot.rel));
return relRoot;
} catch (Exception e) {
LOG.error("Query planner failed with exception.", e);
throw new SamzaException(e);
String errorMsg = SamzaSqlValidator.formatErrorString(query, e);
LOG.error(errorMsg, e);
throw new SamzaException(errorMsg, e);
}
}

public static RelDataType getSourceRelSchema(RelSchemaProvider relSchemaProvider,
RelSchemaConverter relSchemaConverter) {
// If the source part is the last one, then fetch the schema corresponding to the stream and register.
SqlSchema sqlSchema = relSchemaProvider.getSqlSchema();

List<String> fieldNames = new ArrayList<>();
List<SqlFieldSchema> fieldTypes = new ArrayList<>();
if (!sqlSchema.containsField(SamzaSqlRelMessage.KEY_NAME)) {
fieldNames.add(SamzaSqlRelMessage.KEY_NAME);
fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY));
}

fieldNames.addAll(
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldName).collect(Collectors.toList()));
fieldTypes.addAll(
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldSchema).collect(Collectors.toList()));

SqlSchema newSchema = new SqlSchema(fieldNames, fieldTypes);
return relSchemaConverter.convertToRelSchema(newSchema);
}

private Table createTableFromRelSchema(RelDataType relationalSchema) {
private static Table createTableFromRelSchema(RelDataType relationalSchema) {
return new AbstractTable() {
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
return relationalSchema;
Expand Down
Loading