Skip to content
Permalink
Browse files
Updated to Asterix changes
Removed the word Asterix from class names
Made BAD ruleset more robust to Asterix rule changes

Change-Id: I371cf7f0dc2e4d904d8c6c1e5cca644283ef3626
  • Loading branch information
sjaco002 committed Dec 16, 2016
1 parent 3afba42 commit 7cc6a84c354de338adb41df843bad55439ca4e2f
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 29 deletions.
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.bad.lang;

import java.util.ArrayList;
import java.util.List;

import org.apache.asterix.bad.rules.InsertBrokerNotifierForChannelRule;
@@ -37,27 +38,44 @@ public class BADRuleSetFactory implements IRuleSetFactory {
public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogicalRewrites()
throws AlgebricksException {
List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRuleSet = DefaultRuleSetFactory.buildLogical();
if (logicalRuleSet.size() != 14) {
throw new AlgebricksException("Incorrect RuleSet");
}

List<IAlgebraicRewriteRule> normalizationCollection = RuleCollections.buildNormalizationRuleCollection();
List<IAlgebraicRewriteRule> alteredNormalizationCollection = new ArrayList<>();
alteredNormalizationCollection.addAll(normalizationCollection);

for (int i = 0; i < normalizationCollection.size(); i++) {
IAlgebraicRewriteRule rule = normalizationCollection.get(i);
//Create a normalization collection that includes the broker rule
for (int i = 0; i < alteredNormalizationCollection.size(); i++) {
IAlgebraicRewriteRule rule = alteredNormalizationCollection.get(i);
if (rule instanceof UnnestToDataScanRule) {
normalizationCollection.add(i + 1, new InsertBrokerNotifierForChannelRule());
alteredNormalizationCollection.add(i + 1, new InsertBrokerNotifierForChannelRule());
break;
}
}

//Find instances of the normalization collection and replace them with the new one
SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true);
logicalRuleSet.set(3, new Pair<>(seqOnceCtrl, normalizationCollection));
logicalRuleSet.set(7, new Pair<>(seqOnceCtrl, normalizationCollection));
for (int i =0; i < logicalRuleSet.size(); i++){
List<IAlgebraicRewriteRule> collection = logicalRuleSet.get(i).second;
if (collection.size() == normalizationCollection.size()) {
boolean isNormalizationCollection = true;
for (int j = 0; j < collection.size(); j++) {
//Make sure the set of rules is the same
if (!collection.get(j).getClass().equals(normalizationCollection.get(j).getClass())) {
isNormalizationCollection = false;
break;
}
}
if (isNormalizationCollection) {
//replace with the new collection
logicalRuleSet.set(i, new Pair<>(seqOnceCtrl, alteredNormalizationCollection));
}
}
}
return logicalRuleSet;
}

@Override
public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getPhysicalRewrites() {
return DefaultRuleSetFactory.buildPhysical();
}

}
@@ -44,7 +44,7 @@
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.asterix.runtime.util.AppContextInfo;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -128,8 +128,8 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
throw new AsterixException("Channel " + channelName + " is not running");
}

ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE.getCCApplicationContext()
.getMessageBroker();
ICCMessageBroker messageBroker =
(ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker();

ChannelJobInfo cInfo = listener.getJobInfo(channel.getChannelId());;
Set<String> ncs = new HashSet<>(cInfo.getLocations());
@@ -49,7 +49,7 @@
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -169,7 +169,7 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada

List<Expression> UUIDList = new ArrayList<Expression>();
UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
FunctionIdentifier function = AsterixBuiltinFunctions.UUID_CONSTRUCTOR;
FunctionIdentifier function = BuiltinFunctions.UUID_CONSTRUCTOR;
FunctionSignature UUIDfunc = new FunctionSignature(function.getNamespace(), function.getName(),
function.getArity());
CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
@@ -42,7 +42,7 @@
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -143,7 +143,7 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
List<Expression> UUIDList = new ArrayList<Expression>();
UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));

FunctionIdentifier function = AsterixBuiltinFunctions.UUID_CONSTRUCTOR;
FunctionIdentifier function = BuiltinFunctions.UUID_CONSTRUCTOR;
FunctionSignature UUIDfunc = new FunctionSignature(function.getNamespace(), function.getName(),
function.getArity());
CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
@@ -68,7 +68,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Function;
import org.apache.asterix.om.base.temporal.ADurationParserFactory;
import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.asterix.runtime.util.AppContextInfo;
import org.apache.asterix.runtime.util.ClusterStateManager;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
@@ -282,7 +282,7 @@ private JobSpecification createChannelJob(IStatementExecutor statementExecutor,

private void setupCompiledJob(MetadataProvider metadataProvider, String dataverse, EntityId entityId,
JobSpecification channeljobSpec, IHyracksClientConnection hcc) throws Exception {
ICCApplicationContext iCCApp = AsterixAppContextInfo.INSTANCE.getCCApplicationContext();
ICCApplicationContext iCCApp = AppContextInfo.INSTANCE.getCCApplicationContext();
ClusterControllerInfo ccInfo = iCCApp.getCCContext().getClusterControllerInfo();
String strIP = ccInfo.getClientNetAddress();
int port = ccInfo.getClientNetPort();
@@ -35,7 +35,7 @@
import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.asterix.runtime.util.AppContextInfo;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
@@ -97,7 +97,7 @@ private synchronized void handleJobFinishEvent(ActiveEvent message) throws Excep
private synchronized void handleJobFinishMessage(ChannelJobInfo cInfo) throws Exception {
EntityId channelJobId = cInfo.getEntityId();

IHyracksClientConnection hcc = AsterixAppContextInfo.INSTANCE.getHcc();
IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc();
JobInfo info = hcc.getJobInfo(cInfo.getJobId());
JobStatus status = info.getStatus();
boolean failure = status != null && status.equals(JobStatus.FAILURE);
@@ -128,7 +128,7 @@ private static synchronized void handleJobStartMessage(ChannelJobInfo cInfo) thr
}
}

IHyracksClientConnection hcc = AsterixAppContextInfo.INSTANCE.getHcc();
IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc();
JobInfo info = hcc.getJobInfo(cInfo.getJobId());
List<String> locations = new ArrayList<>();
for (OperatorDescriptorId channelOperatorId : channelOperatorIds) {
@@ -30,7 +30,7 @@
import org.apache.asterix.metadata.declared.DatasetDataSource;
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.constants.AsterixConstantValue;
import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -174,8 +174,8 @@ private DelegateOperator createNotifyBrokerPlan(LogicalVariable brokerEndpointVa
LogicalVariable subscriptionListVar = context.newVar();
List<LogicalVariable> aggVars = new ArrayList<LogicalVariable>();
aggVars.add(subscriptionListVar);
AggregateFunctionCallExpression funAgg = AsterixBuiltinFunctions.makeAggregateFunctionExpression(
AsterixBuiltinFunctions.LISTIFY, new ArrayList<Mutable<ILogicalExpression>>());
AggregateFunctionCallExpression funAgg = BuiltinFunctions.makeAggregateFunctionExpression(
BuiltinFunctions.LISTIFY, new ArrayList<Mutable<ILogicalExpression>>());
funAgg.getArguments()
.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(subscriptionIdVar)));
List<Mutable<ILogicalExpression>> aggExpressions = new ArrayList<Mutable<ILogicalExpression>>();
@@ -229,7 +229,7 @@ private AssignOperator createbrokerEndPointAssignOperator(LogicalVariable broker
new VariableReferenceExpression(brokerScan.getVariables().get(2)));

ScalarFunctionCallExpression fieldAccessByName = new ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME), varRef, fieldRef);
FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME), varRef, fieldRef);
ArrayList<LogicalVariable> varArray = new ArrayList<LogicalVariable>(1);
varArray.add(brokerEndpointVar);
ArrayList<Mutable<ILogicalExpression>> exprArray = new ArrayList<Mutable<ILogicalExpression>>(1);
@@ -26,7 +26,7 @@
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.bad.ChannelJobService;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
@@ -70,7 +70,7 @@ public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brok
eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
eval1 = subEvalFactory.createScalarEvaluator(ctx);
eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
this.activeManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
this.activeManager = (ActiveManager) ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
.getApplicationObject()).getActiveManager();
this.entityId = activeJobId;
}
@@ -23,7 +23,7 @@
import java.util.Collection;
import java.util.logging.Logger;

import org.apache.asterix.common.config.AsterixTransactionProperties;
import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.test.aql.TestExecutor;
import org.apache.asterix.test.runtime.ExecutionTestUtil;
import org.apache.asterix.testframework.context.TestCaseContext;
@@ -50,7 +50,7 @@ public class BADExecutionTest {

protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/conf/asterix-build-configuration.xml";

protected static AsterixTransactionProperties txnProperties;
protected static TransactionProperties txnProperties;
private static final TestExecutor testExecutor = new TestExecutor();
private static final boolean cleanupOnStart = true;
private static final boolean cleanupOnStop = true;

0 comments on commit 7cc6a84

Please sign in to comment.