Skip to content

Commit

Permalink
KAFKA-4641: Add more unit test for stream thread (#4531)
Browse files Browse the repository at this point in the history
Before the patch, jacoco coverage test:

Element	Missed Instructions	Cov.	Missed Branches	Cov.	Missed	Cxty	Missed	Lines	Missed	Methods	Missed	Classes
Total	3,386 of 22,177	84%	336 of 1,639	79%	350	1,589	526	4,451	103	768	1	102
StreamThread	 	77%	 	76%	27	102	48	299	1	31	0	1
After the patch:

Element	Missed Instructions	Cov.	Missed Branches	Cov.	Missed	Cxty	Missed	Lines	Missed	Methods	Missed	Classes
Total	3,329 of 22,180	84%	329 of 1,639	79%	345	1,590	516	4,452	102	769	1	102
StreamThread	 	81%	 	80%	23	103	39	300	1	32	0	1


Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Damian Guy <damian@confluent.io>
  • Loading branch information
guozhangwang committed Feb 7, 2018
1 parent 4ee5182 commit 5d69a79
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 6 deletions.
Expand Up @@ -1183,8 +1183,12 @@ public String toString(final String indent) {
return sb.toString();
}

// this is for testing only
// the following are for testing only
TaskManager taskManager() {
return taskManager;
}

Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords() {
return standbyRecords;
}
}
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
Expand All @@ -40,7 +41,13 @@
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
Expand Down Expand Up @@ -100,13 +107,16 @@ public void setUp() {
}

private final String topic1 = "topic1";
private final String topic2 = "topic2";

private final TopicPartition t1p1 = new TopicPartition(topic1, 1);
private final TopicPartition t1p2 = new TopicPartition(topic1, 2);
private final TopicPartition t2p1 = new TopicPartition(topic2, 1);

// task0 is unused
private final TaskId task1 = new TaskId(0, 1);
private final TaskId task2 = new TaskId(0, 2);
private final TaskId task3 = new TaskId(1, 1);

private Properties configProps(final boolean enableEos) {
return new Properties() {
Expand All @@ -129,7 +139,7 @@ private Properties configProps(final boolean enableEos) {
public void testPartitionAssignmentChangeForSingleGroup() {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);

final StreamThread thread = getStreamThread();
final StreamThread thread = createStreamThread(clientId, config, false);

final StateListenerStub stateListener = new StateListenerStub();
thread.setStateListener(stateListener);
Expand Down Expand Up @@ -685,10 +695,6 @@ public void onChange(final Thread thread, final ThreadStateTransitionValidator n
}
}

private StreamThread getStreamThread() {
return createStreamThread(clientId, config, false);
}

@Test
public void shouldReturnActiveTaskMetadataWhileRunningState() throws InterruptedException {
internalTopologyBuilder.addSource(null, "source", null, null, null, topic1);
Expand Down Expand Up @@ -759,6 +765,151 @@ public void shouldReturnStandbyTaskMetadataWhileRunningState() throws Interrupte
assertTrue(threadMetadata.activeTasks().isEmpty());
}

@SuppressWarnings("unchecked")
@Test
public void shouldUpdateStandbyTask() {
final String storeName1 = "count-one";
final String storeName2 = "table-two";
final String changelogName = applicationId + "-" + storeName1 + "-changelog";
final TopicPartition partition1 = new TopicPartition(changelogName, 1);
final TopicPartition partition2 = t2p1;
internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
.groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as(storeName1));
internalStreamsBuilder.table(topic2, new ConsumedInternal(), new MaterializedInternal(Materialized.as(storeName2), internalStreamsBuilder, ""));

final StreamThread thread = createStreamThread(clientId, config, false);
final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
restoreConsumer.updatePartitions(changelogName,
Collections.singletonList(new PartitionInfo(changelogName,
1,
null,
new Node[0],
new Node[0])));

restoreConsumer.assign(Utils.mkSet(partition1, partition2));
restoreConsumer.updateEndOffsets(Collections.singletonMap(partition1, 10L));
restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition1, 0L));
restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2, 10L));
restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition2, 0L));
// let the store1 be restored from 0 to 10; store2 be restored from 0 to (committed offset) 5
clientSupplier.consumer.assign(Utils.mkSet(partition2));
clientSupplier.consumer.commitSync(Collections.singletonMap(partition2, new OffsetAndMetadata(5L, "")));

for (long i = 0L; i < 10L; i++) {
restoreConsumer.addRecord(new ConsumerRecord<>(changelogName, 1, i, ("K" + i).getBytes(), ("V" + i).getBytes()));
restoreConsumer.addRecord(new ConsumerRecord<>(topic2, 1, i, ("K" + i).getBytes(), ("V" + i).getBytes()));
}

thread.setState(StreamThread.State.RUNNING);

thread.rebalanceListener.onPartitionsRevoked(null);

final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();

// assign single partition
standbyTasks.put(task1, Collections.singleton(t1p1));
standbyTasks.put(task3, Collections.singleton(t2p1));

thread.taskManager().setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), standbyTasks);

thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());

thread.runOnce(-1);

final StandbyTask standbyTask1 = thread.taskManager().standbyTask(partition1);
final StandbyTask standbyTask2 = thread.taskManager().standbyTask(partition2);
final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object, Long>) standbyTask1.getStore(storeName1);
final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object, Long>) standbyTask2.getStore(storeName2);

assertEquals(10L, store1.approximateNumEntries());
assertEquals(5L, store2.approximateNumEntries());
assertEquals(Collections.singleton(partition2), restoreConsumer.paused());
assertEquals(1, thread.standbyRecords().size());
assertEquals(5, thread.standbyRecords().get(partition2).size());
}

@Test
public void shouldPunctuateActiveTask() {
final List<Long> punctuatedStreamTime = new ArrayList<>();
final List<Long> punctuatedWallClockTime = new ArrayList<>();
final ProcessorSupplier<Object, Object> punctuateProcessor = new ProcessorSupplier<Object, Object>() {
@Override
public Processor<Object, Object> get() {
return new Processor<Object, Object>() {
@Override
public void init(ProcessorContext context) {
context.schedule(100L, PunctuationType.STREAM_TIME, new Punctuator() {
@Override
public void punctuate(long timestamp) {
punctuatedStreamTime.add(timestamp);
}
});
context.schedule(100L, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
@Override
public void punctuate(long timestamp) {
punctuatedWallClockTime.add(timestamp);
}
});
}

@Override
public void process(Object key, Object value) { }

@SuppressWarnings("deprecation")
@Override
public void punctuate(long timestamp) { }

@Override
public void close() { }
};
}
};

internalStreamsBuilder.stream(Collections.singleton(topic1), consumed).process(punctuateProcessor);

final StreamThread thread = createStreamThread(clientId, config, false);

thread.setState(StreamThread.State.RUNNING);

thread.rebalanceListener.onPartitionsRevoked(null);
final List<TopicPartition> assignedPartitions = new ArrayList<>();

final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();

// assign single partition
assignedPartitions.add(t1p1);
activeTasks.put(task1, Collections.singleton(t1p1));

thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());

thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
clientSupplier.consumer.assign(assignedPartitions);
clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));

thread.runOnce(-1);

assertEquals(0, punctuatedStreamTime.size());
assertEquals(0, punctuatedWallClockTime.size());

mockTime.sleep(100L);
for (long i = 0L; i < 10L; i++) {
clientSupplier.consumer.addRecord(new ConsumerRecord<>(topic1, 1, i, i * 100L, TimestampType.CREATE_TIME, ConsumerRecord.NULL_CHECKSUM, ("K" + i).getBytes().length, ("V" + i).getBytes().length, ("K" + i).getBytes(), ("V" + i).getBytes()));
}

thread.runOnce(-1);

assertEquals(1, punctuatedStreamTime.size());
assertEquals(1, punctuatedWallClockTime.size());

mockTime.sleep(100L);

thread.runOnce(-1);

// we should skip stream time punctuation, only trigger wall-clock time punctuation
assertEquals(1, punctuatedStreamTime.size());
assertEquals(2, punctuatedWallClockTime.size());
}

@Test
public void shouldAlwaysUpdateTasksMetadataAfterChangingState() throws InterruptedException {
final StreamThread thread = createStreamThread(clientId, config, false);
Expand Down

0 comments on commit 5d69a79

Please sign in to comment.