Skip to content
Permalink
Browse files
Move BAD from AQL to SQL++
Change-Id: I563bf7b91b280eb65fbbbca0e95e8968f05ea591
  • Loading branch information
sjaco002 committed Apr 19, 2017
1 parent 3b20a63 commit 55514040b1b9cf7488f0138577eb2d35b0dc456d
Showing 77 changed files with 487 additions and 489 deletions.
@@ -36,10 +36,10 @@
<version>${asterix.version}</version>
<configuration>
<base>${project.basedir}</base>
<gbase>../../asterix-lang-aql/src/main/javacc/AQL.jj</gbase>
<gbase>../../asterix-lang-sqlpp/src/main/javacc/SQLPP.jj</gbase>
<gextension>src/main/resources/lang-extension/lang.txt</gextension>
<output>target/generated-resources/javacc/grammar.jj</output>
<parserClassName>BADAQLParser</parserClassName>
<parserClassName>BADParser</parserClassName>
<packageName>org.apache.asterix.bad.lang</packageName>
</configuration>
<executions>
@@ -240,7 +240,7 @@
</dependency>
<dependency>
<groupId>org.apache.asterix</groupId>
<artifactId>asterix-lang-aql</artifactId>
<artifactId>asterix-lang-sqlpp</artifactId>
<version>${asterix.version}</version>
</dependency>
<dependency>
@@ -21,12 +21,12 @@
import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslatorFactory;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.compiler.provider.IRuleSetFactory;
import org.apache.asterix.lang.aql.rewrites.AQLRewriterFactory;
import org.apache.asterix.lang.aql.visitor.AQLAstPrintVisitorFactory;
import org.apache.asterix.lang.common.base.IAstPrintVisitorFactory;
import org.apache.asterix.lang.common.base.IParserFactory;
import org.apache.asterix.lang.common.base.IRewriterFactory;
import org.apache.asterix.translator.AqlExpressionToPlanTranslatorFactory;
import org.apache.asterix.lang.sqlpp.rewrites.SqlppRewriterFactory;
import org.apache.asterix.lang.sqlpp.visitor.SqlppAstPrintVisitorFactory;
import org.apache.asterix.translator.SqlppExpressionToPlanTranslatorFactory;

public class BADCompilationProvider implements ILangCompilationProvider {

@@ -37,17 +37,17 @@ public IParserFactory getParserFactory() {

@Override
public IRewriterFactory getRewriterFactory() {
return new AQLRewriterFactory();
return new SqlppRewriterFactory();
}

@Override
public IAstPrintVisitorFactory getAstPrintVisitorFactory() {
return new AQLAstPrintVisitorFactory();
return new SqlppAstPrintVisitorFactory();
}

@Override
public ILangExpressionToPlanTranslatorFactory getExpressionToPlanTranslatorFactory() {
return new AqlExpressionToPlanTranslatorFactory();
return new SqlppExpressionToPlanTranslatorFactory();
}

@Override
@@ -32,7 +32,6 @@
import org.apache.asterix.bad.metadata.ProcedureSearchKey;
import org.apache.asterix.common.api.ExtensionId;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -54,10 +53,8 @@ public void configure(List<Pair<String, String>> args) {
@Override
public ILangCompilationProvider getLangCompilationProvider(Language lang) {
switch (lang) {
case AQL:
return new BADCompilationProvider();
case SQLPP:
return new SqlppCompilationProvider();
return new BADCompilationProvider();
default:
return null;
}
@@ -27,12 +27,12 @@ public class BADParserFactory implements IParserFactory {

@Override
public IParser createParser(String query) {
return new BADAQLParser(query);
return new BADParser(query);
}

@Override
public IParser createParser(Reader reader) {
return new BADAQLParser(reader);
return new BADParser(reader);
}

}
@@ -42,10 +42,10 @@

public class BADStatementExecutor extends QueryTranslator {

public BADStatementExecutor(List<Statement> aqlStatements, SessionConfig conf,
public BADStatementExecutor(List<Statement> statements, SessionConfig conf,
ILangCompilationProvider compliationProvider, IStorageComponentProvider storageComponentProvider,
ExecutorService executorService) {
super(aqlStatements, conf, compliationProvider, storageComponentProvider, executorService);
super(statements, conf, compliationProvider, storageComponentProvider, executorService);
}


@@ -30,10 +30,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.lang.aql.expression.FLWOGRExpression;
import org.apache.asterix.lang.common.base.Clause;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.clause.LetClause;
import org.apache.asterix.lang.common.expression.CallExpr;
import org.apache.asterix.lang.common.expression.FieldAccessor;
import org.apache.asterix.lang.common.expression.FieldBinding;
@@ -195,19 +192,12 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada

if (subscriptionId == null) {
//To create a new subscription
VariableExpr subscriptionVar = new VariableExpr(new VarIdentifier("$sub", 1));
VariableExpr useSubscriptionVar = new VariableExpr(new VarIdentifier("$sub", 1));
VariableExpr resultVar = new VariableExpr(new VarIdentifier("$result", 0));
VariableExpr useResultVar = new VariableExpr(new VarIdentifier("$result", 0));
useResultVar.setIsNewVar(false);
useSubscriptionVar.setIsNewVar(false);
List<Clause> clauseList = new ArrayList<>();
LetClause let = new LetClause(subscriptionVar,
new FieldAccessor(useResultVar, new Identifier(BADConstants.SubscriptionId)));
clauseList.add(let);
FLWOGRExpression body = new FLWOGRExpression(clauseList, useSubscriptionVar);

metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
FieldAccessor accessor = new FieldAccessor(useResultVar, new Identifier(BADConstants.SubscriptionId));

metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter));
boolean resultsAsync =
resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED;
metadataProvider.setResultAsyncMode(resultsAsync);
@@ -219,7 +209,7 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
tempMdProvider.setOutputFile(metadataProvider.getOutputFile());

InsertStatement insert = new InsertStatement(new Identifier(dataverse),
new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, resultVar, body);
new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, resultVar, accessor);
((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, insert, hcc, hdc,
resultDelivery, stats, false, null, null);
} else {
@@ -29,7 +29,6 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.lang.aql.visitor.AqlDeleteRewriteVisitor;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.expression.CallExpr;
import org.apache.asterix.lang.common.expression.FieldAccessor;
@@ -40,6 +39,7 @@
import org.apache.asterix.lang.common.statement.DeleteStatement;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.lang.sqlpp.visitor.SqlppDeleteRewriteVisitor;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -141,7 +141,7 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada

DeleteStatement delete = new DeleteStatement(vars, new Identifier(dataverse),
new Identifier(subscriptionsDatasetName), condition, varCounter);
AqlDeleteRewriteVisitor visitor = new AqlDeleteRewriteVisitor();
SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
delete.accept(visitor, null);
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
metadataProvider.getStorageComponentProvider());
@@ -36,6 +36,7 @@
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.bad.ChannelJobService;
import org.apache.asterix.bad.lang.BADLangExtension;
import org.apache.asterix.bad.lang.BADParserFactory;
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
@@ -44,7 +45,6 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.metadata.IDataset;
import org.apache.asterix.lang.aql.parser.AQLParserFactory;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.expression.CallExpr;
@@ -54,6 +54,7 @@
import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
import org.apache.asterix.lang.common.statement.InsertStatement;
import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
import org.apache.asterix.lang.common.statement.SetStatement;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.metadata.MetadataException;
@@ -206,32 +207,33 @@ private JobSpecification createChannelJob(IStatementExecutor statementExecutor,
Identifier resultsName, MetadataProvider metadataProvider, IHyracksClientConnection hcc,
IHyracksDataset hdc, Stats stats, String dataverse) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append("insert into dataset " + dataverse + "." + resultsName + " ");
builder.append(" as $a (" + " let $" + BADConstants.ChannelExecutionTime + " := current-datetime() \n");

builder.append("for $sub in dataset " + dataverse + "." + subscriptionsName + "\n");
builder.append(
"for $broker in dataset " + BADConstants.BAD_DATAVERSE_NAME + "." + BADConstants.BROKER_KEYWORD + "\n");
builder.append("where $broker." + BADConstants.BrokerName + "= $sub." + BADConstants.BrokerName + "\n");
builder.append("and $broker." + BADConstants.DataverseName + "= $sub." + BADConstants.DataverseName + "\n");
builder.append(" for $result in " + function.getNamespace() + "." + function.getName() + "(");
builder.append("SET inline_with \"false\"\n");
builder.append("insert into " + dataverse + "." + resultsName);
builder.append(" as a (\n" + "with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n");
builder.append("select result, ");
builder.append(BADConstants.ChannelExecutionTime + ", ");
builder.append("sub." + BADConstants.SubscriptionId + " as " + BADConstants.SubscriptionId + ",");
builder.append("current_datetime() as " + BADConstants.DeliveryTime + "\n");
builder.append("from " + dataverse + "." + subscriptionsName + " sub,\n");
builder.append(BADConstants.BAD_DATAVERSE_NAME + "." + BADConstants.BROKER_KEYWORD + " b, \n");
builder.append(function.getNamespace() + "." + function.getName() + "(");
int i = 0;
for (; i < function.getArity() - 1; i++) {
builder.append("$sub.param" + i + ",");
builder.append("sub.param" + i + ",");
}
builder.append("$sub.param" + i + ")\n");
builder.append("return {\n");
builder.append("\"" + BADConstants.ChannelExecutionTime + "\":$" + BADConstants.ChannelExecutionTime + ",");
builder.append("\"" + BADConstants.SubscriptionId + "\":$sub." + BADConstants.SubscriptionId + ",");
builder.append("\"" + BADConstants.DeliveryTime + "\":current-datetime(),");
builder.append("\"result\":$result");
builder.append("}");
builder.append("sub.param" + i + ") result \n");
builder.append("where b." + BADConstants.BrokerName + " = sub." + BADConstants.BrokerName + "\n");
builder.append("and b." + BADConstants.DataverseName + " = sub." + BADConstants.DataverseName + "\n");
builder.append(")");
builder.append(" returning $a");
builder.append(" returning a");
builder.append(";");
AQLParserFactory aqlFact = new AQLParserFactory();
List<Statement> fStatements = aqlFact.createParser(new StringReader(builder.toString())).parse();
return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(0),
BADParserFactory factory = new BADParserFactory();
List<Statement> fStatements = factory.createParser(new StringReader(builder.toString())).parse();

SetStatement ss = (SetStatement) fStatements.get(0);
metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());

return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1),
hcc, hdc, ResultDelivery.ASYNC, stats, true, null, null);
}

@@ -33,14 +33,13 @@
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.bad.lang.BADLangExtension;
import org.apache.asterix.bad.lang.BADParserFactory;
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
import org.apache.asterix.bad.metadata.Procedure;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.lang.aql.parser.AQLParserFactory;
import org.apache.asterix.lang.aql.visitor.AqlDeleteRewriteVisitor;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.expression.CallExpr;
@@ -50,6 +49,7 @@
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.struct.VarIdentifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.lang.sqlpp.visitor.SqlppDeleteRewriteVisitor;
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
@@ -162,8 +162,8 @@ private Pair<JobSpecification, PrecompiledType> createProcedureJob(String body,
StringBuilder builder = new StringBuilder();
builder.append(body);
builder.append(";");
AQLParserFactory aqlFact = new AQLParserFactory();
List<Statement> fStatements = aqlFact.createParser(new StringReader(builder.toString())).parse();
BADParserFactory factory = new BADParserFactory();
List<Statement> fStatements = factory.createParser(new StringReader(builder.toString())).parse();
if (fStatements.size() > 1) {
throw new CompilationException("Procedure can only execute a single statement");
}
@@ -178,7 +178,7 @@ private Pair<JobSpecification, PrecompiledType> createProcedureJob(String body,
metadataProvider.getLocks().unlock();
return pair;
} else if (fStatements.get(0).getKind() == Statement.Kind.DELETE) {
AqlDeleteRewriteVisitor visitor = new AqlDeleteRewriteVisitor();
SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
fStatements.get(0).accept(visitor, null);
return new Pair<>(((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider,
fStatements.get(0), hcc, true), PrecompiledType.DELETE);
@@ -27,7 +27,6 @@

public class Procedure implements IExtensionMetadataEntity {
private static final long serialVersionUID = 1L;
public static final String LANGUAGE_AQL = "AQL";
public static final String LANGUAGE_JAVA = "JAVA";

public static final String RETURNTYPE_VOID = "VOID";
@@ -26,6 +26,8 @@ import org.apache.asterix.bad.lang.statement.CreateChannelStatement;
import org.apache.asterix.bad.lang.statement.CreateProcedureStatement;
import org.apache.asterix.bad.lang.statement.ExecuteProcedureStatement;
import org.apache.asterix.bad.lang.statement.ProcedureDropStatement;
import org.apache.asterix.lang.sqlpp.parser.ParseException;
import org.apache.asterix.lang.sqlpp.parser.Token;


@merge
@@ -24,15 +24,14 @@

drop dataverse channels if exists;
create dataverse channels;
use dataverse channels;

use channels;

create type TweetMessageTypeuuid as closed {
tweetid: uuid,
sender-location: point,
send-time: datetime,
referred-topics: {{ string }},
message-text: string,
sender_location: point,
send_time: datetime,
referred_topics: {{ string }},
message_text: string,
countA: int32,
countB: int32
}
@@ -41,14 +40,13 @@ create type TweetMessageTypeuuid as closed {
create dataset TweetMessageuuids(TweetMessageTypeuuid)
primary key tweetid autogenerated;

create function NearbyTweetsContainingText($location, $text) {
for $tweet in dataset TweetMessageuuids
let $circle := create-circle($location,30.0)
where contains($tweet.message-text,$text)
and spatial-intersect($tweet.sender-location, $location)
return $tweet.message-text
create function NearbyTweetsContainingText(place, text) {
(select m.message_text
from TweetMessageuuids m
where contains(m.message_text,text)
and spatial_intersect(m.sender_location, place))
};

write output to nc1:"rttest/channel-create.adm";
write output to nc1:"rttest/channel-create.sqlpp";

create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");

0 comments on commit 5551404

Please sign in to comment.