Skip to content
Permalink
Browse files
Improve performance of NotifyBrokerRuntime code
Change-Id: Ia4ecd381d102c67f7c66cfa965312bfb885aa281
  • Loading branch information
sjaco002 committed Jun 20, 2018
1 parent 139909b commit 105ee6d1676e2f75c1c1b3d3996ad364ff6b414a
Showing 2 changed files with 77 additions and 55 deletions.
@@ -274,6 +274,16 @@
<artifactId>hyracks-client</artifactId>
<version>${hyracks.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-util</artifactId>
<version>${hyracks.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
<artifactId>algebricks-data</artifactId>
<version>${hyracks.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-control-common</artifactId>
@@ -19,34 +19,33 @@

package org.apache.asterix.bad.runtime;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.AOrderedlistPrinterFactory;
import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ARecordPrinterFactory;
import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
import org.apache.asterix.om.base.ADateTime;
import org.apache.asterix.om.base.AOrderedList;
import org.apache.asterix.om.base.ARecord;
import org.apache.asterix.om.base.AUUID;
import org.apache.asterix.om.types.AOrderedListType;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.algebricks.data.IPrinter;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
@@ -58,30 +57,34 @@
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.util.string.UTF8StringReader;
import org.apache.hyracks.util.string.UTF8StringWriter;

public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
private static final Logger LOGGER = Logger.getLogger(NotifyBrokerRuntime.class.getName());

private final ByteBufferInputStream bbis = new ByteBufferInputStream();
private final DataInputStream di = new DataInputStream(bbis);
private final AOrderedListSerializerDeserializer subSerDes =
new AOrderedListSerializerDeserializer(new AOrderedListType(BuiltinType.AUUID, null));
private final ARecordSerializerDeserializer recordSerDes;
private static final AStringSerializerDeserializer stringSerDes =
new AStringSerializerDeserializer(new UTF8StringWriter(), new UTF8StringReader());

private final IPrinter recordPrinterFactory;
private final IPrinter subscriptionIdListPrinterFactory;

private IPointable inputArg0 = new VoidPointable();
private IPointable inputArg1 = new VoidPointable();
private IPointable inputArg2 = new VoidPointable();
private IScalarEvaluator eval0;
private IScalarEvaluator eval1;
private IScalarEvaluator eval2;
private final ActiveManager activeManager;
private final EntityId entityId;
private final boolean push;
private AOrderedList pushList;
private ARecord pushRecord;
private final IAType recordType;
private final Map<String, HashSet<String>> sendData = new HashMap<>();
private final Map<String, String> sendData = new HashMap<>();
private final Map<String, ByteArrayOutputStream> sendbaos = new HashMap<>();
private final Map<String, PrintStream> sendStreams = new HashMap<>();
private String executionTimeString;
private boolean firstResult = true;
String endpoint;

public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brokerEvalFactory,
IScalarEvaluatorFactory pushListEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
@@ -90,14 +93,11 @@ public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brok
eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
eval1 = pushListEvalFactory.createScalarEvaluator(ctx);
eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
this.activeManager = (ActiveManager) ((INcApplicationContext) ctx.getJobletContext().getServiceContext()
.getApplicationContext()).getActiveManager();
this.entityId = activeJobId;
this.push = push;
this.pushList = null;
this.pushRecord = null;
this.recordType = recordType;
recordSerDes = new ARecordSerializerDeserializer((ARecordType) recordType);
recordPrinterFactory = new ARecordPrinterFactory((ARecordType) recordType).createPrinter();
subscriptionIdListPrinterFactory =
new AOrderedlistPrinterFactory(new AOrderedListType(BuiltinType.AUUID, null)).createPrinter();
executionTimeString = null;
}

@@ -106,28 +106,18 @@ public void open() throws HyracksDataException {
return;
}

private void addSubscriptions(String endpoint, AOrderedList subscriptionIds) {
for (int i = 0; i < subscriptionIds.size(); i++) {
AUUID subId = (AUUID) subscriptionIds.getItem(i);
String subscriptionString = subId.toString();
//Broker code currently cannot handle the "uuid {}" part of the string, so we parse just the value
subscriptionString = subscriptionString.substring(8, subscriptionString.length() - 2);
subscriptionString = "\"" + subscriptionString + "\"";
sendData.get(endpoint).add(subscriptionString);
}
}

public String createData(String endpoint) {
String JSON = "{ \"dataverseName\":\"" + entityId.getDataverse() + "\", \"channelName\":\""
+ entityId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
+ executionTimeString + "\", \"subscriptionIds\":[";
for (String value : sendData.get(endpoint)) {
JSON += value;
JSON += ",";
String resultTitle = "\"subscriptionIds";
if (push) {
resultTitle = "\"results\"";
}
JSON = JSON.substring(0, JSON.length() - 1);
JSON += "]}";
return JSON;
String jsonStr = "{ \"dataverseName\":\"" + entityId.getDataverse() + "\", \"channelName\":\""
+ entityId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
+ executionTimeString + "\", " + resultTitle + ":[";
jsonStr += sendData.get(endpoint);
jsonStr = jsonStr.substring(0, jsonStr.length());
jsonStr += "]}";
return jsonStr;

}

@@ -172,6 +162,11 @@ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
eval1.evaluate(tRef, inputArg1);
eval2.evaluate(tRef, inputArg2);

/*The incoming tuples have three fields:
1. eval0 will get the serialized broker endpoint string
2. eval1 will get the payload (either the subscriptionIds or entire results)
3. eval2 will get the channel execution time stamp (the same for all tuples)
*/
if (executionTimeString == null) {
int resultSetOffset = inputArg2.getStartOffset();
bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1);
@@ -185,34 +180,51 @@ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {

int serBrokerOffset = inputArg0.getStartOffset();
bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1);
String endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di).getStringValue();
sendData.putIfAbsent(endpoint, new HashSet<>());
endpoint = stringSerDes.deserialize(di).getStringValue();
sendbaos.putIfAbsent(endpoint, new ByteArrayOutputStream());
try {
sendStreams.putIfAbsent(endpoint,
new PrintStream(sendbaos.get(endpoint), true, StandardCharsets.UTF_8.name()));
} catch (UnsupportedEncodingException e) {
throw new HyracksDataException(e.getMessage());
}

if (push) {
int pushOffset = inputArg1.getStartOffset();
bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), pushOffset + 1);
//TODO: Right now this creates an object per channel result. Need to find a better way to deserialize
pushRecord = recordSerDes.deserialize(di);
sendData.get(endpoint).add(pushRecord.toString());
if (!firstResult) {
sendStreams.get(endpoint).append(',');
}
recordPrinterFactory.print(inputArg1.getByteArray(), inputArg1.getStartOffset(), inputArg1.getLength(),
sendStreams.get(endpoint));

} else {
int serSubOffset = inputArg1.getStartOffset();
bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serSubOffset + 1);
pushList = subSerDes.deserialize(di);
addSubscriptions(endpoint, pushList);
if (!firstResult) {
sendStreams.get(endpoint).append(',');
}
subscriptionIdListPrinterFactory.print(inputArg1.getByteArray(), inputArg1.getStartOffset(),
inputArg1.getLength(),
sendStreams.get(endpoint));
}
firstResult = false;
}

}

@Override
public void close() throws HyracksDataException {
for (String endpoint : sendData.keySet()) {
if (sendData.get(endpoint).size() > 0) {
sendGroupOfResults(endpoint);
sendData.get(endpoint).clear();
for (String endpoint : sendStreams.keySet()) {
sendData.put(endpoint, new String(sendbaos.get(endpoint).toByteArray(), StandardCharsets.UTF_8));
sendGroupOfResults(endpoint);
sendStreams.get(endpoint).close();
try {
sendbaos.get(endpoint).close();
} catch (IOException e) {
throw new HyracksDataException(e.getMessage());
}

}

return;
}

0 comments on commit 105ee6d

Please sign in to comment.