Skip to content

Commit

Permalink
Merge pull request #12 from FINTprosjektet/event-dispatch
Browse files Browse the repository at this point in the history
Dispatch SSE events on separate thread pool.
  • Loading branch information
asgeirn committed Jun 21, 2018
2 parents d57ff99 + aa31076 commit 828a9c2
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 22 deletions.
50 changes: 33 additions & 17 deletions src/main/java/no/fint/provider/events/sse/SseService.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@
import no.fint.event.model.Event;
import no.fint.provider.events.ProviderProps;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Slf4j
Expand All @@ -21,8 +27,19 @@ public class SseService {

private ConcurrentHashMap<String, FintSseEmitters> clients = new ConcurrentHashMap<>();

@Value("${fint.provider.sse.threads:10}")
private int threads;

private ExecutorService executorService;

@PostConstruct
public void setup() {
executorService = Executors.newFixedThreadPool(threads);
}

@PreDestroy
public void shutdown() {
executorService.shutdownNow();
clients.values().forEach(emitters -> emitters.forEach(FintSseEmitter::complete));
}

Expand Down Expand Up @@ -76,22 +93,21 @@ public void send(Event event) {
if (emitters == null) {
log.info("No sse clients registered for {}", event.getOrgId());
} else {
List<FintSseEmitter> toBeRemoved = new ArrayList<>();
emitters.forEach(emitter -> {
try {
SseEmitter.SseEventBuilder builder = SseEmitter.event().id(event.getCorrId()).name(event.getAction()).data(event).reconnectTime(5000L);
emitter.send(builder);
} catch (Exception e) {
log.warn("Exception when trying to send message to SseEmitter", e.getMessage());
log.warn("Removing subscriber {}", event.getOrgId());
log.debug("Details: {}", event, e);
toBeRemoved.add(emitter);
}
});

for (FintSseEmitter emitter : toBeRemoved) {
removeEmitter(event.getOrgId(), emitter);
}
emitters.forEach(
emitter -> executorService.execute(() -> {
try {
log.info("Sending event {} to {}", event.getCorrId(), emitter);
SseEmitter.SseEventBuilder builder = SseEmitter.event().id(event.getCorrId()).name(event.getAction()).data(event).reconnectTime(5000L);
emitter.send(builder);
} catch (Exception e) {
log.warn("Exception when trying to send message to SseEmitter", e.getMessage());
log.warn("Removing subscriber {}", event.getOrgId());
log.debug("Details: {}", event, e);
executorService.execute(() -> removeEmitter(event.getOrgId(), emitter));
}
}
)
);
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/main/resources/application-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@ fint:
queue-endpoint-enabled: true

springfox:
swagger-https: false
swagger-https: false

server:
port: 8081
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package no.fint.provider.events.sse

import no.fint.event.model.Event
import no.fint.provider.events.ProviderProps
import no.fint.util.CurrentThreadExecutor
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter
import spock.lang.Specification

Expand All @@ -15,7 +16,7 @@ class SseServiceSpec extends Specification {
props = Mock(ProviderProps) {
getMaxNumberOfEmitters() >> 5
}
sseService = new SseService(providerProps: props)
sseService = new SseService(providerProps: props, executorService: new CurrentThreadExecutor())
}

def "Return SseEmitter when subscribing with new orgId"() {
Expand Down Expand Up @@ -51,7 +52,7 @@ class SseServiceSpec extends Specification {

def "Do not send event when orgId does not have registered emitters"() {
given:
sseService = new SseService(providerProps: props, clients: [:])
sseService = new SseService(providerProps: props, clients: [:], executorService: new CurrentThreadExecutor())

when:
sseService.send(new Event(orgId: 'hfk.no'))
Expand All @@ -66,7 +67,7 @@ class SseServiceSpec extends Specification {
def emitters = FintSseEmitters.with(5)
emitters.add(emitter)
def clients = ['hfk.no': emitters] as ConcurrentHashMap
sseService = new SseService(providerProps: props, clients: clients)
sseService = new SseService(providerProps: props, clients: clients, executorService: new CurrentThreadExecutor())

when:
sseService.send(new Event(orgId: 'hfk.no'))
Expand Down Expand Up @@ -95,7 +96,7 @@ class SseServiceSpec extends Specification {
def emitters = FintSseEmitters.with(5)
emitters.add(emitter)
def clients = ['hfk.no': emitters] as ConcurrentHashMap
sseService = new SseService(providerProps: props, clients: clients)
sseService = new SseService(providerProps: props, clients: clients, executorService: new CurrentThreadExecutor())

when:
sseService.shutdown()
Expand Down
84 changes: 84 additions & 0 deletions src/test/java/no/fint/util/CurrentThreadExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package no.fint.util;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

/**
* Executor which useful for unit testing
*/
public class CurrentThreadExecutor implements ExecutorService {

@Override
public void execute(Runnable command) {
command.run();
}

@Override
public void shutdown() {
}

@Override
public List<Runnable> shutdownNow() {
return Collections.emptyList();
}

@Override
public boolean isShutdown() {
return false;
}

@Override
public boolean isTerminated() {
return false;
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return false;
}

@Override
public <T> Future<T> submit(Callable<T> task) {
FutureTask<T> f = new FutureTask<T>(task);
f.run();
return f;
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
FutureTask<T> f = new FutureTask<T>(task, result);
f.run();
return f;
}

@Override
public Future<?> submit(Runnable task) {
FutureTask<?> f = new FutureTask<Void>(task, null);
f.run();
return f;
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return tasks.stream().map(this::submit).collect(Collectors.toList());
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return tasks.stream().map(this::submit).collect(Collectors.toList());
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return tasks.stream().map(this::submit).findFirst().get().get();
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return tasks.stream().map(this::submit).findFirst().get().get();
}

}

0 comments on commit 828a9c2

Please sign in to comment.