Skip to content

Commit

Permalink
Add kinesis-sink user metrics to sinkContext (#2169)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Jul 16, 2018
1 parent 5f779b4 commit ba1ea66
Showing 1 changed file with 28 additions and 6 deletions.
Expand Up @@ -88,21 +88,32 @@ public class KinesisSink implements Sink<byte[]> {
private String streamName;
private static final String defaultPartitionedKey = "default";
private static final int maxPartitionedKeyLength = 256;
private SinkContext sinkContext;

public static final String ACCESS_KEY_NAME = "accessKey";
public static final String SECRET_KEY_NAME = "secretKey";

public static final String METRICS_TOTAL_INCOMING = "_kinesis_total_incoming_";
public static final String METRICS_TOTAL_INCOMING_BYTES = "_kinesis_total_incoming_bytes_";
public static final String METRICS_TOTAL_SUCCESS = "_kinesis_total_success_";
public static final String METRICS_TOTAL_FAILURE = "_kinesis_total_failure_";


@Override
public void write(Record<byte[]> record) throws Exception {
String partitionedKey = record.getKey().orElse(defaultPartitionedKey);
partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
: partitionedKey; // partitionedKey Length must be at least one, and at most 256
ByteBuffer data = createKinesisMessage(kinesisSinkConfig.getMessageFormat(), record);
ListenableFuture<UserRecordResult> addRecordResult = kinesisProducer.addUserRecord(this.streamName,
partitionedKey,
createKinesisMessage(kinesisSinkConfig.getMessageFormat(), record));
partitionedKey, data);
addCallback(addRecordResult,
ProducerSendCallback.create(this.streamName, record, System.nanoTime()), directExecutor());
ProducerSendCallback.create(this.streamName, record, System.nanoTime(), sinkContext), directExecutor());
if (sinkContext != null) {
sinkContext.recordMetric(METRICS_TOTAL_INCOMING, 1);
sinkContext.recordMetric(METRICS_TOTAL_INCOMING_BYTES, data.array().length);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Published message to kinesis stream {} with size {}", streamName, record.getValue().length);
}
Expand All @@ -120,6 +131,7 @@ public void close() throws IOException {
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
kinesisSinkConfig = KinesisSinkConfig.load(config);
this.sinkContext = sinkContext;

checkArgument(isNotBlank(kinesisSinkConfig.getAwsKinesisStreamName()), "empty kinesis-stream name");
checkArgument(isNotBlank(kinesisSinkConfig.getAwsEndpoint()), "empty aws-end-point");
Expand Down Expand Up @@ -158,23 +170,27 @@ private static final class ProducerSendCallback implements FutureCallback<UserRe
private String streamName;
private long startTime = 0;
private final Handle<ProducerSendCallback> recyclerHandle;
private SinkContext sinkContext;

private ProducerSendCallback(Handle<ProducerSendCallback> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

static ProducerSendCallback create(String streamName, Record<byte[]> resultContext, long startTime) {
static ProducerSendCallback create(String streamName, Record<byte[]> resultContext, long startTime,
SinkContext sinkContext) {
ProducerSendCallback sendCallback = RECYCLER.get();
sendCallback.resultContext = resultContext;
sendCallback.streamName = streamName;
sendCallback.startTime = startTime;
sendCallback.sinkContext = sinkContext;
return sendCallback;
}

private void recycle() {
resultContext = null;
streamName = null;
startTime = 0;
sinkContext = null;
recyclerHandle.recycle(this);
}

Expand All @@ -188,10 +204,13 @@ protected ProducerSendCallback newObject(Handle<ProducerSendCallback> handle) {
@Override
public void onSuccess(UserRecordResult result) {
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully published message for replicator of {}-{} with latency", this.streamName,
result.getShardId(), TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime)));
LOG.debug("Successfully published message for {}-{} with latency", this.streamName, result.getShardId(),
TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime)));
}
this.resultContext.ack();
if (sinkContext != null) {
sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, 1);
}
recycle();
}

Expand All @@ -200,6 +219,9 @@ public void onFailure(Throwable exception) {
LOG.error("[{}] Failed to published message for replicator of {}-{} ", streamName,
resultContext.getPartitionId(), resultContext.getRecordSequence());
this.resultContext.fail();
if (sinkContext != null) {
sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1);
}
recycle();
}
}
Expand Down

0 comments on commit ba1ea66

Please sign in to comment.