Skip to content
Permalink
Browse files
Added Procedures to BAD
Change-Id: I03550a74e2c90179e72345103b3d2c4f98148631
  • Loading branch information
sjaco002 committed Feb 24, 2017
1 parent 3dcf57c commit 79226b5769ee0960ba0fd207315e4a25a17c1812
Showing 50 changed files with 946 additions and 616 deletions.
@@ -9,3 +9,4 @@ git.properties
build
*.iml
.idea/*
ClusterControllerService
@@ -246,11 +246,6 @@
<artifactId>algebricks-core</artifactId>
<version>${hyracks.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-dataflow-std</artifactId>
<version>${hyracks.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

This file was deleted.

@@ -70,7 +70,7 @@ public static void executeJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlag
if (jobId == null) {
hcc.startJob(jobSpec, jobFlags);
} else {
hcc.startJob(jobSpec, jobFlags, jobId);
hcc.startJob(jobId);
}
}

@@ -27,6 +27,7 @@
import org.apache.asterix.bad.metadata.ChannelSearchKey;
import org.apache.asterix.bad.metadata.DataverseBrokersSearchKey;
import org.apache.asterix.bad.metadata.DataverseChannelsSearchKey;
import org.apache.asterix.bad.metadata.DataverseProceduresSearchKey;
import org.apache.asterix.bad.metadata.Procedure;
import org.apache.asterix.bad.metadata.ProcedureSearchKey;
import org.apache.asterix.common.api.ExtensionId;
@@ -119,4 +120,10 @@ public static List<Channel> getChannels(MetadataTransactionContext mdTxnCtx, Str
return MetadataManager.INSTANCE.getEntities(mdTxnCtx, channelSearchKey);
}

public static List<Procedure> getProcedures(MetadataTransactionContext mdTxnCtx, String dataverseName)
throws AlgebricksException {
DataverseProceduresSearchKey proceduresSearchKey = new DataverseProceduresSearchKey(dataverseName);
return MetadataManager.INSTANCE.getEntities(mdTxnCtx, proceduresSearchKey);
}

}
@@ -32,6 +32,7 @@ public class BADQueryTranslatorFactory extends DefaultStatementExecutorFactory {
@Override
public QueryTranslator create(List<Statement> statements, SessionConfig conf,
ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider) {
return new BADStatementExecutor(statements, conf, compilationProvider, storageComponentProvider);
return new BADStatementExecutor(statements, conf, compilationProvider, storageComponentProvider,
executorService);
}
}
@@ -19,13 +19,17 @@
package org.apache.asterix.bad.lang;

import java.util.List;
import java.util.concurrent.ExecutorService;

import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.lang.statement.BrokerDropStatement;
import org.apache.asterix.bad.lang.statement.ChannelDropStatement;
import org.apache.asterix.bad.lang.statement.ProcedureDropStatement;
import org.apache.asterix.bad.metadata.Broker;
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.bad.metadata.Procedure;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.statement.DataverseDropStatement;
@@ -39,8 +43,9 @@
public class BADStatementExecutor extends QueryTranslator {

public BADStatementExecutor(List<Statement> aqlStatements, SessionConfig conf,
ILangCompilationProvider compliationProvider, IStorageComponentProvider storageComponentProvider) {
super(aqlStatements, conf, compliationProvider, storageComponentProvider);
ILangCompilationProvider compliationProvider, IStorageComponentProvider storageComponentProvider,
ExecutorService executorService) {
super(aqlStatements, conf, compliationProvider, storageComponentProvider, executorService);
}


@@ -64,6 +69,12 @@ protected void handleDataverseDropStatement(MetadataProvider metadataProvider, S
new Identifier(channel.getChannelId().getEntityName()), false);
drop.handle(this, metadataProvider, hcc, null, null, null, 0);
}
List<Procedure> procedures = BADLangExtension.getProcedures(mdTxnCtx, dvId.getValue());
for (Procedure procedure : procedures) {
ProcedureDropStatement drop = new ProcedureDropStatement(new FunctionSignature(dvId.getValue(),
procedure.getEntityId().getEntityName(), procedure.getArity()), false);
drop.handle(this, metadataProvider, hcc, null, null, null, 0);
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
}

@@ -18,42 +18,29 @@
*/
package org.apache.asterix.bad.lang.statement;

import java.util.HashSet;
import java.util.Set;

import org.apache.asterix.active.ActiveJobNotificationHandler;
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.bad.ChannelJobInfo;
import org.apache.asterix.bad.lang.BADLangExtension;
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.bad.metadata.ChannelEventsListener;
import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorNodePushable;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
import org.apache.asterix.lang.common.statement.DropDatasetStatement;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.runtime.utils.AppContextInfo;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;

public class ChannelDropStatement implements IExtensionStatement {

@@ -102,10 +89,8 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
ChannelEventsListener listener = (ChannelEventsListener) ActiveJobNotificationHandler.INSTANCE
PrecompiledJobEventListener listener = (PrecompiledJobEventListener) ActiveJobNotificationHandler.INSTANCE
.getActiveEntityListener(entityId);
IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
boolean subscriberRegistered = false;
Channel channel = null;

MetadataTransactionContext mdTxnCtx = null;
@@ -122,29 +107,14 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
throw new AlgebricksException("There is no channel with this name " + channelName + ".");
}
}
if (listener != null) {
subscriberRegistered = listener.isChannelActive(entityId, eventSubscriber);
}
if (!subscriberRegistered) {
throw new AsterixException("Channel " + channelName + " is not running");
}

ICCMessageBroker messageBroker =
(ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker();

ChannelJobInfo cInfo = listener.getJobInfo(channel.getChannelId());;
Set<String> ncs = new HashSet<>(cInfo.getLocations());
AlgebricksAbsolutePartitionConstraint locations = new AlgebricksAbsolutePartitionConstraint(
ncs.toArray(new String[ncs.size()]));
int partition = 0;
for (String location : locations.getLocations()) {
messageBroker.sendApplicationMessageToNC(
new ActiveManagerMessage(ActiveManagerMessage.STOP_ACTIVITY, "cc",
new ActiveRuntimeId(channel.getChannelId(),
RepetitiveChannelOperatorNodePushable.class.getSimpleName(), partition++)),
location);
listener.getExecutorService().shutdownNow();
JobId hyracksJobId = listener.getJobId();
listener.deActivate();
ActiveJobNotificationHandler.INSTANCE.removeListener(listener);
if (hyracksJobId != null) {
hcc.destroyJob(hyracksJobId);
}
eventSubscriber.assertEvent(ActiveLifecycleEvent.ACTIVE_JOB_ENDED);

//Drop the Channel Datasets
//TODO: Need to find some way to handle if this fails.
@@ -157,9 +127,6 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
new Identifier(channel.getSubscriptionsDataset()), true);
((QueryTranslator) statementExecutor).handleDatasetDropStatement(metadataProvider, dropStmt, hcc);

if (subscriberRegistered) {
listener.deregisterEventSubscriber(eventSubscriber);
}

//Remove the Channel Metadata
MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, channel);
@@ -156,7 +156,7 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
AqlDeleteRewriteVisitor visitor = new AqlDeleteRewriteVisitor();
delete.accept(visitor, null);

((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, delete, hcc);
((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, delete, hcc, false);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
QueryTranslator.abort(e, e, mdTxnCtx);

0 comments on commit 79226b5

Please sign in to comment.