Skip to content

Commit

Permalink
MINOR: Improve usage of LogCaptureAppender (#8508)
Browse files Browse the repository at this point in the history
Reviewers: Ismael Juma <ismael@confluent.io>, John Roesler <john@confluent.io>
  • Loading branch information
mjsax committed Apr 21, 2020
1 parent e406d9d commit 11d8ef7
Show file tree
Hide file tree
Showing 42 changed files with 999 additions and 711 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;

class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamKTableJoinProcessor.class);
private static final Logger LOG = LoggerFactory.getLogger(KStreamKTableJoin.class);

private final KTableValueGetter<K2, V2> valueGetter;
private final KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,9 +801,9 @@ public void remove(final Collection<TopicPartition> revokedChangelogs) {
}
changelogMetadata.clear();
} else {
log.debug("Changelog partition {} could not be found, " +
"it could be already cleaned up during the handling" +
"of task corruption and never restore again", partition);
log.debug("Changelog partition {} could not be found," +
" it could be already cleaned up during the handling" +
" of task corruption and never restore again", partition);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,15 +800,15 @@ public void setConfig(final String storeName,
public void shouldWarnAboutRocksDBConfigSetterIsNotGuaranteedToBeBackwardsCompatible() {
props.setProperty(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, TestRocksDbConfigSetter.class.getName());

final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
LogCaptureAppender.unregister(appender);

assertThat(appender.getMessages(), hasItem("stream-client [" + CLIENT_ID + "] "
+ "RocksDB's version will be bumped to version 6+ via KAFKA-8897 in a future release. "
+ "If you use `org.rocksdb.CompactionOptionsFIFO#setTtl(long)` or `#ttl()` you will need to rewrite "
+ "your code after KAFKA-8897 is resolved and set TTL via `org.rocksdb.Options` "
+ "(or `org.rocksdb.ColumnFamilyOptions`)."));
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);

assertThat(appender.getMessages(), hasItem("stream-client [" + CLIENT_ID + "] "
+ "RocksDB's version will be bumped to version 6+ via KAFKA-8897 in a future release. "
+ "If you use `org.rocksdb.CompactionOptionsFIFO#setTtl(long)` or `#ttl()` you will need to rewrite "
+ "your code after KAFKA-8897 is resolved and set TTL via `org.rocksdb.Options` "
+ "(or `org.rocksdb.ColumnFamilyOptions`)."));
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ public void testGetGroupInstanceIdConfigs() {
final StreamsConfig streamsConfig = new StreamsConfig(props);

Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
assertThat(returnedProps.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG), equalTo("group-instance-id-1-" + threadIdx));
assertThat(
returnedProps.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG),
equalTo("group-instance-id-1-" + threadIdx)
);

returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId);
assertNull(returnedProps.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG));
Expand All @@ -170,7 +173,10 @@ public void consumerConfigMustContainStreamPartitionAssignorConfig() {

assertEquals(42, returnedProps.get(StreamsConfig.REPLICATION_FACTOR_CONFIG));
assertEquals(1, returnedProps.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
assertEquals(StreamsPartitionAssignor.class.getName(), returnedProps.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
assertEquals(
StreamsPartitionAssignor.class.getName(),
returnedProps.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)
);
assertEquals(7L, returnedProps.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
assertEquals("dummy:host", returnedProps.get(StreamsConfig.APPLICATION_SERVER_CONFIG));
assertNull(returnedProps.get(StreamsConfig.RETRIES_CONFIG));
Expand Down Expand Up @@ -220,12 +226,18 @@ public void defaultSerdeShouldBeConfigured() {
final String topic = "my topic";

serializer.configure(serializerConfigs, true);
assertEquals("Should get the original string after serialization and deserialization with the configured encoding",
str, streamsConfig.defaultKeySerde().deserializer().deserialize(topic, serializer.serialize(topic, str)));
assertEquals(
"Should get the original string after serialization and deserialization with the configured encoding",
str,
streamsConfig.defaultKeySerde().deserializer().deserialize(topic, serializer.serialize(topic, str))
);

serializer.configure(serializerConfigs, false);
assertEquals("Should get the original string after serialization and deserialization with the configured encoding",
str, streamsConfig.defaultValueSerde().deserializer().deserialize(topic, serializer.serialize(topic, str)));
assertEquals(
"Should get the original string after serialization and deserialization with the configured encoding",
str,
streamsConfig.defaultValueSerde().deserializer().deserialize(topic, serializer.serialize(topic, str))
);
}

@Test
Expand Down Expand Up @@ -598,15 +610,21 @@ private void shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnable
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue");
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
assertThat(
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT))
);
}

@Test
public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)));
assertThat(
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT))
);
}

@Test
Expand Down Expand Up @@ -654,7 +672,10 @@ private void shouldSetDifferentDefaultsIfEosEnabled() {
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(clientId);

assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
assertThat(
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT))
);
assertTrue((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
assertThat(producerConfigs.get(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG), equalTo(Integer.MAX_VALUE));
assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), equalTo(100L));
Expand Down Expand Up @@ -731,7 +752,10 @@ public void shouldThrowExceptionIfCommitIntervalMsIsNegative() {
new StreamsConfig(props);
fail("Should throw ConfigException when commitIntervalMs is set to a negative value");
} catch (final ConfigException e) {
assertEquals("Invalid value -1 for configuration commit.interval.ms: Value must be at least 0", e.getMessage());
assertEquals(
"Invalid value -1 for configuration commit.interval.ms: Value must be at least 0",
e.getMessage()
);
}
}

Expand Down Expand Up @@ -765,7 +789,10 @@ public void shouldSpecifyCorrectKeySerdeClassOnError() {
config.defaultKeySerde();
fail("Test should throw a StreamsException");
} catch (final StreamsException e) {
assertEquals("Failed to configure key serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
assertEquals(
"Failed to configure key serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde",
e.getMessage()
);
}
}

Expand All @@ -778,7 +805,10 @@ public void shouldSpecifyCorrectValueSerdeClassOnError() {
config.defaultValueSerde();
fail("Test should throw a StreamsException");
} catch (final StreamsException e) {
assertEquals("Failed to configure value serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
assertEquals(
"Failed to configure value serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde",
e.getMessage()
);
}
}

Expand All @@ -801,7 +831,11 @@ private void shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosEnable
streamsConfig.getProducerConfigs(clientId);
fail("Should throw ConfigException when ESO is enabled and maxInFlight requests exceeds 5");
} catch (final ConfigException e) {
assertEquals("Invalid value 7 for configuration max.in.flight.requests.per.connection: Can't exceed 5 when exactly-once processing is enabled", e.getMessage());
assertEquals(
"Invalid value 7 for configuration max.in.flight.requests.per.connection:" +
" Can't exceed 5 when exactly-once processing is enabled",
e.getMessage()
);
}
}

Expand Down Expand Up @@ -842,7 +876,11 @@ private void shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInval
new StreamsConfig(props).getProducerConfigs(clientId);
fail("Should throw ConfigException when EOS is enabled and maxInFlight cannot be paresed into an integer");
} catch (final ConfigException e) {
assertEquals("Invalid value not-a-number for configuration max.in.flight.requests.per.connection: String value could not be parsed as 32-bit integer", e.getMessage());
assertEquals(
"Invalid value not-a-number for configuration max.in.flight.requests.per.connection:" +
" String value could not be parsed as 32-bit integer",
e.getMessage()
);
}
}

Expand Down Expand Up @@ -871,18 +909,21 @@ public void shouldThrowConfigExceptionWhenOptimizationConfigNotValueInRange() {
@SuppressWarnings("deprecation")
@Test
public void shouldLogWarningWhenPartitionGrouperIsUsed() {
props.put(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, org.apache.kafka.streams.processor.DefaultPartitionGrouper.class);
props.put(
StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG,
org.apache.kafka.streams.processor.DefaultPartitionGrouper.class
);

LogCaptureAppender.setClassLoggerToDebug(StreamsConfig.class);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();

new StreamsConfig(props);

LogCaptureAppender.unregister(appender);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
new StreamsConfig(props);

assertThat(
appender.getMessages(),
hasItem("Configuration parameter `" + StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG + "` is deprecated and will be removed in 3.0.0 release."));
assertThat(
appender.getMessages(),
hasItem("Configuration parameter `" + StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG +
"` is deprecated and will be removed in 3.0.0 release.")
);
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
Expand All @@ -41,7 +41,6 @@
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockProcessorSupplier;
Expand All @@ -50,9 +49,7 @@
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;

Expand All @@ -65,7 +62,6 @@
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;

@SuppressWarnings("unchecked")
public class KGroupedStreamImplTest {

private static final String TOPIC = "topic";
Expand Down Expand Up @@ -105,7 +101,7 @@ public void shouldNotHaveNullReducerWithWindowedReduce() {

@Test(expected = NullPointerException.class)
public void shouldNotHaveNullWindowsWithWindowedReduce() {
groupedStream.windowedBy((Windows) null);
groupedStream.windowedBy((Windows<?>) null);
}

@Test(expected = TopologyException.class)
Expand Down Expand Up @@ -149,7 +145,7 @@ public void shouldNotHaveNullAdderOnWindowedAggregate() {

@Test(expected = NullPointerException.class)
public void shouldNotHaveNullWindowsOnWindowedAggregate() {
groupedStream.windowedBy((Windows) null);
groupedStream.windowedBy((Windows<?>) null);
}

@Test(expected = TopologyException.class)
Expand Down Expand Up @@ -397,22 +393,19 @@ public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() {
Materialized.as(INVALID_STORE_NAME));
}

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnReduceWhenMaterializedIsNull() {
groupedStream.reduce(MockReducer.STRING_ADDER, (Materialized) null);
groupedStream.reduce(MockReducer.STRING_ADDER, null);
}

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() {
groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Materialized) null);
groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null);
}

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnCountWhenMaterializedIsNull() {
groupedStream.count((Materialized) null);
groupedStream.count((Materialized<String, Long, KeyValueStore<Bytes, byte[]>>) null);
}

@Test
Expand Down Expand Up @@ -451,11 +444,12 @@ public void shouldLogAndMeasureSkipsInAggregateWithBuiltInMetricsVersionLatest()

private void shouldLogAndMeasureSkipsInAggregate(final String builtInMetricsVersion) {
groupedStream.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count").withKeySerde(Serdes.String()));
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {

try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamAggregate.class);
final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {

processData(driver);
LogCaptureAppender.unregister(appender);

if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
final Map<MetricName, ? extends Metric> metrics = driver.metrics();
Expand Down Expand Up @@ -521,12 +515,12 @@ private void shouldLogAndMeasureSkipsInReduce(final String builtInMetricsVersion
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String())
);

final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {

try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamReduce.class);
final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {

processData(driver);
LogCaptureAppender.unregister(appender);

if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
final Map<MetricName, ? extends Metric> metrics = driver.metrics();
Expand Down Expand Up @@ -659,7 +653,6 @@ public void shouldCountWindowed() {
@Test
public void shouldCountWindowedWithInternalStoreName() {
final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
final List<KeyValue<Windowed<String>, KeyValue<Long, Long>>> results = new ArrayList<>();
groupedStream
.windowedBy(TimeWindows.of(ofMillis(500L)))
.count()
Expand Down

0 comments on commit 11d8ef7

Please sign in to comment.