Skip to content

Commit

Permalink
Add/Remove alarms from situations (#62)
Browse files Browse the repository at this point in the history
* add/remove alarms from situation endpoint
* Sonarcloud issues
* Remove duplicated code
* fix blueprint context unit test
* Add unit test to getAlarm(int id)
* Check if alarm is not already in a situation
* inline method getSituation
* add log

also fix bad bundle name for processor api

ALEC-191
  • Loading branch information
BenjaminJ committed Sep 27, 2022
1 parent 2a6a36c commit eed4126
Show file tree
Hide file tree
Showing 12 changed files with 223 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@
package org.opennms.alec.datasource.api;

import java.util.List;
import java.util.Optional;

public interface AlarmDatasource {

List<Alarm> getAlarms();

Optional<Alarm> getAlarm(int id) throws InterruptedException;

List<Alarm> getAlarmsAndRegisterHandler(AlarmHandler handler);

void registerHandler(AlarmHandler handler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.util.List;
import java.util.Objects;
import java.util.Optional;

import org.opennms.alec.datasource.api.Alarm;
import org.opennms.alec.datasource.api.AlarmDatasource;
Expand All @@ -47,6 +48,11 @@ public List<Alarm> getAlarms() {
return alarms;
}

@Override
public Optional<Alarm> getAlarm(int id) throws InterruptedException {
return alarms.stream().filter(alarm -> String.valueOf(id).equals(alarm.getId())).findFirst();
}

@Override
public List<Alarm> getAlarmsAndRegisterHandler(AlarmHandler handler) {
return alarms;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,21 @@ public List<Alarm> getAlarms() {
}
}

@Override
public Optional<Alarm> getAlarm(int id) {
waitForInit();

rwLock.readLock().lock();
try {
return alarmsById.values().stream()
.filter(alarm -> id == alarm.getId())
.map(mapper::toAlarm)
.findFirst();
} finally {
rwLock.readLock().unlock();
}
}

@Override
public List<Alarm> getAlarmsAndRegisterHandler(AlarmHandler handler) {
rwLock.readLock().lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,26 @@ public void testGetSituationNotFound() throws InterruptedException {
when(alarm.getLastEventTime()).thenReturn(new Date());
assertTrue(dac.getSituation(2).isEmpty());
}

@Test
public void testGetAlarm() throws InterruptedException {
when(alarm.getId()).thenReturn(1);
when(alarm.getReductionKey()).thenReturn("key");
when(alarm.getManagedObjectInstance()).thenReturn("test:1");
when(alarm.getManagedObjectType()).thenReturn(ManagedObjectType.EntPhysicalEntity.getName());
when(alarm.getFirstEventTime()).thenReturn(new Date());
when(alarm.getLastEventTime()).thenReturn(new Date());
assertTrue(dac.getAlarm(1).isPresent());
}

@Test
public void testGetAlarmNotFound() throws InterruptedException {
when(alarm.getId()).thenReturn(1);
when(alarm.getReductionKey()).thenReturn("key");
when(alarm.getManagedObjectInstance()).thenReturn("test:1");
when(alarm.getManagedObjectType()).thenReturn(ManagedObjectType.EntPhysicalEntity.getName());
when(alarm.getFirstEventTime()).thenReturn(new Date());
when(alarm.getLastEventTime()).thenReturn(new Date());
assertTrue(dac.getAlarm(2).isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
Expand Down Expand Up @@ -131,6 +132,7 @@ public class OpennmsDatasource implements SituationDatasource, AlarmDatasource,
public static final String SITUATION_STORE = "situationStore";
public static final String EDGE_STORE = "edgeStore";
public static final String IGNORED_SITUATION = "An error occurred while mapping a situation. It will be ignored. Situation: {}";
public static final String IGNORED_ALARM = "An error occurred while mapping an alarm. It will be ignored. Alarm: {}";

private final HandlerRegistry<AlarmHandler> alarmHandlers = new HandlerRegistry<>();
private final HandlerRegistry<AlarmFeedbackHandler> alarmFeedbackHandlers = new HandlerRegistry<>();
Expand Down Expand Up @@ -192,8 +194,10 @@ public void init() throws IOException {
}
streams.setStateListener(streamStateListener);

streams.setUncaughtExceptionHandler((t, e) ->
LOG.error(String.format("Stream error on thread: %s", t.getName()), e));
streams.setUncaughtExceptionHandler(exception -> {
LOG.error("Stream error on thread: {}", Thread.currentThread().getName(), exception);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
try {
streams.start();
} catch (StreamsException | IllegalStateException e) {
Expand Down Expand Up @@ -407,14 +411,19 @@ private void createStores(StreamsBuilder builder) {
@Override
public List<Alarm> getAlarms() {
final List<Alarm> alarms = new ArrayList<>();
try {
waitUntilAlarmStoreIsQueryable().all().forEachRemaining(entry -> alarms.add(OpennmsMapper.toAlarm(entry.value)));
try (KeyValueIterator<String, OpennmsModelProtos.Alarm> all = waitUntilAlarmStoreIsQueryable().all()) {
all.forEachRemaining(entry -> alarms.add(OpennmsMapper.toAlarm(entry.value)));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return alarms;
}

@Override
public Optional<Alarm> getAlarm(int id) {
return getAlarms().stream().filter(alarm -> String.valueOf(id).equals(alarm.getId())).findFirst();
}

@Override
public List<Alarm> getAlarmsAndRegisterHandler(AlarmHandler handler) {
final List<Alarm> alarms = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,53 @@ public void canHandleExistingAlarms() throws IOException {
datasource.unregisterHandler(this);
}

@Test(timeout=60000)
public void canRetrieveAlarms() throws IOException, InterruptedException {
datasource.init();
assertThat(datasource.getAlarms(), hasSize(0));

OpennmsModelProtos.Alarm alarm = OpennmsModelProtos.Alarm.newBuilder()
.setReductionKey("nodeDown::1")
.setLastEventTime(1)
.setSeverity(OpennmsModelProtos.Severity.CRITICAL)
.build();
producer.send(new ProducerRecord<>(datasource.getAlarmTopic(), alarm.getReductionKey(), alarm.toByteArray()));

await().atMost(10, TimeUnit.SECONDS).until(() -> datasource.getAlarms(), hasSize(1));
}

@Test(timeout=60000)
public void canRetrieveAlarm() throws IOException, InterruptedException {
datasource.init();
assertThat(datasource.getAlarms(), hasSize(0));

OpennmsModelProtos.Alarm alarm = OpennmsModelProtos.Alarm.newBuilder()
.setReductionKey("nodeDown::1")
.setId(1)
.setLastEventTime(1)
.setSeverity(OpennmsModelProtos.Severity.CRITICAL)
.build();
producer.send(new ProducerRecord<>(datasource.getAlarmTopic(), alarm.getReductionKey(), alarm.toByteArray()));

await().atMost(10, TimeUnit.SECONDS).until(() -> datasource.getAlarm(1).isPresent());
}

@Test(timeout=6000)
public void canRetrieveAlarmNotFound() throws IOException, InterruptedException {
datasource.init();
assertThat(datasource.getAlarms(), hasSize(0));

OpennmsModelProtos.Alarm alarm = OpennmsModelProtos.Alarm.newBuilder()
.setReductionKey("nodeDown::1")
.setId(1)
.setLastEventTime(1)
.setSeverity(OpennmsModelProtos.Severity.CRITICAL)
.build();
producer.send(new ProducerRecord<>(datasource.getAlarmTopic(), alarm.getReductionKey(), alarm.toByteArray()));

await().atMost(10, TimeUnit.SECONDS).until(() -> datasource.getAlarm(2).isEmpty());
}

@Override
public void onAlarmCreatedOrUpdated(Alarm alarm) {
alarmsCreatedOrUpdatedById.put(alarm.getId(), alarm);
Expand Down
10 changes: 10 additions & 0 deletions features/ui/src/main/java/org/opennms/alec/rest/SituationRest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
package org.opennms.alec.rest;

import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
Expand All @@ -55,4 +57,12 @@ public interface SituationRest {

@GET
Response getSituationList() throws InterruptedException;

@PUT
@Path("{situationId}/alarm/{alarmId}")
Response addAlarm(@PathParam("situationId") String situationId, @PathParam("alarmId") String alarmId) throws InterruptedException;

@DELETE
@Path("{situationId}/alarm/{alarmId}")
Response removeAlarm(@PathParam("situationId") String situationId, @PathParam("alarmId") String alarmId) throws InterruptedException;
}
109 changes: 95 additions & 14 deletions features/ui/src/main/java/org/opennms/alec/rest/SituationRestImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,22 @@

package org.opennms.alec.rest;

import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import javax.ws.rs.core.Response;

import org.opennms.alec.data.KeyEnum;
import org.opennms.alec.data.SituationStatus;
import org.opennms.alec.data.SituationStatusImpl;
import org.opennms.alec.datasource.api.Alarm;
import org.opennms.alec.datasource.api.AlarmDatasource;
import org.opennms.alec.datasource.api.Situation;
import org.opennms.alec.datasource.api.SituationDatasource;
import org.opennms.alec.datasource.api.Status;
Expand All @@ -52,29 +57,33 @@

public class SituationRestImpl implements SituationRest {
private static final Logger LOG = LoggerFactory.getLogger(SituationRestImpl.class);
public static final String SITUATION_NOT_FOUND = "Situation {0} not found";
public static final String ALARM_NOT_FOUND = "Alarm {0} not found";

private final ObjectMapper objectMapper;
private final KeyValueStore<String> kvStore;
private final SituationDatasource situationDatasource;
private final AlarmDatasource alarmDatasource;

public SituationRestImpl(KeyValueStore<String> kvStore,
SituationDatasource situationDatasource) {
SituationDatasource situationDatasource,
AlarmDatasource alarmDatasource) {
this.kvStore = kvStore;
this.situationDatasource = situationDatasource;
this.alarmDatasource = alarmDatasource;
objectMapper = new ObjectMapper();
}

@Override
public Response rejected(String id) throws InterruptedException {
Optional<Situation> situationOptional;
situationOptional = situationDatasource.getSituation(Integer.parseInt(id));
public Response rejected(String situationId) throws InterruptedException {
Optional<Situation> situationOptional = situationDatasource.getSituation(Integer.parseInt(situationId));

if (situationOptional.isPresent()) {
Situation situation = situationOptional.get();
//check status
if (Status.REJECTED.equals(situation.getStatus())) {
LOG.debug("Situation {} already rejected", id);
return Response.accepted("Situation " + id + " already rejected").build();
LOG.debug("Situation {} already rejected", situationId);
return Response.accepted(MessageFormat.format("Situation {0} already rejected", situationId)).build();
}

try {
Expand All @@ -88,20 +97,19 @@ public Response rejected(String id) throws InterruptedException {
}
}

return Response.status(Response.Status.NOT_FOUND).entity("Situation id: " + id + " not found").build();
return Response.status(Response.Status.NOT_FOUND).entity(MessageFormat.format(SITUATION_NOT_FOUND, situationId)).build();
}

@Override
public Response accepted(String id) throws InterruptedException {
Optional<Situation> situationOptional;
situationOptional = situationDatasource.getSituation(Integer.parseInt(id));
public Response accepted(String situationId) throws InterruptedException {
Optional<Situation> situationOptional = situationDatasource.getSituation(Integer.parseInt(situationId));

if (situationOptional.isPresent()) {
Situation situation = situationOptional.get();
//check status
if (Status.ACCEPTED.equals(situation.getStatus())) {
LOG.debug("Situation {} already accepted", id);
return Response.accepted("Situation " + id + " already accepted").build();
LOG.debug("Situation {} already accepted", situationId);
return Response.accepted(MessageFormat.format("Situation {0} already accepted", situationId)).build();
}

//Update situation
Expand All @@ -116,7 +124,7 @@ public Response accepted(String id) throws InterruptedException {
}
}

return Response.status(Response.Status.NOT_FOUND).entity("Situation id: " + id + " not found").build();
return Response.status(Response.Status.NOT_FOUND).entity(MessageFormat.format(SITUATION_NOT_FOUND, situationId)).build();
}

@Override
Expand All @@ -136,6 +144,79 @@ public Response getSituationList() throws InterruptedException {
return Response.ok(situations).build();
}

@Override
public Response addAlarm(String situationId, String alarmId) throws InterruptedException {
Optional<Situation> situationOptional = situationDatasource.getSituation(Integer.parseInt(situationId));

if (situationOptional.isPresent()) {
Optional<Alarm> alarmOptional = alarmDatasource.getAlarm(Integer.parseInt(alarmId));
if (alarmOptional.isPresent()) {
if (alarmIsNotInAnotherSituation(alarmOptional.get().getReductionKey())) {
Situation oldSituation = situationOptional.get();
Set<Alarm> alarms = new HashSet<>(oldSituation.getAlarms());
alarms.add(alarmOptional.get());
return forwardAndStoreSituation(oldSituation, alarms);
} else {
LOG.warn("Alarm {} is already in a situation, thus it will not be added to situation {}", alarmId, situationId);
return Response.status(Response.Status.CONFLICT).entity(MessageFormat.format("Alarm {0} is already in a situation", alarmId)).build();
}
} else {
LOG.warn("Alarm {} not found, thus it will not be added to situation {}", alarmId, situationId);
return Response.status(Response.Status.NOT_FOUND).entity(MessageFormat.format(ALARM_NOT_FOUND, alarmId)).build();
}
} else {
LOG.warn("Situation {} not found, thus alarm {} will not be added to the situation", situationId, alarmId);
return Response.status(Response.Status.NOT_FOUND).entity(MessageFormat.format(SITUATION_NOT_FOUND, situationId)).build();
}
}

@Override
public Response removeAlarm(String situationId, String alarmId) throws InterruptedException {
Optional<Situation> situationOptional = situationDatasource.getSituation(Integer.parseInt(situationId));

if (situationOptional.isPresent()) {
Optional<Alarm> alarmOptional = alarmDatasource.getAlarm(Integer.parseInt(alarmId));
if (alarmOptional.isPresent()) {
Situation oldSituation = situationOptional.get();
Set<Alarm> alarms = oldSituation.getAlarms()
.stream()
.filter(alarm -> !alarmOptional.get().getReductionKey().equals(alarm.getReductionKey()))
.collect(Collectors.toUnmodifiableSet());
return forwardAndStoreSituation(oldSituation, alarms);
} else {
LOG.warn("Alarm {} not found, thus it will not be removed from situation {}", alarmId, situationId);
return Response.status(Response.Status.NOT_FOUND).entity(MessageFormat.format(ALARM_NOT_FOUND, alarmId)).build();
}
} else {
LOG.warn("Situation {} not found, thus alarm {} will not be removed from the situation", situationId, alarmId);
return Response.status(Response.Status.NOT_FOUND).entity(MessageFormat.format(SITUATION_NOT_FOUND, situationId)).build();
}
}

private boolean alarmIsNotInAnotherSituation(String reductionKey) throws InterruptedException {
for (Situation situation : situationDatasource.getSituations()) {
for (Alarm alarm : situation.getAlarms()) {
if (reductionKey.equals(alarm.getReductionKey())) {
return false;
}
}
}
return true;
}

private Response forwardAndStoreSituation(Situation oldSituation, Set<Alarm> alarms) throws InterruptedException {
try {
Situation newSituation = ImmutableSituation.newBuilderFrom(oldSituation).setAlarms(alarms).build();
situationDatasource.forwardSituation(newSituation);
storeMLSituations();
return Response.ok().build();
} catch (InterruptedException e) {
throw e;
} catch (Exception e) {
return somethingWentWrong(e);
}
}

private void storeMLSituations() throws JsonProcessingException, InterruptedException {
List<Situation> acceptedSituations = situationDatasource.getSituationsWithAlarmId().stream()
.filter(s -> Status.ACCEPTED.equals(s.getStatus()))
Expand All @@ -150,6 +231,6 @@ private void storeMLSituations() throws JsonProcessingException, InterruptedExce

private Response somethingWentWrong(Throwable e) {
LOG.error(e.getMessage(), e.fillInStackTrace());
return Response.serverError().entity("something went wrong: " + e.getMessage()).build();
return Response.serverError().entity(MessageFormat.format("something went wrong: {0}", e.getMessage())).build();
}
}
Loading

0 comments on commit eed4126

Please sign in to comment.