Skip to content
Permalink
Browse files
Added parameterized procedures
Add tests, including concurrent/parameterized execution
delete and query procedures can both use parameters
these will use Asterix job parameters to assign at runtime
Add timeStamp index to channel results
Cleanup result code for query procedures
Prevent repetitive jobs from executing
multiple iterations concurrently

Change-Id: I999879b1cae0de179a1d3c232fa940228979f4fe
  • Loading branch information
sjaco002 committed Nov 14, 2017
1 parent 1f36ec7 commit 8b53ce556eaac45c6698e448687f9e77eecc85e8
Showing 46 changed files with 1,280 additions and 315 deletions.
@@ -243,6 +243,11 @@
<artifactId>asterix-lang-sqlpp</artifactId>
<version>${asterix.version}</version>
</dependency>
<dependency>
<groupId>org.apache.asterix</groupId>
<artifactId>asterix-lang-common</artifactId>
<version>${asterix.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
@@ -288,6 +293,11 @@
<artifactId>asterix-external-data</artifactId>
<version>${asterix.version}</version>
</dependency>
<dependency>
<groupId>org.apache.asterix</groupId>
<artifactId>asterix-transactions</artifactId>
<version>${asterix.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
@@ -49,6 +49,8 @@ public interface BADConstants {
public static final String FIELD_NAME_RETURN_TYPE = "ReturnType";
public static final String FIELD_NAME_DEFINITION = "Definition";
public static final String FIELD_NAME_LANGUAGE = "Language";
//To enable new Asterix TxnId for separate deployed job spec invocations
public static final byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes();

public enum ChannelJobType {
REPETITIVE
@@ -23,66 +23,25 @@
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.EnumSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.asterix.active.EntityId;
import org.apache.asterix.om.base.AOrderedList;
import org.apache.asterix.om.base.AUUID;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;

/**
* Provides functionality for running channel jobs and communicating with Brokers
* Provides functionality for channel jobs and communicating with Brokers
*/
public class ChannelJobService {

private static final Logger LOGGER = Logger.getLogger(ChannelJobService.class.getName());

public static ScheduledExecutorService startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId,
IHyracksClientConnection hcc, long duration)
throws Exception {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
executeJob(jobSpec, jobFlags, jobId, hcc);
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Channel Job Failed to run.", e);
}
}
}, duration, duration, TimeUnit.MILLISECONDS);
return scheduledExecutorService;
}

public static void executeJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId,
IHyracksClientConnection hcc)
throws Exception {
LOGGER.info("Executing Channel Job");
if (jobId == null) {
hcc.startJob(jobSpec, jobFlags);
} else {
hcc.startJob(jobId);
}
}

public static void runChannelJob(JobSpecification channeljobSpec, IHyracksClientConnection hcc) throws Exception {
JobId jobId = hcc.startJob(channeljobSpec);
hcc.waitForCompletion(jobId);
}

public static void sendBrokerNotificationsForChannel(EntityId activeJobId, String brokerEndpoint,
AOrderedList subscriptionIds, String channelExecutionTime) throws HyracksDataException {
String formattedString;
formattedString = formatJSON(activeJobId, subscriptionIds, channelExecutionTime);
formattedString = formatJSON(activeJobId, subscriptionIds, channelExecutionTime);
sendMessage(brokerEndpoint, formattedString);
}

@@ -18,14 +18,18 @@
*/
package org.apache.asterix.bad.lang.statement;

import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.bad.lang.BADLangExtension;
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.lang.common.statement.DropDatasetStatement;
@@ -38,11 +42,11 @@
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.DeployedJobSpecId;

public class ChannelDropStatement implements IExtensionStatement {
private static final Logger LOGGER = Logger.getLogger(ChannelDropStatement.class.getName());

private final Identifier dataverseName;
private final Identifier channelName;
@@ -91,7 +95,7 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
Channel channel = null;

MetadataTransactionContext mdTxnCtx = null;
@@ -109,22 +113,30 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
}
}

listener.getExecutorService().shutdownNow();
JobId hyracksJobId = listener.getJobId();
listener.deActivate();
activeEventHandler.unregisterListener(listener);
if (hyracksJobId != null) {
hcc.destroyJob(hyracksJobId);
// wait for job completion to release any resources to be dropped
ensureJobDestroyed(hcc, hyracksJobId);
if (listener == null) {
//TODO: Channels need to better handle cluster failures
LOGGER.log(Level.SEVERE,
"Tried to drop a Deployed Job whose listener no longer exists: "
+ entityId.getExtensionName() + " " + entityId.getDataverse() + "."
+ entityId.getEntityName() + ".");

} else {
listener.getExecutorService().shutdown();
listener.getExecutorService().awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
listener.deActivate();
activeEventHandler.unregisterListener(listener);
if (deployedJobSpecId != null) {
hcc.undeployJobSpec(deployedJobSpecId);
}
}

//Create a metadata provider to use in nested jobs.
MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse());
tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
//Drop the Channel Datasets
//TODO: Need to find some way to handle if this fails.
//TODO: Prevent datasets for Channels from being dropped elsewhere

DropDatasetStatement dropStmt = new DropDatasetStatement(new Identifier(dataverse),
new Identifier(channel.getResultsDatasetName()), true);
((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
@@ -147,19 +159,4 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
}
}

private void ensureJobDestroyed(IHyracksClientConnection hcc, JobId hyracksJobId) throws Exception {
try {
hcc.waitForCompletion(hyracksJobId);
} catch (Exception e) {
// if the job has already been destroyed, it is safe to complete
if (e instanceof HyracksDataException) {
HyracksDataException hde = (HyracksDataException) e;
if (hde.getComponent().equals(ErrorCode.HYRACKS)
&& hde.getErrorCode() == ErrorCode.JOB_HAS_BEEN_CLEARED_FROM_HISTORY) {
return;
}
}
throw e;
}
}
}
@@ -22,13 +22,13 @@
import java.io.DataOutputStream;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.asterix.active.DeployedJobService;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.app.active.ActiveNotificationHandler;
@@ -38,9 +38,10 @@
import org.apache.asterix.bad.lang.BADLangExtension;
import org.apache.asterix.bad.lang.BADParserFactory;
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
@@ -50,8 +51,10 @@
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.expression.CallExpr;
import org.apache.asterix.lang.common.expression.IndexedTypeExpression;
import org.apache.asterix.lang.common.expression.LiteralExpr;
import org.apache.asterix.lang.common.literal.StringLiteral;
import org.apache.asterix.lang.common.statement.CreateIndexStatement;
import org.apache.asterix.lang.common.statement.DatasetDecl;
import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
import org.apache.asterix.lang.common.statement.InsertStatement;
@@ -69,11 +72,11 @@
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;

@@ -89,16 +92,14 @@ public class CreateChannelStatement implements IExtensionStatement {
private InsertStatement channelResultsInsertQuery;
private String subscriptionsTableName;
private String resultsTableName;
private boolean distributed;

public CreateChannelStatement(Identifier dataverseName, Identifier channelName, FunctionSignature function,
Expression period, boolean distributed) {
Expression period) {
this.channelName = channelName;
this.dataverseName = dataverseName;
this.function = function;
this.period = (CallExpr) period;
this.duration = "";
this.distributed = distributed;
}

public Identifier getDataverseName() {
@@ -196,12 +197,32 @@ private void createDatasets(IStatementExecutor statementExecutor, Identifier sub
new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, null,
new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);

//Create an index on timestamp for results
CreateIndexStatement createTimeIndex = new CreateIndexStatement();
createTimeIndex.setDatasetName(resultsName);
createTimeIndex.setDataverseName(new Identifier(dataverse));
createTimeIndex.setIndexName(new Identifier(resultsName + "TimeIndex"));
createTimeIndex.setIfNotExists(false);
createTimeIndex.setIndexType(IndexType.BTREE);
createTimeIndex.setEnforced(false);
createTimeIndex.setGramLength(0);
List<String> fNames = new ArrayList<>();
fNames.add(BADConstants.ChannelExecutionTime);
Pair<List<String>, IndexedTypeExpression> fields = new Pair<>(fNames, null);
createTimeIndex.addFieldExprPair(fields);
createTimeIndex.addFieldIndexIndicator(0);


//Run both statements to create datasets
((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
hcc, null);
metadataProvider.getLocks().reset();
((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, hcc,
null);
metadataProvider.getLocks().reset();

//Create a time index for the results
((QueryTranslator) statementExecutor).handleCreateIndexStatement(metadataProvider, createTimeIndex, hcc, null);

}

@@ -240,16 +261,12 @@ private JobSpecification createChannelJob(IStatementExecutor statementExecutor,
}

private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc,
PrecompiledJobEventListener listener, boolean predistributed) throws Exception {
DeployedJobSpecEventListener listener) throws Exception {
if (channeljobSpec != null) {
//TODO: Find a way to fix optimizer tests so we don't need this check
JobId jobId = null;
if (predistributed) {
jobId = hcc.distributeJob(channeljobSpec);
}
ScheduledExecutorService ses = ChannelJobService.startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class),
jobId, hcc, ChannelJobService.findPeriod(duration));
listener.storeDistributedInfo(jobId, ses, null);
DeployedJobSpecId destributedId = hcc.deployJobSpec(channeljobSpec);
ScheduledExecutorService ses = DeployedJobService.startRepetitiveDeployedJobSpec(destributedId, hcc,
ChannelJobService.findPeriod(duration), new HashMap<>(), entityId);
listener.storeDistributedInfo(destributedId, ses, null, null);
}

}
@@ -275,7 +292,7 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
boolean alreadyActive = false;
Channel channel = null;

@@ -322,16 +339,12 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsName.getValue()));
datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue()));
//TODO: Add datasets used by channel function
listener = new PrecompiledJobEventListener(appCtx, entityId, PrecompiledType.CHANNEL, datasets, null,
listener = new DeployedJobSpecEventListener(appCtx, entityId, PrecompiledType.CHANNEL, datasets, null,
"BadListener");
activeEventHandler.registerListener(listener);
}

if (distributed) {
setupExecutorJob(entityId, channeljobSpec, hcc, listener, true);
} else {
setupExecutorJob(entityId, channeljobSpec, hcc, listener, false);
}
setupExecutorJob(entityId, channeljobSpec, hcc, listener);

MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);

0 comments on commit 8b53ce5

Please sign in to comment.