Skip to content

Commit

Permalink
Dispatch SSE events on separate thread pool.
Browse files Browse the repository at this point in the history
This avoids a problem where an old stale SSE client consume all events.
  • Loading branch information
Asgeir Nilsen committed Jun 19, 2018
1 parent 19655df commit 84d8a27
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 18 deletions.
50 changes: 33 additions & 17 deletions src/main/java/no/fint/provider/events/sse/SseService.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@
import no.fint.event.model.Event;
import no.fint.provider.events.ProviderProps;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Slf4j
Expand All @@ -21,8 +27,19 @@ public class SseService {

private ConcurrentHashMap<String, FintSseEmitters> clients = new ConcurrentHashMap<>();

@Value("${fint.provider.sse.threads:10}")
private int threads;

ExecutorService executorService;

@PostConstruct
public void setup() {
executorService = Executors.newFixedThreadPool(threads);
}

@PreDestroy
public void shutdown() {
executorService.shutdownNow();
clients.values().forEach(emitters -> emitters.forEach(FintSseEmitter::complete));
}

Expand Down Expand Up @@ -76,22 +93,21 @@ public void send(Event event) {
if (emitters == null) {
log.info("No sse clients registered for {}", event.getOrgId());
} else {
List<FintSseEmitter> toBeRemoved = new ArrayList<>();
emitters.forEach(emitter -> {
try {
SseEmitter.SseEventBuilder builder = SseEmitter.event().id(event.getCorrId()).name(event.getAction()).data(event).reconnectTime(5000L);
emitter.send(builder);
} catch (Exception e) {
log.warn("Exception when trying to send message to SseEmitter", e.getMessage());
log.warn("Removing subscriber {}", event.getOrgId());
log.debug("Details: {}", event, e);
toBeRemoved.add(emitter);
}
});

for (FintSseEmitter emitter : toBeRemoved) {
removeEmitter(event.getOrgId(), emitter);
}
emitters.forEach(
emitter -> executorService.execute(() -> {
try {
log.info("Sending event {} to {}", event.getCorrId(), emitter);
SseEmitter.SseEventBuilder builder = SseEmitter.event().id(event.getCorrId()).name(event.getAction()).data(event).reconnectTime(5000L);
emitter.send(builder);
} catch (Exception e) {
log.warn("Exception when trying to send message to SseEmitter", e.getMessage());
log.warn("Removing subscriber {}", event.getOrgId());
log.debug("Details: {}", event, e);
executorService.execute(() -> removeEmitter(event.getOrgId(), emitter));
}
}
)
);
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/main/resources/application-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@ fint:
queue-endpoint-enabled: true

springfox:
swagger-https: false
swagger-https: false

server:
port: 8081

0 comments on commit 84d8a27

Please sign in to comment.