diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java index b95ca11..1a5fc34 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java @@ -250,7 +250,6 @@ public void handleCreateIndexStatement(MetadataProvider metadataProvider, Statem metadataProvider.getLocks().unlock(); } - } @Override @@ -343,7 +342,7 @@ protected void handleDataverseDropStatement(MetadataProvider metadataProvider, S tempMdProvider.getLocks().reset(); ChannelDropStatement drop = new ChannelDropStatement(dvId, new Identifier(channel.getChannelId().getEntityName()), false); - drop.handle(hcc, this, requestParameters, tempMdProvider, 0); + drop.handle(hcc, this, requestParameters, tempMdProvider, 0, null); } for (Procedure procedure : procedures) { if (!procedure.getEntityId().getDataverse().equals(dvId.getValue())) { @@ -352,13 +351,13 @@ protected void handleDataverseDropStatement(MetadataProvider metadataProvider, S tempMdProvider.getLocks().reset(); ProcedureDropStatement drop = new ProcedureDropStatement(new FunctionSignature(dvId.getValue(), procedure.getEntityId().getEntityName(), procedure.getArity()), false); - drop.handle(hcc, this, requestParameters, tempMdProvider, 0); + drop.handle(hcc, this, requestParameters, tempMdProvider, 0, null); } List brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue()); for (Broker broker : brokers) { tempMdProvider.getLocks().reset(); BrokerDropStatement drop = new BrokerDropStatement(dvId, new Identifier(broker.getBrokerName()), false); - drop.handle(hcc, this, requestParameters, tempMdProvider, 0); + drop.handle(hcc, this, requestParameters, tempMdProvider, 0, null); } MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); super.handleDataverseDropStatement(metadataProvider, stmt, hcc); diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java index 4f68f40..b2b38ab 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/BrokerDropStatement.java @@ -30,6 +30,7 @@ import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.translator.IRequestParameters; import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -70,8 +71,8 @@ public R accept(ILangVisitor visitor, T arg) throws CompilationExce @Override public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor, - IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId) - throws HyracksDataException, AlgebricksException { + IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId, + IStatementExecutorContext executorCtx) throws HyracksDataException, AlgebricksException { //TODO: dont drop a broker that's being used String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName); MetadataTransactionContext mdTxnCtx = null; diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java index 8403fcc..d778215 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java @@ -40,6 +40,7 @@ import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.translator.IRequestParameters; import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -82,8 +83,8 @@ public R accept(ILangVisitor visitor, T arg) throws CompilationExce @Override public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor, - IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId) - throws HyracksDataException, AlgebricksException { + IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId, + IStatementExecutorContext executorCtx) throws HyracksDataException, AlgebricksException { String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName); boolean txnActive = false; EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue()); @@ -111,9 +112,8 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe if (listener == null) { //TODO: Channels need to better handle cluster failures LOGGER.log(Level.SEVERE, - "Tried to drop a Deployed Job whose listener no longer exists: " - + entityId.getExtensionName() + " " + entityId.getDataverse() + "." - + entityId.getEntityName() + "."); + "Tried to drop a Deployed Job whose listener no longer exists: " + entityId.getExtensionName() + + " " + entityId.getDataverse() + "." + entityId.getEntityName() + "."); } else { listener.getExecutorService().shutdown(); diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java index 7583f0b..cff1eaa 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java @@ -52,6 +52,7 @@ import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.IStatementExecutor.ResultDelivery; import org.apache.asterix.translator.IStatementExecutor.Stats; +import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.api.client.IHyracksClientConnection; @@ -120,8 +121,8 @@ public R accept(ILangVisitor visitor, T arg) throws CompilationExce @Override public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor, - IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId) - throws HyracksDataException, AlgebricksException { + IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId, + IStatementExecutorContext executorCtx) throws HyracksDataException, AlgebricksException { String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName); String brokerDataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(brokerDataverseName); diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java index b23bf3b..774ca86 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java @@ -46,6 +46,7 @@ import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.asterix.translator.IRequestParameters; import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.api.client.IHyracksClientConnection; @@ -100,8 +101,8 @@ public R accept(ILangVisitor visitor, T arg) throws CompilationExce @Override public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor, - IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId) - throws HyracksDataException, AlgebricksException { + IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId, + IStatementExecutorContext executorCtx) throws HyracksDataException, AlgebricksException { String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName); MetadataTransactionContext mdTxnCtx = null; @@ -139,8 +140,7 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(), metadataProvider.getDefaultDataverse()); tempMdProvider.getConfig().putAll(metadataProvider.getConfig()); - ((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false, null, - null); + ((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false, null, null); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); } catch (Exception e) { QueryTranslator.abort(e, e, mdTxnCtx); diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java index 367599b..0f01a66 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java @@ -33,6 +33,7 @@ import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.translator.IRequestParameters; import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -74,8 +75,8 @@ public R accept(ILangVisitor visitor, T arg) throws CompilationExce @Override public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor, - IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId) - throws HyracksDataException, AlgebricksException { + IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId, + IStatementExecutorContext executorCtx) throws HyracksDataException, AlgebricksException { String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName); MetadataTransactionContext mdTxnCtx = null; try { diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java index a28666a..edadeb6 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java @@ -68,6 +68,7 @@ import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.IStatementExecutor.ResultDelivery; import org.apache.asterix.translator.IStatementExecutor.Stats; +import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.client.IHyracksClientConnection; @@ -138,8 +139,7 @@ public R accept(ILangVisitor visitor, T arg) throws CompilationExce return null; } - public void initialize(MetadataTransactionContext mdTxnCtx) - throws AlgebricksException, HyracksDataException { + public void initialize(MetadataTransactionContext mdTxnCtx) throws AlgebricksException, HyracksDataException { Function lookup = MetadataManager.INSTANCE.getFunction(mdTxnCtx, function); if (lookup == null) { throw new MetadataException(" Unknown function " + function.getName()); @@ -258,8 +258,8 @@ private JobSpecification createChannelJob(IStatementExecutor statementExecutor, @Override public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor, - IRequestParameters requestContext, MetadataProvider metadataProvider, int resultSetId) - throws HyracksDataException, AlgebricksException { + IRequestParameters requestContext, MetadataProvider metadataProvider, int resultSetId, + IStatementExecutorContext executorCtx) throws HyracksDataException, AlgebricksException { //This function performs three tasks: //1. Create datasets for the Channel //2. Create and run the Channel Job diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java index aaf2bfa..6459b4c 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java @@ -69,6 +69,7 @@ import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.IStatementExecutor.ResultDelivery; import org.apache.asterix.translator.IStatementExecutor.Stats; +import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.client.IHyracksClientConnection; @@ -175,8 +176,7 @@ private void initialize() throws CompilationException, HyracksDataException { } private Pair createProcedureJob(IStatementExecutor statementExecutor, - MetadataProvider metadataProvider, IHyracksClientConnection hcc, Stats stats) - throws Exception { + MetadataProvider metadataProvider, IHyracksClientConnection hcc, Stats stats) throws Exception { if (getProcedureBodyStatement().getKind() == Statement.Kind.INSERT) { if (!varList.isEmpty()) { throw new CompilationException("Insert procedures cannot have parameters"); @@ -193,8 +193,7 @@ private Pair createProcedureJob(IStatementExe dependencies.get(1).addAll(FunctionUtil.getFunctionDependencies(fact.createQueryRewriter(), ((Query) getProcedureBodyStatement()).getBody(), metadataProvider).get(1)); Pair pair = new Pair<>(BADJobService.compileQueryJob(statementExecutor, - metadataProvider, hcc, (Query) getProcedureBodyStatement()), - PrecompiledType.QUERY); + metadataProvider, hcc, (Query) getProcedureBodyStatement()), PrecompiledType.QUERY); dependencies.get(0).addAll(FunctionUtil.getFunctionDependencies(fact.createQueryRewriter(), ((Query) getProcedureBodyStatement()).getBody(), metadataProvider).get(0)); return pair; @@ -216,8 +215,7 @@ private Pair createProcedureJob(IStatementExe } private void setupDeployedJobSpec(EntityId entityId, JobSpecification jobSpec, IHyracksClientConnection hcc, - DeployedJobSpecEventListener listener, ResultSetId resultSetId, Stats stats) - throws Exception { + DeployedJobSpecEventListener listener, ResultSetId resultSetId, Stats stats) throws Exception { jobSpec.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId); DeployedJobSpecId deployedJobSpecId = hcc.deployJobSpec(jobSpec); listener.setDeployedJobSpecId(deployedJobSpecId); @@ -225,8 +223,8 @@ private void setupDeployedJobSpec(EntityId entityId, JobSpecification jobSpec, I @Override public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor, - IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId) - throws HyracksDataException, AlgebricksException { + IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId, + IStatementExecutorContext executorCtx) throws HyracksDataException, AlgebricksException { ICcApplicationContext appCtx = metadataProvider.getApplicationContext(); ActiveNotificationHandler activeEventHandler = (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java index b794538..69f413e 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java @@ -50,6 +50,7 @@ import org.apache.asterix.translator.ConstantHelper; import org.apache.asterix.translator.IRequestParameters; import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -96,8 +97,8 @@ public R accept(ILangVisitor visitor, T arg) throws CompilationExce @Override public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor, - IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId) - throws HyracksDataException, AlgebricksException { + IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId, + IStatementExecutorContext executorCtx) throws HyracksDataException, AlgebricksException { ICcApplicationContext appCtx = metadataProvider.getApplicationContext(); ActiveNotificationHandler activeEventHandler = (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); @@ -119,9 +120,8 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId(); if (procedure.getDuration().equals("")) { BADJobService.runDeployedJobSpec(deployedJobSpecId, hcc, requestParameters.getHyracksDataset(), - contextRuntimeVarMap, entityId, - metadataProvider.getTxnIdFactory(), appCtx, listener, (QueryTranslator) statementExecutor); - + contextRuntimeVarMap, entityId, metadataProvider.getTxnIdFactory(), appCtx, listener, + (QueryTranslator) statementExecutor); } else { ScheduledExecutorService ses = BADJobService.startRepetitiveDeployedJobSpec(deployedJobSpecId, hcc, @@ -142,8 +142,7 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe } } - private Map createParameterMap(Procedure procedure) - throws AsterixException, HyracksDataException { + private Map createParameterMap(Procedure procedure) throws AsterixException, HyracksDataException { Map map = new HashMap<>(); if (procedure.getParams().size() != argList.size()) { throw AsterixException.create(ErrorCode.COMPILATION_INVALID_PARAMETER_NUMBER, diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java index 95531bd..7a1cfd0 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java @@ -40,6 +40,7 @@ import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.translator.IRequestParameters; import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -76,8 +77,8 @@ public R accept(ILangVisitor visitor, T arg) throws CompilationExce @Override public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor, - IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId) - throws HyracksDataException, AlgebricksException { + IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId, + IStatementExecutorContext executorCtx) throws HyracksDataException, AlgebricksException { ICcApplicationContext appCtx = metadataProvider.getApplicationContext(); ActiveNotificationHandler activeEventHandler = (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); @@ -115,9 +116,8 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe if (listener == null) { //TODO: Channels need to better handle cluster failures LOGGER.log(Level.SEVERE, - "Tried to drop a Deployed Job whose listener no longer exists: " - + entityId.getExtensionName() + " " + entityId.getDataverse() + "." - + entityId.getEntityName() + "."); + "Tried to drop a Deployed Job whose listener no longer exists: " + entityId.getExtensionName() + + " " + entityId.getDataverse() + "." + entityId.getEntityName() + "."); } else { if (listener.getExecutorService() != null) { listener.getExecutorService().shutdown();