Skip to content
Permalink
Browse files
Allow BAD jobs to update their specifications to use new indexes
- storage format changes: new field for Channel body

This changes uses the Asterix upsertDeployedJobSpec to
recompile and update the channel job when new indexes are
created.

Added test case
Moved methods from Asterix DeployedJobService to BADJobService

Change-Id: If0a4d37a5b91063fcb1673dbfd008c140ed54ae6
  • Loading branch information
sjaco002 committed May 7, 2018
1 parent 345b0f5 commit 0da2d001ad18cc888a53b26a5c2867f8a90c9969
Showing 26 changed files with 1,343 additions and 307 deletions.
@@ -50,6 +50,7 @@ public interface BADConstants {
String FIELD_NAME_RETURN_TYPE = "ReturnType";
String FIELD_NAME_DEFINITION = "Definition";
String FIELD_NAME_LANGUAGE = "Language";
String FIELD_NAME_BODY = "Body";
//To enable new Asterix TxnId for separate deployed job spec invocations
byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes();
int EXECUTOR_TIMEOUT = 20;
@@ -0,0 +1,277 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.bad;

import java.io.StringReader;
import java.time.Instant;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.api.http.server.ResultUtil;
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.lang.BADParserFactory;
import org.apache.asterix.bad.lang.BADStatementExecutor;
import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.statement.Query;
import org.apache.asterix.lang.common.statement.SetStatement;
import org.apache.asterix.lang.sqlpp.visitor.SqlppDeleteRewriteVisitor;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;

/**
* Provides functionality for channel jobs
*/
public class BADJobService {

private static final Logger LOGGER = Logger.getLogger(BADJobService.class.getName());

//pool size one (only running one thread at a time)
private static final int POOL_SIZE = 1;

private static final long millisecondTimeout = BADConstants.EXECUTOR_TIMEOUT * 1000;

//Starts running a deployed job specification periodically with an interval of "period" seconds
public static ScheduledExecutorService startRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId,
IHyracksClientConnection hcc, long period, Map<byte[], byte[]> jobParameters, EntityId entityId,
ITxnIdFactory txnIdFactory, DeployedJobSpecEventListener listener) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(POOL_SIZE);
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (!runDeployedJobSpecCheckPeriod(distributedId, hcc, jobParameters, period, entityId,
txnIdFactory, listener)) {
scheduledExecutorService.shutdown();
}
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Job Failed to run for " + entityId.getExtensionName() + " "
+ entityId.getDataverse() + "." + entityId.getEntityName() + ".", e);
}
}
}, period, period, TimeUnit.MILLISECONDS);
return scheduledExecutorService;
}

public static boolean runDeployedJobSpecCheckPeriod(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
Map<byte[], byte[]> jobParameters, long period, EntityId entityId, ITxnIdFactory txnIdFactory,
DeployedJobSpecEventListener listener) throws Exception {
long executionMilliseconds =
runDeployedJobSpec(distributedId, hcc, jobParameters, entityId, txnIdFactory, null, listener, null);
if (executionMilliseconds > period) {
LOGGER.log(Level.SEVERE,
"Periodic job for " + entityId.getExtensionName() + " " + entityId.getDataverse() + "."
+ entityId.getEntityName() + " was unable to meet the required period of " + period
+ " milliseconds. Actually took " + executionMilliseconds + " execution will shutdown"
+ new Date());
return false;
}
return true;
}

public static long runDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
Map<byte[], byte[]> jobParameters, EntityId entityId, ITxnIdFactory txnIdFactory,
ICcApplicationContext appCtx, DeployedJobSpecEventListener listener, QueryTranslator statementExecutor)
throws Exception {
listener.waitWhileAtState(ActivityState.SUSPENDED);

//Add the Asterix Transaction Id to the map
jobParameters.put(BADConstants.TRANSACTION_ID_PARAMETER_NAME,
String.valueOf(txnIdFactory.create().getId()).getBytes());

long startTime = Instant.now().toEpochMilli();
JobId jobId = hcc.startJob(distributedId, jobParameters);

hcc.waitForCompletion(jobId);
long executionMilliseconds = Instant.now().toEpochMilli() - startTime;

if (listener.getType() == DeployedJobSpecEventListener.PrecompiledType.QUERY) {
ResultReader resultReader = new ResultReader(listener.getResultDataset(), jobId, listener.getResultId());

ResultUtil.printResults(appCtx, resultReader, statementExecutor.getSessionOutput(),
new IStatementExecutor.Stats(), null);
}

LOGGER.log(Level.SEVERE,
"Deployed Job execution completed for " + entityId.getExtensionName() + " " + entityId.getDataverse()
+ "." + entityId.getEntityName() + ". Took " + executionMilliseconds + " milliseconds ");

return executionMilliseconds;

}


public static long findPeriod(String duration) {
//TODO: Allow Repetitive Channels to use YMD durations
String hoursMinutesSeconds = "";
if (duration.indexOf('T') != -1) {
hoursMinutesSeconds = duration.substring(duration.indexOf('T') + 1);
}
double seconds = 0;
if (hoursMinutesSeconds != "") {
int pos = 0;
if (hoursMinutesSeconds.indexOf('H') != -1) {
Double hours = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('H')));
seconds += (hours * 60 * 60);
pos = hoursMinutesSeconds.indexOf('H') + 1;
}
if (hoursMinutesSeconds.indexOf('M') != -1) {
Double minutes =
Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('M')));
seconds += (minutes * 60);
pos = hoursMinutesSeconds.indexOf('M') + 1;
}
if (hoursMinutesSeconds.indexOf('S') != -1) {
Double s = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('S')));
seconds += (s);
}
}
return (long) (seconds * 1000);
}

public static JobSpecification compilePushChannel(IStatementExecutor statementExecutor,
MetadataProvider metadataProvider, IHyracksClientConnection hcc, Query q) throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
JobSpecification jobSpec = null;
try {
jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
} catch (Exception e) {
LOGGER.log(Level.INFO, e.getMessage(), e);
if (bActiveTxn) {
((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
}
throw e;
} finally {
metadataProvider.getLocks().unlock();
}
return jobSpec;
}

public static void redeployJobSpec(EntityId entityId, String queryBodyString, MetadataProvider metadataProvider,
BADStatementExecutor badStatementExecutor, IHyracksClientConnection hcc,
IRequestParameters requestParameters) throws Exception {

ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
DeployedJobSpecEventListener listener =
(DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
if (listener == null) {
LOGGER.severe("Tried to redeploy the job for " + entityId + " but no listener exists.");
return;
}

BADParserFactory factory = new BADParserFactory();
List<Statement> fStatements = factory.createParser(new StringReader(queryBodyString)).parse();
JobSpecification jobSpec = null;
if (listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.PUSH_CHANNEL)
|| listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.CHANNEL)) {
//Channels
SetStatement ss = (SetStatement) fStatements.get(0);
metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
if (listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.PUSH_CHANNEL)) {
jobSpec = compilePushChannel(badStatementExecutor, metadataProvider, hcc, (Query) fStatements.get(1));
} else {
jobSpec = badStatementExecutor.handleInsertUpsertStatement(metadataProvider, fStatements.get(1), hcc,
null, null, null, null, true, null);
}
} else {
//Procedures
metadataProvider.setResultSetId(listener.getResultId());
final IStatementExecutor.ResultDelivery resultDelivery =
requestParameters.getResultProperties().getDelivery();
final IHyracksDataset hdc = requestParameters.getHyracksDataset();
final IStatementExecutor.Stats stats = requestParameters.getStats();
boolean resultsAsync = resultDelivery == IStatementExecutor.ResultDelivery.ASYNC
|| resultDelivery == IStatementExecutor.ResultDelivery.DEFERRED;
metadataProvider.setResultAsyncMode(resultsAsync);
metadataProvider.setMaxResultReads(1);

jobSpec = compileProcedureJob(badStatementExecutor, metadataProvider, hcc, hdc, stats, fStatements.get(1));

}
hcc.upsertDeployedJobSpec(listener.getDeployedJobSpecId(), jobSpec);

listener.resume();

}

public static JobSpecification compileQueryJob(IStatementExecutor statementExecutor,
MetadataProvider metadataProvider, IHyracksClientConnection hcc, Query q) throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
JobSpecification jobSpec = null;
try {
jobSpec = statementExecutor.rewriteCompileQuery(hcc, metadataProvider, q, null);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
} catch (Exception e) {
((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
throw e;
}
return jobSpec;
}

private static JobSpecification compileProcedureJob(IStatementExecutor statementExecutor,
MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc,
IStatementExecutor.Stats stats, Statement procedureStatement) throws Exception {
if (procedureStatement.getKind() == Statement.Kind.INSERT) {
return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
procedureStatement, hcc, hdc, IStatementExecutor.ResultDelivery.ASYNC, null, stats, true, null);
} else if (procedureStatement.getKind() == Statement.Kind.QUERY) {
return compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) procedureStatement);
} else {
SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
procedureStatement.accept(visitor, null);
return ((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, procedureStatement,
hcc, true);
}
}

@Override
public String toString() {
return "BADJobService";
}

}

This file was deleted.

0 comments on commit 0da2d00

Please sign in to comment.