Skip to content

Commit

Permalink
[#2273] Prepare base classes for implementing Kafka based command router
Browse files Browse the repository at this point in the history
Signed-off-by: Kartheeswaran Kalidass <kartheeswaran.kalidass@bosch.io>
  • Loading branch information
kaniyan committed Jan 27, 2021
1 parent 7561be5 commit 18ba130
Show file tree
Hide file tree
Showing 4 changed files with 383 additions and 197 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
* Copyright (c) 2020, 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand All @@ -13,66 +13,35 @@

package org.eclipse.hono.adapter.client.telemetry.kafka;

import java.net.HttpURLConnection;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.tracing.KafkaTracingHelper;
import org.eclipse.hono.client.kafka.producer.AbstractKafkaBasedMessageSender;
import org.eclipse.hono.config.ProtocolAdapterProperties;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Lifecycle;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.QoS;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.TenantObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.opentracing.References;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.EncodeException;
import io.vertx.core.json.Json;
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.RecordMetadata;
import io.vertx.kafka.client.producer.impl.KafkaHeaderImpl;

/**
* A client for publishing messages to a Kafka cluster.
* A client for publishing downstream messages to a Kafka cluster.
*/
public abstract class AbstractKafkaBasedDownstreamSender implements Lifecycle {
public abstract class AbstractKafkaBasedDownstreamSender extends AbstractKafkaBasedMessageSender {

/**
* A logger to be shared with subclasses.
*/
protected final Logger log = LoggerFactory.getLogger(getClass());

private final KafkaProducerFactory<String, Buffer> producerFactory;
private final String producerName;
private final Map<String, String> config;
private final ProtocolAdapterProperties adapterConfig;
private final Tracer tracer;

/**
* Creates a new Kafka-based telemetry sender.
* Creates a new Kafka-based downstream sender.
*
* @param producerFactory The factory to use for creating Kafka producers.
* @param producerName The producer name to use.
Expand All @@ -85,18 +54,9 @@ public abstract class AbstractKafkaBasedDownstreamSender implements Lifecycle {
public AbstractKafkaBasedDownstreamSender(final KafkaProducerFactory<String, Buffer> producerFactory,
final String producerName, final Map<String, String> config, final ProtocolAdapterProperties adapterConfig,
final Tracer tracer) {
super(producerFactory, producerName, config, tracer);

Objects.requireNonNull(producerFactory);
Objects.requireNonNull(producerName);
Objects.requireNonNull(config);
Objects.requireNonNull(adapterConfig);
Objects.requireNonNull(tracer);

this.producerFactory = producerFactory;
this.producerName = producerName;
this.config = config;
this.adapterConfig = adapterConfig;
this.tracer = tracer;
this.adapterConfig = Objects.requireNonNull(adapterConfig);
}

/**
Expand Down Expand Up @@ -142,62 +102,16 @@ protected Future<Void> send(final HonoTopic topic, final TenantObject tenant, fi

final String tenantId = tenant.getTenantId();
final String deviceId = device.getDeviceId();

log.trace("sending to Kafka [topic: {}, tenantId: {}, deviceId: {}, qos: {}, contentType: {}, properties: {}]",
topic, tenantId, deviceId, qos, contentType, properties);
final Span span = startSpan(topic, tenantId, deviceId, qos, contentType, context);

final KafkaProducerRecord<String, Buffer> record = KafkaProducerRecord.create(topic.toString(), deviceId,
payload);

final Map<String, Object> propsWithDefaults = addDefaults(tenant, device, qos, contentType, properties);
record.addHeaders(encodePropertiesAsKafkaHeaders(propsWithDefaults, span));

KafkaTracingHelper.injectSpanContext(tracer, record, span.context());
logProducerRecord(span, record);

final Promise<RecordMetadata> promise = Promise.promise();
getOrCreateProducer().send(record, promise);

final Future<Void> producerFuture = promise.future()
.recover(t -> {
logError(span, topic, tenantId, deviceId, qos, t);
span.finish();
return Future.failedFuture(new ServerErrorException(getErrorCode(t), t));
})
.map(recordMetadata -> {
logRecordMetadata(span, deviceId, recordMetadata);
span.finish();
return null;
});

return qos.equals(QoS.AT_MOST_ONCE) ? Future.succeededFuture() : producerFuture;
}

/**
* {@inheritDoc}
* <p>
* Starts the producer.
*/
@Override
public Future<Void> start() {
configureUniqueClientId();
getOrCreateProducer();
return Future.succeededFuture();
}

/**
* {@inheritDoc}
* <p>
* Closes the producer.
*/
@Override
public Future<Void> stop() {
return producerFactory.closeProducer(producerName);
}

private KafkaProducer<String, Buffer> getOrCreateProducer() {
return producerFactory.getOrCreateProducer(producerName, config);
if (QoS.AT_LEAST_ONCE.equals(qos)) {
return sendAndWaitForOutcome(topic.toString(), tenantId, deviceId, payload, propsWithDefaults, context);
} else {
send(topic.toString(), tenantId, deviceId, payload, propsWithDefaults, context);
return Future.succeededFuture();
}
}

private Map<String, Object> addDefaults(final TenantObject tenant, final RegistrationAssertion device,
Expand Down Expand Up @@ -234,96 +148,4 @@ private Map<String, Object> addDefaults(final TenantObject tenant, final Registr

return headerProperties;
}

private List<KafkaHeader> encodePropertiesAsKafkaHeaders(final Map<String, Object> properties, final Span span) {
final List<KafkaHeader> headers = new ArrayList<>();
properties.forEach((k, v) -> {
try {
final Buffer headerValue = (v instanceof String)
? Buffer.buffer((String) v)
: Buffer.buffer(Json.encode(v));

headers.add(new KafkaHeaderImpl(k, headerValue));
} catch (final EncodeException e) {
log.info("failed to serialize property with key [{}] to Kafka header", k);
span.log("failed to create Kafka header from property: " + k);
}
});

return headers;
}

private Span startSpan(final HonoTopic topic, final String tenantId, final String deviceId, final QoS qos,
final String contentType, final SpanContext context) {

final String referenceType = QoS.AT_MOST_ONCE.equals(qos) ? References.FOLLOWS_FROM : References.CHILD_OF;
return KafkaTracingHelper.newProducerSpan(tracer, topic, referenceType, context)
.setTag(TracingHelper.TAG_TENANT_ID.getKey(), tenantId)
.setTag(TracingHelper.TAG_DEVICE_ID.getKey(), deviceId)
.setTag(TracingHelper.TAG_QOS.getKey(), qos.name())
.setTag(MessageHelper.SYS_PROPERTY_CONTENT_TYPE, contentType);
}

private void logProducerRecord(final Span span, final KafkaProducerRecord<String, Buffer> record) {
final String headersAsString = record.headers()
.stream()
.map(header -> header.key() + "=" + header.value())
.collect(Collectors.joining(",", "{", "}"));

log.trace("producing message [topic: {}, key: {}, partition: {}, timestamp: {}, headers: {}]",
record.topic(), record.key(), record.partition(), record.timestamp(), headersAsString);

span.log("producing message with headers: " + headersAsString);
}

private void logRecordMetadata(final Span span, final String recordKey, final RecordMetadata metadata) {

log.trace("message produced to Kafka [topic: {}, key: {}, partition: {}, offset: {}, timestamp: {}]",
metadata.getTopic(), recordKey, metadata.getPartition(), metadata.getOffset(), metadata.getTimestamp());

span.log("message produced to Kafka");
KafkaTracingHelper.setRecordMetadataTags(span, metadata);
Tags.HTTP_STATUS.set(span, HttpURLConnection.HTTP_ACCEPTED);

}

private void logError(final Span span, final HonoTopic topic, final String tenantId, final String deviceId,
final QoS qos, final Throwable cause) {
log.debug("sending message failed [topic: {}, key: {}, qos: {}, tenantId: {}, deviceId: {}]",
topic, deviceId, qos, tenantId, deviceId, cause);

Tags.HTTP_STATUS.set(span, getErrorCode(cause));
TracingHelper.logError(span, cause);
}

private int getErrorCode(final Throwable t) {
/*
* TODO set error code depending on exception?
*
* Possible thrown exceptions include:
*
* Non-Retriable exceptions (fatal, the message will never be sent):
*
* InvalidTopicException OffsetMetadataTooLargeException RecordBatchTooLargeException RecordTooLargeException
* UnknownServerException UnknownProducerIdException
*
* Retriable exceptions (transient, may be covered by increasing #.retries):
*
* CorruptRecordException InvalidMetadataException NotEnoughReplicasAfterAppendException
* NotEnoughReplicasException OffsetOutOfRangeException TimeoutException UnknownTopicOrPartitionException
*/

return HttpURLConnection.HTTP_UNAVAILABLE;
}

private void configureUniqueClientId() {
final UUID uuid = UUID.randomUUID();

final String clientId = config.get(ProducerConfig.CLIENT_ID_CONFIG);
if (clientId == null) {
config.put(ProducerConfig.CLIENT_ID_CONFIG, String.format("%s-%s", producerName, uuid));
} else {
config.put(ProducerConfig.CLIENT_ID_CONFIG, String.format("%s-%s-%s", clientId, producerName, uuid));
}
}
}
34 changes: 34 additions & 0 deletions clients/kafka-common/pom.xml
Expand Up @@ -63,6 +63,40 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.eclipse.hono</groupId>
<artifactId>hono-client</artifactId>
<exclusions>
<exclusion>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
</exclusion>
<exclusion>
<groupId>io.vertx</groupId>
<artifactId>vertx-proton</artifactId>
</exclusion>
<exclusion>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-jackson</artifactId>
</exclusion>
<exclusion>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
<exclusion>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-impl</artifactId>
</exclusion>
<exclusion>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-crypto</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-kafka-client</artifactId>
Expand Down

0 comments on commit 18ba130

Please sign in to comment.