From ae74440d866ee6b178a4f032e3ecf5a0a2ce7db5 Mon Sep 17 00:00:00 2001 From: James Xu Date: Sat, 17 Jun 2017 19:30:03 +0800 Subject: [PATCH 1/7] [BEAM-2528] BeamSql: DDL: create table --- sdks/java/extensions/sql/pom.xml | 145 ++++++++++++++++++ .../sql/src/main/codegen/config.fmpp | 23 +++ .../sql/src/main/codegen/data/Parser.tdd | 75 +++++++++ .../sql/src/main/codegen/includes/license.ftl | 17 ++ .../src/main/codegen/includes/parserImpls.ftl | 86 +++++++++++ .../beam/sdk/extensions/sql/BeamSqlCli.java | 98 ++++++++++++ .../beam/sdk/extensions/sql/BeamSqlEnv.java | 11 +- .../sql/impl/planner/BeamQueryPlanner.java | 19 ++- .../sql/impl/rel/BeamIOSinkRel.java | 5 +- .../sql/impl/rel/BeamIOSourceRel.java | 4 +- .../beam/sdk/extensions/sql/meta/Column.java | 33 ++++ .../beam/sdk/extensions/sql/meta/Table.java | 44 ++++++ .../sdk/extensions/sql/meta/package-info.java | 22 +++ .../sql/meta/provider/MetaUtils.java | 40 +++++ .../sql/meta/provider/TableProvider.java | 58 +++++++ .../provider}/kafka/BeamKafkaCSVTable.java | 10 +- .../provider}/kafka/BeamKafkaTable.java | 10 +- .../provider/kafka/KafkaTableProvider.java | 64 ++++++++ .../provider}/kafka/package-info.java | 2 +- .../sql/meta/provider/package-info.java | 22 +++ .../provider}/text/BeamTextCSVTable.java | 12 +- .../text/BeamTextCSVTableIOReader.java | 7 +- .../text/BeamTextCSVTableIOWriter.java | 7 +- .../provider}/text/BeamTextTable.java | 3 +- .../meta/provider/text/TextTableProvider.java | 82 ++++++++++ .../provider}/text/package-info.java | 2 +- .../sql/meta/store/InMemoryMetaStore.java | 114 ++++++++++++++ .../extensions/sql/meta/store/MetaStore.java | 49 ++++++ .../sql/meta/store/package-info.java | 22 +++ .../extensions/sql/parser/BeamSqlParser.java | 50 ++++++ .../sql/parser/ColumnConstraint.java | 42 +++++ .../sql/parser/ColumnDefinition.java | 56 +++++++ .../extensions/sql/parser/SqlCreateTable.java | 133 ++++++++++++++++ .../extensions/sql/parser/SqlDDLKeywords.java | 27 ++++ .../extensions/sql/parser/UnparseUtil.java | 59 +++++++ .../extensions/sql/parser/package-info.java | 22 +++ .../extensions/sql/BeamSqlApiSurfaceTest.java | 3 + .../kafka/BeamKafkaCSVTableTest.java | 2 +- .../provider}/text/BeamTextCSVTableTest.java | 2 +- 39 files changed, 1447 insertions(+), 35 deletions(-) create mode 100644 sdks/java/extensions/sql/src/main/codegen/config.fmpp create mode 100644 sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd create mode 100644 sdks/java/extensions/sql/src/main/codegen/includes/license.ftl create mode 100644 sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Column.java create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/package-info.java create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/MetaUtils.java create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java rename sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/{schema => meta/provider}/kafka/BeamKafkaCSVTable.java (90%) rename sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/{schema => meta/provider}/kafka/BeamKafkaTable.java (95%) create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java rename sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/{schema => meta/provider}/kafka/package-info.java (92%) create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/package-info.java rename sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/{schema => meta/provider}/text/BeamTextCSVTable.java (90%) rename sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/{schema => meta/provider}/text/BeamTextCSVTableIOReader.java (89%) rename sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/{schema => meta/provider}/text/BeamTextCSVTableIOWriter.java (90%) rename sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/{schema => meta/provider}/text/BeamTextTable.java (95%) create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java rename sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/{schema => meta/provider}/text/package-info.java (93%) create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/MetaStore.java create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/package-info.java create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/BeamSqlParser.java create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/ColumnConstraint.java create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/ColumnDefinition.java create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/SqlCreateTable.java create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/SqlDDLKeywords.java create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/UnparseUtil.java create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/package-info.java rename sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/{schema => meta/provider}/kafka/BeamKafkaCSVTableTest.java (98%) rename sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/{schema => meta/provider}/text/BeamTextCSVTableTest.java (99%) diff --git a/sdks/java/extensions/sql/pom.xml b/sdks/java/extensions/sql/pom.xml index b4aa223a08f2..5aec30a035db 100644 --- a/sdks/java/extensions/sql/pom.xml +++ b/sdks/java/extensions/sql/pom.xml @@ -77,12 +77,26 @@ ${project.basedir}/src/test/ + ${project.build.sourceDirectory} + + org.apache.maven.plugins + maven-compiler-plugin + + 1.7 + 1.7 + + + -Xlint:-deprecation + + + + org.apache.maven.plugins maven-surefire-plugin @@ -123,6 +137,128 @@ + + maven-resources-plugin + + + copy-fmpp-resources + initialize + + copy-resources + + + ${project.build.directory}/codegen + + + src/main/codegen + false + + + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + 2.8 + + + unpack-parser-template + initialize + + unpack + + + + + org.apache.calcite + calcite-core + jar + true + ${project.build.directory}/ + **/Parser.jj + + + + + + + + + com.googlecode.fmpp-maven-plugin + fmpp-maven-plugin + 1.0 + + + org.freemarker + freemarker + 2.3.25-incubating + + + + + generate-fmpp-sources + generate-sources + + generate + + + ${project.build.directory}/codegen/config.fmpp + target/generated-sources + ${project.build.directory}/codegen/templates + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.9 + + + add-generated-sources + process-sources + + add-source + + + + ${project.build.directory}/generated-sources + + + + + + + + org.codehaus.mojo + javacc-maven-plugin + 2.4 + + + generate-sources + javacc + + javacc + + + ${project.build.directory}/generated-sources/ + + **/Parser.jj + + 2 + false + ${project.build.directory}/generated-sources/ + + + + + org.jacoco @@ -222,5 +358,14 @@ hamcrest-all test + + com.alibaba + fastjson + 1.2.12 + + + com.google.code.findbugs + jsr305 + diff --git a/sdks/java/extensions/sql/src/main/codegen/config.fmpp b/sdks/java/extensions/sql/src/main/codegen/config.fmpp new file mode 100644 index 000000000000..be5a792ca0a1 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/codegen/config.fmpp @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http:# www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +data: { + parser: tdd(../data/Parser.tdd) +} + +freemarkerLinks: { + includes: includes/ +} diff --git a/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd b/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd new file mode 100644 index 000000000000..d4df64b93ea7 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd @@ -0,0 +1,75 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http:# www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{ + # Generated parser implementation class package and name + package: "org.apache.beam.sdk.extensions.sql.parser.impl", + class: "BeamSqlParserImpl", + + # List of import statements. + imports: [ + "org.apache.calcite.sql.validate.*", + "org.apache.calcite.util.*", + "org.apache.beam.sdk.extensions.sql.parser.*", + "java.util.*" + ] + + # List of keywords. + keywords: [ + "LOCATION", + "TBLPROPERTIES", + "COMMENT" + ] + + # List of methods for parsing custom SQL statements. + statementParserMethods: [ + "SqlCreateTable()" + ] + + # List of methods for parsing custom literals. + # Example: ParseJsonLiteral(). + literalParserMethods: [ + ] + + # List of methods for parsing custom data types. + dataTypeParserMethods: [ + ] + + nonReservedKeywords: [ + ] + + createStatementParserMethods: [ + ] + + alterStatementParserMethods: [ + ] + + dropStatementParserMethods: [ + ] + + # List of files in @includes directory that have parser method + # implementations for custom SQL statements, literals or types + # given as part of "statementParserMethods", "literalParserMethods" or + # "dataTypeParserMethods". + implementationFiles: [ + "parserImpls.ftl" + ] + + includeCompoundIdentifier: true, + includeBraces: true, + includeAdditionalDeclarations: false, + allowBangEqual: false +} diff --git a/sdks/java/extensions/sql/src/main/codegen/includes/license.ftl b/sdks/java/extensions/sql/src/main/codegen/includes/license.ftl new file mode 100644 index 000000000000..7e66353b7db7 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/codegen/includes/license.ftl @@ -0,0 +1,17 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ diff --git a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl new file mode 100644 index 000000000000..0635d8bca716 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl @@ -0,0 +1,86 @@ +<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + + +private void ColumnDef(List list) : +{ + SqlParserPos pos; + SqlIdentifier name; + SqlDataTypeSpec type; + ColumnConstraint constraint = null; + SqlNode comment = null; +} +{ + name = SimpleIdentifier() { pos = getPos(); } + type = DataType() + [ + + { constraint = new ColumnConstraint.PrimaryKey(getPos()); } + ] + [ + comment = StringLiteral() + ] + { + list.add(new ColumnDefinition(name, type, constraint, comment, pos)); + } +} + +SqlNodeList ColumnDefinitionList() : +{ + SqlParserPos pos; + List list = Lists.newArrayList(); +} +{ + { pos = getPos(); } + ColumnDef(list) + ( ColumnDef(list) )* + { + return new SqlNodeList(list, pos.plus(getPos())); + } +} + +/** + * CREATE TABLE ( IF NOT EXISTS )? + * ( database_name '.' )? table_name ( '(' column_def ( ',' column_def )* ')' + * ( STORED AS INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname )? + * LOCATION location_uri + * ( TBLPROPERTIES tbl_properties )? + * ( AS select_stmt ) + */ +SqlNode SqlCreateTable() : +{ + SqlParserPos pos; + SqlIdentifier tblName; + SqlNodeList fieldList; + SqlNode comment = null; + SqlNode location = null; + SqlNode tbl_properties = null; + SqlNode select = null; +} +{ + { pos = getPos(); } + + tblName = CompoundIdentifier() + fieldList = ColumnDefinitionList() + [ + + comment = StringLiteral() + ] + [ + + location = StringLiteral() + ] + [ tbl_properties = StringLiteral() ] + [ select = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) ] { + return new SqlCreateTable(pos, tblName, fieldList, comment, + location, tbl_properties, select); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java index 3bea46ab5c23..13494ec8315e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java @@ -17,20 +17,55 @@ */ package org.apache.beam.sdk.extensions.sql; +import java.util.ArrayList; +import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.extensions.sql.meta.Column; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore; +import org.apache.beam.sdk.extensions.sql.parser.BeamSqlParser; +import org.apache.beam.sdk.extensions.sql.parser.ColumnConstraint; +import org.apache.beam.sdk.extensions.sql.parser.ColumnDefinition; +import org.apache.beam.sdk.extensions.sql.parser.SqlCreateTable; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; + import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.sql.SqlNode; /** * {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client. */ @Experimental public class BeamSqlCli { + private BeamSqlEnv env; + private MetaStore metaStore; + /** + * The default type of table(if not specified when create table). + */ + private String defaultTableType; + + public BeamSqlCli(MetaStore metaStore) { + this.metaStore = metaStore; + this.env = new BeamSqlEnv(); + + // dump tables in metaStore into schema + List
tables = this.metaStore.queryAllTables(); + for (Table table : tables) { + env.registerTable(table.getName(), metaStore.buildBeamSqlTable(table.getName())); + } + } + + public MetaStore getMetaStore() { + return metaStore; + } + /** * Returns a human readable representation of the query execution plan. */ @@ -40,6 +75,69 @@ public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) throws Ex return beamPlan; } + /** + * Executes the given sql. + */ + public void execute(String sqlString) throws Exception { + BeamSqlParser parser = new BeamSqlParser(sqlString); + SqlNode sqlNode = parser.impl().parseSqlStmtEof(); + + if (sqlNode instanceof SqlCreateTable) { + handleCreateTable((SqlCreateTable) sqlNode, metaStore); + } else { + PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation() + .as(PipelineOptions.class); + options.setJobName("BeamPlanCreator"); + Pipeline pipeline = Pipeline.create(options); + compilePipeline(sqlString, pipeline, env); + pipeline.run(); + } + } + + private void handleCreateTable(SqlCreateTable sqlNode, MetaStore store) { + SqlCreateTable sqlCreateTable = sqlNode; + List columns = new ArrayList<>(sqlCreateTable.fieldList().size()); + for (ColumnDefinition columnDef : sqlCreateTable.fieldList()) { + Column column = Column.builder() + .name(columnDef.name()) + .type( + CalciteUtils.toJavaType( + columnDef.type().deriveType(BeamQueryPlanner.TYPE_FACTORY).getSqlTypeName() + ) + ) + .comment(columnDef.comment()) + .primaryKey(columnDef.constraint() instanceof ColumnConstraint.PrimaryKey) + .build(); + columns.add(column); + } + + String tableType = sqlCreateTable.location() == null + ? defaultTableType : sqlCreateTable.location().getScheme(); + + if (tableType == null) { + throw new IllegalStateException("Table type is not specified and BeamSqlCli#defaultTableType" + + "is not configured!"); + } + + Table table = Table.builder() + .type(tableType) + .name(sqlCreateTable.tableName()) + .columns(columns) + .comment(sqlCreateTable.comment()) + .location(sqlCreateTable.location()) + .properties(sqlCreateTable.properties()) + .build(); + store.createTable(table); + + // register the new table into the schema + env.registerTable(table.getName(), metaStore.buildBeamSqlTable(table.getName())); + } + + public BeamSqlCli defaultTableType(String type) { + this.defaultTableType = type; + return this; + } + /** * compile SQL, and return a {@link Pipeline}. */ diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java index be0b0afda912..5187a6896907 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java @@ -20,8 +20,8 @@ import java.io.Serializable; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf; import org.apache.calcite.DataContext; @@ -60,6 +60,7 @@ public void registerUdf(String functionName, Class clazz) } /** +<<<<<<< HEAD * Register a UDAF function which can be used in GROUP-BY expression. * See {@link BeamSqlUdaf} on how to implement a UDAF. */ @@ -68,18 +69,18 @@ public void registerUdaf(String functionName, Class clazz } /** - * Registers a {@link BaseBeamTable} which can be used for all subsequent queries. + * Registers a {@link BeamSqlTable} which can be used for all subsequent queries. * */ - public void registerTable(String tableName, BaseBeamTable table) { + public void registerTable(String tableName, BeamSqlTable table) { schema.add(tableName, new BeamCalciteTable(table.getRowType())); planner.getSourceTables().put(tableName, table); } /** - * Find {@link BaseBeamTable} by table name. + * Find {@link BeamSqlTable} by table name. */ - public BaseBeamTable findTable(String tableName){ + public BeamSqlTable findTable(String tableName){ return planner.getSourceTables().get(tableName); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java index dd01a87984de..312d33901190 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java @@ -26,8 +26,8 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; -import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlTable; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.adapter.java.JavaTypeFactory; @@ -66,13 +66,15 @@ public class BeamQueryPlanner { private static final Logger LOG = LoggerFactory.getLogger(BeamQueryPlanner.class); + private SchemaPlus schema; protected final Planner planner; - private Map sourceTables = new HashMap<>(); + private Map sourceTables = new HashMap<>(); public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl( RelDataTypeSystem.DEFAULT); public BeamQueryPlanner(SchemaPlus schema) { + this.schema = schema; final List traitDefs = new ArrayList<>(); traitDefs.add(ConventionTraitDef.INSTANCE); traitDefs.add(RelCollationTraitDef.INSTANCE); @@ -89,10 +91,6 @@ public BeamQueryPlanner(SchemaPlus schema) { .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables)) .build(); this.planner = Frameworks.getPlanner(config); - - for (String t : schema.getTableNames()) { - sourceTables.put(t, (BaseBeamTable) schema.getTable(t)); - } } /** @@ -156,10 +154,17 @@ private SqlNode validateNode(SqlNode sqlNode) throws ValidationException { return planner.validate(sqlNode); } - public Map getSourceTables() { + public Map getSourceTables() { return sourceTables; } + /** + * TODO refactor to use this method in many other places. + */ + public void addTable(String name, BeamSqlTable table) { + sourceTables.put(name, table); + } + public Planner getPlanner() { return planner; } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java index d5eb210adf3c..4845f3c38b4a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java @@ -20,8 +20,8 @@ import com.google.common.base.Joiner; import java.util.List; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; -import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlTable; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.plan.RelOptCluster; @@ -65,8 +65,7 @@ public PCollection buildBeamPipeline(PCollectionTuple inputPCollecti String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); - BaseBeamTable targetTable = sqlEnv.findTable(sourceName); - + BeamSqlTable targetTable = sqlEnv.findTable(sourceName); upstream.apply(stageName, targetTable.buildIOWriter()); return upstream; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java index 5179eba20c32..a709fa4e2e41 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java @@ -20,9 +20,9 @@ import com.google.common.base.Joiner; import org.apache.beam.sdk.extensions.sql.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlTable; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; @@ -54,7 +54,7 @@ public PCollection buildBeamPipeline(PCollectionTuple inputPCollecti return sourceStream; } else { //If not, the source PColection is provided with BaseBeamTable.buildIOReader(). - BaseBeamTable sourceTable = sqlEnv.findTable(sourceName); + BeamSqlTable sourceTable = sqlEnv.findTable(sourceName); return sourceTable.buildIOReader(inputPCollections.getPipeline()) .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType()))); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Column.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Column.java new file mode 100644 index 000000000000..c821b0589ca6 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Column.java @@ -0,0 +1,33 @@ +package org.apache.beam.sdk.extensions.sql.meta; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import javax.annotation.Nullable; + +/** + * Metadata class for a {@code BeamSqlTable} column. + */ +@AutoValue +public abstract class Column implements Serializable { + public abstract String getName(); + public abstract Integer getType(); + @Nullable + public abstract String getComment(); + public abstract boolean isPrimaryKey(); + + public static Builder builder() { + return new org.apache.beam.sdk.extensions.sql.meta.AutoValue_Column.Builder(); + } + + /** + * Builder class for {@link Column}. + */ + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder name(String name); + public abstract Builder type(Integer type); + public abstract Builder comment(String comment); + public abstract Builder primaryKey(boolean isPrimaryKey); + public abstract Column build(); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java new file mode 100644 index 000000000000..420312ddfce7 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java @@ -0,0 +1,44 @@ +package org.apache.beam.sdk.extensions.sql.meta; + +import com.alibaba.fastjson.JSONObject; +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.net.URI; +import java.util.List; +import javax.annotation.Nullable; + +/** + * Represents the metadata of a {@code BeamSqlTable}. + */ +@AutoValue +public abstract class Table implements Serializable { + /** type of the table. */ + public abstract String getType(); + public abstract String getName(); + public abstract List getColumns(); + @Nullable + public abstract String getComment(); + @Nullable + public abstract URI getLocation(); + @Nullable + public abstract JSONObject getProperties(); + + + public static Builder builder() { + return new org.apache.beam.sdk.extensions.sql.meta.AutoValue_Table.Builder(); + } + + /** + * Builder class for {@link Table}. + */ + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder type(String type); + public abstract Builder name(String name); + public abstract Builder columns(List columns); + public abstract Builder comment(String name); + public abstract Builder location(URI location); + public abstract Builder properties(JSONObject properties); + public abstract Table build(); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/package-info.java new file mode 100644 index 000000000000..a50ce5f6e131 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Metadata related classes. + */ +package org.apache.beam.sdk.extensions.sql.meta; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/MetaUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/MetaUtils.java new file mode 100644 index 000000000000..66f8a2b1499b --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/MetaUtils.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.meta.provider; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.meta.Column; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; + +/** + * Utility methods for metadata. + */ +public class MetaUtils { + public static BeamSqlRowType getBeamSqlRecordTypeFromTable(Table table) { + List columnNames = new ArrayList<>(table.getColumns().size()); + List columnTypes = new ArrayList<>(table.getColumns().size()); + for (Column column : table.getColumns()) { + columnNames.add(column.getName()); + columnTypes.add(column.getType()); + } + return BeamSqlRowType.create(columnNames, columnTypes); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java new file mode 100644 index 000000000000..ab97231c7537 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.meta.provider; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlTable; + +/** + * A {@code TableProvider} handles the metadata CRUD of a specified kind of tables. + */ +public interface TableProvider { + /** + * Init the provider. + */ + void init(); + + /** + * Gets the table type this provider handles. + */ + String getTableType(); + + /** + * Creates a table. + */ + void createTable(Table table); + + /** + * Query all tables from this provider. + */ + List
queryAllTables(); + + /** + * Build a {@link BeamSqlTable} using the given table meta info. + */ + BeamSqlTable buildBeamSqlTable(Table table); + + /** + * Close the provider. + */ + void close(); +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTable.java similarity index 90% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTable.java index 2a509471105d..aaf8a732db53 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTable.java @@ -15,12 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.schema.kafka; +package org.apache.beam.sdk.extensions.sql.meta.provider.kafka; + +import static org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils.beamSqlRow2CsvLine; +import static org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils.csvLine2BeamSqlRow; import java.util.List; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -76,7 +78,7 @@ public PCollection expand(PCollection> input) { @ProcessElement public void processElement(ProcessContext c) { String rowInString = new String(c.element().getValue()); - c.output(BeamTableUtils.csvLine2BeamSqlRow(format, rowInString, rowType)); + c.output(csvLine2BeamSqlRow(format, rowInString, rowType)); } })); } @@ -101,7 +103,7 @@ public PCollection> expand(PCollection input) { @ProcessElement public void processElement(ProcessContext c) { BeamSqlRow in = c.element(); - c.output(KV.of(new byte[] {}, BeamTableUtils.beamSqlRow2CsvLine(in, format).getBytes())); + c.output(KV.of(new byte[] {}, beamSqlRow2CsvLine(in, format).getBytes())); } })); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java similarity index 95% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java index 2cc664fd9d10..2d0064fbefcb 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.schema.kafka; +package org.apache.beam.sdk.extensions.sql.meta.provider.kafka; import static com.google.common.base.Preconditions.checkArgument; @@ -43,7 +43,6 @@ * */ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable { - private String bootstrapServers; private List topics; private Map configUpdates; @@ -106,4 +105,11 @@ public PDone expand(PCollection input) { }; } + public String getBootstrapServers() { + return bootstrapServers; + } + + public List getTopics() { + return topics; + } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java new file mode 100644 index 000000000000..73ec98169449 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java @@ -0,0 +1,64 @@ +package org.apache.beam.sdk.extensions.sql.meta.provider.kafka; + +import static org.apache.beam.sdk.extensions.sql.meta.provider.MetaUtils.getBeamSqlRecordTypeFromTable; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlTable; + +/** + * Text table provider. + * + *

A sample of text table is: + * + *

{@code
+ * CREATE TABLE ORDERS(
+ *   ID INT PRIMARY KEY COMMENT 'this is the primary key',
+ *   NAME VARCHAR(127) COMMENT 'this is the name'
+ * )
+ * COMMENT 'this is the table orders'
+ * LOCATION 'kafka://localhost:2181/brokers?topic=test'
+ * TBLPROPERTIES '{"bootstrap.servers":"localhost:9092", "topics": ["topic1", "topic2"]}'
+ * }
+ */ +public class KafkaTableProvider implements TableProvider { + @Override public BeamSqlTable buildBeamSqlTable(Table table) { + BeamSqlRowType recordType = getBeamSqlRecordTypeFromTable(table); + + JSONObject properties = table.getProperties(); + String bootstrapServers = properties.getString("bootstrap.servers"); + JSONArray topicsArr = properties.getJSONArray("topics"); + List topics = new ArrayList<>(topicsArr.size()); + for (Object topic : topicsArr) { + topics.add(topic.toString()); + } + BeamKafkaCSVTable txtTable = new BeamKafkaCSVTable(recordType, bootstrapServers, topics); + return txtTable; + } + + @Override public String getTableType() { + return "kafka"; + } + + @Override public void createTable(Table table) { + // empty + } + + @Override public List
queryAllTables() { + return Collections.emptyList(); + } + + @Override public void init() { + // empty + } + + @Override public void close() { + // empty + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/package-info.java similarity index 92% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/package-info.java index f0ddeb638077..4101da775d5e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/package-info.java @@ -19,4 +19,4 @@ /** * table schema for KafkaIO. */ -package org.apache.beam.sdk.extensions.sql.schema.kafka; +package org.apache.beam.sdk.extensions.sql.meta.provider.kafka; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/package-info.java new file mode 100644 index 000000000000..c27126131aca --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Table providers. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java similarity index 90% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java index c44faab79dbc..9791f22d859f 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTable.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.schema.text; +package org.apache.beam.sdk.extensions.sql.meta.provider.text; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; @@ -41,6 +41,7 @@ public class BeamTextCSVTable extends BeamTextTable { private static final Logger LOG = LoggerFactory .getLogger(BeamTextCSVTable.class); + private String filePattern; private CSVFormat csvFormat; /** @@ -53,6 +54,7 @@ public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern) { public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern, CSVFormat csvFormat) { super(beamSqlRowType, filePattern); + this.filePattern = filePattern; this.csvFormat = csvFormat; } @@ -67,4 +69,12 @@ public PCollection buildIOReader(Pipeline pipeline) { public PTransform, PDone> buildIOWriter() { return new BeamTextCSVTableIOWriter(beamSqlRowType, filePattern, csvFormat); } + + public CSVFormat getCsvFormat() { + return csvFormat; + } + + public String getFilePattern() { + return filePattern; + } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOReader.java similarity index 89% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOReader.java index 06109c31301b..057df2d99d76 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOReader.java @@ -16,12 +16,13 @@ * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.schema.text; +package org.apache.beam.sdk.extensions.sql.meta.provider.text; + +import static org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils.csvLine2BeamSqlRow; import java.io.Serializable; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -51,7 +52,7 @@ public PCollection expand(PCollection input) { @ProcessElement public void processElement(ProcessContext ctx) { String str = ctx.element(); - ctx.output(BeamTableUtils.csvLine2BeamSqlRow(csvFormat, str, beamSqlRowType)); + ctx.output(csvLine2BeamSqlRow(csvFormat, str, beamSqlRowType)); } })); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java similarity index 90% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java index 1684b3780a11..86ff99b09a94 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableIOWriter.java @@ -16,12 +16,13 @@ * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.schema.text; +package org.apache.beam.sdk.extensions.sql.meta.provider.text; + +import static org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils.beamSqlRow2CsvLine; import java.io.Serializable; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -51,7 +52,7 @@ public BeamTextCSVTableIOWriter(BeamSqlRowType beamSqlRowType, String filePatter @ProcessElement public void processElement(ProcessContext ctx) { BeamSqlRow row = ctx.element(); - ctx.output(BeamTableUtils.beamSqlRow2CsvLine(row, csvFormat)); + ctx.output(beamSqlRow2CsvLine(row, csvFormat)); } })).apply(TextIO.write().to(filePattern)); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java similarity index 95% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java index e85608d76614..05b4a698d45a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextTable.java @@ -16,9 +16,10 @@ * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.schema.text; +package org.apache.beam.sdk.extensions.sql.meta.provider.text; import java.io.Serializable; + import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; import org.apache.beam.sdk.extensions.sql.schema.BeamIOType; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java new file mode 100644 index 000000000000..8bb62a4124ea --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.meta.provider.text; + +import static org.apache.beam.sdk.extensions.sql.meta.provider.MetaUtils.getBeamSqlRecordTypeFromTable; + +import com.alibaba.fastjson.JSONObject; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlTable; +import org.apache.commons.csv.CSVFormat; + +/** + * Text table provider. + * + *

A sample of text table is: + *

{@code
+ * CREATE TABLE ORDERS(
+ *   ID INT PRIMARY KEY COMMENT 'this is the primary key',
+ *   NAME VARCHAR(127) COMMENT 'this is the name'
+ * )
+ * COMMENT 'this is the table orders'
+ * LOCATION 'text://home/admin/orders'
+ * TBLPROPERTIES '{"format": "Excel"}' -- format of each text line(csv format)
+ * }
+ */ +public class TextTableProvider implements TableProvider { + + @Override public String getTableType() { + return "text"; + } + + @Override public BeamSqlTable buildBeamSqlTable(Table table) { + BeamSqlRowType recordType = getBeamSqlRecordTypeFromTable(table); + + String filePattern = table.getLocation().getPath(); + CSVFormat format = CSVFormat.DEFAULT; + JSONObject properties = table.getProperties(); + String csvFormatStr = properties.getString("format"); + if (csvFormatStr != null && !csvFormatStr.isEmpty()) { + format = CSVFormat.valueOf(csvFormatStr); + } + + BeamTextCSVTable txtTable = new BeamTextCSVTable(recordType, filePattern, format); + return txtTable; + } + + @Override public void createTable(Table table) { + // empty + } + + @Override public List
queryAllTables() { + return Collections.emptyList(); + } + + @Override public void init() { + // empty + } + + @Override public void close() { + // empty + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/package-info.java similarity index 93% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/package-info.java index f914e2e73373..2dd9e6e56cea 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/package-info.java @@ -19,4 +19,4 @@ /** * Table schema for text files. */ -package org.apache.beam.sdk.extensions.sql.schema.text; +package org.apache.beam.sdk.extensions.sql.meta.provider.text; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java new file mode 100644 index 000000000000..7ea31e77edd0 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.meta.store; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.ServiceLoader; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlTable; + +/** + * A {@link MetaStore} which stores the meta info in memory. + * + *

NOTE, because this implementation is memory based, the metadata is NOT persistent. + * for tables which created, you need to create again every you launch the + * {@link org.apache.beam.sdk.extensions.sql.BeamSqlCli}. + */ +public class InMemoryMetaStore implements MetaStore { + private Map tables = new HashMap<>(); + private Map providers = new HashMap<>(); + + public InMemoryMetaStore() { + // init the providers + ServiceLoader loader = ServiceLoader.load(TableProvider.class); + for (TableProvider provider : loader) { + provider.init(); + providers.put(provider.getTableType(), provider); + } + + // Init the tables, The tables from all providers should be unique. if there is any duplication, + // the init process will fail. + for (TableProvider provider : providers.values()) { + initTablesFromProvider(provider); + } + } + + private void initTablesFromProvider(TableProvider provider) { + List

tables = provider.queryAllTables(); + for (Table table : tables) { + if (this.tables.containsKey(table.getName())) { + throw new IllegalStateException( + "Duplicate table: " + table.getName() + " from provider: " + provider); + } + + this.tables.put(table.getName(), table); + } + } + + @Override public void createTable(Table table) { + validateTableType(table); + + // first assert the table name is unique + if (tables.containsKey(table.getName())) { + throw new IllegalArgumentException("Duplicate table name: " + table.getName()); + } + + // invoke the provider's create + providers.get(table.getType()).createTable(table); + + // store to the global metastore + tables.put(table.getName(), table); + } + + @Override public Table queryTable(String tableName) { + return tables.get(tableName); + } + + @Override public List
queryAllTables() { + return new ArrayList<>(tables.values()); + } + + @Override public BeamSqlTable buildBeamSqlTable(String tableName) { + Table table = queryTable(tableName); + + if (table == null) { + throw new IllegalArgumentException("The specified table: " + tableName + " does not exists!"); + } + + TableProvider provider = providers.get(table.getType()); + + return provider.buildBeamSqlTable(table); + } + + private void validateTableType(Table table) { + if (!providers.containsKey(table.getType())) { + throw new UnsupportedOperationException( + "Table type: " + table.getType() + " not supported!"); + } + } + + public void registerProvider(TableProvider provider) { + this.providers.put(provider.getTableType(), provider); + initTablesFromProvider(provider); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/MetaStore.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/MetaStore.java new file mode 100644 index 000000000000..f8f1013d201d --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/MetaStore.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.meta.store; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlTable; + +/** + * The interface to handle CRUD of {@code BeamSql} table metadata. + */ +public interface MetaStore { + + /** + * create a table. + */ + void createTable(Table table); + + /** + * Query table with the specified name. + */ + Table queryTable(String tableName); + + /** + * Query all the tables. + */ + List
queryAllTables(); + + /** + * Build the {@code BeamSqlTable} for the specified table. + */ + BeamSqlTable buildBeamSqlTable(String tableName); +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/package-info.java new file mode 100644 index 000000000000..39f138585c60 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Meta stores. + */ +package org.apache.beam.sdk.extensions.sql.meta.store; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/BeamSqlParser.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/BeamSqlParser.java new file mode 100644 index 000000000000..4b4037695b6a --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/BeamSqlParser.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.parser; + +import com.google.common.annotations.VisibleForTesting; +import java.io.StringReader; +import org.apache.beam.sdk.extensions.sql.parser.impl.BeamSqlParserImpl; +import org.apache.calcite.config.Lex; + +/** + * + */ +public class BeamSqlParser { + public static final int DEFAULT_IDENTIFIER_MAX_LENGTH = 128; + private final BeamSqlParserImpl impl; + + public BeamSqlParser(String s) { + this.impl = new BeamSqlParserImpl(new StringReader(s)); + this.impl.setTabSize(1); + this.impl.setQuotedCasing(Lex.ORACLE.quotedCasing); + this.impl.setUnquotedCasing(Lex.ORACLE.unquotedCasing); + this.impl.setIdentifierMaxLength(DEFAULT_IDENTIFIER_MAX_LENGTH); + /* + * By default parser uses [ ] for quoting identifiers. Switching to + * DQID (double quoted identifiers) is needed for array and map access + * (m['x'] = 1 or arr[2] = 10 etc) to work. + */ + this.impl.switchTo("DQID"); + } + + @VisibleForTesting + public BeamSqlParserImpl impl() { + return impl; + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/ColumnConstraint.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/ColumnConstraint.java new file mode 100644 index 000000000000..7b8f053bd396 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/ColumnConstraint.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.parser; + +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Column constraint such as primary key. + */ +public class ColumnConstraint extends SqlLiteral { + private ColumnConstraint( + Object value, SqlTypeName typeName, SqlParserPos pos) { + super(value, typeName, pos); + } + + /** + * A primary key constraint. + */ + public static class PrimaryKey extends ColumnConstraint { + public PrimaryKey(SqlParserPos pos) { + super(SqlDDLKeywords.PRIMARY, SqlTypeName.SYMBOL, pos); + } + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/ColumnDefinition.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/ColumnDefinition.java new file mode 100644 index 000000000000..ebaa0216d4d3 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/ColumnDefinition.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.parser; + +import java.util.Arrays; +import org.apache.calcite.sql.SqlDataTypeSpec; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.NlsString; + +/** + * Column definition used during sql parsing(mainly DDL which not supported by default). + */ +public class ColumnDefinition extends SqlNodeList { + public ColumnDefinition( + SqlIdentifier name, SqlDataTypeSpec type, ColumnConstraint constraint, + SqlNode comment, SqlParserPos pos) { + super(Arrays.asList(name, type, constraint, comment), pos); + } + + public String name() { + return get(0).toString(); + } + + public SqlDataTypeSpec type() { + return (SqlDataTypeSpec) get(1); + } + + public ColumnConstraint constraint() { + SqlNode constraintNode = get(2); + return constraintNode == null ? null : (ColumnConstraint) constraintNode; + } + + public String comment() { + SqlNode commentNode = get(3); + return commentNode == null ? null : ((NlsString) SqlLiteral.value(commentNode)).getValue(); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/SqlCreateTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/SqlCreateTable.java new file mode 100644 index 000000000000..f7a32c0fbb12 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/SqlCreateTable.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.parser; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.google.common.base.Strings; +import java.net.URI; +import java.util.List; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; +import org.apache.calcite.util.NlsString; + +/** + * A Calcite {@code SqlCall} which represents a create table statement. + */ +public class SqlCreateTable extends SqlCall { + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator( + "CREATE_TABLE", SqlKind.OTHER) { + @Override + public SqlCall createCall( + SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... o) { + assert functionQualifier == null; + return new SqlCreateTable(pos, (SqlIdentifier) o[0], (SqlNodeList) o[1], + o[2], o[3], o[4], o[5]); + } + + @Override + public void unparse( + SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) { + SqlCreateTable t = (SqlCreateTable) call; + UnparseUtil u = new UnparseUtil(writer, leftPrec, rightPrec); + u.keyword("CREATE", "TABLE").node(t.tblName).nodeList( + t.fieldList); + u.keyword("LOCATION").node(t.location); + if (t.properties != null) { + u.keyword("TBLPROPERTIES").node(t.properties); + } + if (t.query != null) { + u.keyword("AS").node(t.query); + } + } + }; + + private final SqlIdentifier tblName; + private final SqlNodeList fieldList; + private final SqlNode comment; + private final SqlNode location; + private final SqlNode properties; + private final SqlNode query; + + public SqlCreateTable( + SqlParserPos pos, SqlIdentifier tblName, SqlNodeList fieldList, + SqlNode comment, SqlNode location, SqlNode properties, SqlNode query) { + super(pos); + this.tblName = tblName; + this.fieldList = fieldList; + this.comment = comment; + this.location = location; + this.properties = properties; + this.query = query; + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + getOperator().unparse(writer, this, leftPrec, rightPrec); + } + + @Override + public List getOperandList() { + return ImmutableNullableList.of(tblName, fieldList, location, properties, + query); + } + + public String tableName() { + return tblName.toString(); + } + + public URI location() { + return location == null ? null : URI.create(getString(location)); + } + + public String comment() { + return comment == null ? null : getString(comment); + } + + public JSONObject properties() { + String propertiesStr = getString(properties); + if (Strings.isNullOrEmpty(propertiesStr)) { + return new JSONObject(); + } else { + return JSON.parseObject(propertiesStr); + } + } + + private String getString(SqlNode n) { + return n == null ? null : ((NlsString) SqlLiteral.value(n)).getValue(); + } + + @SuppressWarnings("unchecked") + public List fieldList() { + return (List) ((List) fieldList.getList()); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/SqlDDLKeywords.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/SqlDDLKeywords.java new file mode 100644 index 000000000000..0e4e0a9745f4 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/SqlDDLKeywords.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.parser; + +import org.apache.calcite.sql.SqlLiteral; + +/** + * Define the keywords that can occur in a CREATE TABLE statement. + */ +public enum SqlDDLKeywords implements SqlLiteral.SqlSymbol { + PRIMARY +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/UnparseUtil.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/UnparseUtil.java new file mode 100644 index 000000000000..f482b0e08b3c --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/UnparseUtil.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.parser; + +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlWriter; + +class UnparseUtil { + private final SqlWriter writer; + private final int leftPrec; + private final int rightPrec; + + UnparseUtil(SqlWriter writer, int leftPrec, int rightPrec) { + this.writer = writer; + this.leftPrec = leftPrec; + this.rightPrec = rightPrec; + } + + UnparseUtil keyword(String... keywords) { + for (String k : keywords) { + writer.keyword(k); + } + return this; + } + + UnparseUtil node(SqlNode n) { + n.unparse(writer, leftPrec, rightPrec); + return this; + } + + UnparseUtil nodeList(SqlNodeList l) { + writer.keyword("("); + if (l.size() > 0) { + l.get(0).unparse(writer, leftPrec, rightPrec); + for (int i = 1; i < l.size(); ++i) { + writer.keyword(","); + l.get(i).unparse(writer, leftPrec, rightPrec); + } + } + writer.keyword(")"); + return this; + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/package-info.java new file mode 100644 index 000000000000..f7f39054a911 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Created by xumingmingv on 16/06/2017. + */ +package org.apache.beam.sdk.extensions.sql.parser; diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java index 08678d108f87..dcbeaa11c7af 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java @@ -41,6 +41,9 @@ public void testSdkApiSurface() throws Exception { ImmutableSet.of( "org.apache.beam", "org.joda.time", + "com.alibaba.fastjson", + // exposed through fastjson + "sun.reflect", "org.apache.commons.csv"); ApiSurface surface = ApiSurface diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTableTest.java similarity index 98% rename from sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java rename to sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTableTest.java index 05af36cb3b71..f3e2f72417df 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTableTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.schema.kafka; +package org.apache.beam.sdk.extensions.sql.meta.provider.kafka; import java.io.Serializable; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableTest.java similarity index 99% rename from sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java rename to sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableTest.java index 79e3d6d8049d..7fb9544e909f 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/BeamTextCSVTableTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.schema.text; +package org.apache.beam.sdk.extensions.sql.meta.provider.text; import java.io.File; import java.io.FileOutputStream; From 749e45fa75819d08215a28d967e00318bc426047 Mon Sep 17 00:00:00 2001 From: James Xu Date: Thu, 27 Jul 2017 14:40:16 +0800 Subject: [PATCH 2/7] fix minor conflict --- .../main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java index 5187a6896907..ca82a1510e6f 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java @@ -60,7 +60,6 @@ public void registerUdf(String functionName, Class clazz) } /** -<<<<<<< HEAD * Register a UDAF function which can be used in GROUP-BY expression. * See {@link BeamSqlUdaf} on how to implement a UDAF. */ From 35c53128d5773b3ec0d9dae1cdced6c4cc25eb71 Mon Sep 17 00:00:00 2001 From: James Xu Date: Fri, 28 Jul 2017 00:08:42 +0800 Subject: [PATCH 3/7] disable findbugs for auto-generated classes in package: org.apache.beam.dsls.sql.parser.impl --- .../src/main/resources/beam/findbugs-filter.xml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml index 0c9080d408b7..2cd2600cd126 100644 --- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml +++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml @@ -421,4 +421,11 @@ exception in writeObject() --> + + + + + From 84cc3b0951cccac83accbdcaf3312ba1118b8697 Mon Sep 17 00:00:00 2001 From: James Xu Date: Fri, 28 Jul 2017 08:21:42 +0800 Subject: [PATCH 4/7] exclude auto-generated package org.apache.beam.dsls.sql.parser.impl from javadoc --- pom.xml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pom.xml b/pom.xml index 803d30cb937d..64d5adbe8e6e 100644 --- a/pom.xml +++ b/pom.xml @@ -1400,6 +1400,8 @@ maven-javadoc-plugin 2.10.4 + + org.apache.beam.dsls.sql.parser.impl ${beam.javadoc_opts} Apache Beam SDK for Java, version ${project.version} API Apache Beam SDK for Java, version ${project.version} From 184aa0fc44ecc0aa05eedf6ac5d9281826186ec9 Mon Sep 17 00:00:00 2001 From: James Xu Date: Wed, 2 Aug 2017 15:43:11 +0800 Subject: [PATCH 5/7] cleanup and include meta package into ApiSurfaceTest --- .../src/main/resources/beam/findbugs-filter.xml | 2 +- sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd | 4 ++-- .../org/apache/beam/sdk/extensions/sql/BeamSqlCli.java | 8 ++++---- .../extensions/sql/{ => impl}/parser/BeamSqlParser.java | 4 ++-- .../sql/{ => impl}/parser/ColumnConstraint.java | 2 +- .../sql/{ => impl}/parser/ColumnDefinition.java | 2 +- .../extensions/sql/{ => impl}/parser/SqlCreateTable.java | 2 +- .../extensions/sql/{ => impl}/parser/SqlDDLKeywords.java | 2 +- .../sdk/extensions/sql/{ => impl}/parser/UnparseUtil.java | 2 +- .../extensions/sql/{ => impl}/parser/package-info.java | 2 +- .../beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java | 2 ++ 11 files changed, 17 insertions(+), 15 deletions(-) rename sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/{ => impl}/parser/BeamSqlParser.java (92%) rename sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/{ => impl}/parser/ColumnConstraint.java (96%) rename sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/{ => impl}/parser/ColumnDefinition.java (97%) rename sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/{ => impl}/parser/SqlCreateTable.java (98%) rename sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/{ => impl}/parser/SqlDDLKeywords.java (94%) rename sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/{ => impl}/parser/UnparseUtil.java (96%) rename sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/{ => impl}/parser/package-info.java (93%) diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml index 2cd2600cd126..d60dba8f4cd2 100644 --- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml +++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml @@ -426,6 +426,6 @@ - + diff --git a/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd b/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd index d4df64b93ea7..82e644ba858f 100644 --- a/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd +++ b/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd @@ -16,14 +16,14 @@ { # Generated parser implementation class package and name - package: "org.apache.beam.sdk.extensions.sql.parser.impl", + package: "org.apache.beam.sdk.extensions.sql.impl.parser.impl", class: "BeamSqlParserImpl", # List of import statements. imports: [ "org.apache.calcite.sql.validate.*", "org.apache.calcite.util.*", - "org.apache.beam.sdk.extensions.sql.parser.*", + "org.apache.beam.sdk.extensions.sql.impl.parser.*", "java.util.*" ] diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java index 13494ec8315e..0e5564df82a9 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java @@ -21,16 +21,16 @@ import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.extensions.sql.impl.parser.BeamSqlParser; +import org.apache.beam.sdk.extensions.sql.impl.parser.ColumnConstraint; +import org.apache.beam.sdk.extensions.sql.impl.parser.ColumnDefinition; +import org.apache.beam.sdk.extensions.sql.impl.parser.SqlCreateTable; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.extensions.sql.meta.Column; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore; -import org.apache.beam.sdk.extensions.sql.parser.BeamSqlParser; -import org.apache.beam.sdk.extensions.sql.parser.ColumnConstraint; -import org.apache.beam.sdk.extensions.sql.parser.ColumnDefinition; -import org.apache.beam.sdk.extensions.sql.parser.SqlCreateTable; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/BeamSqlParser.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParser.java similarity index 92% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/BeamSqlParser.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParser.java index 4b4037695b6a..03155575b25c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/BeamSqlParser.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParser.java @@ -15,11 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.parser; +package org.apache.beam.sdk.extensions.sql.impl.parser; import com.google.common.annotations.VisibleForTesting; import java.io.StringReader; -import org.apache.beam.sdk.extensions.sql.parser.impl.BeamSqlParserImpl; +import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl; import org.apache.calcite.config.Lex; /** diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/ColumnConstraint.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/ColumnConstraint.java similarity index 96% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/ColumnConstraint.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/ColumnConstraint.java index 7b8f053bd396..965daa2b1e7b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/ColumnConstraint.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/ColumnConstraint.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.parser; +package org.apache.beam.sdk.extensions.sql.impl.parser; import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.parser.SqlParserPos; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/ColumnDefinition.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/ColumnDefinition.java similarity index 97% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/ColumnDefinition.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/ColumnDefinition.java index ebaa0216d4d3..fce8d2ce66ea 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/ColumnDefinition.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/ColumnDefinition.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.parser; +package org.apache.beam.sdk.extensions.sql.impl.parser; import java.util.Arrays; import org.apache.calcite.sql.SqlDataTypeSpec; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/SqlCreateTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java similarity index 98% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/SqlCreateTable.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java index f7a32c0fbb12..2d194e2425a4 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/SqlCreateTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.parser; +package org.apache.beam.sdk.extensions.sql.impl.parser; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/SqlDDLKeywords.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDDLKeywords.java similarity index 94% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/SqlDDLKeywords.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDDLKeywords.java index 0e4e0a9745f4..14a1b122ce45 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/SqlDDLKeywords.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDDLKeywords.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.parser; +package org.apache.beam.sdk.extensions.sql.impl.parser; import org.apache.calcite.sql.SqlLiteral; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/UnparseUtil.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/UnparseUtil.java similarity index 96% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/UnparseUtil.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/UnparseUtil.java index f482b0e08b3c..30e06b530b81 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/UnparseUtil.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/UnparseUtil.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.parser; +package org.apache.beam.sdk.extensions.sql.impl.parser; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/package-info.java similarity index 93% rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/package-info.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/package-info.java index f7f39054a911..73ae2c526d7d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/parser/package-info.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/package-info.java @@ -19,4 +19,4 @@ /** * Created by xumingmingv on 16/06/2017. */ -package org.apache.beam.sdk.extensions.sql.parser; +package org.apache.beam.sdk.extensions.sql.impl.parser; diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java index dcbeaa11c7af..a2179c65816b 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java @@ -52,6 +52,8 @@ public void testSdkApiSurface() throws Exception { .includingClass(BeamSqlEnv.class) .includingPackage("org.apache.beam.sdk.extensions.sql.schema", getClass().getClassLoader()) + .includingPackage("org.apache.beam.sdk.extensions.sql.meta", + getClass().getClassLoader()) .pruningPrefix("java") .pruningPattern("org[.]apache[.]beam[.]sdk[.]extensions[.]sql[.].*Test") .pruningPattern("org[.]apache[.]beam[.]sdk[.]extensions[.]sql[.].*TestBase"); From 107369df045a72058ee296db7358638c6501b646 Mon Sep 17 00:00:00 2001 From: James Xu Date: Wed, 2 Aug 2017 15:51:39 +0800 Subject: [PATCH 6/7] fix licences --- .../sql/src/main/codegen/config.fmpp | 2 +- .../sql/src/main/codegen/data/Parser.tdd | 2 +- .../beam/sdk/extensions/sql/meta/Column.java | 18 ++++++++++++++++++ .../beam/sdk/extensions/sql/meta/Table.java | 18 ++++++++++++++++++ .../provider/kafka/KafkaTableProvider.java | 18 ++++++++++++++++++ 5 files changed, 56 insertions(+), 2 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/codegen/config.fmpp b/sdks/java/extensions/sql/src/main/codegen/config.fmpp index be5a792ca0a1..61645e29f55e 100644 --- a/sdks/java/extensions/sql/src/main/codegen/config.fmpp +++ b/sdks/java/extensions/sql/src/main/codegen/config.fmpp @@ -6,7 +6,7 @@ # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # -# http:# www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, diff --git a/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd b/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd index 82e644ba858f..09a53799042f 100644 --- a/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd +++ b/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd @@ -6,7 +6,7 @@ # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # -# http:# www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Column.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Column.java index c821b0589ca6..9bcc16af7f56 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Column.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Column.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.beam.sdk.extensions.sql.meta; import com.google.auto.value.AutoValue; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java index 420312ddfce7..42599c8fdfcc 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.beam.sdk.extensions.sql.meta; import com.alibaba.fastjson.JSONObject; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java index 73ec98169449..ed753ec2a3dc 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.beam.sdk.extensions.sql.meta.provider.kafka; import static org.apache.beam.sdk.extensions.sql.meta.provider.MetaUtils.getBeamSqlRecordTypeFromTable; From 3c73f5254e77a8dcfcfa998375cae155213a3a29 Mon Sep 17 00:00:00 2001 From: James Xu Date: Thu, 3 Aug 2017 10:58:29 +0800 Subject: [PATCH 7/7] ignore auto-generated parser code from javadoc plugin --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 64d5adbe8e6e..184b9719d654 100644 --- a/pom.xml +++ b/pom.xml @@ -1401,7 +1401,7 @@ 2.10.4 - org.apache.beam.dsls.sql.parser.impl + org.apache.beam.sdk.extensions.sql.impl.parser.impl ${beam.javadoc_opts} Apache Beam SDK for Java, version ${project.version} API Apache Beam SDK for Java, version ${project.version}