Skip to content
Permalink
Browse files
Prevent case where drop channel hangs indefinitely
Change-Id: I7ed8efea454c19b2d0b86f01b196bb361d35450f
  • Loading branch information
sjaco002 committed Dec 11, 2017
1 parent 6bac498 commit 92b9f760cc27ee91756a90ab47b576b99de990a6
Showing 3 changed files with 43 additions and 33 deletions.
@@ -19,38 +19,39 @@
package org.apache.asterix.bad;

public interface BADConstants {
public static final String SubscriptionId = "subscriptionId";
public static final String BrokerName = "BrokerName";
public static final String ChannelName = "ChannelName";
public static final String ProcedureName = "ProcedureName";
public static final String DataverseName = "DataverseName";
public static final String BrokerEndPoint = "BrokerEndPoint";
public static final String DeliveryTime = "deliveryTime";
public static final String ResultId = "resultId";
public static final String ChannelExecutionTime = "channelExecutionTime";
public static final String ChannelSubscriptionsType = "ChannelSubscriptionsType";
public static final String ChannelResultsType = "ChannelResultsType";
public static final String ResultsDatasetName = "ResultsDatasetName";
public static final String SubscriptionsDatasetName = "SubscriptionsDatasetName";
public static final String CHANNEL_EXTENSION_NAME = "Channel";
public static final String PROCEDURE_KEYWORD = "Procedure";
public static final String BROKER_KEYWORD = "Broker";
public static final String RECORD_TYPENAME_BROKER = "BrokerRecordType";
public static final String RECORD_TYPENAME_CHANNEL = "ChannelRecordType";
public static final String RECORD_TYPENAME_PROCEDURE = "ProcedureRecordType";
public static final String subscriptionEnding = "Subscriptions";
public static final String resultsEnding = "Results";
public static final String BAD_METADATA_EXTENSION_NAME = "BADMetadataExtension";
public static final String BAD_DATAVERSE_NAME = "Metadata";
public static final String Duration = "Duration";
public static final String Function = "Function";
public static final String FIELD_NAME_ARITY = "Arity";
public static final String FIELD_NAME_PARAMS = "Params";
public static final String FIELD_NAME_RETURN_TYPE = "ReturnType";
public static final String FIELD_NAME_DEFINITION = "Definition";
public static final String FIELD_NAME_LANGUAGE = "Language";
String SubscriptionId = "subscriptionId";
String BrokerName = "BrokerName";
String ChannelName = "ChannelName";
String ProcedureName = "ProcedureName";
String DataverseName = "DataverseName";
String BrokerEndPoint = "BrokerEndPoint";
String DeliveryTime = "deliveryTime";
String ResultId = "resultId";
String ChannelExecutionTime = "channelExecutionTime";
String ChannelSubscriptionsType = "ChannelSubscriptionsType";
String ChannelResultsType = "ChannelResultsType";
String ResultsDatasetName = "ResultsDatasetName";
String SubscriptionsDatasetName = "SubscriptionsDatasetName";
String CHANNEL_EXTENSION_NAME = "Channel";
String PROCEDURE_KEYWORD = "Procedure";
String BROKER_KEYWORD = "Broker";
String RECORD_TYPENAME_BROKER = "BrokerRecordType";
String RECORD_TYPENAME_CHANNEL = "ChannelRecordType";
String RECORD_TYPENAME_PROCEDURE = "ProcedureRecordType";
String subscriptionEnding = "Subscriptions";
String resultsEnding = "Results";
String BAD_METADATA_EXTENSION_NAME = "BADMetadataExtension";
String BAD_DATAVERSE_NAME = "Metadata";
String Duration = "Duration";
String Function = "Function";
String FIELD_NAME_ARITY = "Arity";
String FIELD_NAME_PARAMS = "Params";
String FIELD_NAME_RETURN_TYPE = "ReturnType";
String FIELD_NAME_DEFINITION = "Definition";
String FIELD_NAME_LANGUAGE = "Language";
//To enable new Asterix TxnId for separate deployed job spec invocations
public static final byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes();
byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes();
int EXECUTOR_TIMEOUT = 20;

public enum ChannelJobType {
REPETITIVE
@@ -122,7 +122,11 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe

} else {
listener.getExecutorService().shutdown();
listener.getExecutorService().awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
if (!listener.getExecutorService().awaitTermination(BADConstants.EXECUTOR_TIMEOUT, TimeUnit.SECONDS)) {
LOGGER.log(Level.SEVERE,
"Executor Service is terminating non-gracefully for: " + entityId.getExtensionName() + " "
+ entityId.getDataverse() + "." + entityId.getEntityName());
}
DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
listener.deActivate();
activeEventHandler.unregisterListener(listener);
@@ -120,7 +120,12 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
} else {
if (listener.getExecutorService() != null) {
listener.getExecutorService().shutdown();
listener.getExecutorService().awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
if (!listener.getExecutorService().awaitTermination(BADConstants.EXECUTOR_TIMEOUT,
TimeUnit.SECONDS)) {
LOGGER.log(Level.SEVERE,
"Executor Service is terminating non-gracefully for: " + entityId.getExtensionName()
+ " " + entityId.getDataverse() + "." + entityId.getEntityName());
}
}
DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
listener.deActivate();

0 comments on commit 92b9f76

Please sign in to comment.