Skip to content

Commit

Permalink
Improve SSL Expiry Metrics and Switch to Round-Robin Partition Assign…
Browse files Browse the repository at this point in the history
…er (#144)

* Change default partition strategy from Range based to Round-Robin

Signed-off-by: “Nithin <nithin.pankaj@walmartlabs.com>

* Improved metric details for SSL certs expiry

Signed-off-by: “Nithin <nithin.pankaj@walmartlabs.com>

* Improvement in Listerner error metrics Aspect

Signed-off-by: “Nithin <nithin.pankaj@walmartlabs.com>

---------

Signed-off-by: “Nithin <nithin.pankaj@walmartlabs.com>
Co-authored-by: “Nithin <nithin.pankaj@walmartlabs.com>
  • Loading branch information
nithin-pankaj and “Nithin committed Apr 8, 2024
1 parent 4d71c80 commit 1517381
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 3 deletions.
11 changes: 11 additions & 0 deletions src/main/java/hlf/java/rest/client/config/BaseKafkaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.Date;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.BooleanUtils;
Expand Down Expand Up @@ -57,18 +60,24 @@ protected void configureSSLProperties(
SSLAuthFilesHelper.getExpiryTimestampForKeyStore(
sslProperties.getSslTruststoreLocation(), sslProperties.getSslTruststorePassword());

DateFormat formatter = new SimpleDateFormat("dd/MM/yyyy");

String guagePrefix =
getConfigType().equals(ConfigType.CONSUMER) ? "consumer." : "producer.";

Gauge.builder(
guagePrefix + topicName + ".keystore.expiryTs",
keyStoreCertExpiryTimestamp::getTime)
.tag("topic-name", topicName)
.tag("expiry-date", formatter.format(new Date(keyStoreCertExpiryTimestamp.getTime())))
.strongReference(true)
.register(sslMetricsRegistry);

Gauge.builder(
guagePrefix + topicName + ".truststore.expiryTs",
trustStoreCertExpiryTimestamp::getTime)
.tag("topic-name", topicName)
.tag("expiry-date", formatter.format(new Date(trustStoreCertExpiryTimestamp.getTime())))
.strongReference(true)
.register(sslMetricsRegistry);

Expand All @@ -81,13 +90,15 @@ protected void configureSSLProperties(
guagePrefix + topicName + ".keystore.hasExpired",
hasKeyStoreCertExpired,
BooleanUtils::toInteger)
.tag("topic-name", topicName)
.strongReference(true)
.register(sslMetricsRegistry);

Gauge.builder(
guagePrefix + topicName + ".truststore.hasExpired",
hasTrustStoreCertExpired,
BooleanUtils::toInteger)
.tag("topic-name", topicName)
.strongReference(true)
.register(sslMetricsRegistry);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public DefaultKafkaConsumerFactory<String, String> consumerFactory(
props.put(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, FabricClientConstants.KAFKA_INTG_MAX_POLL_RECORDS);

// Distribute available partitions evenly across all consumers (or consumer threads)
props.put(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
FabricClientConstants.ROUND_ROBIN_CONSUMER_PARTITION_ASSIGNEMENT_STRATEGY);

// Azure event-hub config
configureSaslProperties(props, kafkaConsumerProperties.getSaslJaasConfig());

Expand Down
17 changes: 14 additions & 3 deletions src/main/java/hlf/java/rest/client/config/SSLAuthFilesHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;

@Slf4j
@UtilityClass
Expand Down Expand Up @@ -57,18 +61,25 @@ Timestamp getExpiryTimestampForKeyStore(String keyStorePath, String keyStorePass

KeyStore keyStore = loadKeyStore(keyStorePath, keyStorePassword);

List<Timestamp> certExpiryTimestamps = new ArrayList<>();

Enumeration<String> aliases = keyStore.aliases();
while (aliases.hasMoreElements()) {
String alias = aliases.nextElement();
Certificate cert = keyStore.getCertificate(alias);
if (cert instanceof X509Certificate) {
X509Certificate x509Cert = (X509Certificate) cert;
return new Timestamp(x509Cert.getNotAfter().getTime());
certExpiryTimestamps.add(new Timestamp(x509Cert.getNotAfter().getTime()));
}
}

throw new CertificateException(
"Couldn't extract an instance of X509Certificate for fetching expiry details");
if (CollectionUtils.isEmpty(certExpiryTimestamps)) {
throw new CertificateException(
"Couldn't extract an instance of X509Certificate for fetching expiry details");
}

// Return the earliest (minimum) timestamp from the list
return Collections.min(certExpiryTimestamps);
}

private static KeyStore loadKeyStore(String path, String password)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ public Object interceptedKafkaMetricsEmissionAdvice(ProceedingJoinPoint proceedi

if (e instanceof UnrecognizedTransactionPayloadException) {
invalidInboundTransactionMessageCounter.increment();
throw e;
}

if (e instanceof ContractException) {
inboundTxnContractExceptionCounter.increment();
throw e;
}

inboundTxnProcessingFailureCounter.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,6 @@ public class FabricClientConstants {
public static final String JSON_PATH_CERTIFICATE = "$..certificate";

public static final String VALUE_TAG_CAPABILITIES = "Capabilities";
public static final String ROUND_ROBIN_CONSUMER_PARTITION_ASSIGNEMENT_STRATEGY =
"org.apache.kafka.clients.consumer.RoundRobinAssignor";
}

0 comments on commit 1517381

Please sign in to comment.