Skip to content

Commit

Permalink
ISSUE-2406 [Storm SQL] Change underlying API to Streams API
Browse files Browse the repository at this point in the history
* This will enable us to provide windowed aggregation, join, etc.
  * Tuple-to-tuple is making more sense than micro-batch in these cases
* Tested with several sql cases
* Also bump Calcite version to 1.14.0
  • Loading branch information
HeartSaVioR committed Dec 4, 2017
1 parent 8abdec8 commit e9f7ff6
Show file tree
Hide file tree
Showing 61 changed files with 1,419 additions and 1,674 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -291,7 +291,7 @@
<cassandra.version>2.1.7</cassandra.version>
<druid.version>0.8.2</druid.version>
<elasticsearch.version>5.2.2</elasticsearch.version>
<calcite.version>1.11.0</calcite.version>
<calcite.version>1.14.0</calcite.version>
<mongodb.version>3.2.0</mongodb.version>
<solr.version>5.2.1</solr.version>
<jpmml.version>1.0.22</jpmml.version>
Expand Down
24 changes: 24 additions & 0 deletions sql/storm-sql-core/pom.xml
Expand Up @@ -66,10 +66,34 @@
<artifactId>calcite-core</artifactId>
<version>${calcite.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-metrics</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
<exclusion>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.storm.sql;

import java.util.List;

import org.apache.calcite.DataContext;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.sql.javac.CompilingClassLoader;
import org.apache.storm.streams.Stream;
import org.apache.storm.tuple.Values;

public abstract class AbstractStreamsProcessor {
protected Stream<Values> outputStream;
protected DataContext dataContext;
protected List<CompilingClassLoader> classLoaders;

/**
* @return the output stream of the SQL
*/
public Stream<Values> outputStream() {
return outputStream;
}

/**
* Construct the Storm topology based on the SQL.
*/
public abstract StormTopology build();

/**
* @return DataContext instance which is used with execution of query
*/
public DataContext getDataContext() {
return dataContext;
}

/**
* @return Classloaders to compile. They're all chaining so the last classloader can access all classes.
*/
public List<CompilingClassLoader> getClassLoaders() {
return classLoaders;
}
}

This file was deleted.

80 changes: 42 additions & 38 deletions sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -1,23 +1,45 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.storm.sql;

import static org.apache.storm.sql.compiler.CompilerUtil.TableBuilderInfo;

import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.jar.Attributes;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import java.util.zip.ZipEntry;

import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.RelNode;
Expand All @@ -37,6 +59,7 @@
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.SubmitOptions;
import org.apache.storm.sql.compiler.StormSqlTypeFactoryImpl;
import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler;
Expand All @@ -47,33 +70,13 @@
import org.apache.storm.sql.parser.SqlCreateTable;
import org.apache.storm.sql.parser.StormParser;
import org.apache.storm.sql.planner.StormRelUtils;
import org.apache.storm.sql.planner.trident.QueryPlanner;
import org.apache.storm.sql.planner.streams.QueryPlanner;
import org.apache.storm.sql.runtime.AbstractValuesProcessor;
import org.apache.storm.sql.runtime.ChannelHandler;
import org.apache.storm.sql.runtime.DataSource;
import org.apache.storm.sql.runtime.DataSourcesRegistry;
import org.apache.storm.sql.runtime.FieldInfo;
import org.apache.storm.sql.runtime.ISqlTridentDataSource;
import org.apache.storm.trident.TridentTopology;

import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.jar.Attributes;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import java.util.zip.ZipEntry;

import static org.apache.storm.sql.compiler.CompilerUtil.TableBuilderInfo;
import org.apache.storm.sql.runtime.ISqlStreamsDataSource;

class StormSqlImpl extends StormSql {
private final JavaTypeFactory typeFactory = new StormSqlTypeFactoryImpl(
Expand Down Expand Up @@ -111,18 +114,18 @@ public void submit(
String name, Iterable<String> statements, Map<String, Object> topoConf, SubmitOptions opts,
StormSubmitter.ProgressListener progressListener, String asUser)
throws Exception {
Map<String, ISqlTridentDataSource> dataSources = new HashMap<>();
Map<String, ISqlStreamsDataSource> dataSources = new HashMap<>();
for (String sql : statements) {
StormParser parser = new StormParser(sql);
SqlNode node = parser.impl().parseSqlStmtEof();
if (node instanceof SqlCreateTable) {
handleCreateTableForTrident((SqlCreateTable) node, dataSources);
handleCreateTableForStreams((SqlCreateTable) node, dataSources);
} else if (node instanceof SqlCreateFunction) {
handleCreateFunction((SqlCreateFunction) node);
} else {
QueryPlanner planner = new QueryPlanner(schema);
AbstractTridentProcessor processor = planner.compile(dataSources, sql);
TridentTopology topo = processor.build();
AbstractStreamsProcessor processor = planner.compile(dataSources, sql);
StormTopology topo = processor.build();

Path jarPath = null;
try {
Expand All @@ -133,7 +136,7 @@ public void submit(
jarPath = Files.createTempFile("storm-sql", ".jar");
System.setProperty("storm.jar", jarPath.toString());
packageTopology(jarPath, processor);
StormSubmitter.submitTopologyAs(name, topoConf, topo.build(), opts, progressListener, asUser);
StormSubmitter.submitTopologyAs(name, topoConf, topo, opts, progressListener, asUser);
} finally {
if (jarPath != null) {
Files.delete(jarPath);
Expand All @@ -145,7 +148,7 @@ public void submit(

@Override
public void explain(Iterable<String> statements) throws Exception {
Map<String, ISqlTridentDataSource> dataSources = new HashMap<>();
Map<String, ISqlStreamsDataSource> dataSources = new HashMap<>();
for (String sql : statements) {
StormParser parser = new StormParser(sql);
SqlNode node = parser.impl().parseSqlStmtEof();
Expand All @@ -156,7 +159,7 @@ public void explain(Iterable<String> statements) throws Exception {
System.out.println("-----------------------------------------------------------");

if (node instanceof SqlCreateTable) {
handleCreateTableForTrident((SqlCreateTable) node, dataSources);
handleCreateTableForStreams((SqlCreateTable) node, dataSources);
System.out.println("No plan presented on DDL");
} else if (node instanceof SqlCreateFunction) {
handleCreateFunction((SqlCreateFunction) node);
Expand All @@ -177,7 +180,7 @@ public void explain(Iterable<String> statements) throws Exception {
}
}

private void packageTopology(Path jar, AbstractTridentProcessor processor) throws IOException {
private void packageTopology(Path jar, AbstractStreamsProcessor processor) throws IOException {
Manifest manifest = new Manifest();
Attributes attr = manifest.getMainAttributes();
attr.put(Attributes.Name.MANIFEST_VERSION, "1.0");
Expand Down Expand Up @@ -239,10 +242,10 @@ private Method findMethod(String clazzName, String methodName) throws ClassNotFo
return null;
}

private void handleCreateTableForTrident(
SqlCreateTable n, Map<String, ISqlTridentDataSource> dataSources) {
private void handleCreateTableForStreams(
SqlCreateTable n, Map<String, ISqlStreamsDataSource> dataSources) {
List<FieldInfo> fields = updateSchema(n);
ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(n.location(), n
ISqlStreamsDataSource ds = DataSourcesRegistry.constructStreamsDataSource(n.location(), n
.inputFormatClass(), n.outputFormatClass(), n.properties(), fields);
if (ds == null) {
throw new RuntimeException("Failed to find data source for " + n
Expand Down Expand Up @@ -279,8 +282,9 @@ private FrameworkConfig buildFrameWorkConfig() {
List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
sqlOperatorTables.add(SqlStdOperatorTable.instance());
sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema),
false,
Collections.<String>emptyList(), typeFactory));
Collections.emptyList(),
typeFactory,
new CalciteConnectionConfigImpl(new Properties())));
return Frameworks.newConfigBuilder().defaultSchema(schema)
.operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables)).build();
} else {
Expand Down
Expand Up @@ -17,6 +17,12 @@
*/
package org.apache.storm.sql;

import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
Expand All @@ -26,12 +32,6 @@
import org.apache.storm.generated.TopologyInitialStatus;
import org.apache.storm.utils.Utils;

import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;

public class StormSqlRunner {
private static final String OPTION_SQL_FILE_SHORT = "f";
private static final String OPTION_SQL_FILE_LONG = "file";
Expand Down Expand Up @@ -78,7 +78,8 @@ private static Options buildOptions() {
Options options = new Options();
options.addOption(OPTION_SQL_FILE_SHORT, OPTION_SQL_FILE_LONG, true, "REQUIRED SQL file which has sql statements");
options.addOption(OPTION_SQL_TOPOLOGY_NAME_SHORT, OPTION_SQL_TOPOLOGY_NAME_LONG, true, "Topology name to submit");
options.addOption(OPTION_SQL_EXPLAIN_SHORT, OPTION_SQL_EXPLAIN_LONG, false, "Activate explain mode (topology name will be ignored)");
options.addOption(OPTION_SQL_EXPLAIN_SHORT, OPTION_SQL_EXPLAIN_LONG, false,
"Activate explain mode (topology name will be ignored)");
return options;
}

Expand Down
Expand Up @@ -26,7 +26,7 @@
*
* @see Delta
*/
public interface ParallelStreamableTable extends StreamableTable {
public interface ParallelStreamableTable extends StormStreamableTable {

/**
* Returns parallelism hint of this table. Returns null if don't know.
Expand Down

0 comments on commit e9f7ff6

Please sign in to comment.