Skip to content

Commit

Permalink
Pass executor context and client id to extention statements
Browse files Browse the repository at this point in the history
Change-Id: I1395817df00917bd9b4d5676d4f2cea3888b7221
  • Loading branch information
amoudi87 committed Jul 18, 2018
1 parent 8c940db commit 40b70a1
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 43 deletions.
Expand Up @@ -250,7 +250,6 @@ public void handleCreateIndexStatement(MetadataProvider metadataProvider, Statem
metadataProvider.getLocks().unlock();
}


}

@Override
Expand Down Expand Up @@ -343,7 +342,7 @@ protected void handleDataverseDropStatement(MetadataProvider metadataProvider, S
tempMdProvider.getLocks().reset();
ChannelDropStatement drop =
new ChannelDropStatement(dvId, new Identifier(channel.getChannelId().getEntityName()), false);
drop.handle(hcc, this, requestParameters, tempMdProvider, 0);
drop.handle(hcc, this, requestParameters, tempMdProvider, 0, null);
}
for (Procedure procedure : procedures) {
if (!procedure.getEntityId().getDataverse().equals(dvId.getValue())) {
Expand All @@ -352,13 +351,13 @@ protected void handleDataverseDropStatement(MetadataProvider metadataProvider, S
tempMdProvider.getLocks().reset();
ProcedureDropStatement drop = new ProcedureDropStatement(new FunctionSignature(dvId.getValue(),
procedure.getEntityId().getEntityName(), procedure.getArity()), false);
drop.handle(hcc, this, requestParameters, tempMdProvider, 0);
drop.handle(hcc, this, requestParameters, tempMdProvider, 0, null);
}
List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue());
for (Broker broker : brokers) {
tempMdProvider.getLocks().reset();
BrokerDropStatement drop = new BrokerDropStatement(dvId, new Identifier(broker.getBrokerName()), false);
drop.handle(hcc, this, requestParameters, tempMdProvider, 0);
drop.handle(hcc, this, requestParameters, tempMdProvider, 0, null);
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
super.handleDataverseDropStatement(metadataProvider, stmt, hcc);
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
Expand Down Expand Up @@ -70,8 +71,8 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce

@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId,
IStatementExecutorContext executorCtx) throws HyracksDataException, AlgebricksException {
//TODO: dont drop a broker that's being used
String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
MetadataTransactionContext mdTxnCtx = null;
Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
Expand Down Expand Up @@ -82,8 +83,8 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce

@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId,
IStatementExecutorContext executorCtx) throws HyracksDataException, AlgebricksException {
String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
Expand Down Expand Up @@ -111,9 +112,8 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
if (listener == null) {
//TODO: Channels need to better handle cluster failures
LOGGER.log(Level.SEVERE,
"Tried to drop a Deployed Job whose listener no longer exists: "
+ entityId.getExtensionName() + " " + entityId.getDataverse() + "."
+ entityId.getEntityName() + ".");
"Tried to drop a Deployed Job whose listener no longer exists: " + entityId.getExtensionName()
+ " " + entityId.getDataverse() + "." + entityId.getEntityName() + ".");

} else {
listener.getExecutorService().shutdown();
Expand Down
Expand Up @@ -52,6 +52,7 @@
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.api.client.IHyracksClientConnection;
Expand Down Expand Up @@ -120,8 +121,8 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce

@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId,
IStatementExecutorContext executorCtx) throws HyracksDataException, AlgebricksException {
String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
String brokerDataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(brokerDataverseName);

Expand Down
Expand Up @@ -46,6 +46,7 @@
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.api.client.IHyracksClientConnection;
Expand Down Expand Up @@ -100,8 +101,8 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce

@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId,
IStatementExecutorContext executorCtx) throws HyracksDataException, AlgebricksException {
String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);

MetadataTransactionContext mdTxnCtx = null;
Expand Down Expand Up @@ -139,8 +140,7 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
metadataProvider.getDefaultDataverse());
tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false, null,
null);
((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false, null, null);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
QueryTranslator.abort(e, e, mdTxnCtx);
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
Expand Down Expand Up @@ -74,8 +75,8 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce

@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId,
IStatementExecutorContext executorCtx) throws HyracksDataException, AlgebricksException {
String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
MetadataTransactionContext mdTxnCtx = null;
try {
Expand Down
Expand Up @@ -68,6 +68,7 @@
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.client.IHyracksClientConnection;
Expand Down Expand Up @@ -138,8 +139,7 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce
return null;
}

public void initialize(MetadataTransactionContext mdTxnCtx)
throws AlgebricksException, HyracksDataException {
public void initialize(MetadataTransactionContext mdTxnCtx) throws AlgebricksException, HyracksDataException {
Function lookup = MetadataManager.INSTANCE.getFunction(mdTxnCtx, function);
if (lookup == null) {
throw new MetadataException(" Unknown function " + function.getName());
Expand Down Expand Up @@ -258,8 +258,8 @@ private JobSpecification createChannelJob(IStatementExecutor statementExecutor,

@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestContext, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
IRequestParameters requestContext, MetadataProvider metadataProvider, int resultSetId,
IStatementExecutorContext executorCtx) throws HyracksDataException, AlgebricksException {
//This function performs three tasks:
//1. Create datasets for the Channel
//2. Create and run the Channel Job
Expand Down
Expand Up @@ -69,6 +69,7 @@
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.client.IHyracksClientConnection;
Expand Down Expand Up @@ -175,8 +176,7 @@ private void initialize() throws CompilationException, HyracksDataException {
}

private Pair<JobSpecification, PrecompiledType> createProcedureJob(IStatementExecutor statementExecutor,
MetadataProvider metadataProvider, IHyracksClientConnection hcc, Stats stats)
throws Exception {
MetadataProvider metadataProvider, IHyracksClientConnection hcc, Stats stats) throws Exception {
if (getProcedureBodyStatement().getKind() == Statement.Kind.INSERT) {
if (!varList.isEmpty()) {
throw new CompilationException("Insert procedures cannot have parameters");
Expand All @@ -193,8 +193,7 @@ private Pair<JobSpecification, PrecompiledType> createProcedureJob(IStatementExe
dependencies.get(1).addAll(FunctionUtil.getFunctionDependencies(fact.createQueryRewriter(),
((Query) getProcedureBodyStatement()).getBody(), metadataProvider).get(1));
Pair<JobSpecification, PrecompiledType> pair = new Pair<>(BADJobService.compileQueryJob(statementExecutor,
metadataProvider, hcc, (Query) getProcedureBodyStatement()),
PrecompiledType.QUERY);
metadataProvider, hcc, (Query) getProcedureBodyStatement()), PrecompiledType.QUERY);
dependencies.get(0).addAll(FunctionUtil.getFunctionDependencies(fact.createQueryRewriter(),
((Query) getProcedureBodyStatement()).getBody(), metadataProvider).get(0));
return pair;
Expand All @@ -216,17 +215,16 @@ private Pair<JobSpecification, PrecompiledType> createProcedureJob(IStatementExe
}

private void setupDeployedJobSpec(EntityId entityId, JobSpecification jobSpec, IHyracksClientConnection hcc,
DeployedJobSpecEventListener listener, ResultSetId resultSetId, Stats stats)
throws Exception {
DeployedJobSpecEventListener listener, ResultSetId resultSetId, Stats stats) throws Exception {
jobSpec.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
DeployedJobSpecId deployedJobSpecId = hcc.deployJobSpec(jobSpec);
listener.setDeployedJobSpecId(deployedJobSpecId);
}

@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId,
IStatementExecutorContext executorCtx) throws HyracksDataException, AlgebricksException {
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
Expand Down
Expand Up @@ -50,6 +50,7 @@
import org.apache.asterix.translator.ConstantHelper;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
Expand Down Expand Up @@ -96,8 +97,8 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce

@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId,
IStatementExecutorContext executorCtx) throws HyracksDataException, AlgebricksException {
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
Expand All @@ -119,9 +120,8 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
if (procedure.getDuration().equals("")) {
BADJobService.runDeployedJobSpec(deployedJobSpecId, hcc, requestParameters.getHyracksDataset(),
contextRuntimeVarMap, entityId,
metadataProvider.getTxnIdFactory(), appCtx, listener, (QueryTranslator) statementExecutor);

contextRuntimeVarMap, entityId, metadataProvider.getTxnIdFactory(), appCtx, listener,
(QueryTranslator) statementExecutor);

} else {
ScheduledExecutorService ses = BADJobService.startRepetitiveDeployedJobSpec(deployedJobSpecId, hcc,
Expand All @@ -142,8 +142,7 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
}
}

private Map<byte[], byte[]> createParameterMap(Procedure procedure)
throws AsterixException, HyracksDataException {
private Map<byte[], byte[]> createParameterMap(Procedure procedure) throws AsterixException, HyracksDataException {
Map<byte[], byte[]> map = new HashMap<>();
if (procedure.getParams().size() != argList.size()) {
throw AsterixException.create(ErrorCode.COMPILATION_INVALID_PARAMETER_NUMBER,
Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
Expand Down Expand Up @@ -76,8 +77,8 @@ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationExce

@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
throws HyracksDataException, AlgebricksException {
IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId,
IStatementExecutorContext executorCtx) throws HyracksDataException, AlgebricksException {
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
Expand Down Expand Up @@ -115,9 +116,8 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
if (listener == null) {
//TODO: Channels need to better handle cluster failures
LOGGER.log(Level.SEVERE,
"Tried to drop a Deployed Job whose listener no longer exists: "
+ entityId.getExtensionName() + " " + entityId.getDataverse() + "."
+ entityId.getEntityName() + ".");
"Tried to drop a Deployed Job whose listener no longer exists: " + entityId.getExtensionName()
+ " " + entityId.getDataverse() + "." + entityId.getEntityName() + ".");
} else {
if (listener.getExecutorService() != null) {
listener.getExecutorService().shutdown();
Expand Down

0 comments on commit 40b70a1

Please sign in to comment.