Skip to content

Commit

Permalink
Merge c58ad19 into c1f7892
Browse files Browse the repository at this point in the history
  • Loading branch information
asgeirn committed Sep 26, 2018
2 parents c1f7892 + c58ad19 commit 9f01a7c
Show file tree
Hide file tree
Showing 19 changed files with 119 additions and 158 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ FROM openjdk:8-jre-alpine
COPY --from=builder /home/gradle/build/deps/external/*.jar /data/
COPY --from=builder /home/gradle/build/deps/fint/*.jar /data/
COPY --from=builder /home/gradle/build/libs/fint-provider-*.jar /data/fint-provider.jar
CMD ["java", "-jar", "/data/fint-provider.jar"]
CMD ["java", "-XX:+ExitOnOutOfMemoryError", "-jar", "/data/fint-provider.jar"]
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ repositories {
dependencies {
compile('no.fint:fint-audit-mongo-plugin:1.4.1')
compile('no.fint:fint-events:2.1.0-rc-1')
compile('no.fint:fint-event-model:2.0.1')
compile('no.fint:fint-event-model:3.0.0')
compile('no.fint:fint-springfox-extension:0.0.1')

compile('org.projectlombok:lombok')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Set;
import java.util.Collection;

@Slf4j
@RestController
Expand All @@ -22,7 +22,7 @@ public class EventStateController {
private EventStateService eventStateService;

@GetMapping
public Set<EventState> getEventState() {
public Collection<EventState> getEventState() {
return eventStateService.getEventStates();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package no.fint.provider.events.eventstate;

import com.hazelcast.core.HazelcastInstance;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import no.fint.event.model.Event;
import no.fint.provider.events.ProviderProps;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

@Slf4j
@Service
Expand All @@ -22,31 +24,35 @@ public class EventStateService {
@Autowired
private ProviderProps providerProps;

@Getter
private Set<EventState> eventStates;
private Map<String,EventState> eventStates;

@PostConstruct
public void init() {
eventStates = hazelcastInstance.getSet(providerProps.getKey());
eventStates = hazelcastInstance.getMap(providerProps.getKey());
}

public void add(Event event, int timeToLiveInMinutes) {
eventStates.add(new EventState(event, timeToLiveInMinutes));
log.trace("Add {}, ttl={}", event, timeToLiveInMinutes);
eventStates.put(event.getCorrId(), new EventState(event, timeToLiveInMinutes));
}

public Optional<EventState> get(Event event) {
return eventStates.stream().filter(eventState -> eventState.getCorrId().equals(event.getCorrId())).findAny();
Optional<EventState> result = Optional.ofNullable(eventStates.get(event.getCorrId()));
return result;
}

public Optional<EventState> remove(Event event) {
Optional<EventState> eventState = get(event);
if (eventState.isPresent()) {
boolean removed = eventStates.remove(eventState.get());
if (!removed) {
log.warn("Unable to remove event with corrId {} from EventStates", event.getCorrId());
}
}
Optional<EventState> eventState = Optional.ofNullable(eventStates.remove(event.getCorrId()));
return eventState;
}

public List<Event> getExpiredEvents() {
List<EventState> expired = eventStates.values().stream().filter(EventState::expired).collect(Collectors.toList());
expired.stream().map(EventState::getCorrId).forEach(eventStates::remove);
return expired.stream().map(EventState::getEvent).collect(Collectors.toList());
}

public Collection<EventState> getEventStates() {
return eventStates.values();
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package no.fint.provider.events.eventstate;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import no.fint.audit.FintAuditService;
import no.fint.event.model.Event;
import no.fint.event.model.ResponseStatus;
import no.fint.event.model.Status;
import no.fint.events.FintEvents;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.List;

@Slf4j
@Service
public class JanitorService {
Expand All @@ -20,19 +19,19 @@ public class JanitorService {
@Autowired
private FintAuditService fintAuditService;

@Autowired
private FintEvents fintEvents;

@Scheduled(initialDelay = 20000L, fixedDelay = 5000L)
public void cleanUpEventStates() {
log.debug("Running janitor service");
List<EventState> eventStates = Lists.newArrayList(eventStateService.getEventStates());
eventStates.forEach(eventState -> {
if (eventState.expired()) {
Event event = eventState.getEvent();
log.info("EventState expired, removing from list: {}", eventState);
eventStateService.remove(event);

event.setMessage("Event expired");
fintAuditService.audit(event);
}
eventStateService.getExpiredEvents().forEach(event -> {
log.info("Event expired: {}", event);
event.setResponseStatus(ResponseStatus.ERROR);
event.setStatus(Status.ADAPTER_NOT_CONFIRMED);
event.setMessage("Event expired");
fintAuditService.audit(event);
fintEvents.sendUpstream(event);
});
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package no.fint.provider.events.response;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import lombok.extern.slf4j.Slf4j;
import no.fint.audit.FintAuditService;
import no.fint.event.model.Event;
Expand All @@ -14,21 +12,14 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.Optional;

@Slf4j
@Service
public class ResponseService {

@Value("${fint.provider.tracing:false}")
@Value("${fint.provider.trace.response:false}")
private boolean tracing;

@Autowired
Expand All @@ -40,38 +31,12 @@ public class ResponseService {
@Autowired
private FintEvents fintEvents;

private Path traceFile;
private ObjectMapper objectMapper;

@PostConstruct
public void init() throws IOException {
if (tracing) {
traceFile = Files.createTempFile("response", ".json");
Files.write(traceFile, "[\n{}".getBytes());
objectMapper = new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
log.info("Tracing response events to {}", traceFile.toAbsolutePath());
}
}

@PreDestroy
public void shutdown() throws IOException {
if (tracing)
try (OutputStream os = Files.newOutputStream(traceFile, StandardOpenOption.APPEND, StandardOpenOption.SYNC)) {
os.write("\n]\n".getBytes());
}
}

public void handleAdapterResponse(Event event) {
log.debug("{}: Response for {} from {} status {} with {} elements.",
event.getCorrId(), event.getAction(), event.getOrgId(), event.getStatus(),
Optional.ofNullable(event.getData()).map(List::size).orElse(0));
if (tracing) {
try (OutputStream os = Files.newOutputStream(traceFile, StandardOpenOption.APPEND, StandardOpenOption.SYNC)){
os.write(",\n".getBytes());
objectMapper.writeValue(os, event);
} catch (IOException e) {
log.info("Unable to trace event", e);
}
fintAuditService.audit(event, false);
}
if (event.isHealthCheck()) {
event.setStatus(Status.UPSTREAM_QUEUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@
@EqualsAndHashCode(callSuper = true)
public class FintSseEmitter extends SseEmitter {
private String id;
private String client;
private String registered;
private final AtomicInteger eventCounter = new AtomicInteger();

public FintSseEmitter() {
setRegisteredDate();
}

public FintSseEmitter(String id, long timeout) {
public FintSseEmitter(String id, String client, long timeout) {
super(timeout);
this.id = id;
this.client = client;
setRegisteredDate();
}

Expand Down
1 change: 1 addition & 0 deletions src/main/java/no/fint/provider/events/sse/SseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@
public class SseClient {
private String registered;
private String id;
private String client;
private int events;
}
6 changes: 4 additions & 2 deletions src/main/java/no/fint/provider/events/sse/SseController.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ public class SseController {
@Synchronized
@GetMapping("/{id}")
public SseEmitter subscribe(@ApiParam(Constants.SWAGGER_X_ORG_ID) @RequestHeader(HeaderConstants.ORG_ID) String orgId,
@ApiParam("ID of client.") @RequestHeader(HeaderConstants.CLIENT) String client,
@ApiParam("Global unique id for the client. Typically a UUID.") @PathVariable String id) {
SseEmitter emitter = sseService.subscribe(id, orgId);
log.info("{}: Client {}, ID {}", orgId, client, id);
SseEmitter emitter = sseService.subscribe(id, orgId, client);
fintEvents.registerDownstreamListener(orgId, downstreamSubscriber);
return emitter;
}
Expand All @@ -49,7 +51,7 @@ public List<SseOrg> getClients() {
List<SseOrg> orgs = new ArrayList<>();
clients.forEach((key, value) -> {
List<SseClient> sseClients = new ArrayList<>();
value.forEach(emitter -> sseClients.add(new SseClient(emitter.getRegistered(), emitter.getId(), emitter.getEventCounter().get())));
value.forEach(emitter -> sseClients.add(new SseClient(emitter.getRegistered(), emitter.getId(), emitter.getClient(), emitter.getEventCounter().get())));

orgs.add(new SseOrg(key, sseClients));
});
Expand Down
10 changes: 7 additions & 3 deletions src/main/java/no/fint/provider/events/sse/SseService.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import javax.annotation.PreDestroy;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

@Slf4j
Expand All @@ -27,7 +28,7 @@ public void shutdown() {
}

@Synchronized
public SseEmitter subscribe(String id, String orgId) {
public SseEmitter subscribe(String id, String orgId, String client) {
FintSseEmitters fintSseEmitters = clients.get(orgId);
if (fintSseEmitters == null) {
fintSseEmitters = FintSseEmitters.with(providerProps.getMaxNumberOfEmitters(), this::closeEmitter);
Expand All @@ -37,8 +38,11 @@ public SseEmitter subscribe(String id, String orgId) {
if (registeredEmitter.isPresent()) {
return registeredEmitter.get();
} else {
log.info("id: {}, {} connected", id, orgId);
FintSseEmitter emitter = new FintSseEmitter(id, TimeUnit.MINUTES.toMillis(providerProps.getSseTimeoutMinutes()));
log.info("{}: {} connected", orgId, id);
FintSseEmitter emitter = new FintSseEmitter(id, client,
TimeUnit.MINUTES.toMillis(
ThreadLocalRandom.current().nextInt(2000) +
providerProps.getSseTimeoutMinutes()));
emitter.onCompletion(() -> {
log.info("onCompletion called for {}, id: {}", orgId, emitter.getId());
removeEmitter(orgId, emitter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import lombok.extern.slf4j.Slf4j;
import no.fint.audit.FintAuditService;
import no.fint.event.model.Event;
import no.fint.event.model.ResponseStatus;
import no.fint.event.model.Status;
import no.fint.events.FintEvents;
import no.fint.provider.events.ProviderProps;
Expand Down Expand Up @@ -31,6 +32,7 @@ public class StatusService {
private ProviderProps providerProps;

public void updateEventState(Event event) {
log.trace("Event received: {}", event);
Optional<EventState> state = eventStateService.remove(event);
if (state.isPresent()) {
fintAuditService.audit(event);
Expand All @@ -39,7 +41,10 @@ public void updateEventState(Event event) {
if (event.getStatus() == Status.ADAPTER_ACCEPTED) {
eventStateService.add(event, providerProps.getResponseTtl());
} else {
log.info("Adapter did not acknowledge the event (status: {}), sending event upstream.", event.getStatus().name());
if (event.getResponseStatus() == null) {
event.setResponseStatus(ResponseStatus.REJECTED);
}
log.debug("{} adapter did not acknowledge the event (status: {}), sending event upstream.", event.getOrgId(), event.getStatus().name());
event.setMessage(String.format("Adapter did not acknowledge the event (status: %s)", event.getStatus().name()));
fintEvents.sendUpstream(event);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package no.fint.provider.events.subscriber;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import lombok.extern.slf4j.Slf4j;
import no.fint.audit.FintAuditService;
import no.fint.event.model.Event;
Expand All @@ -17,19 +15,11 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

@Slf4j
@Component
public class DownstreamSubscriber implements FintEventListener {

@Value("${fint.provider.tracing:false}")
@Value("${fint.provider.trace.downstream:false}")
private boolean tracing;

@Autowired
Expand All @@ -44,37 +34,11 @@ public class DownstreamSubscriber implements FintEventListener {
@Autowired
private ProviderProps providerProps;

private Path traceFile;
private ObjectMapper objectMapper;

@PostConstruct
public void init() throws IOException {
if (tracing) {
traceFile = Files.createTempFile("downstream", ".json");
Files.write(traceFile, "[\n{}".getBytes());
objectMapper = new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
log.info("Tracing downstream events to {}", traceFile.toAbsolutePath());
}
}

@PreDestroy
public void shutdown() throws IOException {
if (tracing)
try (OutputStream os = Files.newOutputStream(traceFile, StandardOpenOption.APPEND, StandardOpenOption.SYNC)) {
os.write("\n]\n".getBytes());
}
}

@Override
public void accept(Event event) {
log.debug("Event received: {}", event);
if (tracing) {
try (OutputStream os = Files.newOutputStream(traceFile, StandardOpenOption.APPEND, StandardOpenOption.SYNC)){
os.write(",\n".getBytes());
objectMapper.writeValue(os, event);
} catch (IOException e) {
log.info("Unable to trace event", e);
}
fintAuditService.audit(event, false);
}
if (event.isHealthCheck()) {
event.addObject(new Health(Constants.COMPONENT, HealthStatus.RECEIVED_IN_PROVIDER_FROM_CONSUMER));
Expand Down
Loading

0 comments on commit 9f01a7c

Please sign in to comment.