Skip to content
Permalink
Browse files
Remove use of static ctx
Change-Id: I758f50772823d7b1935e4d61a6cb2805ba0808ea
  • Loading branch information
sjaco002 committed Apr 21, 2017
1 parent 5551404 commit 3ada054e04b149001597173fd1d60bff21b4bff7
Showing 11 changed files with 92 additions and 73 deletions.
@@ -23,16 +23,17 @@
import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.translator.SessionConfig;

public class BADQueryTranslatorFactory extends DefaultStatementExecutorFactory {

@Override
public QueryTranslator create(List<Statement> statements, SessionConfig conf,
public QueryTranslator create(ICcApplicationContext appCtx, List<Statement> statements, SessionConfig conf,
ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider) {
return new BADStatementExecutor(statements, conf, compilationProvider, storageComponentProvider,
return new BADStatementExecutor(appCtx, statements, conf, compilationProvider, storageComponentProvider,
executorService);
}
}
@@ -22,6 +22,7 @@
import java.util.List;

import org.apache.asterix.bad.rules.InsertBrokerNotifierForChannelRule;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.compiler.provider.DefaultRuleSetFactory;
import org.apache.asterix.compiler.provider.IRuleSetFactory;
import org.apache.asterix.optimizer.base.RuleCollections;
@@ -35,11 +36,12 @@
public class BADRuleSetFactory implements IRuleSetFactory {

@Override
public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogicalRewrites()
throws AlgebricksException {
List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRuleSet = DefaultRuleSetFactory.buildLogical();
public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogicalRewrites(
ICcApplicationContext appCtx) throws AlgebricksException {
List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRuleSet =
DefaultRuleSetFactory.buildLogical(appCtx);

List<IAlgebraicRewriteRule> normalizationCollection = RuleCollections.buildNormalizationRuleCollection();
List<IAlgebraicRewriteRule> normalizationCollection = RuleCollections.buildNormalizationRuleCollection(appCtx);
List<IAlgebraicRewriteRule> alteredNormalizationCollection = new ArrayList<>();
alteredNormalizationCollection.addAll(normalizationCollection);

@@ -54,7 +56,7 @@ public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogica

//Find instances of the normalization collection and replace them with the new one
SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true);
for (int i =0; i < logicalRuleSet.size(); i++){
for (int i = 0; i < logicalRuleSet.size(); i++) {
List<IAlgebraicRewriteRule> collection = logicalRuleSet.get(i).second;
if (collection.size() == normalizationCollection.size()) {
boolean isNormalizationCollection = true;
@@ -75,7 +77,8 @@ public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogica
}

@Override
public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getPhysicalRewrites() {
return DefaultRuleSetFactory.buildPhysical();
public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getPhysicalRewrites(
ICcApplicationContext appCtx) {
return DefaultRuleSetFactory.buildPhysical(appCtx);
}
}
@@ -29,6 +29,7 @@
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.bad.metadata.Procedure;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.lang.common.base.Statement;
@@ -42,13 +43,12 @@

public class BADStatementExecutor extends QueryTranslator {

public BADStatementExecutor(List<Statement> statements, SessionConfig conf,
public BADStatementExecutor(ICcApplicationContext appCtx, List<Statement> statements, SessionConfig conf,
ILangCompilationProvider compliationProvider, IStorageComponentProvider storageComponentProvider,
ExecutorService executorService) {
super(statements, conf, compliationProvider, storageComponentProvider, executorService);
super(appCtx, statements, conf, compliationProvider, storageComponentProvider, executorService);
}


@Override
protected void handleDataverseDropStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
@@ -59,7 +59,7 @@ protected void handleDataverseDropStatement(MetadataProvider metadataProvider, S
metadataProvider.setMetadataTxnContext(mdTxnCtx);
Identifier dvId = ((DataverseDropStatement) stmt).getDataverseName();
List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue());
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse(),
metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());
for (Broker broker : brokers) {
@@ -70,8 +70,8 @@ protected void handleDataverseDropStatement(MetadataProvider metadataProvider, S
List<Channel> channels = BADLangExtension.getChannels(mdTxnCtx, dvId.getValue());
for (Channel channel : channels) {
tempMdProvider.getLocks().reset();
ChannelDropStatement drop = new ChannelDropStatement(dvId,
new Identifier(channel.getChannelId().getEntityName()), false);
ChannelDropStatement drop =
new ChannelDropStatement(dvId, new Identifier(channel.getChannelId().getEntityName()), false);
drop.handle(this, tempMdProvider, hcc, null, null, null, 0);
}
List<Procedure> procedures = BADLangExtension.getProcedures(mdTxnCtx, dvId.getValue());
@@ -19,13 +19,15 @@
package org.apache.asterix.bad.lang.statement;

import org.apache.asterix.active.ActiveJobNotificationHandler;
import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.bad.lang.BADLangExtension;
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.lang.common.statement.DropDatasetStatement;
import org.apache.asterix.lang.common.struct.Identifier;
@@ -85,12 +87,14 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce
public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {

String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
PrecompiledJobEventListener listener = (PrecompiledJobEventListener) ActiveJobNotificationHandler.INSTANCE
.getActiveEntityListener(entityId);
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
PrecompiledJobEventListener listener =
(PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
Channel channel = null;

MetadataTransactionContext mdTxnCtx = null;
@@ -111,13 +115,13 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
listener.getExecutorService().shutdownNow();
JobId hyracksJobId = listener.getJobId();
listener.deActivate();
ActiveJobNotificationHandler.INSTANCE.removeListener(listener);
activeEventHandler.removeListener(listener);
if (hyracksJobId != null) {
hcc.destroyJob(hyracksJobId);
}

//Create a metadata provider to use in nested jobs.
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse(),
metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());
//Drop the Channel Datasets
@@ -131,7 +135,6 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
new Identifier(channel.getSubscriptionsDataset()), true);
((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc);


//Remove the Channel Metadata
MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, channel);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -128,8 +128,7 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {

String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
String brokerDataverse = ((QueryTranslator) statementExecutor)
.getActiveDataverse(brokerDataverseName);
String brokerDataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(brokerDataverseName);

MetadataTransactionContext mdTxnCtx = null;
try {
@@ -153,7 +152,7 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada

Query subscriptionTuple = new Query(false);

List<FieldBinding> fb = new ArrayList<FieldBinding>();
List<FieldBinding> fb = new ArrayList<>();
LiteralExpr leftExpr = new LiteralExpr(new StringLiteral(BADConstants.DataverseName));
Expression rightExpr = new LiteralExpr(new StringLiteral(brokerDataverse));
fb.add(new FieldBinding(leftExpr, rightExpr));
@@ -165,11 +164,11 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
if (subscriptionId != null) {
leftExpr = new LiteralExpr(new StringLiteral(BADConstants.SubscriptionId));

List<Expression> UUIDList = new ArrayList<Expression>();
List<Expression> UUIDList = new ArrayList<>();
UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
FunctionIdentifier function = BuiltinFunctions.UUID_CONSTRUCTOR;
FunctionSignature UUIDfunc = new FunctionSignature(function.getNamespace(), function.getName(),
function.getArity());
FunctionSignature UUIDfunc =
new FunctionSignature(function.getNamespace(), function.getName(), function.getArity());
CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);

rightExpr = UUIDCall;
@@ -186,8 +185,8 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada

subscriptionTuple.setVarCounter(varCounter);

MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
metadataProvider.getStorageComponentProvider());
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());

if (subscriptionId == null) {
@@ -129,12 +129,12 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
condition.setCurrentop(true);
condition.addOperator("=");

List<Expression> UUIDList = new ArrayList<Expression>();
List<Expression> UUIDList = new ArrayList<>();
UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));

FunctionIdentifier function = BuiltinFunctions.UUID_CONSTRUCTOR;
FunctionSignature UUIDfunc = new FunctionSignature(function.getNamespace(), function.getName(),
function.getArity());
FunctionSignature UUIDfunc =
new FunctionSignature(function.getNamespace(), function.getName(), function.getArity());
CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);

condition.addOperand(UUIDCall);
@@ -143,8 +143,8 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
new Identifier(subscriptionsDatasetName), condition, varCounter);
SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
delete.accept(visitor, null);
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
metadataProvider.getStorageComponentProvider());
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());
((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -30,6 +30,7 @@
import java.util.logging.Logger;

import org.apache.asterix.active.ActiveJobNotificationHandler;
import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.app.translator.QueryTranslator;
@@ -41,6 +42,7 @@
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
@@ -174,10 +176,10 @@ private void createDatasets(IStatementExecutor statementExecutor, Identifier sub
Identifier subscriptionsTypeName = new Identifier(BADConstants.ChannelSubscriptionsType);
Identifier resultsTypeName = new Identifier(BADConstants.ChannelResultsType);
//Setup the subscriptions dataset
List<List<String>> partitionFields = new ArrayList<List<String>>();
List<Integer> keyIndicators = new ArrayList<Integer>();
List<List<String>> partitionFields = new ArrayList<>();
List<Integer> keyIndicators = new ArrayList<>();
keyIndicators.add(0);
List<String> fieldNames = new ArrayList<String>();
List<String> fieldNames = new ArrayList<>();
fieldNames.add(BADConstants.SubscriptionId);
partitionFields.add(fieldNames);
IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
@@ -186,8 +188,8 @@ private void createDatasets(IStatementExecutor statementExecutor, Identifier sub
new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);

//Setup the results dataset
partitionFields = new ArrayList<List<String>>();
fieldNames = new ArrayList<String>();
partitionFields = new ArrayList<>();
fieldNames = new ArrayList<>();
fieldNames.add(BADConstants.ResultId);
partitionFields.add(fieldNames);
idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
@@ -238,8 +240,7 @@ private JobSpecification createChannelJob(IStatementExecutor statementExecutor,
}

private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc,
PrecompiledJobEventListener listener, boolean predistributed)
throws Exception {
PrecompiledJobEventListener listener, boolean predistributed) throws Exception {
if (channeljobSpec != null) {
//TODO: Find a way to fix optimizer tests so we don't need this check
JobId jobId = null;
@@ -272,8 +273,11 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
Identifier subscriptionsName = new Identifier(channelName + BADConstants.subscriptionEnding);
Identifier resultsName = new Identifier(channelName + BADConstants.resultsEnding);
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
PrecompiledJobEventListener listener = (PrecompiledJobEventListener) ActiveJobNotificationHandler.INSTANCE
.getActiveEntityListener(entityId);
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
PrecompiledJobEventListener listener =
(PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
boolean alreadyActive = false;
Channel channel = null;

@@ -302,8 +306,8 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue()) != null) {
throw new AsterixException("The channel name:" + channelName + " is not available.");
}
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
metadataProvider.getStorageComponentProvider());
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());
//Create Channel Datasets
createDatasets(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc, stats,
@@ -320,7 +324,7 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue()));
//TODO: Add datasets used by channel function
listener = new PrecompiledJobEventListener(entityId, PrecompiledType.CHANNEL, datasets);
ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
activeEventHandler.registerListener(listener);
}

if (distributed) {

0 comments on commit 3ada054

Please sign in to comment.