From 1366e8ab083bc2fcaa1ccd82568fcfc218f82d2f Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Thu, 15 Oct 2015 11:28:16 -0700 Subject: [PATCH] MINOR: set up temp directories properly in StreamTaskTest --- .../processor/internals/StreamTaskTest.java | 216 ++++++++++-------- 1 file changed, 118 insertions(+), 98 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index f93093cf6fde0..92b86846f63ae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -27,11 +27,14 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamingConfig; import org.apache.kafka.test.MockSourceNode; import org.junit.Test; import org.junit.Before; +import java.io.File; +import java.nio.file.Files; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -61,22 +64,25 @@ public class StreamTaskTest { } }); - private final StreamingConfig config = new StreamingConfig(new Properties() { - { - setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); - setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); - setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); - } - }); + private StreamingConfig createConfig(final File baseDir) throws Exception { + return new StreamingConfig(new Properties() { + { + setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); + setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); + setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); + setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); + } + }); + } private final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); private final MockProducer producer = new MockProducer<>(false, bytesSerializer, bytesSerializer); private final MockConsumer restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); - + private final byte[] recordValue = intSerializer.serialize(null, 10); private final byte[] recordKey = intSerializer.serialize(null, 1); @@ -88,96 +94,110 @@ public void setup() { @SuppressWarnings("unchecked") @Test - public void testProcessOrder() { - StreamTask task = new StreamTask(0, consumer, producer, restoreStateConsumer, partitions, topology, config, null); - - task.addRecords(partition1, records( - new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue), - new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey, recordValue), - new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey, recordValue) - )); - - task.addRecords(partition2, records( - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, recordKey, recordValue), - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey, recordValue), - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue) - )); - - assertEquals(task.process(), 5); - assertEquals(source1.numReceived, 1); - assertEquals(source2.numReceived, 0); - - assertEquals(task.process(), 4); - assertEquals(source1.numReceived, 1); - assertEquals(source2.numReceived, 1); - - assertEquals(task.process(), 3); - assertEquals(source1.numReceived, 2); - assertEquals(source2.numReceived, 1); - - assertEquals(task.process(), 2); - assertEquals(source1.numReceived, 3); - assertEquals(source2.numReceived, 1); - - assertEquals(task.process(), 1); - assertEquals(source1.numReceived, 3); - assertEquals(source2.numReceived, 2); - - assertEquals(task.process(), 0); - assertEquals(source1.numReceived, 3); - assertEquals(source2.numReceived, 3); - - task.close(); + public void testProcessOrder() throws Exception { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + StreamingConfig config = createConfig(baseDir); + StreamTask task = new StreamTask(0, consumer, producer, restoreStateConsumer, partitions, topology, config, null); + + task.addRecords(partition1, records( + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey, recordValue) + )); + + task.addRecords(partition2, records( + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue) + )); + + assertEquals(task.process(), 5); + assertEquals(source1.numReceived, 1); + assertEquals(source2.numReceived, 0); + + assertEquals(task.process(), 4); + assertEquals(source1.numReceived, 1); + assertEquals(source2.numReceived, 1); + + assertEquals(task.process(), 3); + assertEquals(source1.numReceived, 2); + assertEquals(source2.numReceived, 1); + + assertEquals(task.process(), 2); + assertEquals(source1.numReceived, 3); + assertEquals(source2.numReceived, 1); + + assertEquals(task.process(), 1); + assertEquals(source1.numReceived, 3); + assertEquals(source2.numReceived, 2); + + assertEquals(task.process(), 0); + assertEquals(source1.numReceived, 3); + assertEquals(source2.numReceived, 3); + + task.close(); + + } finally { + Utils.delete(baseDir); + } } @SuppressWarnings("unchecked") @Test - public void testPauseResume() { - StreamTask task = new StreamTask(1, consumer, producer, restoreStateConsumer, partitions, topology, config, null); - - task.addRecords(partition1, records( - new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue), - new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey, recordValue) - )); - - task.addRecords(partition2, records( - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey, recordValue), - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue), - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 55, recordKey, recordValue), - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, recordKey, recordValue) - )); - - assertEquals(task.process(), 5); - assertEquals(source1.numReceived, 1); - assertEquals(source2.numReceived, 0); - - assertEquals(consumer.paused().size(), 1); - assertTrue(consumer.paused().contains(partition2)); - - task.addRecords(partition1, records( - new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey, recordValue), - new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, recordKey, recordValue), - new ConsumerRecord<>(partition1.topic(), partition1.partition(), 50, recordKey, recordValue) - )); - - assertEquals(consumer.paused().size(), 2); - assertTrue(consumer.paused().contains(partition1)); - assertTrue(consumer.paused().contains(partition2)); - - assertEquals(task.process(), 7); - assertEquals(source1.numReceived, 1); - assertEquals(source2.numReceived, 1); - - assertEquals(consumer.paused().size(), 1); - assertTrue(consumer.paused().contains(partition1)); - - assertEquals(task.process(), 6); - assertEquals(source1.numReceived, 2); - assertEquals(source2.numReceived, 1); - - assertEquals(consumer.paused().size(), 0); - - task.close(); + public void testPauseResume() throws Exception { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + StreamingConfig config = createConfig(baseDir); + StreamTask task = new StreamTask(1, consumer, producer, restoreStateConsumer, partitions, topology, config, null); + + task.addRecords(partition1, records( + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey, recordValue) + )); + + task.addRecords(partition2, records( + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 55, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, recordKey, recordValue) + )); + + assertEquals(task.process(), 5); + assertEquals(source1.numReceived, 1); + assertEquals(source2.numReceived, 0); + + assertEquals(consumer.paused().size(), 1); + assertTrue(consumer.paused().contains(partition2)); + + task.addRecords(partition1, records( + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 50, recordKey, recordValue) + )); + + assertEquals(consumer.paused().size(), 2); + assertTrue(consumer.paused().contains(partition1)); + assertTrue(consumer.paused().contains(partition2)); + + assertEquals(task.process(), 7); + assertEquals(source1.numReceived, 1); + assertEquals(source2.numReceived, 1); + + assertEquals(consumer.paused().size(), 1); + assertTrue(consumer.paused().contains(partition1)); + + assertEquals(task.process(), 6); + assertEquals(source1.numReceived, 2); + assertEquals(source2.numReceived, 1); + + assertEquals(consumer.paused().size(), 0); + + task.close(); + + } finally { + Utils.delete(baseDir); + } } private Iterable> records(ConsumerRecord... recs) {