Skip to content
Permalink
Browse files
Introducing BAD-CQ
a. Active datasets
  Active datasets are like regular datasets that can be
  inserted/upserted/deleted with statements and fed with data feeds
  directly.  Records stored in active datasets contain an additional
  active timestamp field (implemented using meta-records). The active
  timestamp is assigned inside the storage, right before persistence.
  This patch introduced new syntactic components to enable creating
  active datasets.  It also created a BAD query translator to make sure
  all DML statements can operate on active datasets.
b. Continuous channels
  Continuous channels are built on repetitive channels but provide
  continuous query semantics. To ensure that, this patch introduced an
  active timestamp manager on each node to manage channel execution
  times locally. Active timestamp managers are a local class that is
  tied to a JVM on a node. There is also an optimization rule for
  ensuring continuous query semantics in a distributed environment.
c. Active functions
  Active functions are used for helping users create continuous queries.
  They were added through the extension APIs.
d. BAD Islands
  As an application built on BAD-CQ, BAD islands show how we connect
  data channels to data feeds and share data between different BAD
  systems declaratively.
e. Cleanups and fixes
  This patch also cleaned up the BAD codebase and introduced tests for
  metadata, optimizer, and runtime. It fixed a type inferencing issue in
  InsertBrokerNotifierForChannelRule when there is a group-by in the
  query. Also, it optimized the broker notification delivery to use
  separate threads.

Change-Id: I77263c3fedd03205b83fe13978649b33fccda11c
  • Loading branch information
idleft committed Aug 21, 2020
1 parent 1ef0449 commit 9d26e078d1d7c64c137d2e7752756efd40c49fad
Showing 230 changed files with 5,415 additions and 936 deletions.
@@ -0,0 +1,11 @@
{"tid" : 0, "area_name": "UCI", "suspicious_level": 9, "text": "Tweet 1", "location": point("68.41629186478751, 44.631424133284966"), "timestamp": datetime("2018-11-09T00:00:00.000Z"), "threatening_flag": true}
{"tid" : 1, "area_name": "SD", "suspicious_level": 2, "text": "Tweet 2", "location": point("2026.6769440649875, 2018.7964303566189"), "timestamp": datetime("2017-11-07T00:00:00.000Z"), "threatening_flag": false}
{"tid" : 2, "area_name": "LA", "suspicious_level": 10, "text": "Tweet 3", "location": point("4037.165975321966, 4020.6200896429373"), "timestamp": datetime("2018-08-14T00:00:00.000Z"), "threatening_flag": true}
{"tid" : 3, "area_name": "UCI", "suspicious_level": 5, "text": "Tweet 4", "location": point("6011.1101544678095, 6033.86438259922"), "timestamp": datetime("2018-05-18T00:00:00.000Z"), "threatening_flag": true}
{"tid" : 4, "area_name": "UCI", "suspicious_level": 3, "text": "Tweet 5", "location": point("8059.243121712048, 8070.611768137903"), "timestamp": datetime("2018-10-27T00:00:00.000Z"), "threatening_flag": false}
{"tid" : 5, "area_name": "LA", "suspicious_level": 8, "text": "Tweet 6", "location": point("10030.582330779562, 10035.23466920704"), "timestamp": datetime("2017-04-11T00:00:00.000Z"), "threatening_flag": false}
{"tid" : 6, "area_name": "LA", "suspicious_level": 9, "text": "Tweet 7", "location": point("12074.059527318483, 12008.549148129303"), "timestamp": datetime("2017-04-23T00:00:00.000Z"), "threatening_flag": true}
{"tid" : 7, "area_name": "UCI", "suspicious_level": 6, "text": "Tweet 8", "location": point("14047.406241253855, 14063.900105905997"), "timestamp": datetime("2018-06-06T00:00:00.000Z"), "threatening_flag": false}
{"tid" : 8, "area_name": "LA", "suspicious_level": 8, "text": "Tweet 9", "location": point("16051.9846352692, 16078.599078960015"), "timestamp": datetime("2018-03-27T00:00:00.000Z"), "threatening_flag": true}
{"tid" : 9, "area_name": "UCI", "suspicious_level": 1, "text": "Tweet 10", "location": point("18006.76288487177, 18014.17514030951"), "timestamp": datetime("2017-08-29T00:00:00.000Z"), "threatening_flag": false}
{"tid" : 10, "area_name": "SD", "suspicious_level": 0, "text": "Tweet 11", "location": point("20088.467372365518, 20051.89371416724"), "timestamp": datetime("2017-10-01T00:00:00.000Z"), "threatening_flag": true}
@@ -27,7 +27,7 @@
<properties>
<asterix.version>0.9.5-SNAPSHOT</asterix.version>
<hyracks.version>0.3.5-SNAPSHOT</hyracks.version>
<testLog4jConfigFile>${root.dir}/../../asterix-app/src/test/resources/log4j2-test.xml</testLog4jConfigFile>
<testLog4jConfigFile>src/main/resources/log4j2-bad.xml</testLog4jConfigFile>
</properties>
<build>
<plugins>
@@ -100,6 +100,7 @@
<configuration>
<excludes combine.children="append">
<exclude>src/test/resources/**/results/**</exclude>
<exclude>data/**</exclude>
</excludes>
</configuration>
</plugin>
@@ -18,28 +18,15 @@
*/
package org.apache.asterix.bad;

import org.apache.asterix.common.metadata.DataverseName;

public interface BADConstants {
String SubscriptionId = "subscriptionId";
String BrokerName = "BrokerName";
String ChannelName = "ChannelName";
String ProcedureName = "ProcedureName";
String DataverseName = "DataverseName";
String BrokerEndPoint = "BrokerEndPoint";
String DeliveryTime = "deliveryTime";
String ResultId = "resultId";
String ChannelExecutionTime = "channelExecutionTime";
String ChannelSubscriptionsType = "ChannelSubscriptionsType";
String ChannelResultsType = "ChannelResultsType";
String ResultsDatasetName = "ResultsDatasetName";
String SubscriptionsDatasetName = "SubscriptionsDatasetName";
String CHANNEL_EXTENSION_NAME = "Channel";
String PROCEDURE_KEYWORD = "Procedure";
String BROKER_KEYWORD = "Broker";
String RECORD_TYPENAME_BROKER = "BrokerRecordType";
String RECORD_TYPENAME_CHANNEL = "ChannelRecordType";
String RECORD_TYPENAME_PROCEDURE = "ProcedureRecordType";
String subscriptionEnding = "Subscriptions";
String resultsEnding = "Results";
String BAD_METADATA_EXTENSION_NAME = "BADMetadataExtension";
@@ -52,10 +39,54 @@ public interface BADConstants {
String FIELD_NAME_DEFINITION = "Definition";
String FIELD_NAME_LANGUAGE = "Language";
String FIELD_NAME_BODY = "Body";

/* --- Notification Fields --- */
String ChannelExecutionTime = "channelExecutionTime";
String CHANNEL_EXECUTION_EPOCH_TIME = "channelExecutionEpochTime";

// --- Active Dataset
String RECORD_TYPENAME_ACTIVE_RECORD = "ActiveRecordType";
String FIELD_NAME_ACTIVE_TS = "_active_timestamp";

//To enable new Asterix TxnId for separate deployed job spec invocations
byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes();
int EXECUTOR_TIMEOUT = 20;

/* --- Metadata Common --- */
String METADATA_TYPE_NAME_DATAVERSENAME = "DataverseName";

String METADATA_DATASET_CHANNEL = "Channel";
String METADATA_DATASET_PROCEDURE = "Procedure";
String METADATA_DATASET_BROKER = "Broker";

/* --- Metadata Datatypes --- */
String METADATA_TYPENAME_SUBSCRIPTIONS = "ChannelSubscriptionsType";
String METADATA_TYPENAME_BROKER = "BrokerRecordType";
String METADATA_TYPENAME_CHANNEL = "ChannelRecordType";
String METADATA_TYPENAME_PROCEDURE = "ProcedureRecordType";

/* --- Broker Field Names --- */
String METADATA_TYPE_FIELD_NAME_BROKERNAME = "BrokerName";
String METADATA_TYPE_FIELD_NAME_BROKER_END_POINT = "BrokerEndPoint";
String METADATA_TYPE_FIELD_NAME_BROKER_TYPE = "BrokerType";

/* --- Runtime Entities --- */
String RUNTIME_ENTITY_PROCEDURE = "Procedure";
String RUNTIME_ENTITY_CHANNEL = "Channel";

/* --- Query Compilation --- */
String CONFIG_CHANNEL_NAME = "_internal_channelName";

/* --- BAD ISLANDS --- */
String GENERAL_BROKER_TYPE_NAME = "general";
String BAD_BROKER_TYPE_NAME = "bad";
String BAD_BROKER_FIELD_NAME_TYPE = "broker-type";
String BAD_FEED_FIELD_NAME_HOST = "bad-host";
String BAD_FEED_FIELD_NAME_CHANNEL = "bad-channel";
String BAD_FEED_FIELD_NAME_PARAMETERS = "bad-channel-parameters";
String BAD_FEED_FIELD_NAME_CHANNEL_DV = "bad-dataverse";
// String BAD_FEED_TYPE = "type"

public enum ChannelJobType {
REPETITIVE
}
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.bad;

import java.io.StringReader;
import java.time.Instant;
import java.util.Date;
import java.util.HashMap;
@@ -37,7 +36,7 @@
import org.apache.asterix.app.result.fields.ResultsPrinter;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.lang.BADParserFactory;
import org.apache.asterix.bad.lang.BADStatementExecutor;
import org.apache.asterix.bad.lang.BADQueryTranslator;
import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
import org.apache.asterix.common.api.IResponsePrinter;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -66,44 +65,44 @@ public class BADJobService {
private static final Logger LOGGER = Logger.getLogger(BADJobService.class.getName());

//pool size one (only running one thread at a time)
private static final int POOL_SIZE = 1;
public static final int POOL_SIZE = 1;

private static final long millisecondTimeout = BADConstants.EXECUTOR_TIMEOUT * 1000;

public static ScheduledExecutorService createExecutorServe() {
return Executors.newScheduledThreadPool(POOL_SIZE);
}

public static void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec,
IHyracksClientConnection hcc, DeployedJobSpecEventListener listener, ITxnIdFactory txnIdFactory,
String duration) throws Exception {
ScheduledExecutorService ses = createExecutorServe();
listener.setExecutorService(ses);
if (channeljobSpec != null) {
channeljobSpec.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
DeployedJobSpecId deployedId = hcc.deployJobSpec(channeljobSpec);
ScheduledExecutorService ses = startRepetitiveDeployedJobSpec(deployedId, hcc, findPeriod(duration),
new HashMap<>(), entityId, txnIdFactory, listener);
startRepetitiveDeployedJobSpec(ses, deployedId, hcc, findPeriod(duration), new HashMap<>(), entityId,
txnIdFactory, listener);
listener.setDeployedJobSpecId(deployedId);
listener.setExecutorService(ses);
}

}

//Starts running a deployed job specification periodically with an interval of "period" seconds
public static ScheduledExecutorService startRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId,
IHyracksClientConnection hcc, long period, Map<byte[], byte[]> jobParameters, EntityId entityId,
ITxnIdFactory txnIdFactory, DeployedJobSpecEventListener listener) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(POOL_SIZE);
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (!runDeployedJobSpecCheckPeriod(distributedId, hcc, jobParameters, period, entityId,
txnIdFactory, listener)) {
scheduledExecutorService.shutdown();
}
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Job Failed to run for " + entityId.getExtensionName() + " "
+ entityId.getDataverseName() + "." + entityId.getEntityName() + ".", e);
public static void startRepetitiveDeployedJobSpec(ScheduledExecutorService scheduledExecutorService,
DeployedJobSpecId distributedId, IHyracksClientConnection hcc, long period,
Map<byte[], byte[]> jobParameters, EntityId entityId, ITxnIdFactory txnIdFactory,
DeployedJobSpecEventListener listener) {
scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
if (!runDeployedJobSpecCheckPeriod(distributedId, hcc, jobParameters, period, entityId, txnIdFactory,
listener)) {
scheduledExecutorService.shutdown();
}
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Job Failed to run for " + entityId.getExtensionName() + " "
+ entityId.getDataverseName() + "." + entityId.getEntityName() + ".", e);
}
}, period, period, TimeUnit.MILLISECONDS);
return scheduledExecutorService;
}

public static boolean runDeployedJobSpecCheckPeriod(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
@@ -207,8 +206,8 @@ public static JobSpecification compilePushChannel(IStatementExecutor statementEx
}

public static void redeployJobSpec(EntityId entityId, String queryBodyString, MetadataProvider metadataProvider,
BADStatementExecutor badStatementExecutor, IHyracksClientConnection hcc,
IRequestParameters requestParameters, boolean useNewId) throws Exception {
BADQueryTranslator badStatementExecutor, IHyracksClientConnection hcc, IRequestParameters requestParameters,
boolean useNewId) throws Exception {

ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
@@ -220,7 +219,7 @@ public static void redeployJobSpec(EntityId entityId, String queryBodyString, Me
}

BADParserFactory factory = new BADParserFactory();
List<Statement> fStatements = factory.createParser(new StringReader(queryBodyString)).parse();
List<Statement> fStatements = factory.createParser(queryBodyString).parse();
JobSpecification jobSpec = null;
if (listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.PUSH_CHANNEL)
|| listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.CHANNEL)) {
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file