Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sql/storm-sql-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
<maxAllowedViolations>59</maxAllowedViolations>
<maxAllowedViolations>0</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
Expand Down
4 changes: 2 additions & 2 deletions sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
/**
* The StormSql class provides standalone, interactive interfaces to execute
* SQL statements over streaming data.
* <p>
* The StormSql class is stateless. The user needs to submit the data
*
* <p>The StormSql class is stateless. The user needs to submit the data
* definition language (DDL) statements and the query statements in the same
* batch.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,13 @@ public void submit(
@Override
public void explain(Iterable<String> statements) throws Exception {
for (String sql : statements) {
StormParser parser = new StormParser(sql);
SqlNode node = parser.impl().parseSqlStmtEof();

System.out.println("===========================================================");
System.out.println("query>");
System.out.println(sql);
System.out.println("-----------------------------------------------------------");

StormParser parser = new StormParser(sql);
SqlNode node = parser.impl().parseSqlStmtEof();
if (node instanceof SqlCreateTable) {
sqlContext.interpretCreateTable((SqlCreateTable) node);
System.out.println("No plan presented on DDL");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public static void main(String[] args) throws Exception {
SubmitOptions submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE);
sql.submit(topoName, stmts, conf, submitOptions, null, null);
} else {
printUsageAndExit(options, "Either " + OPTION_SQL_TOPOLOGY_NAME_LONG + " or " + OPTION_SQL_EXPLAIN_LONG +
" must be presented");
printUsageAndExit(options, "Either " + OPTION_SQL_TOPOLOGY_NAME_LONG
+ " or " + OPTION_SQL_EXPLAIN_LONG + " must be presented");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,16 @@

package org.apache.storm.sql.compiler;

import static org.apache.calcite.rel.RelFieldCollation.Direction;
import static org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING;
import static org.apache.calcite.rel.RelFieldCollation.Direction.DESCENDING;
import static org.apache.calcite.rel.RelFieldCollation.NullDirection;
import static org.apache.calcite.sql.validate.SqlMonotonicity.INCREASING;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;

import java.util.ArrayList;

import org.apache.calcite.config.CalciteConnectionConfig;
Expand All @@ -33,17 +40,11 @@
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SqlMonotonicity;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Util;

import org.apache.storm.sql.calcite.ParallelStreamableTable;
import org.apache.storm.sql.calcite.ParallelTable;
import org.apache.storm.sql.parser.ColumnConstraint;

import static org.apache.calcite.sql.validate.SqlMonotonicity.INCREASING;
import static org.apache.calcite.rel.RelFieldCollation.Direction;
import static org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING;
import static org.apache.calcite.rel.RelFieldCollation.Direction.DESCENDING;
import static org.apache.calcite.rel.RelFieldCollation.NullDirection;

public class CompilerUtil {

public static class TableBuilderInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@

/**
* Compiles a scalar expression ({@link org.apache.calcite.rex.RexNode}) to Java source code String.
* <p/>
* This code is inspired by JaninoRexCompiler in Calcite, but while it is returning {@link org.apache.calcite.interpreter.Scalar} which is
* executable, we need to pass the source code to compile and serialize instance so that it can be executed on worker efficiently.
*
* <p>This code is inspired by JaninoRexCompiler in Calcite, but while it is returning
* {@link org.apache.calcite.interpreter.Scalar} which is executable, we need to pass the source code to compile and
* serialize instance so that it can be executed on worker efficiently.
*/
public class RexNodeToJavaCodeCompiler {
private final RexBuilder rexBuilder;
Expand All @@ -64,8 +65,8 @@ public RexNodeToJavaCodeCompiler(RexBuilder rexBuilder) {
}

/**
* Given a method that implements {@link ExecutableExpression#execute(Context, Object[])}, adds a bridge method that implements {@link
* ExecutableExpression#execute(Context)}, and compiles.
* Given a method that implements {@link ExecutableExpression#execute(Context, Object[])}, adds a bridge method that
* implements {@link ExecutableExpression#execute(Context)}, and compiles.
*/
static String baz(ParameterExpression context,
ParameterExpression outputValues, BlockStatement block, String className) {
Expand Down Expand Up @@ -121,41 +122,21 @@ public BlockStatement compileToBlock(final RexProgram program) {
return compileToBlock(program, context_, outputValues_).toBlock();
}

public String compile(List<RexNode> nodes, RelDataType inputRowType, String className) {
final RexProgramBuilder programBuilder =
new RexProgramBuilder(inputRowType, rexBuilder);
for (RexNode node : nodes) {
programBuilder.addProject(node, null);
}

return compile(programBuilder.getProgram(), className);
}

public String compile(final RexProgram program, String className) {
final ParameterExpression context_ =
Expressions.parameter(Context.class, "context");
final ParameterExpression outputValues_ =
Expressions.parameter(Object[].class, "outputValues");

BlockBuilder builder = compileToBlock(program, context_, outputValues_);
return baz(context_, outputValues_, builder.toBlock(), className);
}

private BlockBuilder compileToBlock(final RexProgram program, ParameterExpression context,
ParameterExpression outputValues) {
ParameterExpression outputValues) {
RelDataType inputRowType = program.getInputRowType();
final BlockBuilder builder = new BlockBuilder();
final JavaTypeFactoryImpl javaTypeFactory =
new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());

final RexToLixTranslator.InputGetter inputGetter =
new RexToLixTranslator.InputGetterImpl(
ImmutableList.of(
Pair.<Expression, PhysType>of(
Expressions.field(context,
BuiltInMethod.CONTEXT_VALUES.field),
PhysTypeImpl.of(javaTypeFactory, inputRowType,
JavaRowFormat.ARRAY, false))));
new RexToLixTranslator.InputGetterImpl(
ImmutableList.of(
Pair.<Expression, PhysType>of(
Expressions.field(context,
BuiltInMethod.CONTEXT_VALUES.field),
PhysTypeImpl.of(javaTypeFactory, inputRowType,
JavaRowFormat.ARRAY, false))));
final Function1<String, RexToLixTranslator.InputGetter> correlates =
new Function1<String, RexToLixTranslator.InputGetter>() {
@Override
Expand All @@ -164,22 +145,42 @@ public RexToLixTranslator.InputGetter apply(String a0) {
}
};
final Expression root =
Expressions.field(context, BuiltInMethod.CONTEXT_ROOT.field);
Expressions.field(context, BuiltInMethod.CONTEXT_ROOT.field);
final List<Expression> list =
RexToLixTranslator.translateProjects(program, javaTypeFactory, builder,
null, root, inputGetter, correlates);
RexToLixTranslator.translateProjects(program, javaTypeFactory, builder,
null, root, inputGetter, correlates);
for (int i = 0; i < list.size(); i++) {
builder.add(
Expressions.statement(
Expressions.assign(
Expressions.arrayIndex(outputValues,
Expressions.constant(i)),
list.get(i))));
Expressions.statement(
Expressions.assign(
Expressions.arrayIndex(outputValues,
Expressions.constant(i)),
list.get(i))));
}

return builder;
}

public String compile(List<RexNode> nodes, RelDataType inputRowType, String className) {
final RexProgramBuilder programBuilder =
new RexProgramBuilder(inputRowType, rexBuilder);
for (RexNode node : nodes) {
programBuilder.addProject(node, null);
}

return compile(programBuilder.getProgram(), className);
}

public String compile(final RexProgram program, String className) {
final ParameterExpression context_ =
Expressions.parameter(Context.class, "context");
final ParameterExpression outputValues_ =
Expressions.parameter(Object[].class, "outputValues");

BlockBuilder builder = compileToBlock(program, context_, outputValues_);
return baz(context_, outputValues_, builder.toBlock(), className);
}

enum StormBuiltInMethod {
EXPR_EXECUTE1(ExecutableExpression.class, "execute", Context.class),
EXPR_EXECUTE2(ExecutableExpression.class, "execute", Context.class, Object[].class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ public Class<?> findClass(String name) throws ClassNotFoundException {
}

/**
* @return Whether compilation was successful.
* Compiles source code to byte code.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't this check disabled?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We removed the rule to add missing Javadoc on public methods and classes. The rules to validate already present Javadoc were present and remained untouched. I think this makes sense because if Javadoc is present it should be valid. It could also be removed but deciding that slows down the PR workflow in my opinion.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks.

* @return indicates whether compilation was successful
*/
private boolean compileSourceCodeToByteCode(
String className, String sourceCode, DiagnosticListener<JavaFileObject> diagnosticListener) {
Expand Down Expand Up @@ -195,7 +196,7 @@ public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOExcept
/**
* Provides an in-memory representation of JavaFileManager abstraction, so we do not need to write any files to disk.
*
* When files are written to, rather than putting the bytes on disk, they are appended to buffers in byteCodeForClasses.
* <p>When files are written to, rather than putting the bytes on disk, they are appended to buffers in byteCodeForClasses.
*
* @see javax.tools.JavaFileManager
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ public class StormRelUtils {
private static final AtomicInteger classSequence = new AtomicInteger(0);

public static String getClassName(StreamsRel relNode) {
return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_" +
classSequence.getAndIncrement();
return "Generated_"
+ relNode.getClass().getSimpleName().toUpperCase()
+ "_"
+ relNode.getId()
+ "_"
+ classSequence.getAndIncrement();
}

public static StreamsRel getStormRelInput(RelNode input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

package org.apache.storm.sql.planner.streams;

import java.util.Iterator;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.Iterator;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.rel.rules.CalcMergeRule;
import org.apache.calcite.rel.rules.FilterCalcMergeRule;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

package org.apache.storm.sql.planner.streams.rel;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;

import com.google.common.collect.Lists;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
Expand Down Expand Up @@ -54,13 +53,9 @@ public void streamsPlan(StreamsPlanCreator planCreator) throws Exception {
// SingleRel
RelNode input = getInput();
StormRelUtils.getStormRelInput(input).streamsPlan(planCreator);
Stream<Values> inputStream = planCreator.pop();

RelDataType inputRowType = getInput(0).getRowType();

List<String> outputFieldNames = getRowType().getFieldNames();
int outputCount = outputFieldNames.size();

// filter
ExecutableExpression filterInstance = null;
RexLocalRef condition = program.getCondition();
Expand Down Expand Up @@ -88,7 +83,10 @@ public void streamsPlan(StreamsPlanCreator planCreator) throws Exception {
throw new IllegalStateException("Either projection or condition, or both should be provided.");
}

List<String> outputFieldNames = getRowType().getFieldNames();
int outputCount = outputFieldNames.size();
EvaluationCalc evalCalc = new EvaluationCalc(filterInstance, projectionInstance, outputCount, planCreator.getDataContext());
final Stream<Values> inputStream = planCreator.pop();
final Stream finalStream = inputStream.flatMap(evalCalc);

planCreator.addStream(finalStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@

package org.apache.storm.sql.planner.streams.rel;

import java.util.List;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;

import java.util.List;

import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

package org.apache.storm.sql.planner.streams.rel;

import com.google.common.base.Joiner;
import java.util.List;
import java.util.Map;

import com.google.common.base.Joiner;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
Expand Down