Skip to content

Commit

Permalink
SAMZA-2002: SamzaSQL Diagnostics: instrument rest of operators (excep…
Browse files Browse the repository at this point in the history
…t join & aggregate) and at Query level

Second phase of instrumenting SamzaSQL operators to add and maintain metrics. All operators, except join and aggregate, are instrumented to add Processing Time and Input Rate metrics. Whenever output rate could be different (e.g., filter operator) the output rate is also added. At query level, we have Query Latency, and input and output rates.

Author: Shenoda Guirguis <sguirguis@linkedin.com>

Reviewers: Srinivasulu Punuru <spunuru@linkedin.com>, Aditya Toomula <atoomula@linkedin.com>

Closes #831 from shenodaguirguis/addmetrics.3
  • Loading branch information
Shenoda Guirguis authored and srinipunuru committed Dec 6, 2018
1 parent b126683 commit bd9387b
Show file tree
Hide file tree
Showing 21 changed files with 541 additions and 139 deletions.
Expand Up @@ -89,10 +89,9 @@ public List<Object> getFieldValues() {
* @return returns the value of the field.
*/
public Optional<Object> getField(String name) {
for (int index = 0; index < fieldNames.size(); index++) {
if (fieldNames.get(index).equals(name)) {
return Optional.ofNullable(fieldValues.get(index));
}
int index = fieldNames.indexOf(name);
if (index != -1) {
return Optional.ofNullable(fieldValues.get(index));
}

return Optional.empty();
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.apache.samza.sql.avro;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -36,6 +37,7 @@
import org.apache.samza.operators.KV;
import org.apache.samza.sql.SamzaSqlRelRecord;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
import org.apache.samza.system.SystemStream;
import org.slf4j.Logger;
Expand Down Expand Up @@ -92,7 +94,8 @@ public SamzaSqlRelMessage convertToRelMessage(KV<Object, Object> samzaMessage) {
throw new SamzaException(msg);
}

return new SamzaSqlRelMessage(samzaMessage.getKey(), payloadFieldNames, payloadFieldValues);
return new SamzaSqlRelMessage(samzaMessage.getKey(), payloadFieldNames, payloadFieldValues,
new SamzaSqlRelMsgMetadata("", "", ""));
}

public void fetchFieldNamesAndValuesFromIndexedRecord(IndexedRecord record, List<String> fieldNames,
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.apache.samza.sql.data;

import java.io.Serializable;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand All @@ -44,6 +45,12 @@ public class SamzaSqlRelMessage implements Serializable {
@JsonProperty("samzaSqlRelRecord")
private final SamzaSqlRelRecord samzaSqlRelRecord;

/**
* hold metadata about the message or event, e.g., the eventTime timestamp
*/
@JsonProperty("samzaSqlRelMsgMetadata")
private SamzaSqlRelMsgMetadata samzaSqlRelMsgMetadata;

/**
* Creates a {@link SamzaSqlRelMessage} from the list of relational fields and values.
* If the field list contains KEY, then it extracts the key out of the fields to create a
Expand All @@ -53,9 +60,11 @@ public class SamzaSqlRelMessage implements Serializable {
* @param fieldValues Ordered list of all the values in the row. Some of the fields can be null, This could be
* result of delete change capture event in the stream or because of the result of the outer join
* or the fields themselves are null in the original stream.
* @param metadata the message/event's metadata
*/
public SamzaSqlRelMessage(List<String> fieldNames, List<Object> fieldValues) {
public SamzaSqlRelMessage(List<String> fieldNames, List<Object> fieldValues, SamzaSqlRelMsgMetadata metadata) {
Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names and values are not of same length.");
Validate.notNull(metadata, "Message metadata is NULL");

int keyIndex = fieldNames.indexOf(KEY_NAME);
Object key = null;
Expand All @@ -64,8 +73,10 @@ public SamzaSqlRelMessage(List<String> fieldNames, List<Object> fieldValues) {
}
this.key = key;
this.samzaSqlRelRecord = new SamzaSqlRelRecord(fieldNames, fieldValues);
this.samzaSqlRelMsgMetadata = metadata;
}


/**
* Create the SamzaSqlRelMessage, Each rel message represents a row in the table.
* So it can contain a key and a list of fields in the row.
Expand All @@ -74,9 +85,11 @@ public SamzaSqlRelMessage(List<String> fieldNames, List<Object> fieldValues) {
* @param fieldValues Ordered list of all the values in the row. Some of the fields can be null, This could be result of
* delete change capture event in the stream or because of the result of the outer join or the fields
* themselves are null in the original stream.
* @param metadata the message/event's metadata
*/
public SamzaSqlRelMessage(Object key, List<String> fieldNames, List<Object> fieldValues) {
public SamzaSqlRelMessage(Object key, List<String> fieldNames, List<Object> fieldValues, SamzaSqlRelMsgMetadata metadata) {
Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names and values are not of same length.");
Validate.notNull(metadata, "Message metadata is NULL");

List<String> tmpFieldNames = new ArrayList<>();
List<Object> tmpFieldValues = new ArrayList<>();
Expand All @@ -89,21 +102,27 @@ public SamzaSqlRelMessage(Object key, List<String> fieldNames, List<Object> fiel
tmpFieldValues.addAll(fieldValues);

this.samzaSqlRelRecord = new SamzaSqlRelRecord(tmpFieldNames, tmpFieldValues);
this.samzaSqlRelMsgMetadata = metadata;
}

/**
* Creates the SamzaSqlRelMessage from {@link SamzaSqlRelRecord}.
* @param samzaSqlRelRecord represents the rel record.
* @param metadata the message/event's metadata
*/
public SamzaSqlRelMessage(@JsonProperty("samzaSqlRelRecord") SamzaSqlRelRecord samzaSqlRelRecord) {
this(samzaSqlRelRecord.getFieldNames(), samzaSqlRelRecord.getFieldValues());
public SamzaSqlRelMessage(@JsonProperty("samzaSqlRelRecord") SamzaSqlRelRecord samzaSqlRelRecord,
@JsonProperty("samzaSqlRelMsgMetadata") SamzaSqlRelMsgMetadata metadata) {
this(samzaSqlRelRecord.getFieldNames(), samzaSqlRelRecord.getFieldValues(), metadata);
}

@JsonProperty("samzaSqlRelRecord")
public SamzaSqlRelRecord getSamzaSqlRelRecord() {
return samzaSqlRelRecord;
}

@JsonProperty("samzaSqlRelMsgMetadata")
public SamzaSqlRelMsgMetadata getSamzaSqlRelMsgMetadata() { return samzaSqlRelMsgMetadata; }

public Object getKey() {
return key;
}
Expand All @@ -127,7 +146,7 @@ public boolean equals(Object obj) {

@Override
public String toString() {
return "RelMessage: {" + samzaSqlRelRecord + "}";
return "RelMessage: {" + samzaSqlRelRecord + " " + samzaSqlRelMsgMetadata + "}";
}

/**
Expand Down Expand Up @@ -172,4 +191,5 @@ public static List<String> getSamzaSqlCompositeKeyFieldNames(List<String> fieldN
}
return keyPartNames;
}

}
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.sql.data;

import java.io.Serializable;
import org.codehaus.jackson.annotate.JsonProperty;


/**
* Metadata of Samza Sql Rel Message. Contains metadata about the corresponding event or
* relational row of a table. Used as member of the {@link SamzaSqlRelMessage}.
*/
public class SamzaSqlRelMsgMetadata implements Serializable {
/**
* boolean to indicate whether this message comes from a new input message or not, in case of
* Project:flatten() is used, to be able to determine the number of original input messages
* default is true for the case when no flatten() is used
*/
public boolean isNewInputMessage = true;

/**
* The timestamp of when the events actually happened
* set by and copied from the event source
* TODO: copy eventTime through from source to RelMessage
*/
@JsonProperty("eventTime")
private String eventTime;

/**
* the timestamp of when Samza App received the event
* TODO: set arrivalTime during conversion from IME to SamzaMessage
*/
@JsonProperty("arrivalTime")
private String arrivalTime;

/**
* the timestamp when SamzaSQL query starts processing the event
* set by the SamzaSQL Scan operator
*/
@JsonProperty("scanTime")
private String scanTime;

public SamzaSqlRelMsgMetadata(@JsonProperty("eventTime") String eventTime, @JsonProperty("arrivalTime") String arrivalTime,
@JsonProperty("scanTime") String scanTime) {
this.eventTime = eventTime;
this.arrivalTime = arrivalTime;
this.scanTime = scanTime;
}

public SamzaSqlRelMsgMetadata(String eventTime, String arrivalTime, String scanTime, boolean isNewInputMessage) {
this(eventTime, arrivalTime, scanTime);
this.isNewInputMessage = isNewInputMessage;
}

@JsonProperty("eventTime")
public String getEventTime() { return eventTime;}

public void setEventTime(String eventTime) {
this.eventTime = eventTime;
}

public boolean hasEventTime() { return eventTime != null && !eventTime.isEmpty(); }

@JsonProperty("arrivalTime")
public String getarrivalTime() { return arrivalTime;}

public void setArrivalTime(String arrivalTime) {
this.arrivalTime = arrivalTime;
}

public boolean hasArrivalTime() { return arrivalTime != null && !arrivalTime.isEmpty(); }


@JsonProperty("scanTime")
public String getscanTime() { return scanTime;}

public void setScanTime(String scanTime) {
this.scanTime = scanTime;
}

public boolean hasScanTime() { return scanTime != null && !scanTime.isEmpty(); }

@Override
public String toString() {
return "[Metadata:{" + eventTime + " " + arrivalTime + " " + scanTime + "}]";
}

}
Expand Up @@ -19,10 +19,17 @@

package org.apache.samza.sql.translator;

import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.Context;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.SamzaHistogram;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.sql.data.Expression;
Expand All @@ -45,49 +52,87 @@ class FilterTranslator {
this.queryId = queryId;
}

/**
* FilterTranslatorFunction to process input events, apply the filter and produce output
* events accordingly
*/
private static class FilterTranslatorFunction implements FilterFunction<SamzaSqlRelMessage> {
private transient Expression expr;
private transient TranslatorContext context;
private transient TranslatorContext translatorContext;
private transient LogicalFilter filter;
private final int queryId;
private transient MetricsRegistry metricsRegistry;
private transient SamzaHistogram processingTime; // milli-seconds
private transient Counter inputEvents;
private transient Counter filteredOutEvents;
private transient Counter outputEvents;

private final int queryId;
private final int filterId;
private final String logicalOpId;

FilterTranslatorFunction(int filterId, int queryId) {
FilterTranslatorFunction(int filterId, int queryId, String logicalOpId) {
this.filterId = filterId;
this.queryId = queryId;
this.logicalOpId = logicalOpId;
}

@Override
public void init(Context context) {
this.context = ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
this.filter = (LogicalFilter) this.context.getRelNode(filterId);
this.expr = this.context.getExpressionCompiler().compile(filter.getInputs(), Collections.singletonList(filter.getCondition()));
this.translatorContext = ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
this.filter = (LogicalFilter) this.translatorContext.getRelNode(filterId);
this.expr = this.translatorContext.getExpressionCompiler().compile(filter.getInputs(), Collections.singletonList(filter.getCondition()));
ContainerContext containerContext = context.getContainerContext();
metricsRegistry = containerContext.getContainerMetricsRegistry();
processingTime = new SamzaHistogram(metricsRegistry, logicalOpId, TranslatorConstants.PROCESSING_TIME_NAME);
inputEvents = metricsRegistry.newCounter(logicalOpId, TranslatorConstants.INPUT_EVENTS_NAME);
inputEvents.clear();
filteredOutEvents = metricsRegistry.newCounter(logicalOpId, TranslatorConstants.FILTERED_EVENTS_NAME);
filteredOutEvents.clear();
outputEvents = metricsRegistry.newCounter(logicalOpId, TranslatorConstants.OUTPUT_EVENTS_NAME);
outputEvents.clear();
}

@Override
public boolean apply(SamzaSqlRelMessage message) {
Instant startProcessing = Instant.now();
Object[] result = new Object[1];
expr.execute(context.getExecutionContext(), context.getDataContext(),
expr.execute(translatorContext.getExecutionContext(), translatorContext.getDataContext(),
message.getSamzaSqlRelRecord().getFieldValues().toArray(), result);
if (result.length > 0 && result[0] instanceof Boolean) {
boolean retVal = (Boolean) result[0];
log.debug(
String.format("return value for input %s is %s",
Arrays.asList(message.getSamzaSqlRelRecord().getFieldValues()).toString(), retVal));
updateMetrics(startProcessing, retVal, Instant.now());
return retVal;
} else {
log.error("return value is not boolean");
return false;
}
}

/**
* Updates the MetricsRegistery of this operator
* @param startProcessing = begin processing of the message
* @param endProcessing = end of processing
*/
private void updateMetrics(Instant startProcessing, boolean isOutput, Instant endProcessing) {
inputEvents.inc();
if (isOutput) {
outputEvents.inc();
} else {
filteredOutEvents.inc();
}
processingTime.update(Duration.between(startProcessing, endProcessing).toMillis());
}

}

void translate(final LogicalFilter filter, final TranslatorContext context) {
void translate(final LogicalFilter filter, final String logicalOpId, final TranslatorContext context) {
MessageStream<SamzaSqlRelMessage> inputStream = context.getMessageStream(filter.getInput().getId());
final int filterId = filter.getId();

MessageStream<SamzaSqlRelMessage> outputStream = inputStream.filter(new FilterTranslatorFunction(filterId, queryId));
MessageStream<SamzaSqlRelMessage> outputStream = inputStream.filter(new FilterTranslatorFunction(filterId, queryId, logicalOpId));

context.registerMessageStream(filterId, outputStream);
context.registerRelNode(filterId, filter);
Expand Down
Expand Up @@ -84,9 +84,9 @@ class JoinTranslator {
this.queryId = queryId;
}

void translate(final LogicalJoin join, final TranslatorContext context) {
JoinInputNode.InputType inputTypeOnLeft = getInputType(join.getLeft(), context);
JoinInputNode.InputType inputTypeOnRight = getInputType(join.getRight(), context);
void translate(final LogicalJoin join, final TranslatorContext translatorContext) {
JoinInputNode.InputType inputTypeOnLeft = getInputType(join.getLeft(), translatorContext);
JoinInputNode.InputType inputTypeOnRight = getInputType(join.getRight(), translatorContext);

// Do the validation of join query
validateJoinQuery(join, inputTypeOnLeft, inputTypeOnRight);
Expand All @@ -109,13 +109,13 @@ void translate(final LogicalJoin join, final TranslatorContext context) {
JoinInputNode tableNode = new JoinInputNode(isTablePosOnRight ? join.getRight() : join.getLeft(), tableKeyIds,
isTablePosOnRight ? inputTypeOnRight : inputTypeOnLeft, isTablePosOnRight);

MessageStream<SamzaSqlRelMessage> inputStream = context.getMessageStream(streamNode.getRelNode().getId());
Table table = getTable(tableNode, context);
MessageStream<SamzaSqlRelMessage> inputStream = translatorContext.getMessageStream(streamNode.getRelNode().getId());
Table table = getTable(tableNode, translatorContext);

MessageStream<SamzaSqlRelMessage> outputStream =
joinStreamWithTable(inputStream, table, streamNode, tableNode, join, context);
joinStreamWithTable(inputStream, table, streamNode, tableNode, join, translatorContext);

context.registerMessageStream(join.getId(), outputStream);
translatorContext.registerMessageStream(join.getId(), outputStream);
}

private MessageStream<SamzaSqlRelMessage> joinStreamWithTable(MessageStream<SamzaSqlRelMessage> inputStream,
Expand Down

0 comments on commit bd9387b

Please sign in to comment.