From e305f1155861da0e80c40ebba1df7e44e22bb64c Mon Sep 17 00:00:00 2001 From: Xikui Wang Date: Sat, 3 Feb 2018 09:26:24 -0800 Subject: [PATCH] [NO ISSUE][BAD] DeployedJobEventListener and test case fix 1. The concurrent execution test case sometimes failed at result short. The reason is the deployed job is removed before all invocations finished. Added a sleep to the test case, also added running instance check when dropping the procedure. 2. The DeployedJobEventListner was not registered with ActiveNotificationHandler. Now it's registered so we can bind multiple jobs with the event listener in the future. 3. Test cases refactored to make the overall test time shorter. 4. Add `wait-for-completion-procedure` for several test cases to make sure the result is consistent. Change-Id: I12ecf5c3c8f5a5c58fefa80673565c0ae3d1c9e6 --- asterix-bad/pom.xml | 3 +- .../statement/CreateProcedureStatement.java | 1 + .../statement/ExecuteProcedureStatement.java | 28 ++++++++----- .../statement/ProcedureDropStatement.java | 8 +++- .../DeployedJobSpecEventListener.java | 39 ++++++++----------- .../asterix/bad/test/BADExecutionTest.java | 3 +- .../ten_minute_channel.1.ddl.sqlpp | 2 +- .../ten_minute_channel.4.sleep.sqlpp | 2 +- .../ten_minute_channel.5.query.sqlpp | 2 +- .../concurrent_procedure.3.sleep.sqlpp | 25 ++++++++++++ ...sqlpp => concurrent_procedure.4.ddl.sqlpp} | 0 ...lpp => concurrent_procedure.5.query.sqlpp} | 0 .../delete_procedure.3.update.sqlpp | 1 + ...e_procedure_with_parameters.3.update.sqlpp | 1 + .../insert_procedure.2.update.sqlpp | 1 + .../insert_procedure.3.update.sqlpp | 1 + .../insert_procedure.4.update.sqlpp | 1 + 17 files changed, 79 insertions(+), 39 deletions(-) create mode 100644 asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.3.sleep.sqlpp rename asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/{concurrent_procedure.3.ddl.sqlpp => concurrent_procedure.4.ddl.sqlpp} (100%) rename asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/{concurrent_procedure.4.query.sqlpp => concurrent_procedure.5.query.sqlpp} (100%) diff --git a/asterix-bad/pom.xml b/asterix-bad/pom.xml index e94b912..291697d 100644 --- a/asterix-bad/pom.xml +++ b/asterix-bad/pom.xml @@ -28,7 +28,8 @@ 0.9.3-SNAPSHOT 0.3.3-SNAPSHOT true - + ${root.dir}/../../asterix-app/src/test/resources/log4j2-test.xml + diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java index cd60b1a..0908edb 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java @@ -246,6 +246,7 @@ private Pair createProcedureJob(IStatementExe private void setupDeployedJobSpec(EntityId entityId, JobSpecification jobSpec, IHyracksClientConnection hcc, DeployedJobSpecEventListener listener, ResultSetId resultSetId, IHyracksDataset hdc, Stats stats) throws Exception { + jobSpec.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId); DeployedJobSpecId deployedJobSpecId = hcc.deployJobSpec(jobSpec); listener.storeDistributedInfo(deployedJobSpecId, null, hdc, resultSetId); } diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java index b6c66dc..7ab7f95 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java @@ -18,12 +18,6 @@ */ package org.apache.asterix.bad.lang.statement; -import java.io.DataOutput; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; - import org.apache.asterix.active.DeployedJobService; import org.apache.asterix.active.EntityId; import org.apache.asterix.algebra.extension.IExtensionStatement; @@ -63,8 +57,16 @@ import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import java.io.DataOutput; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; + public class ExecuteProcedureStatement implements IExtensionStatement { + public static final String WAIT_FOR_COMPLETION = "wait-for-completion-procedure"; + private final String dataverseName; private final String procedureName; private final int arity; @@ -118,6 +120,7 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe Procedure procedure = null; MetadataTransactionContext mdTxnCtx = null; + JobId jobId; try { mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); txnActive = true; @@ -130,12 +133,18 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe if (procedure.getDuration().equals("")) { //Add the Asterix Transaction Id to the map + long newTxId = TxnIdFactory.create().getId(); contextRuntimeVarMap.put(BADConstants.TRANSACTION_ID_PARAMETER_NAME, - String.valueOf(TxnIdFactory.create().getId()).getBytes()); - JobId jobId = hcc.startJob(deployedJobSpecId, contextRuntimeVarMap); + String.valueOf(newTxId).getBytes()); + jobId = hcc.startJob(deployedJobSpecId, contextRuntimeVarMap); - if (listener.getType() == PrecompiledType.QUERY) { + boolean wait = Boolean.parseBoolean(metadataProvider.getConfig().get( + ExecuteProcedureStatement.WAIT_FOR_COMPLETION)); + if (wait || listener.getType() == PrecompiledType.QUERY) { hcc.waitForCompletion(jobId); + } + + if (listener.getType() == PrecompiledType.QUERY) { ResultReader resultReader = new ResultReader(listener.getResultDataset(), jobId, listener.getResultId()); @@ -150,7 +159,6 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe listener.storeDistributedInfo(deployedJobSpecId, ses, listener.getResultDataset(), listener.getResultId()); } - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); txnActive = false; } catch (Exception e) { diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java index 1555bea..f0eaced 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java @@ -93,7 +93,13 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe boolean txnActive = false; EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName()); DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId); - Procedure procedure = null; + + if (listener.isActive()) { + throw new AlgebricksException("Cannot drop running procedure. There are " + listener.getRunningInstance() + + " running instances."); + } + + Procedure procedure; MetadataTransactionContext mdTxnCtx = null; try { diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java index 13f9e0d..070c148 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java @@ -18,19 +18,12 @@ */ package org.apache.asterix.bad.metadata; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ScheduledExecutorService; - import org.apache.asterix.active.ActiveEvent; import org.apache.asterix.active.ActiveEvent.Kind; import org.apache.asterix.active.ActivityState; import org.apache.asterix.active.EntityId; import org.apache.asterix.active.IActiveEntityEventSubscriber; import org.apache.asterix.active.IActiveEntityEventsListener; -import org.apache.asterix.active.message.ActivePartitionMessage; -import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.metadata.IDataset; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; @@ -42,6 +35,11 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; + public class DeployedJobSpecEventListener implements IActiveEntityEventsListener { private static final Logger LOGGER = Logger.getLogger(DeployedJobSpecEventListener.class); @@ -62,11 +60,11 @@ enum RequestState { private DeployedJobSpecId deployedJobSpecId; private ScheduledExecutorService executorService = null; - private ResultReader resultReader; private final PrecompiledType type; private IHyracksDataset hdc; private ResultSetId resultSetId; + // members protected volatile ActivityState state; protected JobId jobId; @@ -79,7 +77,7 @@ enum RequestState { protected RequestState statsRequestState; protected final String runtimeName; protected final AlgebricksAbsolutePartitionConstraint locations; - protected int numRegistered; + private int runningInstance; public DeployedJobSpecEventListener(ICcApplicationContext appCtx, EntityId entityId, PrecompiledType type, AlgebricksAbsolutePartitionConstraint locations, String runtimeName) { @@ -92,7 +90,6 @@ public DeployedJobSpecEventListener(ICcApplicationContext appCtx, EntityId entit this.stats = "{\"Stats\":\"N/A\"}"; this.runtimeName = runtimeName; this.locations = locations; - this.numRegistered = 0; state = ActivityState.STOPPED; this.type = type; } @@ -110,15 +107,6 @@ public DeployedJobSpecId getDeployedJobSpecId() { return deployedJobSpecId; } - protected synchronized void handle(ActivePartitionMessage message) { - if (message.getEvent() == ActivePartitionMessage.Event.RUNTIME_REGISTERED) { - numRegistered++; - if (numRegistered == locations.getLocations().length) { - state = ActivityState.RUNNING; - } - } - } - @Override public EntityId getEntityId() { return entityId; @@ -182,10 +170,6 @@ public AlgebricksAbsolutePartitionConstraint getLocations() { return locations; } - public ResultReader getResultReader() { - return resultReader; - } - public PrecompiledType getType() { return type; } @@ -234,6 +218,7 @@ private synchronized void handleJobStartEvent(ActiveEvent message) throws Except if (LOGGER.isInfoEnabled()) { LOGGER.info("Channel Job started for " + entityId); } + runningInstance++; state = ActivityState.RUNNING; } @@ -241,6 +226,10 @@ private synchronized void handleJobFinishEvent(ActiveEvent message) throws Excep if (LOGGER.isInfoEnabled()) { LOGGER.info("Channel Job finished for " + entityId); } + runningInstance--; + if (runningInstance == 0) { + state = ActivityState.STOPPED; + } } @Override @@ -266,4 +255,8 @@ public Exception getJobFailure() { public String getDisplayName() throws HyracksDataException { return this.entityId.toString(); } + + public int getRunningInstance() { + return runningInstance; + } } diff --git a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java index bc24e9f..9701b2b 100644 --- a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java +++ b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.logging.Logger; +import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil; import org.apache.asterix.common.config.TransactionProperties; import org.apache.asterix.test.common.TestExecutor; import org.apache.asterix.test.runtime.ExecutionTestUtil; @@ -61,7 +62,7 @@ public class BADExecutionTest { public static void setUp() throws Exception { File outdir = new File(PATH_ACTUAL); outdir.mkdirs(); - ExecutionTestUtil.setUp(cleanupOnStart, TEST_CONFIG_FILE_NAME); + ExecutionTestUtil.setUp(cleanupOnStart, TEST_CONFIG_FILE_NAME, new AsterixHyracksIntegrationUtil(), false, null); } @AfterClass diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp index 11b7b33..a21a4be 100644 --- a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp +++ b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp @@ -43,4 +43,4 @@ create function RoomOccupants(room) { create broker brokerA at "http://www.notifyA.com"; -create repetitive channel roomRecords using RoomOccupants@1 period duration("PT30S"); +create repetitive channel roomRecords using RoomOccupants@1 period duration("PT10S"); diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp index d5f4290..c750707 100644 --- a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp +++ b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp @@ -22,4 +22,4 @@ * Date : Sep 2016 * Author : Steven Jacobs */ -630000 +110000 diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp index cfe92c9..b9282fe 100644 --- a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp +++ b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp @@ -25,4 +25,4 @@ use channels; -(select value count(result) from roomRecordsResults)[0] > 19; \ No newline at end of file +(select value count(result) from roomRecordsResults)[0] > 9; \ No newline at end of file diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.3.sleep.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.3.sleep.sqlpp new file mode 100644 index 0000000..c938d6c --- /dev/null +++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.3.sleep.sqlpp @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* +* Description : Simple Insert Procedure +* Expected Res : 3 +* Date : Jan 2017 +* Author : Steven Jacobs +*/ +3000 diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.3.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.4.ddl.sqlpp similarity index 100% rename from asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.3.ddl.sqlpp rename to asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.4.ddl.sqlpp diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.4.query.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.5.query.sqlpp similarity index 100% rename from asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.4.query.sqlpp rename to asterix-bad/src/test/resources/runtimets/queries/procedure/concurrent_procedure/concurrent_procedure.5.query.sqlpp diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.3.update.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.3.update.sqlpp index dd9e350..f72c921 100644 --- a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.3.update.sqlpp +++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.3.update.sqlpp @@ -24,4 +24,5 @@ */ use channels; +set `wait-for-completion-procedure` "true"; execute deleteAll(); diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.3.update.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.3.update.sqlpp index 8d03794..7dc0661 100644 --- a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.3.update.sqlpp +++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.3.update.sqlpp @@ -24,4 +24,5 @@ */ use channels; +set `wait-for-completion-procedure` "true"; execute deleteSome(108,"jacob"); \ No newline at end of file diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.2.update.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.2.update.sqlpp index 8610395..8d55ccd 100644 --- a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.2.update.sqlpp +++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.2.update.sqlpp @@ -24,4 +24,5 @@ */ use channels; +set `wait-for-completion-procedure` "true"; execute addMe(); diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.3.update.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.3.update.sqlpp index 8610395..8d55ccd 100644 --- a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.3.update.sqlpp +++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.3.update.sqlpp @@ -24,4 +24,5 @@ */ use channels; +set `wait-for-completion-procedure` "true"; execute addMe(); diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.4.update.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.4.update.sqlpp index 8610395..8d55ccd 100644 --- a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.4.update.sqlpp +++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.4.update.sqlpp @@ -24,4 +24,5 @@ */ use channels; +set `wait-for-completion-procedure` "true"; execute addMe();