Skip to content
Permalink
Browse files
Add push-based channels and improve broker notifications
Change-Id: Ie3c7cae0f015d6bc01dd912499565bb12c15abc3
  • Loading branch information
sjaco002 committed Apr 12, 2018
1 parent 9e13d72 commit 345b0f5729d3a6ed0564707ec25b56750c5366ec
Showing 10 changed files with 505 additions and 227 deletions.
@@ -18,51 +18,16 @@
*/
package org.apache.asterix.bad;

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.asterix.active.EntityId;
import org.apache.asterix.om.base.AOrderedList;
import org.apache.asterix.om.base.AUUID;
import org.apache.hyracks.api.exceptions.HyracksDataException;

/**
* Provides functionality for channel jobs and communicating with Brokers
* Provides functionality for channel jobs
*/
public class ChannelJobService {

private static final Logger LOGGER = Logger.getLogger(ChannelJobService.class.getName());

public static void sendBrokerNotificationsForChannel(EntityId activeJobId, String brokerEndpoint,
AOrderedList subscriptionIds, String channelExecutionTime) throws HyracksDataException {
String formattedString;
formattedString = formatJSON(activeJobId, subscriptionIds, channelExecutionTime);
sendMessage(brokerEndpoint, formattedString);
}

public static String formatJSON(EntityId activeJobId, AOrderedList subscriptionIds, String channelExecutionTime) {
String JSON = "{ \"dataverseName\":\"" + activeJobId.getDataverse() + "\", \"channelName\":\""
+ activeJobId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
+ channelExecutionTime + "\", \"subscriptionIds\":[";
for (int i = 0; i < subscriptionIds.size(); i++) {
AUUID subId = (AUUID) subscriptionIds.getItem(i);
String subscriptionString = subId.toString();
//Broker code currently cannot handle the "uuid {}" part of the string, so we parse just the value
subscriptionString = subscriptionString.substring(8, subscriptionString.length() - 2);
JSON += "\"" + subscriptionString + "\"";
if (i < subscriptionIds.size() - 1) {
JSON += ",";
}
}
JSON += "]}";
return JSON;

}

public static long findPeriod(String duration) {
//TODO: Allow Repetitive Channels to use YMD durations
@@ -92,61 +57,6 @@ public static long findPeriod(String duration) {
return (long) (seconds * 1000);
}

public static void sendMessage(String targetURL, String urlParameters) {
HttpURLConnection connection = null;
try {
//Create connection
URL url = new URL(targetURL);
connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");

connection.setRequestProperty("Content-Length", Integer.toString(urlParameters.getBytes().length));
connection.setRequestProperty("Content-Language", "en-US");

connection.setUseCaches(false);
connection.setDoOutput(true);
connection.setConnectTimeout(500);

if (connection.getOutputStream() != null) {
//Send message
DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
wr.writeBytes(urlParameters);
wr.close();
} else {
throw new Exception();
}

if (LOGGER.isLoggable(Level.INFO)) {
int responseCode = connection.getResponseCode();
LOGGER.info("\nSending 'POST' request to URL : " + url);
LOGGER.info("Post parameters : " + urlParameters);
LOGGER.info("Response Code : " + responseCode);
}

if (connection.getInputStream() != null) {
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
String inputLine;
StringBuffer response = new StringBuffer();
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
}
in.close();
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.log(Level.INFO, response.toString());
}
} else {
LOGGER.log(Level.WARNING, "Channel Failed to get response from Broker.");
}

} catch (Exception e) {
LOGGER.log(Level.WARNING, "Channel Failed to connect to Broker.");
} finally {
if (connection != null) {
connection.disconnect();
}
}
}

@Override
public String toString() {
@@ -40,14 +40,14 @@
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType;
import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.expression.CallExpr;
@@ -59,6 +59,7 @@
import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
import org.apache.asterix.lang.common.statement.InsertStatement;
import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
import org.apache.asterix.lang.common.statement.Query;
import org.apache.asterix.lang.common.statement.SetStatement;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
@@ -92,14 +93,16 @@ public class CreateChannelStatement extends ExtensionStatement {
private String subscriptionsTableName;
private String resultsTableName;
private String dataverse;
private final boolean push;

public CreateChannelStatement(Identifier dataverseName, Identifier channelName, FunctionSignature function,
Expression period) {
Expression period, boolean push) {
this.channelName = channelName;
this.dataverseName = dataverseName;
this.function = function;
this.period = (CallExpr) period;
this.duration = "";
this.push = push;
}

public Identifier getDataverseName() {
@@ -218,12 +221,37 @@ private void createDatasets(IStatementExecutor statementExecutor, MetadataProvid

}

private JobSpecification compilePushChannel(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, Query q) throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
JobSpecification jobSpec = null;
try {
jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
} catch (Exception e) {
LOGGER.log(Level.INFO, e.getMessage(), e);
if (bActiveTxn) {
((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
}
throw e;
} finally {
metadataProvider.getLocks().unlock();
}
return jobSpec;
}

private JobSpecification createChannelJob(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append("SET inline_with \"false\";\n");
builder.append("insert into " + dataverse + "." + resultsTableName);
builder.append(" as a (\n" + "with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n");
if (!push) {
builder.append("insert into " + dataverse + "." + resultsTableName);
builder.append(" as a (\n");
}
builder.append("with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n");
builder.append("select result, ");
builder.append(BADConstants.ChannelExecutionTime + ", ");
builder.append("sub." + BADConstants.SubscriptionId + " as " + BADConstants.SubscriptionId + ",");
@@ -238,15 +266,19 @@ private JobSpecification createChannelJob(IStatementExecutor statementExecutor,
builder.append("sub.param" + i + ") result \n");
builder.append("where b." + BADConstants.BrokerName + " = sub." + BADConstants.BrokerName + "\n");
builder.append("and b." + BADConstants.DataverseName + " = sub." + BADConstants.DataverseName + "\n");
builder.append(")");
builder.append(" returning a");
if (!push) {
builder.append(")");
builder.append(" returning a");
}
builder.append(";");
BADParserFactory factory = new BADParserFactory();
List<Statement> fStatements = factory.createParser(new StringReader(builder.toString())).parse();

SetStatement ss = (SetStatement) fStatements.get(0);
metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());

if (push) {
return compilePushChannel(statementExecutor, metadataProvider, hcc, (Query) fStatements.get(1));
}
return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1),
hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null);
}

0 comments on commit 345b0f5

Please sign in to comment.