Skip to content
Permalink
Browse files
Redeploy channels and procedures during recovery
Use the GlobalRecoveryManager extension to redeploy channels/procedures
Restart execution of channels during recovery
Some code cleanup
Added recovery test

Change-Id: I6897ccf9cddb9ec8d10256e252ee893afe6db145
  • Loading branch information
sjaco002 committed May 21, 2018
1 parent 846ac6c commit a9693631f7d47871c3b2e5f9c4e99e13c0e577c5
Showing 50 changed files with 1,006 additions and 148 deletions.
@@ -269,6 +269,16 @@
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-data-std</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-client</artifactId>
<version>${hyracks.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-control-common</artifactId>
<version>${hyracks.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
<artifactId>algebricks-runtime</artifactId>
@@ -283,6 +293,11 @@
<artifactId>asterix-external-data</artifactId>
<version>${asterix.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.apache.asterix</groupId>
<artifactId>asterix-fuzzyjoin</artifactId>
@@ -47,7 +47,7 @@ public interface BADConstants {
String FIELD_NAME_ARITY = "Arity";
String FIELD_NAME_DEPENDENCIES = "Dependencies";
String FIELD_NAME_PARAMS = "Params";
String FIELD_NAME_RETURN_TYPE = "ReturnType";
String FIELD_NAME_TYPE = "Type";
String FIELD_NAME_DEFINITION = "Definition";
String FIELD_NAME_LANGUAGE = "Language";
String FIELD_NAME_BODY = "Body";
@@ -21,6 +21,7 @@
import java.io.StringReader;
import java.time.Instant;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
@@ -51,6 +52,7 @@
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
@@ -67,6 +69,20 @@ public class BADJobService {

private static final long millisecondTimeout = BADConstants.EXECUTOR_TIMEOUT * 1000;

public static void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec,
IHyracksClientConnection hcc, DeployedJobSpecEventListener listener, ITxnIdFactory txnIdFactory,
String duration) throws Exception {
if (channeljobSpec != null) {
channeljobSpec.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
DeployedJobSpecId deployedId = hcc.deployJobSpec(channeljobSpec);
ScheduledExecutorService ses = startRepetitiveDeployedJobSpec(deployedId, hcc, findPeriod(duration),
new HashMap<>(), entityId, txnIdFactory, listener);
listener.setDeployedJobSpecId(deployedId);
listener.setExecutorService(ses);
}

}

//Starts running a deployed job specification periodically with an interval of "period" seconds
public static ScheduledExecutorService startRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId,
IHyracksClientConnection hcc, long period, Map<byte[], byte[]> jobParameters, EntityId entityId,
@@ -93,7 +109,8 @@ public static boolean runDeployedJobSpecCheckPeriod(DeployedJobSpecId distribute
Map<byte[], byte[]> jobParameters, long period, EntityId entityId, ITxnIdFactory txnIdFactory,
DeployedJobSpecEventListener listener) throws Exception {
long executionMilliseconds =
runDeployedJobSpec(distributedId, hcc, jobParameters, entityId, txnIdFactory, null, listener, null);
runDeployedJobSpec(distributedId, hcc, null, jobParameters, entityId, txnIdFactory, null, listener,
null);
if (executionMilliseconds > period) {
LOGGER.log(Level.SEVERE,
"Periodic job for " + entityId.getExtensionName() + " " + entityId.getDataverse() + "."
@@ -106,7 +123,7 @@ public static boolean runDeployedJobSpecCheckPeriod(DeployedJobSpecId distribute
}

public static long runDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
Map<byte[], byte[]> jobParameters, EntityId entityId, ITxnIdFactory txnIdFactory,
IHyracksDataset hdc, Map<byte[], byte[]> jobParameters, EntityId entityId, ITxnIdFactory txnIdFactory,
ICcApplicationContext appCtx, DeployedJobSpecEventListener listener, QueryTranslator statementExecutor)
throws Exception {
listener.waitWhileAtState(ActivityState.SUSPENDED);
@@ -122,7 +139,7 @@ public static long runDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksC
long executionMilliseconds = Instant.now().toEpochMilli() - startTime;

if (listener.getType() == DeployedJobSpecEventListener.PrecompiledType.QUERY) {
ResultReader resultReader = new ResultReader(listener.getResultDataset(), jobId, listener.getResultId());
ResultReader resultReader = new ResultReader(hdc, jobId, new ResultSetId(0));

ResultUtil.printResults(appCtx, resultReader, statementExecutor.getSessionOutput(),
new IStatementExecutor.Stats(), null);
@@ -189,7 +206,7 @@ public static JobSpecification compilePushChannel(IStatementExecutor statementEx

public static void redeployJobSpec(EntityId entityId, String queryBodyString, MetadataProvider metadataProvider,
BADStatementExecutor badStatementExecutor, IHyracksClientConnection hcc,
IRequestParameters requestParameters) throws Exception {
IRequestParameters requestParameters, boolean useNewId) throws Exception {

ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
@@ -217,11 +234,10 @@ public static void redeployJobSpec(EntityId entityId, String queryBodyString, Me
}
} else {
//Procedures
metadataProvider.setResultSetId(listener.getResultId());
final IStatementExecutor.ResultDelivery resultDelivery =
requestParameters.getResultProperties().getDelivery();
final IHyracksDataset hdc = requestParameters.getHyracksDataset();
final IStatementExecutor.Stats stats = requestParameters.getStats();
metadataProvider.setResultSetId(new ResultSetId(0));
IStatementExecutor.ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery();
IHyracksDataset hdc = requestParameters.getHyracksDataset();
IStatementExecutor.Stats stats = requestParameters.getStats();
boolean resultsAsync = resultDelivery == IStatementExecutor.ResultDelivery.ASYNC
|| resultDelivery == IStatementExecutor.ResultDelivery.DEFERRED;
metadataProvider.setResultAsyncMode(resultsAsync);
@@ -230,7 +246,12 @@ public static void redeployJobSpec(EntityId entityId, String queryBodyString, Me
jobSpec = compileProcedureJob(badStatementExecutor, metadataProvider, hcc, hdc, stats, fStatements.get(1));

}
hcc.redeployJobSpec(listener.getDeployedJobSpecId(), jobSpec);
if (useNewId) {
DeployedJobSpecId id = hcc.deployJobSpec(jobSpec);
listener.setDeployedJobSpecId(id);
} else {
hcc.redeployJobSpec(listener.getDeployedJobSpecId(), jobSpec);
}

listener.resume();

@@ -239,13 +260,11 @@ public static void redeployJobSpec(EntityId entityId, String queryBodyString, Me
public static JobSpecification compileQueryJob(IStatementExecutor statementExecutor,
MetadataProvider metadataProvider, IHyracksClientConnection hcc, Query q) throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
JobSpecification jobSpec = null;
JobSpecification jobSpec;
try {
jobSpec = statementExecutor.rewriteCompileQuery(hcc, metadataProvider, q, null);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
} catch (Exception e) {
((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
throw e;
@@ -185,9 +185,6 @@ public void handleDatasetDropStatement(MetadataProvider metadataProvider, Statem
public void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {

//TODO: Check whether a delete or insert procedure using the index. If so, we will need to
// disallow the procedure until after the newly distributed version is ready

MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
//Allow channels to use the new index
@@ -243,13 +240,13 @@ public void handleCreateIndexStatement(MetadataProvider metadataProvider, Statem
for (Channel channel : usages.first) {
metadataProvider = new MetadataProvider(appCtx, activeDataverse);
BADJobService.redeployJobSpec(channel.getChannelId(), channel.getChannelBody(), metadataProvider, this, hcc,
requestParameters);
requestParameters, false);
metadataProvider.getLocks().unlock();
}
for (Procedure procedure : usages.second) {
metadataProvider = new MetadataProvider(appCtx, activeDataverse);
BADJobService.redeployJobSpec(procedure.getEntityId(), procedure.getBody(), metadataProvider, this, hcc,
requestParameters);
requestParameters, false);
metadataProvider.getLocks().unlock();
}

@@ -24,7 +24,6 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;

@@ -46,7 +45,6 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.expression.CallExpr;
@@ -75,7 +73,6 @@
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;

@@ -176,42 +173,44 @@ private void createDatasets(IStatementExecutor statementExecutor, MetadataProvid
new Identifier(BADConstants.BAD_DATAVERSE_NAME), subscriptionsTypeName, null, null, null,
new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true);

//Setup the results dataset
partitionFields = new ArrayList<>();
fieldNames = new ArrayList<>();
fieldNames.add(BADConstants.ResultId);
partitionFields.add(fieldNames);
idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null);
DatasetDecl createResultsDataset = new DatasetDecl(dataverseName, new Identifier(resultsTableName),
new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null,
new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true);

//Create an index on timestamp for results
CreateIndexStatement createTimeIndex = new CreateIndexStatement();
createTimeIndex.setDatasetName(new Identifier(resultsTableName));
createTimeIndex.setDataverseName(dataverseName);
createTimeIndex.setIndexName(new Identifier(resultsTableName + "TimeIndex"));
createTimeIndex.setIfNotExists(false);
createTimeIndex.setIndexType(IndexType.BTREE);
createTimeIndex.setEnforced(false);
createTimeIndex.setGramLength(0);
List<String> fNames = new ArrayList<>();
fNames.add(BADConstants.ChannelExecutionTime);
Pair<List<String>, IndexedTypeExpression> fields = new Pair<>(fNames, null);
createTimeIndex.addFieldExprPair(fields);
createTimeIndex.addFieldIndexIndicator(0);


//Run both statements to create datasets
((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
hcc, null);
metadataProvider.getLocks().reset();
((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, hcc,
null);
metadataProvider.getLocks().reset();

//Create a time index for the results
((QueryTranslator) statementExecutor).handleCreateIndexStatement(metadataProvider, createTimeIndex, hcc, null);
if (!push) {
//Setup the results dataset
partitionFields = new ArrayList<>();
fieldNames = new ArrayList<>();
fieldNames.add(BADConstants.ResultId);
partitionFields.add(fieldNames);
idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null);
DatasetDecl createResultsDataset = new DatasetDecl(dataverseName, new Identifier(resultsTableName),
new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, new HashMap<>(),
DatasetType.INTERNAL, idd, null, true);

//Create an index on timestamp for results
CreateIndexStatement createTimeIndex = new CreateIndexStatement();
createTimeIndex.setDatasetName(new Identifier(resultsTableName));
createTimeIndex.setDataverseName(dataverseName);
createTimeIndex.setIndexName(new Identifier(resultsTableName + "TimeIndex"));
createTimeIndex.setIfNotExists(false);
createTimeIndex.setIndexType(IndexType.BTREE);
createTimeIndex.setEnforced(false);
createTimeIndex.setGramLength(0);
List<String> fNames = new ArrayList<>();
fNames.add(BADConstants.ChannelExecutionTime);
Pair<List<String>, IndexedTypeExpression> fields = new Pair<>(fNames, null);
createTimeIndex.addFieldExprPair(fields);
createTimeIndex.addFieldIndexIndicator(0);
metadataProvider.getLocks().reset();
((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset,
hcc, null);
metadataProvider.getLocks().reset();

//Create a time index for the results
((QueryTranslator) statementExecutor).handleCreateIndexStatement(metadataProvider, createTimeIndex, hcc,
null);

}

}

@@ -257,18 +256,6 @@ private JobSpecification createChannelJob(IStatementExecutor statementExecutor,
hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null);
}

private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc,
DeployedJobSpecEventListener listener, ITxnIdFactory txnIdFactory) throws Exception {
if (channeljobSpec != null) {
channeljobSpec.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
DeployedJobSpecId destributedId = hcc.deployJobSpec(channeljobSpec);
ScheduledExecutorService ses = BADJobService.startRepetitiveDeployedJobSpec(destributedId, hcc,
BADJobService.findPeriod(duration), new HashMap<>(), entityId, txnIdFactory, listener);
listener.storeDistributedInfo(destributedId, ses, null, null);
}

}

@Override
public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
IRequestParameters requestContext, MetadataProvider metadataProvider, int resultSetId)
@@ -283,15 +270,15 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
dataverseName = new Identifier(((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName));
dataverse = dataverseName.getValue();
subscriptionsTableName = channelName + BADConstants.subscriptionEnding;
resultsTableName = channelName + BADConstants.resultsEnding;
resultsTableName = push ? "" : channelName + BADConstants.resultsEnding;

EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
boolean alreadyActive = false;
Channel channel = null;
Channel channel;

MetadataTransactionContext mdTxnCtx = null;
try {
@@ -313,7 +300,7 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsTableName) != null) {
throw new AsterixException("The channel name:" + channelName + " is not available.");
}
if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsTableName) != null) {
if (!push && MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsTableName) != null) {
throw new AsterixException("The channel name:" + channelName + " is not available.");
}
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
@@ -330,12 +317,12 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
// Now we subscribe
if (listener == null) {
listener = new DeployedJobSpecEventListener(appCtx, entityId,
push ? PrecompiledType.PUSH_CHANNEL : PrecompiledType.CHANNEL, null,
"BadListener");
push ? PrecompiledType.PUSH_CHANNEL : PrecompiledType.CHANNEL);
activeEventHandler.registerListener(listener);
}

setupExecutorJob(entityId, channeljobSpec, hcc, listener, metadataProvider.getTxnIdFactory());
BADJobService.setupExecutorJob(entityId, channeljobSpec, hcc, listener, metadataProvider.getTxnIdFactory(),
duration);
channel = new Channel(dataverse, channelName.getValue(), subscriptionsTableName, resultsTableName, function,
duration, null, body);

0 comments on commit a969363

Please sign in to comment.