Skip to content
Permalink
Browse files
[NO ISSUE][BAD] DeployedJobEventListener and test case fix
1. The concurrent execution test case sometimes failed at result short.
The reason is the deployed job is removed before all invocations
finished. Added a sleep to the test case, also added running instance
check when dropping the procedure.
2. The DeployedJobEventListner was not registered with
ActiveNotificationHandler. Now it's registered so we can bind multiple
jobs with the event listener in the future.
3. Test cases refactored to make the overall test time shorter.
4. Add `wait-for-completion-procedure` for several test cases to make
sure the result is consistent.

Change-Id: I12ecf5c3c8f5a5c58fefa80673565c0ae3d1c9e6
  • Loading branch information
idleft committed Feb 3, 2018
1 parent 81ee58e commit e305f1155861da0e80c40ebba1df7e44e22bb64c
Showing 17 changed files with 79 additions and 39 deletions.
@@ -28,7 +28,8 @@
<asterix.version>0.9.3-SNAPSHOT</asterix.version>
<hyracks.version>0.3.3-SNAPSHOT</hyracks.version>
<source-format.skip>true</source-format.skip>
</properties>
<testLog4jConfigFile>${root.dir}/../../asterix-app/src/test/resources/log4j2-test.xml</testLog4jConfigFile>
</properties>
<build>
<plugins>
<plugin>
@@ -246,6 +246,7 @@ private Pair<JobSpecification, PrecompiledType> createProcedureJob(IStatementExe
private void setupDeployedJobSpec(EntityId entityId, JobSpecification jobSpec, IHyracksClientConnection hcc,
DeployedJobSpecEventListener listener, ResultSetId resultSetId, IHyracksDataset hdc, Stats stats)
throws Exception {
jobSpec.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
DeployedJobSpecId deployedJobSpecId = hcc.deployJobSpec(jobSpec);
listener.storeDistributedInfo(deployedJobSpecId, null, hdc, resultSetId);
}
@@ -18,12 +18,6 @@
*/
package org.apache.asterix.bad.lang.statement;

import java.io.DataOutput;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.asterix.active.DeployedJobService;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
@@ -63,8 +57,16 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;

import java.io.DataOutput;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;

public class ExecuteProcedureStatement implements IExtensionStatement {

public static final String WAIT_FOR_COMPLETION = "wait-for-completion-procedure";

private final String dataverseName;
private final String procedureName;
private final int arity;
@@ -118,6 +120,7 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
Procedure procedure = null;

MetadataTransactionContext mdTxnCtx = null;
JobId jobId;
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
txnActive = true;
@@ -130,12 +133,18 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
if (procedure.getDuration().equals("")) {

//Add the Asterix Transaction Id to the map
long newTxId = TxnIdFactory.create().getId();
contextRuntimeVarMap.put(BADConstants.TRANSACTION_ID_PARAMETER_NAME,
String.valueOf(TxnIdFactory.create().getId()).getBytes());
JobId jobId = hcc.startJob(deployedJobSpecId, contextRuntimeVarMap);
String.valueOf(newTxId).getBytes());
jobId = hcc.startJob(deployedJobSpecId, contextRuntimeVarMap);

if (listener.getType() == PrecompiledType.QUERY) {
boolean wait = Boolean.parseBoolean(metadataProvider.getConfig().get(
ExecuteProcedureStatement.WAIT_FOR_COMPLETION));
if (wait || listener.getType() == PrecompiledType.QUERY) {
hcc.waitForCompletion(jobId);
}

if (listener.getType() == PrecompiledType.QUERY) {
ResultReader resultReader =
new ResultReader(listener.getResultDataset(), jobId, listener.getResultId());

@@ -150,7 +159,6 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
listener.storeDistributedInfo(deployedJobSpecId, ses, listener.getResultDataset(),
listener.getResultId());
}

MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
txnActive = false;
} catch (Exception e) {
@@ -93,7 +93,13 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
Procedure procedure = null;

if (listener.isActive()) {
throw new AlgebricksException("Cannot drop running procedure. There are " + listener.getRunningInstance()
+ " running instances.");
}

Procedure procedure;

MetadataTransactionContext mdTxnCtx = null;
try {
@@ -18,19 +18,12 @@
*/
package org.apache.asterix.bad.metadata;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.asterix.active.ActiveEvent;
import org.apache.asterix.active.ActiveEvent.Kind;
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IActiveEntityEventSubscriber;
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.metadata.IDataset;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -42,6 +35,11 @@
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;

public class DeployedJobSpecEventListener implements IActiveEntityEventsListener {

private static final Logger LOGGER = Logger.getLogger(DeployedJobSpecEventListener.class);
@@ -62,11 +60,11 @@ enum RequestState {

private DeployedJobSpecId deployedJobSpecId;
private ScheduledExecutorService executorService = null;
private ResultReader resultReader;
private final PrecompiledType type;

private IHyracksDataset hdc;
private ResultSetId resultSetId;

// members
protected volatile ActivityState state;
protected JobId jobId;
@@ -79,7 +77,7 @@ enum RequestState {
protected RequestState statsRequestState;
protected final String runtimeName;
protected final AlgebricksAbsolutePartitionConstraint locations;
protected int numRegistered;
private int runningInstance;

public DeployedJobSpecEventListener(ICcApplicationContext appCtx, EntityId entityId, PrecompiledType type,
AlgebricksAbsolutePartitionConstraint locations, String runtimeName) {
@@ -92,7 +90,6 @@ public DeployedJobSpecEventListener(ICcApplicationContext appCtx, EntityId entit
this.stats = "{\"Stats\":\"N/A\"}";
this.runtimeName = runtimeName;
this.locations = locations;
this.numRegistered = 0;
state = ActivityState.STOPPED;
this.type = type;
}
@@ -110,15 +107,6 @@ public DeployedJobSpecId getDeployedJobSpecId() {
return deployedJobSpecId;
}

protected synchronized void handle(ActivePartitionMessage message) {
if (message.getEvent() == ActivePartitionMessage.Event.RUNTIME_REGISTERED) {
numRegistered++;
if (numRegistered == locations.getLocations().length) {
state = ActivityState.RUNNING;
}
}
}

@Override
public EntityId getEntityId() {
return entityId;
@@ -182,10 +170,6 @@ public AlgebricksAbsolutePartitionConstraint getLocations() {
return locations;
}

public ResultReader getResultReader() {
return resultReader;
}

public PrecompiledType getType() {
return type;
}
@@ -234,13 +218,18 @@ private synchronized void handleJobStartEvent(ActiveEvent message) throws Except
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Channel Job started for " + entityId);
}
runningInstance++;
state = ActivityState.RUNNING;
}

private synchronized void handleJobFinishEvent(ActiveEvent message) throws Exception {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Channel Job finished for " + entityId);
}
runningInstance--;
if (runningInstance == 0) {
state = ActivityState.STOPPED;
}
}

@Override
@@ -266,4 +255,8 @@ public Exception getJobFailure() {
public String getDisplayName() throws HyracksDataException {
return this.entityId.toString();
}

public int getRunningInstance() {
return runningInstance;
}
}
@@ -23,6 +23,7 @@
import java.util.Collection;
import java.util.logging.Logger;

import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.test.common.TestExecutor;
import org.apache.asterix.test.runtime.ExecutionTestUtil;
@@ -61,7 +62,7 @@ public class BADExecutionTest {
public static void setUp() throws Exception {
File outdir = new File(PATH_ACTUAL);
outdir.mkdirs();
ExecutionTestUtil.setUp(cleanupOnStart, TEST_CONFIG_FILE_NAME);
ExecutionTestUtil.setUp(cleanupOnStart, TEST_CONFIG_FILE_NAME, new AsterixHyracksIntegrationUtil(), false, null);
}

@AfterClass
@@ -43,4 +43,4 @@ create function RoomOccupants(room) {

create broker brokerA at "http://www.notifyA.com";

create repetitive channel roomRecords using RoomOccupants@1 period duration("PT30S");
create repetitive channel roomRecords using RoomOccupants@1 period duration("PT10S");
@@ -22,4 +22,4 @@
* Date : Sep 2016
* Author : Steven Jacobs
*/
630000
110000
@@ -25,4 +25,4 @@

use channels;

(select value count(result) from roomRecordsResults)[0] > 19;
(select value count(result) from roomRecordsResults)[0] > 9;
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Description : Simple Insert Procedure
* Expected Res : 3
* Date : Jan 2017
* Author : Steven Jacobs
*/
3000
@@ -24,4 +24,5 @@
*/

use channels;
set `wait-for-completion-procedure` "true";
execute deleteAll();
@@ -24,4 +24,5 @@
*/

use channels;
set `wait-for-completion-procedure` "true";
execute deleteSome(108,"jacob");
@@ -24,4 +24,5 @@
*/

use channels;
set `wait-for-completion-procedure` "true";
execute addMe();
@@ -24,4 +24,5 @@
*/

use channels;
set `wait-for-completion-procedure` "true";
execute addMe();
@@ -24,4 +24,5 @@
*/

use channels;
set `wait-for-completion-procedure` "true";
execute addMe();

0 comments on commit e305f11

Please sign in to comment.