Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.ClassRule;
Expand All @@ -44,6 +45,7 @@
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.transaction.annotation.Transactional;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
Expand Down Expand Up @@ -461,6 +463,7 @@ public void test_specificConsumerOffsetsWithTailOffsets() throws Exception {
* @return Consumer group id created.
*/
private String createConsumerWithState() {
final int totalRecords = 10;
final String consumerId = "test-consumer-id-" + System.currentTimeMillis();

// Define our new topic name
Expand All @@ -472,20 +475,20 @@ private String createConsumerWithState() {
// Publish records into topic
sharedKafkaTestResource
.getKafkaTestUtils()
.produceRecords(10, newTopic, 0);
.produceRecords(totalRecords, newTopic, 0);

// Create a consumer and consume from the records, maintaining state.
final Properties consumerProperties = new Properties();
consumerProperties.put("client.id", consumerId);
consumerProperties.put("group.id", consumerId);
consumerProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerId);

try (final KafkaConsumer consumer = sharedKafkaTestResource
.getKafkaTestUtils()
.getKafkaConsumer(StringDeserializer.class, StringDeserializer.class, consumerProperties)) {

// Consume
consumer.subscribe(Collections.singleton(newTopic));
consumer.poll(2000L);
consumer.poll(Duration.ofSeconds(5));

// Save state.
consumer.commitSync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
import static org.junit.Assert.assertTrue;
import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.user;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.fileUpload;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.multipart;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
Expand Down Expand Up @@ -159,7 +159,7 @@ public void testPostUpdate_newMessageFormat() throws Exception {

// Hit index.
mockMvc
.perform(fileUpload("/configuration/messageFormat/update")
.perform(multipart("/configuration/messageFormat/update")
.file(jarUpload)
.with(user(adminUserDetails))
.with(csrf())
Expand Down Expand Up @@ -207,7 +207,7 @@ public void testPostUpdate_createNewMessageFormatMissingFile() throws Exception

// Hit page.
mockMvc
.perform(fileUpload("/configuration/messageFormat/update")
.perform(multipart("/configuration/messageFormat/update")
.file(jarUpload)
.with(user(adminUserDetails))
.with(csrf())
Expand Down Expand Up @@ -237,7 +237,7 @@ public void testPostUpdate_createNewMessageFormatInvalidJar() throws Exception {

// Hit page.
mockMvc
.perform(fileUpload("/configuration/messageFormat/update")
.perform(multipart("/configuration/messageFormat/update")
.file(jarUpload)
.with(user(adminUserDetails))
.with(csrf())
Expand Down Expand Up @@ -268,7 +268,7 @@ public void testPostUpdate_updatingExistingButWithBadMessageFormatId() throws Ex

// Hit page.
final MvcResult result = mockMvc
.perform(fileUpload("/configuration/messageFormat/update")
.perform(multipart("/configuration/messageFormat/update")
.file(jarUpload)
.with(user(adminUserDetails))
.with(csrf())
Expand Down Expand Up @@ -314,7 +314,7 @@ public void testPostUpdate_updatingExistingButWithInvalidJar() throws Exception

// Hit page.
mockMvc
.perform(fileUpload("/configuration/messageFormat/update")
.perform(multipart("/configuration/messageFormat/update")
.file(jarUpload)
.with(user(adminUserDetails))
.with(csrf())
Expand Down Expand Up @@ -373,7 +373,7 @@ public void testPostUpdate_updatingExistingNoJarUploaded() throws Exception {

// Hit page.
mockMvc
.perform(fileUpload("/configuration/messageFormat/update")
.perform(multipart("/configuration/messageFormat/update")
.file(jarUpload)
.with(user(adminUserDetails))
.with(csrf())
Expand Down Expand Up @@ -437,7 +437,7 @@ public void testPostUpdate_updatingExistingWithValidJarSameName() throws Excepti

// Hit page.
mockMvc
.perform(fileUpload("/configuration/messageFormat/update")
.perform(multipart("/configuration/messageFormat/update")
.file(jarUpload)
.with(user(adminUserDetails))
.with(csrf())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.NodeList;
import org.sourcelab.kafka.webview.ui.model.Cluster;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
Expand Down Expand Up @@ -177,11 +178,11 @@ public void testBasicConsumeWithSaslAuthentication() {
final KafkaConsumerFactory kafkaConsumerFactory = buildKafkaConsumerFactory();
try (final KafkaConsumer<String, String> consumer = kafkaConsumerFactory.createConsumerAndSubscribe(clientConfig)) {
// Attempt to consume, should pull first 10
ConsumerRecords<String, String> records = consumer.poll(2000L);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
assertEquals("Should have 10 records", maxRecordsPerPoll, records.count());

// Attempt to consume, should pull 2nd 10
records = consumer.poll(2000L);
records = consumer.poll(Duration.ofSeconds(2));
assertEquals("Should have 10 records", maxRecordsPerPoll, records.count());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.sourcelab.kafka.webview.ui.manager.socket.StartingPosition;
import org.sourcelab.kafka.webview.ui.plugin.filter.RecordFilter;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -113,11 +114,11 @@ public void testBasicConsumerMultiplePartitions() {
// Create consumer
try (final KafkaConsumer<String, String> consumer = kafkaConsumerFactory.createConsumerAndSubscribe(clientConfig)) {
// Attempt to consume, should pull first 10
ConsumerRecords<String, String> records = consumer.poll(2000L);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
assertEquals("Should have found " + maxRecordsPerPoll + " records", maxRecordsPerPoll, records.count());

// Attempt to consume, should pull 2nd 10
records = consumer.poll(2000L);
records = consumer.poll(Duration.ofSeconds(2));
assertEquals("Should have found " + maxRecordsPerPoll + " records", maxRecordsPerPoll, records.count());
}
}
Expand Down Expand Up @@ -171,15 +172,15 @@ public void testBasicConsumerExcludePartitions() {
// Create consumer
try (final KafkaConsumer<String, String> consumer = kafkaConsumerFactory.createConsumerAndSubscribe(clientConfig)) {
// Attempt to consume, should pull first 10
ConsumerRecords<String, String> records = consumer.poll(2000L);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
assertEquals("Should have found " + maxRecordsPerPoll + " records", maxRecordsPerPoll, records.count());

for (final ConsumerRecord<String, String> record: records) {
assertEquals("Should be from parittion 1 only", 1, record.partition());
}

// Attempt to consume, should come up empty
records = consumer.poll(2000L);
records = consumer.poll(Duration.ofSeconds(2));
assertTrue("Should be empty", records.isEmpty());
}
}
Expand All @@ -188,7 +189,7 @@ public void testBasicConsumerExcludePartitions() {
* Simple Smoke Test, using RecordFilter to filter everything from partition 0.
*/
@Test
public void testBasicConsumerWithRecordFilter() throws InterruptedException {
public void testBasicConsumerWithRecordFilter() {
final int maxRecordsPerPoll = 10;

// Create a topic with 2 partitions, (partitionId 0, 1)
Expand Down Expand Up @@ -238,15 +239,15 @@ public void testBasicConsumerWithRecordFilter() throws InterruptedException {
// Create consumer
try (final KafkaConsumer<String, String> consumer = kafkaConsumerFactory.createConsumerAndSubscribe(clientConfig)) {
// Attempt to consume, should pull first 10
ConsumerRecords<String, String> records = consumer.poll(10000L);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
assertEquals("Should have found " + maxRecordsPerPoll + " records", maxRecordsPerPoll, records.count());

for (final ConsumerRecord<String, String> record: records) {
assertEquals("Should be from parittion 1 only", 1, record.partition());
}

// Attempt to consume, should come up empty
records = consumer.poll(2000L);
records = consumer.poll(Duration.ofSeconds(2));
assertTrue("Should be empty", records.isEmpty());
}
}
Expand All @@ -256,7 +257,7 @@ public void testBasicConsumerWithRecordFilter() throws InterruptedException {
* We should get no results.
*/
@Test
public void testDeserializerOptions() throws InterruptedException {
public void testDeserializerOptions() {
// Reset state on our Test Deserializer
TestDeserializer.reset();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -104,8 +104,8 @@ public void testPassThrough() {
final RecordFilter mockFilter1 = mock(RecordFilter.class);
final RecordFilter mockFilter2 = mock(RecordFilter.class);

when(mockFilter1.includeRecord(eq("MyTopic"), eq(0), anyLong(), anyObject(), anyObject())).thenReturn(true);
when(mockFilter2.includeRecord(eq("MyTopic"), eq(0), anyLong(), anyObject(), anyObject())).thenReturn(true);
when(mockFilter1.includeRecord(eq("MyTopic"), eq(0), anyLong(), any(), any())).thenReturn(true);
when(mockFilter2.includeRecord(eq("MyTopic"), eq(0), anyLong(), any(), any())).thenReturn(true);

final RecordFilterDefinition recordFilterDefinition1 = new RecordFilterDefinition(mockFilter1, new HashMap<>());
final RecordFilterDefinition recordFilterDefinition2 = new RecordFilterDefinition(mockFilter2, new HashMap<>());
Expand All @@ -131,9 +131,9 @@ public void testPassThrough() {

// Verify mocks
verify(mockFilter1, times(totalRecords))
.includeRecord(eq("MyTopic"), eq(0), anyLong(), anyObject(), anyObject());
.includeRecord(eq("MyTopic"), eq(0), anyLong(), any(), any());
verify(mockFilter2, times(totalRecords))
.includeRecord(eq("MyTopic"), eq(0), anyLong(), anyObject(), anyObject());
.includeRecord(eq("MyTopic"), eq(0), anyLong(), any(), any());
}

/**
Expand All @@ -147,9 +147,9 @@ public void testFilterMessages() {
final RecordFilter mockFilter1 = mock(RecordFilter.class);
final RecordFilter mockFilter2 = mock(RecordFilter.class);

when(mockFilter1.includeRecord(eq("MyTopic"), eq(0), anyLong(), anyObject(), anyObject()))
when(mockFilter1.includeRecord(eq("MyTopic"), eq(0), anyLong(), any(), any()))
.thenReturn(true, false, true, true, true);
when(mockFilter2.includeRecord(eq("MyTopic"), eq(0), anyLong(), anyObject(), anyObject()))
when(mockFilter2.includeRecord(eq("MyTopic"), eq(0), anyLong(), any(), any()))
.thenReturn(true, true, false, true);

final RecordFilterDefinition recordFilterDefinition1 = new RecordFilterDefinition(mockFilter1, new HashMap<>());
Expand Down Expand Up @@ -182,9 +182,9 @@ public void testFilterMessages() {

// Verify mocks
verify(mockFilter1, times(totalRecords))
.includeRecord(eq("MyTopic"), eq(0), anyLong(), anyObject(), anyObject());
.includeRecord(eq("MyTopic"), eq(0), anyLong(), any(), any());
verify(mockFilter2, times(totalRecords - 1))
.includeRecord(eq("MyTopic"), eq(0), anyLong(), anyObject(), anyObject());
.includeRecord(eq("MyTopic"), eq(0), anyLong(), any(), any());
}

private ConsumerRecords createConsumerRecords(final int count) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.junit.Test;
import org.sourcelab.kafka.webview.ui.plugin.filter.RecordFilter;

import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.security.PermissionCollection;
import java.security.ProtectionDomain;
Expand All @@ -43,7 +44,8 @@ public class PluginClassLoaderTest {
* Tests loading a class from a jar.
*/
@Test
public void testLoadingFilterPlugin() throws ClassNotFoundException, IllegalAccessException, InstantiationException {
public void testLoadingFilterPlugin() throws ClassNotFoundException, IllegalAccessException,
InstantiationException, NoSuchMethodException, InvocationTargetException {
// Get URL to our jar
final URL jar = getClass().getClassLoader().getResource("testDeserializer/testPlugins.jar");
final String classPath = "examples.filter.LowOffsetFilter";
Expand All @@ -55,7 +57,7 @@ public void testLoadingFilterPlugin() throws ClassNotFoundException, IllegalAcce
assertNotNull("Should not be null", filterPlugin);

// Create an instance of it and validate.
final RecordFilter filter = filterPlugin.newInstance();
final RecordFilter filter = filterPlugin.getDeclaredConstructor().newInstance();
final String topic = "MyTopic";
final int partition = 2;
final long offset = 2423L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@
import org.junit.Test;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;

import static org.mockito.Matchers.eq;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;


public class PresenceEventListenerTest {

/**
Expand Down