Skip to content
Permalink
Browse files
Enable dependencies in the metadata for BAD entities
Allow Channels and Procedures to store dependencies on
Datasets and Functions

Prevent dropping of these dependencies

Add Error tests

Change-Id: Ic6ac2daad03844a042aded8e17bb231a06f59cbe
  • Loading branch information
sjaco002 committed Jan 20, 2018
1 parent c6c98ce commit 5b870653b3da64733246b1396d206fe84b8f3890
Showing 35 changed files with 1,317 additions and 103 deletions.
@@ -45,6 +45,7 @@ public interface BADConstants {
String Duration = "Duration";
String Function = "Function";
String FIELD_NAME_ARITY = "Arity";
String FIELD_NAME_DEPENDENCIES = "Dependencies";
String FIELD_NAME_PARAMS = "Params";
String FIELD_NAME_RETURN_TYPE = "ReturnType";
String FIELD_NAME_DEFINITION = "Definition";
@@ -21,6 +21,8 @@
import java.util.List;

import org.apache.asterix.algebra.base.ILangExtension;
import org.apache.asterix.bad.metadata.AllChannelsSearchKey;
import org.apache.asterix.bad.metadata.AllProceduresSearchKey;
import org.apache.asterix.bad.metadata.Broker;
import org.apache.asterix.bad.metadata.BrokerSearchKey;
import org.apache.asterix.bad.metadata.Channel;
@@ -111,6 +113,11 @@ public static List<Broker> getBrokers(MetadataTransactionContext mdTxnCtx, Strin
return MetadataManager.INSTANCE.getEntities(mdTxnCtx, brokerSearchKey);
}

public static List<Channel> getAllChannels(MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
AllChannelsSearchKey channelSearchKey = new AllChannelsSearchKey();
return MetadataManager.INSTANCE.getEntities(mdTxnCtx, channelSearchKey);
}

public static List<Channel> getChannels(MetadataTransactionContext mdTxnCtx, String dataverseName)
throws AlgebricksException {
DataverseChannelsSearchKey channelSearchKey = new DataverseChannelsSearchKey(dataverseName);
@@ -123,4 +130,9 @@ public static List<Procedure> getProcedures(MetadataTransactionContext mdTxnCtx,
return MetadataManager.INSTANCE.getEntities(mdTxnCtx, proceduresSearchKey);
}

public static List<Procedure> getAllProcedures(MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
AllProceduresSearchKey proceduresSearchKey = new AllProceduresSearchKey();
return MetadataManager.INSTANCE.getEntities(mdTxnCtx, proceduresSearchKey);
}

}
@@ -21,25 +21,30 @@
import java.util.List;
import java.util.concurrent.ExecutorService;

import org.apache.asterix.app.translator.RequestParameters;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.app.translator.RequestParameters;
import org.apache.asterix.bad.lang.statement.BrokerDropStatement;
import org.apache.asterix.bad.lang.statement.ChannelDropStatement;
import org.apache.asterix.bad.lang.statement.ProcedureDropStatement;
import org.apache.asterix.bad.metadata.Broker;
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.bad.metadata.Procedure;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.statement.DataverseDropStatement;
import org.apache.asterix.lang.common.statement.DropDatasetStatement;
import org.apache.asterix.lang.common.statement.FunctionDropStatement;
import org.apache.asterix.lang.common.statement.IndexDropStatement;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;

public class BADStatementExecutor extends QueryTranslator {
@@ -49,39 +54,170 @@ public BADStatementExecutor(ICcApplicationContext appCtx, List<Statement> statem
super(appCtx, statements, output, compliationProvider, executorService);
}

//TODO: Most of this file could go away if we had metadata dependencies

private void checkIfDatasetIsInUse(MetadataTransactionContext mdTxnCtx, String dataverse, String dataset)
throws CompilationException, AlgebricksException {
List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx);
for (Channel channel : channels) {
List<List<List<String>>> dependencies = channel.getDependencies();
List<List<String>> datasetDependencies = dependencies.get(0);
for (List<String> dependency : datasetDependencies) {
if (dependency.get(0).equals(dataverse) && dependency.get(1).equals(dataset)) {
throw new CompilationException("Cannot alter dataset " + dataverse + "." + dataset + ". "
+ channel.getChannelId() + " depends on it!");
}
}

}
List<Procedure> procedures = BADLangExtension.getAllProcedures(mdTxnCtx);
for (Procedure procedure : procedures) {
List<List<List<String>>> dependencies = procedure.getDependencies();
List<List<String>> datasetDependencies = dependencies.get(0);
for (List<String> dependency : datasetDependencies) {
if (dependency.get(0).equals(dataverse) && dependency.get(1).equals(dataset)) {
throw new CompilationException("Cannot alter dataset " + dataverse + "." + dataset + ". "
+ procedure.getEntityId() + " depends on it!");
}
}

}
}

@Override
public void handleDatasetDropStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
String dvId = getActiveDataverse(((DropDatasetStatement) stmt).getDataverseName());
Identifier dsId = ((DropDatasetStatement) stmt).getDatasetName();

checkIfDatasetIsInUse(mdTxnCtx, dvId, dsId.getValue());

MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
super.handleDatasetDropStatement(metadataProvider, stmt, hcc, requestParameters);
}

@Override
protected void handleIndexDropStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
String dvId = getActiveDataverse(((IndexDropStatement) stmt).getDataverseName());
Identifier dsId = ((IndexDropStatement) stmt).getDatasetName();

checkIfDatasetIsInUse(mdTxnCtx, dvId, dsId.getValue());

MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
super.handleIndexDropStatement(metadataProvider, stmt, hcc, requestParameters);
}

@Override
protected void handleFunctionDropStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
FunctionSignature sig = ((FunctionDropStatement) stmt).getFunctionSignature();

String dvId = getActiveDataverseName(sig.getNamespace());
String function = sig.getName();
String arity = Integer.toString(sig.getArity());

List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx);
for (Channel channel : channels) {
List<List<List<String>>> dependencies = channel.getDependencies();
List<List<String>> datasetDependencies = dependencies.get(1);
for (List<String> dependency : datasetDependencies) {
if (dependency.get(0).equals(dvId) && dependency.get(1).equals(function)
&& dependency.get(2).equals(arity)) {
throw new CompilationException(
"Cannot drop function " + sig + ". " + channel.getChannelId() + " depends on it!");
}
}

}
List<Procedure> procedures = BADLangExtension.getAllProcedures(mdTxnCtx);
for (Procedure procedure : procedures) {
List<List<List<String>>> dependencies = procedure.getDependencies();
List<List<String>> datasetDependencies = dependencies.get(1);
for (List<String> dependency : datasetDependencies) {
if (dependency.get(0).equals(dvId) && dependency.get(1).equals(function)
&& dependency.get(2).equals(arity)) {
throw new CompilationException(
"Cannot drop function " + sig + ". " + procedure.getEntityId() + " depends on it!");
}
}

}

MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
super.handleFunctionDropStatement(metadataProvider, stmt);
}

@Override
protected void handleDataverseDropStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
//TODO: Remove this when metadata dependencies are in place
//TODO: Stop dataset drop when dataset used by channel
super.handleDataverseDropStatement(metadataProvider, stmt, hcc);
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
Identifier dvId = ((DataverseDropStatement) stmt).getDataverseName();
List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue());
MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse());
tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
final IRequestParameters requestParameters = new RequestParameters(null, null, null, null, null, null);
for (Broker broker : brokers) {
tempMdProvider.getLocks().reset();
BrokerDropStatement drop = new BrokerDropStatement(dvId, new Identifier(broker.getBrokerName()), false);
drop.handle(hcc, this, requestParameters, tempMdProvider, 0);
List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx);
for (Channel channel : channels) {
if (channel.getChannelId().getDataverse().equals(dvId.getValue())) {
continue;
}
List<List<List<String>>> dependencies = channel.getDependencies();
for (List<List<String>> dependencyList : dependencies) {
for (List<String> dependency : dependencyList) {
if (dependency.get(0).equals(dvId.getValue())) {
throw new CompilationException("Cannot drop dataverse " + dvId.getValue() + ". "
+ channel.getChannelId() + " depends on it!");
}
}
}
}
List<Procedure> procedures = BADLangExtension.getAllProcedures(mdTxnCtx);
for (Procedure procedure : procedures) {
if (procedure.getEntityId().getDataverse().equals(dvId.getValue())) {
continue;
}
List<List<List<String>>> dependencies = procedure.getDependencies();
for (List<List<String>> dependencyList : dependencies) {
for (List<String> dependency : dependencyList) {
if (dependency.get(0).equals(dvId.getValue())) {
throw new CompilationException("Cannot drop dataverse " + dvId.getValue() + ". "
+ procedure.getEntityId() + " depends on it!");
}
}
}
}
List<Channel> channels = BADLangExtension.getChannels(mdTxnCtx, dvId.getValue());
final IRequestParameters requestParameters = new RequestParameters(null, null, null, null, null, null);
for (Channel channel : channels) {
if (!channel.getChannelId().getDataverse().equals(dvId.getValue())) {
continue;
}
tempMdProvider.getLocks().reset();
ChannelDropStatement drop =
new ChannelDropStatement(dvId, new Identifier(channel.getChannelId().getEntityName()), false);
drop.handle(hcc, this, requestParameters, tempMdProvider, 0);
}
List<Procedure> procedures = BADLangExtension.getProcedures(mdTxnCtx, dvId.getValue());
for (Procedure procedure : procedures) {
if (!procedure.getEntityId().getDataverse().equals(dvId.getValue())) {
continue;
}
tempMdProvider.getLocks().reset();
ProcedureDropStatement drop = new ProcedureDropStatement(new FunctionSignature(dvId.getValue(),
procedure.getEntityId().getEntityName(), procedure.getArity()), false);
drop.handle(hcc, this, requestParameters, tempMdProvider, 0);
}
List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue());
for (Broker broker : brokers) {
tempMdProvider.getLocks().reset();
BrokerDropStatement drop = new BrokerDropStatement(dvId, new Identifier(broker.getBrokerName()), false);
drop.handle(hcc, this, requestParameters, tempMdProvider, 0);
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
super.handleDataverseDropStatement(metadataProvider, stmt, hcc);
}

}
@@ -139,18 +139,16 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
//Drop the Channel Datasets
//TODO: Need to find some way to handle if this fails.
//TODO: Prevent datasets for Channels from being dropped elsewhere

//Remove the Channel Metadata
MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, channel);
DropDatasetStatement dropStmt = new DropDatasetStatement(new Identifier(dataverse),
new Identifier(channel.getResultsDatasetName()), true);
((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
tempMdProvider.getLocks().reset();
dropStmt = new DropDatasetStatement(new Identifier(dataverse),
new Identifier(channel.getSubscriptionsDataset()), true);
((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);

//Remove the Channel Metadata
MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, channel);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
e.printStackTrace();

0 comments on commit 5b87065

Please sign in to comment.