Skip to content

Commit

Permalink
Allow to configure the consumer group ID for a kafka connection
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Jun 25, 2021
1 parent 3a6dec9 commit 85f0fd4
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import java.text.MessageFormat;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionConfigurationInvalidException;

/**
* Allows to configure a consumer group ID via the specific config of a connection.
*/
final class KafkaConsumerGroupSpecificConfig implements KafkaSpecificConfig {

private static final String GROUP_ID_ALLOWED_CHARACTERS = "[a-zA-Z0-9\\._\\-]";
private static final Pattern GROUP_ID_VALIDATION_PATTERN = Pattern.compile(GROUP_ID_ALLOWED_CHARACTERS + "+");
private static final String GROUP_ID_SPECIFIC_CONFIG_KEY = "groupId";

private static KafkaConsumerGroupSpecificConfig instance;

private KafkaConsumerGroupSpecificConfig() {
}

static KafkaConsumerGroupSpecificConfig getInstance() {
if (instance == null) {
instance = new KafkaConsumerGroupSpecificConfig();
}
return instance;
}

@Override
public boolean isApplicable(final Connection connection) {
return !connection.getSources().isEmpty();
}

@Override
public void validateOrThrow(final Connection connection, final DittoHeaders dittoHeaders) {
if (!isValid(connection)) {
final String groupId = getGroupId(connection).orElse("");
final String message = MessageFormat.format(
"The connection configuration contains an invalid value for the consumer group ID. " +
"Allowed Characters are: <{1}>", groupId, GROUP_ID_ALLOWED_CHARACTERS);
throw ConnectionConfigurationInvalidException.newBuilder(message)
.dittoHeaders(dittoHeaders)
.build();
}
}

@Override
public boolean isValid(final Connection connection) {
final Optional<String> optionalGroupId = getGroupId(connection);
if (optionalGroupId.isPresent()) {
final String groupId = optionalGroupId.get();
return GROUP_ID_VALIDATION_PATTERN.matcher(groupId).matches();
}
return true;
}

@Override
public Map<String, String> apply(final Connection connection) {
return getGroupId(connection)
.map(groupId -> Map.of(ConsumerConfig.GROUP_ID_CONFIG, groupId))
.orElseGet(Map::of);
}

private Optional<String> getGroupId(final Connection connection) {
return Optional.ofNullable(connection.getSpecificConfig().get(GROUP_ID_SPECIFIC_CONFIG_KEY));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand All @@ -36,9 +37,19 @@
*/
final class PropertiesFactory {

private static final Collection<KafkaSpecificConfig> SPECIFIC_CONFIGS =
private static final Collection<KafkaSpecificConfig> COMMON_SPECIFIC_CONFIGS =
List.of(KafkaAuthenticationSpecificConfig.getInstance(), KafkaBootstrapServerSpecificConfig.getInstance());

private static final Collection<KafkaSpecificConfig> CONSUMER_SPECIFIC_CONFIGS;
private static final Collection<KafkaSpecificConfig> PRODUCER_SPECIFIC_CONFIGS;

static {
final List<KafkaSpecificConfig> consumerSpecificConfigs = new ArrayList<>(COMMON_SPECIFIC_CONFIGS);
consumerSpecificConfigs.add(KafkaConsumerGroupSpecificConfig.getInstance());
CONSUMER_SPECIFIC_CONFIGS = List.copyOf(consumerSpecificConfigs);
PRODUCER_SPECIFIC_CONFIGS = List.copyOf(COMMON_SPECIFIC_CONFIGS);
}

private final Connection connection;
private final KafkaConfig config;
private final String clientId;
Expand Down Expand Up @@ -76,7 +87,7 @@ ConsumerSettings<String, String> getConsumerSettings(final boolean dryRun) {
.withBootstrapServers(bootstrapServers)
.withGroupId(connection.getId().toString())
.withClientId(clientId + "-consumer")
.withProperties(getSpecificConfigProperties())
.withProperties(getConsumerSpecificConfigProperties())
.withProperties(getSecurityProtocolProperties());

// disable auto commit in dry run mode
Expand All @@ -89,17 +100,25 @@ ProducerSettings<String, String> getProducerSettings() {
return ProducerSettings.apply(alpakkaConfig, new StringSerializer(), new StringSerializer())
.withBootstrapServers(bootstrapServers)
.withProperties(getClientIdProperties())
.withProperties(getSpecificConfigProperties())
.withProperties(getProducerSpecificConfigProperties())
.withProperties(getSecurityProtocolProperties());
}

private Map<String, String> getClientIdProperties() {
return Map.of(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
}

private Map<String, String> getSpecificConfigProperties() {
private Map<String, String> getConsumerSpecificConfigProperties() {
final Map<String, String> properties = new HashMap<>();
for (final KafkaSpecificConfig specificConfig : CONSUMER_SPECIFIC_CONFIGS) {
properties.putAll(specificConfig.apply(connection));
}
return properties;
}

private Map<String, String> getProducerSpecificConfigProperties() {
final Map<String, String> properties = new HashMap<>();
for (final KafkaSpecificConfig specificConfig : SPECIFIC_CONFIGS) {
for (final KafkaSpecificConfig specificConfig : PRODUCER_SPECIFIC_CONFIGS) {
properties.putAll(specificConfig.apply(connection));
}
return properties;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionConfigurationInvalidException;
import org.eclipse.ditto.connectivity.model.Source;
import org.junit.Test;

public final class KafkaConsumerGroupSpecificConfigTest {

private final KafkaConsumerGroupSpecificConfig underTest = KafkaConsumerGroupSpecificConfig.getInstance();

@Test
public void isNotApplicableToConnectionWithoutSources() {
final Connection connection = mock(Connection.class);
when(connection.getSources()).thenReturn(List.of());
assertThat(underTest.isApplicable(connection)).isFalse();
}

@Test
public void isApplicableToConnectionWithSources() {
final Connection connection = mock(Connection.class);
final Source source = mock(Source.class);
when(connection.getSources()).thenReturn(List.of(source));
assertThat(underTest.isApplicable(connection)).isTrue();
}

@Test
public void invalidCharactersCauseConnectionConfigurationInvalidException() {
final Map<String, String> specificConfig = Map.of("groupId", "invalidCharacterÜ");
final Connection connection = mock(Connection.class);
when(connection.getSpecificConfig()).thenReturn(specificConfig);
final DittoHeaders dittoHeaders = DittoHeaders.empty();
assertThatCode(() -> underTest.validateOrThrow(connection, dittoHeaders))
.isExactlyInstanceOf(ConnectionConfigurationInvalidException.class);
}

@Test
public void validCharactersCauseNoException() {
final Map<String, String> specificConfig = Map.of("groupId", "only-Valid-Characters-1234");
final Connection connection = mock(Connection.class);
when(connection.getSpecificConfig()).thenReturn(specificConfig);
final DittoHeaders dittoHeaders = DittoHeaders.empty();
assertThatCode(() -> underTest.validateOrThrow(connection, dittoHeaders))
.doesNotThrowAnyException();
}

@Test
public void invalidCharactersAreInvalid() {
final Map<String, String> specificConfig = Map.of("groupId", "invalidCharacterÜ");
final Connection connection = mock(Connection.class);
when(connection.getSpecificConfig()).thenReturn(specificConfig);
assertThat(underTest.isValid(connection)).isFalse();
}

@Test
public void validCharactersAreValid() {
final Map<String, String> specificConfig = Map.of("groupId", "only-Valid-Characters-1234");
final Connection connection = mock(Connection.class);
when(connection.getSpecificConfig()).thenReturn(specificConfig);
assertThat(underTest.isValid(connection)).isTrue();
}

@Test
public void emptyGroupIdIsInvalid() {
final Map<String, String> specificConfig = Map.of("groupId", "");
final Connection connection = mock(Connection.class);
when(connection.getSpecificConfig()).thenReturn(specificConfig);
assertThat(underTest.isValid(connection)).isFalse();
}

@Test
public void applyReturnsGroupIdForConsumerConfigKey() {
final Map<String, String> specificConfig = Map.of("groupId", "only-Valid-Characters-1234");
final Connection connection = mock(Connection.class);
when(connection.getSpecificConfig()).thenReturn(specificConfig);

final Map<String, String> expectedConfig = Map.of(ConsumerConfig.GROUP_ID_CONFIG, "only-Valid-Characters-1234");
assertThat(underTest.apply(connection)).isEqualTo(expectedConfig);
}

}

0 comments on commit 85f0fd4

Please sign in to comment.