Skip to content
Permalink
Browse files
[ASTERIXDB-1973][BAD] Coordinated changes for IStatementExecutor
Change-Id: I392d7cda45e2bd39a85c037959ae5483eb48c9ee
  • Loading branch information
mhubail committed Sep 19, 2017
1 parent d5ef650 commit ed4aea2174ed67ab08340674e4036a6c77475d11
Showing 10 changed files with 53 additions and 51 deletions.
@@ -21,6 +21,7 @@
import java.util.List;
import java.util.concurrent.ExecutorService;

import org.apache.asterix.app.translator.RequestParameters;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.lang.statement.BrokerDropStatement;
import org.apache.asterix.bad.lang.statement.ChannelDropStatement;
@@ -37,6 +38,7 @@
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.api.client.IHyracksClientConnection;

@@ -59,24 +61,25 @@ protected void handleDataverseDropStatement(MetadataProvider metadataProvider, S
List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue());
MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse());
tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
final IRequestParameters requestParameters = new RequestParameters(null, null, null, null, null, null);
for (Broker broker : brokers) {
tempMdProvider.getLocks().reset();
BrokerDropStatement drop = new BrokerDropStatement(dvId, new Identifier(broker.getBrokerName()), false);
drop.handle(this, tempMdProvider, hcc, null, null, null, 0);
drop.handle(hcc, this, requestParameters, tempMdProvider, 0);
}
List<Channel> channels = BADLangExtension.getChannels(mdTxnCtx, dvId.getValue());
for (Channel channel : channels) {
tempMdProvider.getLocks().reset();
ChannelDropStatement drop =
new ChannelDropStatement(dvId, new Identifier(channel.getChannelId().getEntityName()), false);
drop.handle(this, tempMdProvider, hcc, null, null, null, 0);
drop.handle(hcc, this, requestParameters, tempMdProvider, 0);
}
List<Procedure> procedures = BADLangExtension.getProcedures(mdTxnCtx, dvId.getValue());
for (Procedure procedure : procedures) {
tempMdProvider.getLocks().reset();
ProcedureDropStatement drop = new ProcedureDropStatement(new FunctionSignature(dvId.getValue(),
procedure.getEntityId().getEntityName(), procedure.getArity()), false);
drop.handle(this, tempMdProvider, hcc, null, null, null, 0);
drop.handle(hcc, this, requestParameters, tempMdProvider, 0);
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
}
@@ -28,12 +28,10 @@
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;

public class BrokerDropStatement implements IExtensionStatement {
@@ -76,9 +74,9 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce
}

@Override
public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
//TODO: dont drop a broker that's being used
String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
MetadataTransactionContext mdTxnCtx = null;
@@ -34,12 +34,10 @@
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;

@@ -83,9 +81,9 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce
}

@Override
public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
@@ -48,6 +48,7 @@
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -123,10 +124,9 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce
}

@Override
public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {

public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
String brokerDataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(brokerDataverseName);

@@ -186,14 +186,18 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
metadataProvider.getDefaultDataverse());
tempMdProvider.getConfig().putAll(metadataProvider.getConfig());

final ResultDelivery resultDelivery = requestParameters.getResultDelivery();
final IHyracksDataset hdc = requestParameters.getHyracksDataset();
final Stats stats = requestParameters.getStats();
if (subscriptionId == null) {
//To create a new subscription
VariableExpr resultVar = new VariableExpr(new VarIdentifier("$result", 0));
VariableExpr useResultVar = new VariableExpr(new VarIdentifier("$result", 0));
useResultVar.setIsNewVar(false);
FieldAccessor accessor = new FieldAccessor(useResultVar, new Identifier(BADConstants.SubscriptionId));

metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter));
metadataProvider.setResultSetId(new ResultSetId(resultSetId));
boolean resultsAsync =
resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED;
metadataProvider.setResultAsyncMode(resultsAsync);
@@ -44,13 +44,11 @@
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;

public class ChannelUnsubscribeStatement implements IExtensionStatement {
@@ -106,9 +104,9 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce
}

@Override
public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);

MetadataTransactionContext mdTxnCtx = null;
@@ -31,12 +31,10 @@
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;

public class CreateBrokerStatement implements IExtensionStatement {
@@ -80,9 +78,9 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce
}

@Override
public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
MetadataTransactionContext mdTxnCtx = null;
try {
@@ -64,6 +64,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Function;
import org.apache.asterix.om.base.temporal.ADurationParserFactory;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -253,10 +254,9 @@ private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec
}

@Override
public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {

public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestContext, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
//This function performs three tasks:
//1. Create datasets for the Channel
//2. Create and run the Channel Job
@@ -306,6 +306,8 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
metadataProvider.getDefaultDataverse());
tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
final IHyracksDataset hdc = requestContext.getHyracksDataset();
final Stats stats = requestContext.getStats();
//Create Channel Datasets
createDatasets(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc, dataverse);
tempMdProvider.getLocks().reset();
@@ -57,6 +57,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Function;
import org.apache.asterix.om.base.temporal.ADurationParserFactory;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -197,9 +198,9 @@ private void setupDistributedJob(EntityId entityId, JobSpecification jobSpec, IH
}

@Override
public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
@@ -231,7 +232,10 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
metadataProvider.getDefaultDataverse());
tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
metadataProvider.setResultSetId(new ResultSetId(resultSetId++));
final ResultDelivery resultDelivery = requestParameters.getResultDelivery();
final IHyracksDataset hdc = requestParameters.getHyracksDataset();
final Stats stats = requestParameters.getStats();
boolean resultsAsync = resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED;
metadataProvider.setResultAsyncMode(resultsAsync);
tempMdProvider.setResultSetId(metadataProvider.getResultSetId());
@@ -40,12 +40,11 @@
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
@@ -90,9 +89,9 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce
}

@Override
public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
@@ -34,12 +34,10 @@
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;

@@ -77,9 +75,9 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce
}

@Override
public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();

0 comments on commit ed4aea2

Please sign in to comment.