Skip to content
Permalink
Browse files
[ASTERIXDB-2314][HYR] Dataset in class names in Hyracks
Change-Id: I333fa410df5efe7da9d4f0e9b7143f9f6928b88b
  • Loading branch information
westmann committed Jul 28, 2018
1 parent 40b70a1 commit 51819b77a6068d92eed284624ff6550530b93d05
Showing 6 changed files with 25 additions and 24 deletions.
@@ -51,11 +51,11 @@
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.result.IResultSet;
import org.apache.hyracks.api.result.ResultSetId;

/**
* Provides functionality for channel jobs
@@ -122,7 +122,7 @@ public static boolean runDeployedJobSpecCheckPeriod(DeployedJobSpecId distribute
}

public static long runDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
IHyracksDataset hdc, Map<byte[], byte[]> jobParameters, EntityId entityId, ITxnIdFactory txnIdFactory,
IResultSet resultSet, Map<byte[], byte[]> jobParameters, EntityId entityId, ITxnIdFactory txnIdFactory,
ICcApplicationContext appCtx, DeployedJobSpecEventListener listener, QueryTranslator statementExecutor)
throws Exception {
listener.waitWhileAtState(ActivityState.SUSPENDED);
@@ -138,7 +138,7 @@ public static long runDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksC
long executionMilliseconds = Instant.now().toEpochMilli() - startTime;

if (listener.getType() == DeployedJobSpecEventListener.PrecompiledType.QUERY) {
ResultReader resultReader = new ResultReader(hdc, jobId, new ResultSetId(0));
ResultReader resultReader = new ResultReader(resultSet, jobId, new ResultSetId(0));

ResultUtil.printResults(appCtx, resultReader, statementExecutor.getSessionOutput(),
new IStatementExecutor.Stats(), null);
@@ -235,7 +235,7 @@ public static void redeployJobSpec(EntityId entityId, String queryBodyString, Me
//Procedures
metadataProvider.setResultSetId(new ResultSetId(0));
IStatementExecutor.ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery();
IHyracksDataset hdc = requestParameters.getHyracksDataset();
IResultSet hdc = requestParameters.getResultSet();
IStatementExecutor.Stats stats = requestParameters.getStats();
boolean resultsAsync = resultDelivery == IStatementExecutor.ResultDelivery.ASYNC
|| resultDelivery == IStatementExecutor.ResultDelivery.DEFERRED;
@@ -272,12 +272,12 @@ public static JobSpecification compileQueryJob(IStatementExecutor statementExecu
}

private static JobSpecification compileProcedureJob(IStatementExecutor statementExecutor,
MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc,
MetadataProvider metadataProvider, IHyracksClientConnection hcc, IResultSet resultSet,
IStatementExecutor.Stats stats, Statement procedureStatement) throws Exception {
if (procedureStatement.getKind() == Statement.Kind.INSERT) {
return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
procedureStatement, hcc, hdc, IStatementExecutor.ResultDelivery.ASYNC, null, stats, true, null,
null, null);
procedureStatement, hcc, resultSet, IStatementExecutor.ResultDelivery.ASYNC, null, stats, true,
null, null, null);
} else if (procedureStatement.getKind() == Statement.Kind.QUERY) {
return compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) procedureStatement);
} else {
@@ -56,9 +56,9 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.result.IResultSet;
import org.apache.hyracks.api.result.ResultSetId;

public class ChannelSubscribeStatement extends ExtensionStatement {

@@ -184,7 +184,7 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
tempMdProvider.getConfig().putAll(metadataProvider.getConfig());

final ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery();
final IHyracksDataset hdc = requestParameters.getHyracksDataset();
final IResultSet resultSet = requestParameters.getResultSet();
final Stats stats = requestParameters.getStats();
if (subscriptionId == null) {
//To create a new subscription
@@ -207,14 +207,14 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe

InsertStatement insert = new InsertStatement(new Identifier(dataverse),
new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, resultVar, accessor);
((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, insert, hcc, hdc,
resultDelivery, null, stats, false, null, null, null);
((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, insert, hcc,
resultSet, resultDelivery, null, stats, false, null, null, null);
} else {
//To update an existing subscription
UpsertStatement upsert = new UpsertStatement(new Identifier(dataverse),
new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, null, null);
((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, upsert, hcc, hdc,
resultDelivery, null, stats, false, null, null, null);
((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, upsert, hcc,
resultSet, resultDelivery, null, stats, false, null, null, null);
}

MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -72,9 +72,9 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.result.IResultSet;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;

public class CreateChannelStatement extends ExtensionStatement {
@@ -215,7 +215,7 @@ private void createDatasets(IStatementExecutor statementExecutor, MetadataProvid
}

private JobSpecification createChannelJob(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats) throws Exception {
IHyracksClientConnection hcc, IResultSet resultSet, Stats stats) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append("SET inline_with \"false\";\n");
if (!push) {
@@ -253,7 +253,7 @@ private JobSpecification createChannelJob(IStatementExecutor statementExecutor,
(Query) fStatements.get(1));
}
return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1),
hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null, null);
hcc, resultSet, ResultDelivery.ASYNC, null, stats, true, null, null, null);
}

@Override
@@ -306,13 +306,14 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
metadataProvider.getDefaultDataverse());
tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
final IHyracksDataset hdc = requestContext.getHyracksDataset();
final IResultSet resultSet = requestContext.getResultSet();
final Stats stats = requestContext.getStats();
//Create Channel Datasets
createDatasets(statementExecutor, tempMdProvider, hcc);
tempMdProvider.getLocks().reset();
//Create Channel Internal Job
JobSpecification channeljobSpec = createChannelJob(statementExecutor, tempMdProvider, hcc, hdc, stats);
JobSpecification channeljobSpec =
createChannelJob(statementExecutor, tempMdProvider, hcc, resultSet, stats);

// Now we subscribe
if (listener == null) {
@@ -73,10 +73,10 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;

public class CreateProcedureStatement extends ExtensionStatement {
@@ -119,7 +119,7 @@ public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExe
Map<byte[], byte[]> contextRuntimeVarMap = createParameterMap(procedure);
DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
if (procedure.getDuration().equals("")) {
BADJobService.runDeployedJobSpec(deployedJobSpecId, hcc, requestParameters.getHyracksDataset(),
BADJobService.runDeployedJobSpec(deployedJobSpecId, hcc, requestParameters.getResultSet(),
contextRuntimeVarMap, entityId, metadataProvider.getTxnIdFactory(), appCtx, listener,
(QueryTranslator) statementExecutor);

@@ -56,7 +56,7 @@
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.client.dataset.HyracksDataset;
import org.apache.hyracks.client.result.ResultSet;
import org.apache.hyracks.control.common.utils.HyracksThreadFactory;

public class BADGlobalRecoveryManager extends GlobalRecoveryManager {
@@ -146,7 +146,7 @@ private void deployJobs(ICcApplicationContext appCtx, List<Channel> channels, Li
activeEventHandler.registerListener(listener);
BADJobService.redeployJobSpec(entityId, procedure.getBody(), metadataProvider, badStatementExecutor, hcc,
new RequestParameters(
new HyracksDataset(hcc, appCtx.getCompilerProperties().getFrameSize(),
new ResultSet(hcc, appCtx.getCompilerProperties().getFrameSize(),
ResultReader.NUM_READERS),
new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
new IStatementExecutor.Stats(), null, null, null, null, true),

0 comments on commit 51819b7

Please sign in to comment.