Skip to content
Permalink
Browse files
[NO ISSUE][BAD] Bring the BAD Branch to master
1. Updated paramList in lang.txt to use TypeExpression
2. Updated RequestParameters
3. Updated test results

Change-Id: I6f71319d7c9761266325e3641e0f796dd68408f4
  • Loading branch information
idleft committed Jul 30, 2020
1 parent b5ef45a commit 1ef044918ac590e643c7579adcdece3334a7511e
Showing 12 changed files with 64 additions and 13 deletions.
@@ -69,6 +69,11 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce
return null;
}

@Override
public String getName() {
return BrokerDropStatement.class.getName();
}

@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
@@ -81,6 +81,11 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce
return null;
}

@Override
public String getName() {
return ChannelDropStatement.class.getName();
}

@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
@@ -119,6 +119,11 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce
return null;
}

@Override
public String getName() {
return ChannelSubscribeStatement.class.getName();
}

@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
@@ -99,6 +99,11 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce
return null;
}

@Override
public String getName() {
return ChannelUnsubscribeStatement.class.getName();
}

@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
@@ -73,6 +73,11 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce
return null;
}

@Override
public String getName() {
return CreateBrokerStatement.class.getName();
}

@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
@@ -51,6 +51,8 @@
import org.apache.asterix.lang.common.expression.CallExpr;
import org.apache.asterix.lang.common.expression.IndexedTypeExpression;
import org.apache.asterix.lang.common.expression.LiteralExpr;
import org.apache.asterix.lang.common.expression.TypeExpression;
import org.apache.asterix.lang.common.expression.TypeReferenceExpression;
import org.apache.asterix.lang.common.literal.StringLiteral;
import org.apache.asterix.lang.common.statement.CreateIndexStatement;
import org.apache.asterix.lang.common.statement.DatasetDecl;
@@ -170,9 +172,10 @@ private void createDatasets(IStatementExecutor statementExecutor, MetadataProvid
fieldNames.add(BADConstants.SubscriptionId);
partitionFields.add(fieldNames);
IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null);
TypeExpression subItemType = new TypeReferenceExpression(
new Pair<>(MetadataConstants.METADATA_DATAVERSE_NAME, subscriptionsTypeName));
DatasetDecl createSubscriptionsDataset = new DatasetDecl(dataverseName, new Identifier(subscriptionsTableName),
MetadataConstants.METADATA_DATAVERSE_NAME, subscriptionsTypeName, null, null, null,
new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true);
subItemType, null, null, new HashMap<>(), DatasetType.INTERNAL, idd, null, true);

((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
hcc, null);
@@ -184,9 +187,10 @@ private void createDatasets(IStatementExecutor statementExecutor, MetadataProvid
fieldNames.add(BADConstants.ResultId);
partitionFields.add(fieldNames);
idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null);
TypeExpression resultItemType =
new TypeReferenceExpression(new Pair<>(MetadataConstants.METADATA_DATAVERSE_NAME, resultsTypeName));
DatasetDecl createResultsDataset = new DatasetDecl(dataverseName, new Identifier(resultsTableName),
MetadataConstants.METADATA_DATAVERSE_NAME, resultsTypeName, null, null, null, new HashMap<>(),
DatasetType.INTERNAL, idd, null, true);
resultItemType, null, null, new HashMap<>(), DatasetType.INTERNAL, idd, null, true);

//Create an index on timestamp for results
CreateIndexStatement createTimeIndex = new CreateIndexStatement();
@@ -259,6 +263,11 @@ private JobSpecification createChannelJob(IStatementExecutor statementExecutor,
hcc, resultSet, ResultDelivery.ASYNC, null, stats, true, null, null, null);
}

@Override
public String getName() {
return CreateChannelStatement.class.getName();
}

@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestContext, MetadataProvider metadataProvider, int resultSetId)
@@ -226,6 +226,11 @@ private void setupDeployedJobSpec(EntityId entityId, JobSpecification jobSpec, I
listener.setDeployedJobSpecId(deployedJobSpecId);
}

@Override
public String getName() {
return CreateProcedureStatement.class.getName();
}

@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
@@ -95,6 +95,11 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce
return null;
}

@Override
public String getName() {
return ExecuteProcedureStatement.class.getName();
}

@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
@@ -74,6 +74,11 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce
return null;
}

@Override
public String getName() {
return ProcedureDropStatement.class.getName();
}

@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
@@ -129,7 +129,8 @@ private void deployJobs(ICcApplicationContext appCtx, List<Channel> channels, Li
RequestReference requestReference =
RequestReference.of(UUID.randomUUID().toString(), "CC", System.currentTimeMillis());
BADJobService.redeployJobSpec(entityId, channel.getChannelBody(), metadataProvider, badStatementExecutor,
hcc, new RequestParameters(requestReference, null, null, null, null, null, null, null, null, true),
hcc,
new RequestParameters(requestReference, null, null, null, null, null, null, null, null, null, true),
true);

ScheduledExecutorService ses = BADJobService.startRepetitiveDeployedJobSpec(listener.getDeployedJobSpecId(),
@@ -160,7 +161,8 @@ hcc, new RequestParameters(requestReference, null, null, null, null, null, null,
.getSocketChannelFactory(),
appCtx.getCompilerProperties().getFrameSize(), ResultReader.NUM_READERS),
new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
new IStatementExecutor.Stats(), null, null, null, null, true),
new IStatementExecutor.Stats(), new IStatementExecutor.StatementProperties(), null, null,
null, null, true),
true);
metadataProvider.getLocks().unlock();
//Log that the procedure stopped by cluster restart. Procedure is available again now.
@@ -120,7 +120,7 @@ CreateProcedureStatement CreateProcedureStatement() throws ParseException:
{
FunctionName fctName = null;
FunctionSignature signature;
List<Pair<VarIdentifier,IndexedTypeExpression>> paramList = new ArrayList<Pair<VarIdentifier,IndexedTypeExpression>>();
List<Pair<VarIdentifier,TypeExpression>> paramList = new ArrayList<Pair<VarIdentifier,TypeExpression>>();
List<Integer> paramIds = new ArrayList<Integer>();
String functionBody;
Token beginPos;
@@ -138,7 +138,7 @@ CreateProcedureStatement CreateProcedureStatement() throws ParseException:
paramList = FunctionParameters()
<LEFTBRACE>
{
for (Pair<VarIdentifier,IndexedTypeExpression> param : paramList)
for (Pair<VarIdentifier,TypeExpression> param : paramList)
{
VarIdentifier v = new VarIdentifier(param.getFirst().toString());
getCurrentScope().addNewVarSymbolToScope(v);
@@ -157,7 +157,7 @@ CreateProcedureStatement CreateProcedureStatement() throws ParseException:
("period" period = FunctionCallExpr())?
{
List<VarIdentifier> paramListVariablesOnly = new ArrayList<VarIdentifier>();
for(Pair<VarIdentifier,IndexedTypeExpression> p: paramList){
for(Pair<VarIdentifier,TypeExpression> p: paramList){
paramListVariablesOnly.add(p.first);
}
return new CreateProcedureStatement(signature, paramListVariablesOnly, paramIds, functionBody, period);
@@ -1,6 +1,6 @@
{ "DataverseName": "two", "ProcedureName": "addMe", "Arity": "0", "Params": [ ], "Type": "INSERT", "Definition": "use `two`;\ninsert into channels.UserLocations([\n {\"timeStamp\":current_datetime(), \"roomNumber\":222}]\n );", "Language": "SQLPP", "Duration": "", "Dependencies": [ [ [ "channels", "UserLocations" ] ], [ ] ] }
{ "DataverseName": "two", "ProcedureName": "deleteSome", "Arity": "2", "Params": [ "r", "otherRoom" ], "Type": "DELETE", "Definition": "use `two`;\ndelete from channels.UserLocations\nwhere roomNumber = get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand channels.really_contains(roomNumber,\"l\");", "Language": "SQLPP", "Duration": "", "Dependencies": [ [ [ "channels", "UserLocations" ] ], [ [ "two", "get_job_param", "1" ], [ "channels", "really_contains", "2" ], [ "two", "get_job_param", "1" ] ], [ ] ] }
{ "DataverseName": "two", "ProcedureName": "deleteSome", "Arity": "2", "Params": [ "r", "otherRoom" ], "Type": "DELETE", "Definition": "use `two`;\ndelete from channels.UserLocations\nwhere roomNumber = get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand channels.really_contains(roomNumber,\"l\");", "Language": "SQLPP", "Duration": "", "Dependencies": [ [ [ "channels", "UserLocations" ] ], [ [ "two", "get_job_param", "1" ], [ "two", "get_job_param", "1" ], [ "channels", "really_contains", "2" ] ], [ ] ] }
{ "DataverseName": "two", "ProcedureName": "localAddMe", "Arity": "0", "Params": [ ], "Type": "INSERT", "Definition": "use `two`;\ninsert into UserLocations([\n {\"timeStamp\":current_datetime(), \"roomNumber\":222}]\n );", "Language": "SQLPP", "Duration": "", "Dependencies": [ [ [ "two", "UserLocations" ] ], [ ] ] }
{ "DataverseName": "two", "ProcedureName": "localDeleteSome", "Arity": "2", "Params": [ "r", "otherRoom" ], "Type": "DELETE", "Definition": "use `two`;\ndelete from UserLocations\nwhere roomNumber = get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand really_contains(roomNumber,\"l\");", "Language": "SQLPP", "Duration": "", "Dependencies": [ [ [ "two", "UserLocations" ] ], [ [ "two", "get_job_param", "1" ], [ "two", "really_contains", "2" ], [ "two", "get_job_param", "1" ] ], [ ] ] }
{ "DataverseName": "two", "ProcedureName": "localSelectSome", "Arity": "2", "Params": [ "r", "otherRoom" ], "Type": "QUERY", "Definition": "use `two`;\nselect roomNumber from UserLocations\nwhere roomNumber = get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand really_contains(roomNumber,\"l\")\norder by id;", "Language": "SQLPP", "Duration": "", "Dependencies": [ [ [ "two", "UserLocations" ] ], [ [ "two", "get_job_param", "1" ], [ "two", "really_contains", "2" ], [ "two", "get_job_param", "1" ] ] ] }
{ "DataverseName": "two", "ProcedureName": "selectSome", "Arity": "2", "Params": [ "r", "otherRoom" ], "Type": "QUERY", "Definition": "use `two`;\nselect roomNumber from channels.UserLocations\nwhere roomNumber = get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand channels.really_contains(roomNumber,\"l\")\norder by id;", "Language": "SQLPP", "Duration": "", "Dependencies": [ [ [ "channels", "UserLocations" ] ], [ [ "two", "get_job_param", "1" ], [ "channels", "really_contains", "2" ], [ "two", "get_job_param", "1" ] ] ] }
{ "DataverseName": "two", "ProcedureName": "localDeleteSome", "Arity": "2", "Params": [ "r", "otherRoom" ], "Type": "DELETE", "Definition": "use `two`;\ndelete from UserLocations\nwhere roomNumber = get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand really_contains(roomNumber,\"l\");", "Language": "SQLPP", "Duration": "", "Dependencies": [ [ [ "two", "UserLocations" ] ], [ [ "two", "get_job_param", "1" ], [ "two", "get_job_param", "1" ], [ "two", "really_contains", "2" ] ], [ ] ] }
{ "DataverseName": "two", "ProcedureName": "localSelectSome", "Arity": "2", "Params": [ "r", "otherRoom" ], "Type": "QUERY", "Definition": "use `two`;\nselect roomNumber from UserLocations\nwhere roomNumber = get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand really_contains(roomNumber,\"l\")\norder by id;", "Language": "SQLPP", "Duration": "", "Dependencies": [ [ [ "two", "UserLocations" ] ], [ [ "two", "get_job_param", "1" ], [ "two", "get_job_param", "1" ], [ "two", "really_contains", "2" ] ] ] }
{ "DataverseName": "two", "ProcedureName": "selectSome", "Arity": "2", "Params": [ "r", "otherRoom" ], "Type": "QUERY", "Definition": "use `two`;\nselect roomNumber from channels.UserLocations\nwhere roomNumber = get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand channels.really_contains(roomNumber,\"l\")\norder by id;", "Language": "SQLPP", "Duration": "", "Dependencies": [ [ [ "channels", "UserLocations" ] ], [ [ "two", "get_job_param", "1" ], [ "two", "get_job_param", "1" ], [ "channels", "really_contains", "2" ] ] ] }

0 comments on commit 1ef0449

Please sign in to comment.