Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak #365

Merged
merged 1 commit into from Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -17,19 +17,6 @@
package org.axonframework.extensions.kafka.eventhandling.consumer.subscribable;

import com.thoughtworks.xstream.XStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.axonframework.common.AxonConfigurationException;
Expand All @@ -40,13 +27,28 @@
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.DefaultConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher;
import org.axonframework.extensions.kafka.eventhandling.consumer.RuntimeErrorHandler;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.xml.CompactDriver;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.axonframework.common.BuilderUtils.assertNonNull;
import static org.axonframework.common.BuilderUtils.assertThat;

Expand Down Expand Up @@ -82,7 +84,7 @@ public class SubscribableKafkaMessageSource<K, V> implements SubscribableMessage
private final int consumerCount;

private final Set<java.util.function.Consumer<List<? extends EventMessage<?>>>> eventProcessors = new CopyOnWriteArraySet<>();
private final List<Registration> fetcherRegistrations = new CopyOnWriteArrayList<>();
private final Map<Integer, Registration> fetcherRegistrations = new ConcurrentHashMap<>();
private final AtomicBoolean inProgress = new AtomicBoolean(false);

/**
Expand Down Expand Up @@ -168,11 +170,11 @@ public void start() {
}

for (int consumerIndex = 0; consumerIndex < consumerCount; consumerIndex++) {
addConsumer();
addConsumer(consumerIndex);
}
}

private void addConsumer() {
private void addConsumer(int consumerIndex) {
Consumer<K, V> consumer = consumerFactory.createConsumer(groupId);
consumer.subscribe(topics);

Expand All @@ -184,14 +186,16 @@ private void addConsumer() {
.map(Optional::get)
.collect(Collectors.toList()),
eventMessages -> eventProcessors.forEach(eventProcessor -> eventProcessor.accept(eventMessages)),
this::restartOnError
restartOnError(consumerIndex)
);
fetcherRegistrations.add(closeConsumer);
fetcherRegistrations.put(consumerIndex, closeConsumer);
}

private void restartOnError(RuntimeException e) {
logger.warn("Consumer had a fatal exception, starting a new one", e);
addConsumer();
private RuntimeErrorHandler restartOnError(int consumerIndex) {
return e -> {
logger.warn("Consumer had a fatal exception, starting a new one", e);
addConsumer(consumerIndex);
};
}

/**
Expand All @@ -202,7 +206,8 @@ public void close() {
logger.debug("No Event Processors have been subscribed who's Consumers should be closed");
return;
}
fetcherRegistrations.forEach(Registration::close);
fetcherRegistrations.values().forEach(Registration::close);
fetcherRegistrations.clear();
inProgress.set(false);
}

Expand Down
Expand Up @@ -17,18 +17,25 @@
package org.axonframework.extensions.kafka.eventhandling.consumer.subscribable;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.extensions.kafka.eventhandling.consumer.AsyncFetcher;
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher;
import org.junit.jupiter.api.*;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.awaitility.Awaitility.await;
import static org.axonframework.extensions.kafka.eventhandling.util.ConsumerConfigUtil.DEFAULT_GROUP_ID;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
Expand Down Expand Up @@ -192,7 +199,7 @@ void testSubscribingTheSameInstanceTwiceDisregardsSecondInstanceOnStart() {

@Test
void testStartSubscribesConsumerToAllProvidedTopics() {
when(fetcher.poll(eq(mockConsumer), any(), any())).thenReturn(NO_OP_FETCHER_REGISTRATION);
when(fetcher.poll(eq(mockConsumer), any(), any(), any())).thenReturn(NO_OP_FETCHER_REGISTRATION);

List<String> testTopics = new ArrayList<>();
testTopics.add("topicOne");
Expand Down Expand Up @@ -273,4 +280,35 @@ void testCloseRunsCloseHandlerPerConsumerCount() {
assertTrue(closedEventProcessorOne.get());
assertTrue(closedEventProcessorTwo.get());
}

@Test
void restartingConsumerShouldNotCauseAMemoryLeakAndOnCloseNoRegistrationsShouldBeleft() throws NoSuchFieldException, IllegalAccessException {
fetcher = AsyncFetcher.<String, String, EventMessage<?>>builder()
.executorService(newSingleThreadExecutor()).build();
when(mockConsumer.poll(any(Duration.class))).thenThrow(new BrokerNotAvailableException("none available"));

SubscribableKafkaMessageSource<String, String> testSubject =
SubscribableKafkaMessageSource.<String, String>builder()
.topics(Collections.singletonList(TEST_TOPIC))
.groupId(DEFAULT_GROUP_ID)
.consumerFactory(consumerFactory)
.fetcher(fetcher)
.autoStart()
.build();

testSubject.subscribe(NO_OP_EVENT_PROCESSOR);

await().atMost(Duration.ofSeconds(4)).untilAsserted(() -> verify(consumerFactory, atLeast(4)).createConsumer(DEFAULT_GROUP_ID));
Field fetcherRegistrations = SubscribableKafkaMessageSource.class.getDeclaredField("fetcherRegistrations");

fetcherRegistrations.setAccessible(true);

Map<Integer, Registration> registrations = (Map<Integer, Registration>) fetcherRegistrations.get(testSubject);
assertEquals(1, registrations.values().size());

testSubject.close();

registrations = (Map<Integer, Registration>) fetcherRegistrations.get(testSubject);
assertTrue(registrations.isEmpty());
}
}