Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions examples/storm-loadgen/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-client</artifactId>
Expand Down
5 changes: 0 additions & 5 deletions examples/storm-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand Down
5 changes: 0 additions & 5 deletions external/storm-elasticsearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,6 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava-testlib</artifactId>
Expand Down
5 changes: 0 additions & 5 deletions external/storm-hbase/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,6 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
6 changes: 0 additions & 6 deletions external/storm-jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,6 @@
<artifactId>geronimo-jms_1.1_spec</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>

<!-- Active MQ for testing JMS-->
<dependency>
Expand Down
4 changes: 4 additions & 0 deletions external/storm-kafka-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>java-hamcrest</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.storm.testing.TmpPath;

public class KafkaUnit {
private KafkaServer kafkaServer;
private EmbeddedZookeeper zkServer;
private ZkUtils zkUtils;
private KafkaProducer<String, String> producer;
private TmpPath kafkaDir;
private static final String ZK_HOST = "127.0.0.1";
private static final String KAFKA_HOST = "127.0.0.1";
private static final int KAFKA_PORT = 9092;
Expand All @@ -61,10 +63,11 @@ public void setUp() throws IOException {
zkUtils = ZkUtils.apply(zkClient, false);

// setup Broker
kafkaDir = new TmpPath(Files.createTempDirectory("kafka-").toAbsolutePath().toString());
Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
brokerProps.setProperty("log.dirs", kafkaDir.getPath());
brokerProps.setProperty("listeners", String.format("PLAINTEXT://%s:%d", KAFKA_HOST, KAFKA_PORT));
KafkaConfig config = new KafkaConfig(brokerProps);
MockTime mock = new MockTime();
Expand All @@ -77,6 +80,7 @@ public void setUp() throws IOException {
public void tearDown() {
closeProducer();
kafkaServer.shutdown();
kafkaDir.close();
zkUtils.close();
zkServer.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,25 @@
*/
package org.apache.storm.kafka;

import java.io.IOException;
import org.junit.rules.ExternalResource;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;


public class KafkaUnitRule extends ExternalResource {
public class KafkaUnitExtension implements BeforeEachCallback, AfterEachCallback {

private final KafkaUnit kafkaUnit;

public KafkaUnitRule() {
public KafkaUnitExtension() {
this.kafkaUnit = new KafkaUnit();
}

@Override
public void before() throws IOException {
public void beforeEach(ExtensionContext ctx) throws Exception {
kafkaUnit.setUp();
}

@Override
public void after() {
public void afterEach(ExtensionContext ctx) throws Exception {
kafkaUnit.tearDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.apache.storm.kafka;

import java.util.Collections;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.kafka.spout.RecordTranslator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,41 @@

package org.apache.storm.kafka.spout;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.KafkaUnitRule;
import org.apache.storm.kafka.KafkaUnitExtension;
import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
import org.apache.storm.kafka.spout.subscription.TopicAssigner;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Time;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import org.apache.storm.kafka.spout.subscription.TopicAssigner;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

@ExtendWith(MockitoExtension.class)
public abstract class KafkaSpoutAbstractTest {

@Rule
public MockitoRule mockito = MockitoJUnit.rule();

@Rule
public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
@RegisterExtension
public KafkaUnitExtension kafkaUnitExtension = new KafkaUnitExtension();

final TopologyContext topologyContext = mock(TopologyContext.class);
final Map<String, Object> conf = new HashMap<>();
Expand All @@ -79,7 +75,7 @@ protected KafkaSpoutAbstractTest(long commitOffsetPeriodMs) {
this.commitOffsetPeriodMs = commitOffsetPeriodMs;
}

@Before
@BeforeEach
public void setUp() {
spoutConfig = createSpoutConfig();

Expand All @@ -105,15 +101,15 @@ KafkaConsumer<String, String> createConsumerSpy() {
return spy(new KafkaConsumerFactoryDefault<String, String>().createConsumer(spoutConfig));
}

@After
@AfterEach
public void tearDown() throws Exception {
simulatedTime.close();
}

abstract KafkaSpoutConfig<String, String> createSpoutConfig();

void prepareSpout(int messageCount) throws Exception {
SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitExtension.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,24 @@

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.HashMap;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class KafkaSpoutConfigTest {

@Rule
public ExpectedException expectedException = ExpectedException.none();

@Test
public void testBasic() {
KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic").build();
assertEquals(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST, conf.getFirstPollOffsetStrategy());
assertEquals(conf.getFirstPollOffsetStrategy(), FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
assertNull(conf.getConsumerGroupId());
assertTrue(conf.getTranslator() instanceof DefaultRecordTranslator);
HashMap<String, Object> expected = new HashMap<>();
Expand All @@ -49,8 +45,8 @@ public void testBasic() {
expected.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
expected.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
expected.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
assertEquals(expected, conf.getKafkaProps());
assertEquals(KafkaSpoutConfig.DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS, conf.getMetricsTimeBucketSizeInSecs());
assertEquals(conf.getKafkaProps(), expected);
assertEquals(conf.getMetricsTimeBucketSizeInSecs(), KafkaSpoutConfig.DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS);
}

@Test
Expand All @@ -59,7 +55,7 @@ public void testSetEmitNullTuplesToTrue() {
.setEmitNullTuples(true)
.build();

assertTrue("Failed to set emit null tuples to true", conf.isEmitNullTuples());
assertTrue(conf.isEmitNullTuples(), "Failed to set emit null tuples to true");
}

@Test
Expand Down Expand Up @@ -88,14 +84,13 @@ public void testMetricsTimeBucketSizeInSecs() {
.setMetricsTimeBucketSizeInSecs(100)
.build();

assertEquals(100, conf.getMetricsTimeBucketSizeInSecs());
assertEquals(conf.getMetricsTimeBucketSizeInSecs(), 100);
}

@Test
public void testThrowsIfEnableAutoCommitIsSet() {
expectedException.expect(IllegalStateException.class);
KafkaSpoutConfig.builder("localhost:1234", "topic")
Assertions.assertThrows(IllegalStateException.class, () -> KafkaSpoutConfig.builder("localhost:1234", "topic")
.setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
.build();
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.apache.storm.kafka.spout;

import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
Expand All @@ -40,8 +39,6 @@
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Time.SimulatedTime;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;

Expand All @@ -54,9 +51,10 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;

import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
import org.apache.storm.kafka.spout.subscription.TopicFilter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class KafkaSpoutEmitTest {

Expand All @@ -68,7 +66,7 @@ public class KafkaSpoutEmitTest {
private KafkaConsumer<String, String> consumerMock;
private KafkaSpoutConfig<String, String> spoutConfig;

@Before
@BeforeEach
public void setUp() {
spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1)
.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@
package org.apache.storm.kafka.spout;


import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.utils.Time;
import org.junit.Test;

import java.util.regex.Pattern;

import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

import java.util.regex.Pattern;
import org.apache.storm.kafka.NullRecordTranslator;
import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.utils.Time;
import org.junit.jupiter.api.Test;

public class KafkaSpoutNullTupleTest extends KafkaSpoutAbstractTest {

Expand All @@ -39,7 +37,7 @@ public KafkaSpoutNullTupleTest() {

@Override
KafkaSpoutConfig<String, String> createSpoutConfig() {
return KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(),
return KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitExtension.getKafkaUnit().getKafkaPort(),
Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC))
.setOffsetCommitPeriodMs(commitOffsetPeriodMs)
.setRecordTranslator(new NullRecordTranslator<>())
Expand Down
Loading