Skip to content
Permalink
Browse files
Separate Predistributed Jobs from other Active Jobs
Change-Id: I18de53d55bcd2b423b9c8f4521fc8c3db1f432e1
  • Loading branch information
westmann committed Jul 26, 2017
1 parent 4345dff commit 83f6d53b025a81d8deba750f74b8ce9d03607961
Showing 14 changed files with 209 additions and 100 deletions.
@@ -33,7 +33,6 @@ public class BADQueryTranslatorFactory extends DefaultStatementExecutorFactory {
@Override
public QueryTranslator create(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output,
ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider) {
return new BADStatementExecutor(appCtx, statements, output, compilationProvider, storageComponentProvider,
executorService);
return new BADStatementExecutor(appCtx, statements, output, compilationProvider, executorService);
}
}
@@ -28,7 +28,6 @@
import org.apache.asterix.bad.metadata.Broker;
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.bad.metadata.Procedure;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
@@ -44,9 +43,8 @@
public class BADStatementExecutor extends QueryTranslator {

public BADStatementExecutor(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output,
ILangCompilationProvider compliationProvider, IStorageComponentProvider storageComponentProvider,
ExecutorService executorService) {
super(appCtx, statements, output, compliationProvider, storageComponentProvider, executorService);
ILangCompilationProvider compliationProvider, ExecutorService executorService) {
super(appCtx, statements, output, compliationProvider, executorService);
}

@Override
@@ -59,8 +57,7 @@ protected void handleDataverseDropStatement(MetadataProvider metadataProvider, S
metadataProvider.setMetadataTxnContext(mdTxnCtx);
Identifier dvId = ((DataverseDropStatement) stmt).getDataverseName();
List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue());
MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse(),
metadataProvider.getStorageComponentProvider());
MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse());
tempMdProvider.setConfig(metadataProvider.getConfig());
for (Broker broker : brokers) {
tempMdProvider.getLocks().reset();
@@ -18,10 +18,9 @@
*/
package org.apache.asterix.bad.lang.statement;

import org.apache.asterix.active.ActiveJobNotificationHandler;
import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.bad.lang.BADLangExtension;
@@ -91,10 +90,9 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
PrecompiledJobEventListener listener =
(PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
Channel channel = null;

MetadataTransactionContext mdTxnCtx = null;
@@ -115,14 +113,13 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
listener.getExecutorService().shutdownNow();
JobId hyracksJobId = listener.getJobId();
listener.deActivate();
activeEventHandler.removeListener(listener);
activeEventHandler.unregisterListener(listener);
if (hyracksJobId != null) {
hcc.destroyJob(hyracksJobId);
}

//Create a metadata provider to use in nested jobs.
MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse(),
metadataProvider.getStorageComponentProvider());
MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse());
tempMdProvider.setConfig(metadataProvider.getConfig());
//Drop the Channel Datasets
//TODO: Need to find some way to handle if this fails.
@@ -186,7 +186,7 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
subscriptionTuple.setVarCounter(varCounter);

MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
metadataProvider.getDefaultDataverse());
tempMdProvider.setConfig(metadataProvider.getConfig());

if (subscriptionId == null) {
@@ -144,7 +144,7 @@ public void handle(IStatementExecutor statementExecutor, MetadataProvider metada
SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
delete.accept(visitor, null);
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
metadataProvider.getDefaultDataverse());
tempMdProvider.setConfig(metadataProvider.getConfig());
((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -21,15 +21,17 @@
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.StringReader;
import java.util.*;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.asterix.active.ActiveJobNotificationHandler;
import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.bad.ChannelJobService;
@@ -42,6 +44,7 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.metadata.IDataset;
import org.apache.asterix.lang.common.base.Expression;
@@ -56,7 +59,6 @@
import org.apache.asterix.lang.common.statement.SetStatement;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -67,7 +69,6 @@
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.constraints.Constraint;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobFlag;
@@ -131,11 +132,13 @@ public InsertStatement getChannelResultsInsertQuery() {
return channelResultsInsertQuery;
}

@Override public byte getCategory() {
@Override
public byte getCategory() {
return Category.DDL;
}

@Override public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
@Override
public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
return null;
}

@@ -160,7 +163,8 @@ public void initialize(MetadataTransactionContext mdTxnCtx, String subscriptions

}

@Override public byte getKind() {
@Override
public byte getKind() {
return Kind.EXTENSION;
}

@@ -188,14 +192,13 @@ private void createDatasets(IStatementExecutor statementExecutor, Identifier sub
fieldNames.add(BADConstants.ResultId);
partitionFields.add(fieldNames);
idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
DatasetDecl createResultsDataset =
new DatasetDecl(new Identifier(dataverse), resultsName, new Identifier(BADConstants.BAD_DATAVERSE_NAME),
resultsTypeName, null, null, null, null, new HashMap<String, String>(),
new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
DatasetDecl createResultsDataset = new DatasetDecl(new Identifier(dataverse), resultsName,
new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, null,
new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);

//Run both statements to create datasets
((QueryTranslator) statementExecutor)
.handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset, hcc);
((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
hcc);
metadataProvider.getLocks().reset();
((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, hcc);

@@ -231,9 +234,8 @@ private JobSpecification createChannelJob(IStatementExecutor statementExecutor,
SetStatement ss = (SetStatement) fStatements.get(0);
metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());

return ((QueryTranslator) statementExecutor)
.handleInsertUpsertStatement(metadataProvider, fStatements.get(1), hcc, hdc, ResultDelivery.ASYNC, null,
stats, true, null, null);
return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1),
hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null);
}

private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc,
@@ -244,15 +246,15 @@ private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec
if (predistributed) {
jobId = hcc.distributeJob(channeljobSpec);
}
ScheduledExecutorService ses = ChannelJobService
.startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class), jobId, hcc,
ChannelJobService.findPeriod(duration));
ScheduledExecutorService ses = ChannelJobService.startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class),
jobId, hcc, ChannelJobService.findPeriod(duration));
listener.storeDistributedInfo(jobId, ses, null);
}

}

@Override public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
@Override
public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {

@@ -271,10 +273,9 @@ private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec
Identifier resultsName = new Identifier(channelName + BADConstants.resultsEnding);
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
PrecompiledJobEventListener listener =
(PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
boolean alreadyActive = false;
Channel channel = null;

@@ -287,7 +288,7 @@ private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec
throw new AlgebricksException("A channel with this name " + channelName + " already exists.");
}
if (listener != null) {
alreadyActive = listener.isEntityActive();
alreadyActive = listener.isActive();
}
if (alreadyActive) {
throw new AsterixException("Channel " + channelName + " is already running");
@@ -304,15 +305,14 @@ private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec
throw new AsterixException("The channel name:" + channelName + " is not available.");
}
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
metadataProvider.getDefaultDataverse());
tempMdProvider.setConfig(metadataProvider.getConfig());
//Create Channel Datasets
createDatasets(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc, dataverse);
tempMdProvider.getLocks().reset();
//Create Channel Internal Job
JobSpecification channeljobSpec =
createChannelJob(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc, stats,
dataverse);
JobSpecification channeljobSpec = createChannelJob(statementExecutor, subscriptionsName, resultsName,
tempMdProvider, hcc, hdc, stats, dataverse);

// Now we subscribe
if (listener == null) {

0 comments on commit 83f6d53

Please sign in to comment.