Skip to content

Commit

Permalink
STORM-2406 [Storm SQL] Change underlying API to Streams API
Browse files Browse the repository at this point in the history
* This will enable us to provide windowed aggregation, join, etc.
  * Tuple-to-tuple is making more sense than micro-batch in these cases
* Tested with several sql cases
* Also bump Calcite version to 1.14.0
  • Loading branch information
HeartSaVioR committed Dec 15, 2017
1 parent e7fdd67 commit 8fe6beb
Show file tree
Hide file tree
Showing 63 changed files with 1,539 additions and 1,769 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -291,7 +291,7 @@
<cassandra.version>2.1.7</cassandra.version>
<druid.version>0.8.2</druid.version>
<elasticsearch.version>5.2.2</elasticsearch.version>
<calcite.version>1.11.0</calcite.version>
<calcite.version>1.14.0</calcite.version>
<mongodb.version>3.2.0</mongodb.version>
<solr.version>5.2.1</solr.version>
<jpmml.version>1.0.22</jpmml.version>
Expand Down
26 changes: 25 additions & 1 deletion sql/storm-sql-core/pom.xml
Expand Up @@ -66,10 +66,34 @@
<artifactId>calcite-core</artifactId>
<version>${calcite.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-metrics</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
<exclusion>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down Expand Up @@ -172,7 +196,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
<maxAllowedViolations>1286</maxAllowedViolations>
<maxAllowedViolations>1281</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
Expand Down
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.storm.sql;

import java.util.List;

import org.apache.calcite.DataContext;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.sql.javac.CompilingClassLoader;
import org.apache.storm.streams.Stream;
import org.apache.storm.tuple.Values;

public abstract class AbstractStreamsProcessor {
protected Stream<Values> outputStream;
protected DataContext dataContext;
protected List<CompilingClassLoader> classLoaders;

/**
* @return the output stream of the SQL
*/
public Stream<Values> outputStream() {
return outputStream;
}

/**
* Construct the Storm topology based on the SQL.
*/
public abstract StormTopology build();

/**
* @return DataContext instance which is used with execution of query
*/
public DataContext getDataContext() {
return dataContext;
}

/**
* @return Classloaders to compile. They're all chaining so the last classloader can access all classes.
*/
public List<CompilingClassLoader> getClassLoaders() {
return classLoaders;
}
}

This file was deleted.

Expand Up @@ -25,8 +25,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.RelNode;
Expand Down Expand Up @@ -55,17 +57,17 @@
import org.apache.storm.sql.parser.SqlCreateFunction;
import org.apache.storm.sql.parser.SqlCreateTable;
import org.apache.storm.sql.planner.StormRelUtils;
import org.apache.storm.sql.planner.trident.QueryPlanner;
import org.apache.storm.sql.planner.streams.QueryPlanner;
import org.apache.storm.sql.runtime.DataSourcesRegistry;
import org.apache.storm.sql.runtime.FieldInfo;
import org.apache.storm.sql.runtime.ISqlTridentDataSource;
import org.apache.storm.sql.runtime.ISqlStreamsDataSource;

public class StormSqlContext {
private final JavaTypeFactory typeFactory = new StormSqlTypeFactoryImpl(
RelDataTypeSystem.DEFAULT);
private final SchemaPlus schema = Frameworks.createRootSchema(true);
private boolean hasUdf = false;
private Map<String, ISqlTridentDataSource> dataSources = new HashMap<>();
private Map<String, ISqlStreamsDataSource> dataSources = new HashMap<>();

public void interpretCreateTable(SqlCreateTable n) {
CompilerUtil.TableBuilderInfo builder = new CompilerUtil.TableBuilderInfo(typeFactory);
Expand All @@ -85,7 +87,7 @@ public void interpretCreateTable(SqlCreateTable n) {
Table table = builder.build();
schema.add(n.tableName(), table);

ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(n.location(), n
ISqlStreamsDataSource ds = DataSourcesRegistry.constructStreamsDataSource(n.location(), n
.inputFormatClass(), n.outputFormatClass(), n.properties(), fields);
if (ds == null) {
throw new RuntimeException("Failed to find data source for " + n
Expand Down Expand Up @@ -114,7 +116,7 @@ public void interpretCreateFunction(SqlCreateFunction sqlCreateFunction) throws
hasUdf = true;
}

public AbstractTridentProcessor compileSql(String query) throws Exception {
public AbstractStreamsProcessor compileSql(String query) throws Exception {
QueryPlanner planner = new QueryPlanner(schema);
return planner.compile(dataSources, query);
}
Expand All @@ -134,8 +136,9 @@ public FrameworkConfig buildFrameWorkConfig() {
List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
sqlOperatorTables.add(SqlStdOperatorTable.instance());
sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
false,
Collections.<String>emptyList(), typeFactory));
Collections.emptyList(),
typeFactory,
new CalciteConnectionConfigImpl(new Properties())));
return Frameworks.newConfigBuilder().defaultSchema(schema)
.operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables)).build();
} else {
Expand All @@ -151,11 +154,7 @@ public SchemaPlus getSchema() {
return schema;
}

public boolean isHasUdf() {
return hasUdf;
}

public Map<String, ISqlTridentDataSource> getDataSources() {
public Map<String, ISqlStreamsDataSource> getDataSources() {
return dataSources;
}

Expand Down
33 changes: 17 additions & 16 deletions sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -1,31 +1,23 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.storm.sql;

import org.apache.calcite.sql.SqlNode;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.SubmitOptions;
import org.apache.storm.sql.javac.CompilingClassLoader;
import org.apache.storm.sql.parser.SqlCreateFunction;
import org.apache.storm.sql.parser.SqlCreateTable;
import org.apache.storm.sql.parser.StormParser;
import org.apache.storm.trident.TridentTopology;

import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream;
Expand All @@ -39,6 +31,15 @@
import java.util.jar.Manifest;
import java.util.zip.ZipEntry;

import org.apache.calcite.sql.SqlNode;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.SubmitOptions;
import org.apache.storm.sql.javac.CompilingClassLoader;
import org.apache.storm.sql.parser.SqlCreateFunction;
import org.apache.storm.sql.parser.SqlCreateTable;
import org.apache.storm.sql.parser.StormParser;

class StormSqlImpl extends StormSql {
private final StormSqlContext sqlContext;

Expand All @@ -59,8 +60,8 @@ public void submit(
} else if (node instanceof SqlCreateFunction) {
sqlContext.interpretCreateFunction((SqlCreateFunction) node);
} else {
AbstractTridentProcessor processor = sqlContext.compileSql(sql);
TridentTopology topo = processor.build();
AbstractStreamsProcessor processor = sqlContext.compileSql(sql);
StormTopology topo = processor.build();

Path jarPath = null;
try {
Expand All @@ -71,7 +72,7 @@ public void submit(
jarPath = Files.createTempFile("storm-sql", ".jar");
System.setProperty("storm.jar", jarPath.toString());
packageTopology(jarPath, processor);
StormSubmitter.submitTopologyAs(name, topoConf, topo.build(), opts, progressListener, asUser);
StormSubmitter.submitTopologyAs(name, topoConf, topo, opts, progressListener, asUser);
} finally {
if (jarPath != null) {
Files.delete(jarPath);
Expand Down Expand Up @@ -108,7 +109,7 @@ public void explain(Iterable<String> statements) throws Exception {
}
}

private void packageTopology(Path jar, AbstractTridentProcessor processor) throws IOException {
private void packageTopology(Path jar, AbstractStreamsProcessor processor) throws IOException {
Manifest manifest = new Manifest();
Attributes attr = manifest.getMainAttributes();
attr.put(Attributes.Name.MANIFEST_VERSION, "1.0");
Expand Down
Expand Up @@ -17,6 +17,12 @@
*/
package org.apache.storm.sql;

import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
Expand All @@ -26,12 +32,6 @@
import org.apache.storm.generated.TopologyInitialStatus;
import org.apache.storm.utils.Utils;

import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;

public class StormSqlRunner {
private static final String OPTION_SQL_FILE_SHORT = "f";
private static final String OPTION_SQL_FILE_LONG = "file";
Expand Down Expand Up @@ -78,7 +78,8 @@ private static Options buildOptions() {
Options options = new Options();
options.addOption(OPTION_SQL_FILE_SHORT, OPTION_SQL_FILE_LONG, true, "REQUIRED SQL file which has sql statements");
options.addOption(OPTION_SQL_TOPOLOGY_NAME_SHORT, OPTION_SQL_TOPOLOGY_NAME_LONG, true, "Topology name to submit");
options.addOption(OPTION_SQL_EXPLAIN_SHORT, OPTION_SQL_EXPLAIN_LONG, false, "Activate explain mode (topology name will be ignored)");
options.addOption(OPTION_SQL_EXPLAIN_SHORT, OPTION_SQL_EXPLAIN_LONG, false,
"Activate explain mode (topology name will be ignored)");
return options;
}

Expand Down
Expand Up @@ -26,7 +26,7 @@
*
* @see Delta
*/
public interface ParallelStreamableTable extends StreamableTable {
public interface ParallelStreamableTable extends StormStreamableTable {

/**
* Returns parallelism hint of this table. Returns null if don't know.
Expand Down

0 comments on commit 8fe6beb

Please sign in to comment.