diff --git a/src/main/java/no/fint/provider/events/sse/SseService.java b/src/main/java/no/fint/provider/events/sse/SseService.java index 0e125a1..863adcd 100644 --- a/src/main/java/no/fint/provider/events/sse/SseService.java +++ b/src/main/java/no/fint/provider/events/sse/SseService.java @@ -5,12 +5,18 @@ import no.fint.event.model.Event; import no.fint.provider.events.ProviderProps; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import java.util.*; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @Slf4j @@ -21,8 +27,19 @@ public class SseService { private ConcurrentHashMap clients = new ConcurrentHashMap<>(); + @Value("${fint.provider.sse.threads:10}") + private int threads; + + private ExecutorService executorService; + + @PostConstruct + public void setup() { + executorService = Executors.newFixedThreadPool(threads); + } + @PreDestroy public void shutdown() { + executorService.shutdownNow(); clients.values().forEach(emitters -> emitters.forEach(FintSseEmitter::complete)); } @@ -76,22 +93,21 @@ public void send(Event event) { if (emitters == null) { log.info("No sse clients registered for {}", event.getOrgId()); } else { - List toBeRemoved = new ArrayList<>(); - emitters.forEach(emitter -> { - try { - SseEmitter.SseEventBuilder builder = SseEmitter.event().id(event.getCorrId()).name(event.getAction()).data(event).reconnectTime(5000L); - emitter.send(builder); - } catch (Exception e) { - log.warn("Exception when trying to send message to SseEmitter", e.getMessage()); - log.warn("Removing subscriber {}", event.getOrgId()); - log.debug("Details: {}", event, e); - toBeRemoved.add(emitter); - } - }); - - for (FintSseEmitter emitter : toBeRemoved) { - removeEmitter(event.getOrgId(), emitter); - } + emitters.forEach( + emitter -> executorService.execute(() -> { + try { + log.info("Sending event {} to {}", event.getCorrId(), emitter); + SseEmitter.SseEventBuilder builder = SseEmitter.event().id(event.getCorrId()).name(event.getAction()).data(event).reconnectTime(5000L); + emitter.send(builder); + } catch (Exception e) { + log.warn("Exception when trying to send message to SseEmitter", e.getMessage()); + log.warn("Removing subscriber {}", event.getOrgId()); + log.debug("Details: {}", event, e); + executorService.execute(() -> removeEmitter(event.getOrgId(), emitter)); + } + } + ) + ); } } diff --git a/src/main/resources/application-test.yml b/src/main/resources/application-test.yml index 349a40f..fd16e26 100644 --- a/src/main/resources/application-test.yml +++ b/src/main/resources/application-test.yml @@ -10,4 +10,7 @@ fint: queue-endpoint-enabled: true springfox: - swagger-https: false \ No newline at end of file + swagger-https: false + +server: + port: 8081 diff --git a/src/test/groovy/no/fint/provider/events/sse/SseServiceSpec.groovy b/src/test/groovy/no/fint/provider/events/sse/SseServiceSpec.groovy index bd2c8a7..184282d 100644 --- a/src/test/groovy/no/fint/provider/events/sse/SseServiceSpec.groovy +++ b/src/test/groovy/no/fint/provider/events/sse/SseServiceSpec.groovy @@ -2,6 +2,7 @@ package no.fint.provider.events.sse import no.fint.event.model.Event import no.fint.provider.events.ProviderProps +import no.fint.util.CurrentThreadExecutor import org.springframework.web.servlet.mvc.method.annotation.SseEmitter import spock.lang.Specification @@ -15,7 +16,7 @@ class SseServiceSpec extends Specification { props = Mock(ProviderProps) { getMaxNumberOfEmitters() >> 5 } - sseService = new SseService(providerProps: props) + sseService = new SseService(providerProps: props, executorService: new CurrentThreadExecutor()) } def "Return SseEmitter when subscribing with new orgId"() { @@ -51,7 +52,7 @@ class SseServiceSpec extends Specification { def "Do not send event when orgId does not have registered emitters"() { given: - sseService = new SseService(providerProps: props, clients: [:]) + sseService = new SseService(providerProps: props, clients: [:], executorService: new CurrentThreadExecutor()) when: sseService.send(new Event(orgId: 'hfk.no')) @@ -66,7 +67,7 @@ class SseServiceSpec extends Specification { def emitters = FintSseEmitters.with(5) emitters.add(emitter) def clients = ['hfk.no': emitters] as ConcurrentHashMap - sseService = new SseService(providerProps: props, clients: clients) + sseService = new SseService(providerProps: props, clients: clients, executorService: new CurrentThreadExecutor()) when: sseService.send(new Event(orgId: 'hfk.no')) @@ -95,7 +96,7 @@ class SseServiceSpec extends Specification { def emitters = FintSseEmitters.with(5) emitters.add(emitter) def clients = ['hfk.no': emitters] as ConcurrentHashMap - sseService = new SseService(providerProps: props, clients: clients) + sseService = new SseService(providerProps: props, clients: clients, executorService: new CurrentThreadExecutor()) when: sseService.shutdown() diff --git a/src/test/java/no/fint/util/CurrentThreadExecutor.java b/src/test/java/no/fint/util/CurrentThreadExecutor.java new file mode 100644 index 0000000..8b832ab --- /dev/null +++ b/src/test/java/no/fint/util/CurrentThreadExecutor.java @@ -0,0 +1,84 @@ +package no.fint.util; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.*; +import java.util.stream.Collectors; + +/** + * Executor which useful for unit testing + */ +public class CurrentThreadExecutor implements ExecutorService { + + @Override + public void execute(Runnable command) { + command.run(); + } + + @Override + public void shutdown() { + } + + @Override + public List shutdownNow() { + return Collections.emptyList(); + } + + @Override + public boolean isShutdown() { + return false; + } + + @Override + public boolean isTerminated() { + return false; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return false; + } + + @Override + public Future submit(Callable task) { + FutureTask f = new FutureTask(task); + f.run(); + return f; + } + + @Override + public Future submit(Runnable task, T result) { + FutureTask f = new FutureTask(task, result); + f.run(); + return f; + } + + @Override + public Future submit(Runnable task) { + FutureTask f = new FutureTask(task, null); + f.run(); + return f; + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + return tasks.stream().map(this::submit).collect(Collectors.toList()); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { + return tasks.stream().map(this::submit).collect(Collectors.toList()); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return tasks.stream().map(this::submit).findFirst().get().get(); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return tasks.stream().map(this::submit).findFirst().get().get(); + } + +} \ No newline at end of file