Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Improve usage of LogCaptureAppender #8508

Merged
merged 2 commits into from
Apr 21, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;

class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamKTableJoinProcessor.class);
private static final Logger LOG = LoggerFactory.getLogger(KStreamKTableJoin.class);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an actual change -- in all other processor, we use the "top level class" for the logger and not the Processor class -- changing this for alignment.


private final KTableValueGetter<K2, V2> valueGetter;
private final KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,9 +801,9 @@ public void remove(final Collection<TopicPartition> revokedChangelogs) {
}
changelogMetadata.clear();
} else {
log.debug("Changelog partition {} could not be found, " +
"it could be already cleaned up during the handling" +
"of task corruption and never restore again", partition);
log.debug("Changelog partition {} could not be found," +
" it could be already cleaned up during the handling" +
" of task corruption and never restore again", partition);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,15 +800,15 @@ public void setConfig(final String storeName,
public void shouldWarnAboutRocksDBConfigSetterIsNotGuaranteedToBeBackwardsCompatible() {
props.setProperty(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, TestRocksDbConfigSetter.class.getName());

final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
LogCaptureAppender.unregister(appender);

assertThat(appender.getMessages(), hasItem("stream-client [" + CLIENT_ID + "] "
+ "RocksDB's version will be bumped to version 6+ via KAFKA-8897 in a future release. "
+ "If you use `org.rocksdb.CompactionOptionsFIFO#setTtl(long)` or `#ttl()` you will need to rewrite "
+ "your code after KAFKA-8897 is resolved and set TTL via `org.rocksdb.Options` "
+ "(or `org.rocksdb.ColumnFamilyOptions`)."));
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);

assertThat(appender.getMessages(), hasItem("stream-client [" + CLIENT_ID + "] "
+ "RocksDB's version will be bumped to version 6+ via KAFKA-8897 in a future release. "
+ "If you use `org.rocksdb.CompactionOptionsFIFO#setTtl(long)` or `#ttl()` you will need to rewrite "
+ "your code after KAFKA-8897 is resolved and set TTL via `org.rocksdb.Options` "
+ "(or `org.rocksdb.ColumnFamilyOptions`)."));
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ public void testGetGroupInstanceIdConfigs() {
final StreamsConfig streamsConfig = new StreamsConfig(props);

Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
assertThat(returnedProps.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG), equalTo("group-instance-id-1-" + threadIdx));
assertThat(
returnedProps.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG),
equalTo("group-instance-id-1-" + threadIdx)
);

returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId);
assertNull(returnedProps.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG));
Expand All @@ -170,7 +173,10 @@ public void consumerConfigMustContainStreamPartitionAssignorConfig() {

assertEquals(42, returnedProps.get(StreamsConfig.REPLICATION_FACTOR_CONFIG));
assertEquals(1, returnedProps.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
assertEquals(StreamsPartitionAssignor.class.getName(), returnedProps.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
assertEquals(
StreamsPartitionAssignor.class.getName(),
returnedProps.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)
);
assertEquals(7L, returnedProps.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
assertEquals("dummy:host", returnedProps.get(StreamsConfig.APPLICATION_SERVER_CONFIG));
assertNull(returnedProps.get(StreamsConfig.RETRIES_CONFIG));
Expand Down Expand Up @@ -220,12 +226,18 @@ public void defaultSerdeShouldBeConfigured() {
final String topic = "my topic";

serializer.configure(serializerConfigs, true);
assertEquals("Should get the original string after serialization and deserialization with the configured encoding",
str, streamsConfig.defaultKeySerde().deserializer().deserialize(topic, serializer.serialize(topic, str)));
assertEquals(
"Should get the original string after serialization and deserialization with the configured encoding",
str,
streamsConfig.defaultKeySerde().deserializer().deserialize(topic, serializer.serialize(topic, str))
);

serializer.configure(serializerConfigs, false);
assertEquals("Should get the original string after serialization and deserialization with the configured encoding",
str, streamsConfig.defaultValueSerde().deserializer().deserialize(topic, serializer.serialize(topic, str)));
assertEquals(
"Should get the original string after serialization and deserialization with the configured encoding",
str,
streamsConfig.defaultValueSerde().deserializer().deserialize(topic, serializer.serialize(topic, str))
);
}

@Test
Expand Down Expand Up @@ -598,15 +610,21 @@ private void shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnable
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue");
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
assertThat(
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT))
);
}

@Test
public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)));
assertThat(
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT))
);
}

@Test
Expand Down Expand Up @@ -654,7 +672,10 @@ private void shouldSetDifferentDefaultsIfEosEnabled() {
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(clientId);

assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
assertThat(
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT))
);
assertTrue((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
assertThat(producerConfigs.get(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG), equalTo(Integer.MAX_VALUE));
assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), equalTo(100L));
Expand Down Expand Up @@ -731,7 +752,10 @@ public void shouldThrowExceptionIfCommitIntervalMsIsNegative() {
new StreamsConfig(props);
fail("Should throw ConfigException when commitIntervalMs is set to a negative value");
} catch (final ConfigException e) {
assertEquals("Invalid value -1 for configuration commit.interval.ms: Value must be at least 0", e.getMessage());
assertEquals(
"Invalid value -1 for configuration commit.interval.ms: Value must be at least 0",
e.getMessage()
);
}
}

Expand Down Expand Up @@ -765,7 +789,10 @@ public void shouldSpecifyCorrectKeySerdeClassOnError() {
config.defaultKeySerde();
fail("Test should throw a StreamsException");
} catch (final StreamsException e) {
assertEquals("Failed to configure key serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
assertEquals(
"Failed to configure key serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde",
e.getMessage()
);
}
}

Expand All @@ -778,7 +805,10 @@ public void shouldSpecifyCorrectValueSerdeClassOnError() {
config.defaultValueSerde();
fail("Test should throw a StreamsException");
} catch (final StreamsException e) {
assertEquals("Failed to configure value serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
assertEquals(
"Failed to configure value serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde",
e.getMessage()
);
}
}

Expand All @@ -801,7 +831,11 @@ private void shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosEnable
streamsConfig.getProducerConfigs(clientId);
fail("Should throw ConfigException when ESO is enabled and maxInFlight requests exceeds 5");
} catch (final ConfigException e) {
assertEquals("Invalid value 7 for configuration max.in.flight.requests.per.connection: Can't exceed 5 when exactly-once processing is enabled", e.getMessage());
assertEquals(
"Invalid value 7 for configuration max.in.flight.requests.per.connection:" +
" Can't exceed 5 when exactly-once processing is enabled",
e.getMessage()
);
}
}

Expand Down Expand Up @@ -842,7 +876,11 @@ private void shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInval
new StreamsConfig(props).getProducerConfigs(clientId);
fail("Should throw ConfigException when EOS is enabled and maxInFlight cannot be paresed into an integer");
} catch (final ConfigException e) {
assertEquals("Invalid value not-a-number for configuration max.in.flight.requests.per.connection: String value could not be parsed as 32-bit integer", e.getMessage());
assertEquals(
"Invalid value not-a-number for configuration max.in.flight.requests.per.connection:" +
" String value could not be parsed as 32-bit integer",
e.getMessage()
);
}
}

Expand Down Expand Up @@ -871,18 +909,21 @@ public void shouldThrowConfigExceptionWhenOptimizationConfigNotValueInRange() {
@SuppressWarnings("deprecation")
@Test
public void shouldLogWarningWhenPartitionGrouperIsUsed() {
props.put(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, org.apache.kafka.streams.processor.DefaultPartitionGrouper.class);
props.put(
StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG,
org.apache.kafka.streams.processor.DefaultPartitionGrouper.class
);

LogCaptureAppender.setClassLoggerToDebug(StreamsConfig.class);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();

new StreamsConfig(props);

LogCaptureAppender.unregister(appender);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
new StreamsConfig(props);

assertThat(
appender.getMessages(),
hasItem("Configuration parameter `" + StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG + "` is deprecated and will be removed in 3.0.0 release."));
assertThat(
appender.getMessages(),
hasItem("Configuration parameter `" + StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG +
"` is deprecated and will be removed in 3.0.0 release.")
);
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
Expand All @@ -41,7 +41,6 @@
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockProcessorSupplier;
Expand All @@ -50,9 +49,7 @@
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;

Expand All @@ -65,7 +62,6 @@
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;

@SuppressWarnings("unchecked")
public class KGroupedStreamImplTest {

private static final String TOPIC = "topic";
Expand Down Expand Up @@ -105,7 +101,7 @@ public void shouldNotHaveNullReducerWithWindowedReduce() {

@Test(expected = NullPointerException.class)
public void shouldNotHaveNullWindowsWithWindowedReduce() {
groupedStream.windowedBy((Windows) null);
groupedStream.windowedBy((Windows<?>) null);
}

@Test(expected = TopologyException.class)
Expand Down Expand Up @@ -149,7 +145,7 @@ public void shouldNotHaveNullAdderOnWindowedAggregate() {

@Test(expected = NullPointerException.class)
public void shouldNotHaveNullWindowsOnWindowedAggregate() {
groupedStream.windowedBy((Windows) null);
groupedStream.windowedBy((Windows<?>) null);
}

@Test(expected = TopologyException.class)
Expand Down Expand Up @@ -397,22 +393,19 @@ public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() {
Materialized.as(INVALID_STORE_NAME));
}

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnReduceWhenMaterializedIsNull() {
groupedStream.reduce(MockReducer.STRING_ADDER, (Materialized) null);
groupedStream.reduce(MockReducer.STRING_ADDER, null);
}

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() {
groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Materialized) null);
groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null);
}

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnCountWhenMaterializedIsNull() {
groupedStream.count((Materialized) null);
groupedStream.count((Materialized<String, Long, KeyValueStore<Bytes, byte[]>>) null);
}

@Test
Expand Down Expand Up @@ -451,11 +444,12 @@ public void shouldLogAndMeasureSkipsInAggregateWithBuiltInMetricsVersionLatest()

private void shouldLogAndMeasureSkipsInAggregate(final String builtInMetricsVersion) {
groupedStream.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count").withKeySerde(Serdes.String()));
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {

try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamAggregate.class);
final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {

processData(driver);
LogCaptureAppender.unregister(appender);

if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
final Map<MetricName, ? extends Metric> metrics = driver.metrics();
Expand Down Expand Up @@ -521,12 +515,12 @@ private void shouldLogAndMeasureSkipsInReduce(final String builtInMetricsVersion
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String())
);

final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {

try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamReduce.class);
final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {

processData(driver);
LogCaptureAppender.unregister(appender);

if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
final Map<MetricName, ? extends Metric> metrics = driver.metrics();
Expand Down Expand Up @@ -659,7 +653,6 @@ public void shouldCountWindowed() {
@Test
public void shouldCountWindowedWithInternalStoreName() {
final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
final List<KeyValue<Windowed<String>, KeyValue<Long, Long>>> results = new ArrayList<>();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed because unused.

groupedStream
.windowedBy(TimeWindows.of(ofMillis(500L)))
.count()
Expand Down