From cad2f2ca632bcc5d3bcbcf38ffef4470704cd8c0 Mon Sep 17 00:00:00 2001 From: Jochen Schalanda Date: Mon, 15 May 2017 15:57:22 +0200 Subject: [PATCH 1/2] Don't start stopped inputs after updating them Fixes #3479 --- .../graylog2/inputs/InputEventListener.java | 55 +++- .../org/graylog2/inputs/InputService.java | 2 + .../org/graylog2/inputs/InputServiceImpl.java | 9 + .../system/inputs/InputsResource.java | 2 +- .../inputs/InputEventListenerTest.java | 289 ++++++++++++++++++ 5 files changed, 347 insertions(+), 10 deletions(-) create mode 100644 graylog2-server/src/test/java/org/graylog2/inputs/InputEventListenerTest.java diff --git a/graylog2-server/src/main/java/org/graylog2/inputs/InputEventListener.java b/graylog2-server/src/main/java/org/graylog2/inputs/InputEventListener.java index 90d2ce742d32..ea5a62e36d0d 100644 --- a/graylog2-server/src/main/java/org/graylog2/inputs/InputEventListener.java +++ b/graylog2-server/src/main/java/org/graylog2/inputs/InputEventListener.java @@ -24,6 +24,7 @@ import org.graylog2.plugin.system.NodeId; import org.graylog2.rest.models.system.inputs.responses.InputCreated; import org.graylog2.rest.models.system.inputs.responses.InputDeleted; +import org.graylog2.rest.models.system.inputs.responses.InputUpdated; import org.graylog2.shared.inputs.InputLauncher; import org.graylog2.shared.inputs.InputRegistry; import org.graylog2.shared.inputs.NoSuchInputTypeException; @@ -52,17 +53,19 @@ public InputEventListener(EventBus eventBus, eventBus.register(this); } - @Subscribe public void inputCreated(InputCreated inputCreatedEvent) { - LOG.debug("Input created/changed: " + inputCreatedEvent.id()); + @Subscribe + public void inputCreated(InputCreated inputCreatedEvent) { + final String inputId = inputCreatedEvent.id(); + LOG.debug("Input created: {}", inputId); final Input input; try { - input = inputService.find(inputCreatedEvent.id()); + input = inputService.find(inputId); } catch (NotFoundException e) { - LOG.warn("Received InputCreated event but could not find Input: ", e); + LOG.warn("Received InputCreated event but could not find input {}", inputId, e); return; } - final IOState inputState = inputRegistry.getInputState(inputCreatedEvent.id()); + final IOState inputState = inputRegistry.getInputState(inputId); if (inputState != null) { inputRegistry.remove(inputState); } @@ -71,20 +74,54 @@ public InputEventListener(EventBus eventBus, return; } + startInput(input); + } + + @Subscribe + public void inputUpdated(InputUpdated inputUpdatedEvent) { + final String inputId = inputUpdatedEvent.id(); + LOG.debug("Input updated: {}", inputId); + final Input input; + try { + input = inputService.find(inputId); + } catch (NotFoundException e) { + LOG.warn("Received InputUpdated event but could not find input {}", inputId, e); + return; + } + + final boolean startInput; + final IOState inputState = inputRegistry.getInputState(inputId); + if (inputState != null) { + startInput = inputState.getState() == IOState.Type.RUNNING; + inputRegistry.remove(inputState); + } else { + startInput = false; + } + + if (!startInput || (!input.isGlobal() && !this.nodeId.toString().equals(input.getNodeId()))) { + return; + } + + startInput(input); + } + + private void startInput(Input input) { final MessageInput messageInput; try { messageInput = inputService.getMessageInput(input); - messageInput.initialize(); } catch (NoSuchInputTypeException e) { - LOG.warn("Newly created input is of invalid type: " + input.getType(), e); + LOG.warn("Input {} ({}) is of invalid type {}", input.getTitle(), input.getId(), input.getType(), e); return; } + messageInput.initialize(); + final IOState newInputState = inputLauncher.launch(messageInput); inputRegistry.add(newInputState); } - @Subscribe public void inputDeleted(InputDeleted inputDeletedEvent) { - LOG.debug("Input deleted: " + inputDeletedEvent.id()); + @Subscribe + public void inputDeleted(InputDeleted inputDeletedEvent) { + LOG.debug("Input deleted: {}", inputDeletedEvent.id()); final IOState inputState = inputRegistry.getInputState(inputDeletedEvent.id()); if (inputState != null) { inputRegistry.remove(inputState); diff --git a/graylog2-server/src/main/java/org/graylog2/inputs/InputService.java b/graylog2-server/src/main/java/org/graylog2/inputs/InputService.java index dad86363be27..4c358be4d16e 100644 --- a/graylog2-server/src/main/java/org/graylog2/inputs/InputService.java +++ b/graylog2-server/src/main/java/org/graylog2/inputs/InputService.java @@ -35,6 +35,8 @@ public interface InputService extends PersistedService { Input create(Map fields); + String update(Input model) throws ValidationException; + Input find(String id) throws NotFoundException; Input findForThisNode(String nodeId, String id) throws NotFoundException; diff --git a/graylog2-server/src/main/java/org/graylog2/inputs/InputServiceImpl.java b/graylog2-server/src/main/java/org/graylog2/inputs/InputServiceImpl.java index aa7a96dcecc9..501f6ab65410 100644 --- a/graylog2-server/src/main/java/org/graylog2/inputs/InputServiceImpl.java +++ b/graylog2-server/src/main/java/org/graylog2/inputs/InputServiceImpl.java @@ -113,6 +113,15 @@ public String save(T model) throws ValidationException { return resultId; } + @Override + public String update(Input model) throws ValidationException { + final String resultId = super.save(model); + if (resultId != null && !resultId.isEmpty()) { + publishChange(InputUpdated.create(resultId)); + } + return resultId; + } + @Override public String saveWithoutValidation(T model) { final String resultId = super.saveWithoutValidation(model); diff --git a/graylog2-server/src/main/java/org/graylog2/rest/resources/system/inputs/InputsResource.java b/graylog2-server/src/main/java/org/graylog2/rest/resources/system/inputs/InputsResource.java index 1257c6eb720e..3be3fac4eacd 100644 --- a/graylog2-server/src/main/java/org/graylog2/rest/resources/system/inputs/InputsResource.java +++ b/graylog2-server/src/main/java/org/graylog2/rest/resources/system/inputs/InputsResource.java @@ -187,7 +187,7 @@ public Response update(@ApiParam(name = "JSON body", required = true) @Valid @No mergedInput.putAll(messageInput.asMap()); final Input newInput = inputService.create(input.getId(), mergedInput); - inputService.save(newInput); + inputService.update(newInput); final URI inputUri = getUriBuilderToSelf().path(InputsResource.class) .path("{inputId}") diff --git a/graylog2-server/src/test/java/org/graylog2/inputs/InputEventListenerTest.java b/graylog2-server/src/test/java/org/graylog2/inputs/InputEventListenerTest.java new file mode 100644 index 000000000000..5b0f4abdaaf4 --- /dev/null +++ b/graylog2-server/src/test/java/org/graylog2/inputs/InputEventListenerTest.java @@ -0,0 +1,289 @@ +/** + * This file is part of Graylog. + * + * Graylog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graylog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Graylog. If not, see . + */ +package org.graylog2.inputs; + +import com.google.common.eventbus.EventBus; +import org.graylog2.database.NotFoundException; +import org.graylog2.plugin.IOState; +import org.graylog2.plugin.inputs.MessageInput; +import org.graylog2.plugin.system.NodeId; +import org.graylog2.rest.models.system.inputs.responses.InputCreated; +import org.graylog2.rest.models.system.inputs.responses.InputDeleted; +import org.graylog2.rest.models.system.inputs.responses.InputUpdated; +import org.graylog2.shared.inputs.InputLauncher; +import org.graylog2.shared.inputs.InputRegistry; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +public class InputEventListenerTest { + @Rule + public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Mock + private InputLauncher inputLauncher; + @Mock + private InputRegistry inputRegistry; + @Mock + private InputService inputService; + @Mock + private NodeId nodeId; + private InputEventListener listener; + + @Before + public void setUp() throws Exception { + final EventBus eventBus = new EventBus(this.getClass().getSimpleName()); + listener = new InputEventListener(eventBus, inputLauncher, inputRegistry, inputService, nodeId); + } + + @Test + public void inputCreatedDoesNothingIfInputDoesNotExist() throws Exception { + final String inputId = "input-id"; + when(inputService.find(inputId)).thenThrow(NotFoundException.class); + + listener.inputCreated(InputCreated.create(inputId)); + + verifyZeroInteractions(inputLauncher, inputRegistry, nodeId); + } + + @Test + public void inputCreatedStopsInputIfItIsRunning() throws Exception { + final String inputId = "input-id"; + final Input input = mock(Input.class); + @SuppressWarnings("unchecked") final IOState inputState = mock(IOState.class); + when(inputService.find(inputId)).thenReturn(input); + when(inputRegistry.getInputState(inputId)).thenReturn(inputState); + + listener.inputCreated(InputCreated.create(inputId)); + + verify(inputRegistry, times(1)).remove(inputState); + } + + @Test + @SuppressWarnings("unchecked") + public void inputCreatedDoesNotStopInputIfItIsNotRunning() throws Exception { + final String inputId = "input-id"; + final Input input = mock(Input.class); + when(inputService.find(inputId)).thenReturn(input); + when(inputRegistry.getInputState(inputId)).thenReturn(null); + + listener.inputCreated(InputCreated.create(inputId)); + + verify(inputRegistry, never()).remove(any(IOState.class)); + } + + @Test + public void inputCreatedStartsGlobalInputOnOtherNode() throws Exception { + final String inputId = "input-id"; + final Input input = mock(Input.class); + when(inputService.find(inputId)).thenReturn(input); + when(nodeId.toString()).thenReturn("node-id"); + when(input.getNodeId()).thenReturn("other-node-id"); + when(input.isGlobal()).thenReturn(true); + + final MessageInput messageInput = mock(MessageInput.class); + when(inputService.getMessageInput(input)).thenReturn(messageInput); + + listener.inputCreated(InputCreated.create(inputId)); + + verify(inputLauncher, times(1)).launch(messageInput); + } + + @Test + public void inputCreatedDoesNotStartLocalInputOnAnyNode() throws Exception { + final String inputId = "input-id"; + final Input input = mock(Input.class); + when(inputService.find(inputId)).thenReturn(input); + when(nodeId.toString()).thenReturn("node-id"); + when(input.getNodeId()).thenReturn("other-node-id"); + when(input.isGlobal()).thenReturn(false); + + final MessageInput messageInput = mock(MessageInput.class); + when(inputService.getMessageInput(input)).thenReturn(messageInput); + + listener.inputCreated(InputCreated.create(inputId)); + + verify(inputLauncher, never()).launch(messageInput); + } + + @Test + public void inputCreatedStartsLocalInputOnLocalNode() throws Exception { + final String inputId = "input-id"; + final Input input = mock(Input.class); + when(inputService.find(inputId)).thenReturn(input); + when(nodeId.toString()).thenReturn("node-id"); + when(input.getNodeId()).thenReturn("node-id"); + when(input.isGlobal()).thenReturn(false); + + final MessageInput messageInput = mock(MessageInput.class); + when(inputService.getMessageInput(input)).thenReturn(messageInput); + + listener.inputCreated(InputCreated.create(inputId)); + + verify(inputLauncher, times(1)).launch(messageInput); + } + + @Test + public void inputUpdatedDoesNothingIfInputDoesNotExist() throws Exception { + final String inputId = "input-id"; + when(inputService.find(inputId)).thenThrow(NotFoundException.class); + + listener.inputUpdated(InputUpdated.create(inputId)); + + verifyZeroInteractions(inputLauncher, inputRegistry, nodeId); + } + + @Test + public void inputUpdatedStopsInputIfItIsRunning() throws Exception { + final String inputId = "input-id"; + final Input input = mock(Input.class); + @SuppressWarnings("unchecked") final IOState inputState = mock(IOState.class); + when(inputService.find(inputId)).thenReturn(input); + when(inputRegistry.getInputState(inputId)).thenReturn(inputState); + + listener.inputUpdated(InputUpdated.create(inputId)); + + verify(inputRegistry, times(1)).remove(inputState); + } + + @Test + @SuppressWarnings("unchecked") + public void inputUpdatedDoesNotStopInputIfItIsNotRunning() throws Exception { + final String inputId = "input-id"; + final Input input = mock(Input.class); + when(inputService.find(inputId)).thenReturn(input); + when(inputRegistry.getInputState(inputId)).thenReturn(null); + + listener.inputUpdated(InputUpdated.create(inputId)); + + verify(inputRegistry, never()).remove(any(IOState.class)); + } + + @Test + public void inputUpdatedRestartsGlobalInputOnAnyNode() throws Exception { + final String inputId = "input-id"; + final Input input = mock(Input.class); + @SuppressWarnings("unchecked") final IOState inputState = mock(IOState.class); + when(inputState.getState()).thenReturn(IOState.Type.RUNNING); + when(inputService.find(inputId)).thenReturn(input); + when(inputRegistry.getInputState(inputId)).thenReturn(inputState); + when(nodeId.toString()).thenReturn("node-id"); + when(input.getNodeId()).thenReturn("other-node-id"); + when(input.isGlobal()).thenReturn(true); + + final MessageInput messageInput = mock(MessageInput.class); + when(inputService.getMessageInput(input)).thenReturn(messageInput); + + listener.inputUpdated(InputUpdated.create(inputId)); + + verify(inputLauncher, times(1)).launch(messageInput); + } + + @Test + public void inputUpdatedDoesNotStartLocalInputOnOtherNode() throws Exception { + final String inputId = "input-id"; + final Input input = mock(Input.class); + @SuppressWarnings("unchecked") final IOState inputState = mock(IOState.class); + when(inputState.getState()).thenReturn(IOState.Type.RUNNING); + when(inputService.find(inputId)).thenReturn(input); + when(nodeId.toString()).thenReturn("node-id"); + when(input.getNodeId()).thenReturn("other-node-id"); + when(input.isGlobal()).thenReturn(false); + + final MessageInput messageInput = mock(MessageInput.class); + when(inputService.getMessageInput(input)).thenReturn(messageInput); + + listener.inputUpdated(InputUpdated.create(inputId)); + + verify(inputLauncher, never()).launch(messageInput); + } + + @Test + public void inputUpdatedRestartsLocalInputOnLocalNode() throws Exception { + final String inputId = "input-id"; + final Input input = mock(Input.class); + @SuppressWarnings("unchecked") final IOState inputState = mock(IOState.class); + when(inputState.getState()).thenReturn(IOState.Type.RUNNING); + when(inputService.find(inputId)).thenReturn(input); + when(inputRegistry.getInputState(inputId)).thenReturn(inputState); + when(nodeId.toString()).thenReturn("node-id"); + when(input.getNodeId()).thenReturn("node-id"); + when(input.isGlobal()).thenReturn(false); + + final MessageInput messageInput = mock(MessageInput.class); + when(inputService.getMessageInput(input)).thenReturn(messageInput); + + listener.inputUpdated(InputUpdated.create(inputId)); + + verify(inputLauncher, times(1)).launch(messageInput); + } + + @Test + public void inputUpdatedDoesNotStartLocalInputOnLocalNodeIfItWasNotRunning() throws Exception { + final String inputId = "input-id"; + final Input input = mock(Input.class); + @SuppressWarnings("unchecked") final IOState inputState = mock(IOState.class); + when(inputState.getState()).thenReturn(IOState.Type.STOPPED); + when(inputService.find(inputId)).thenReturn(input); + when(inputRegistry.getInputState(inputId)).thenReturn(inputState); + when(nodeId.toString()).thenReturn("node-id"); + when(input.getNodeId()).thenReturn("node-id"); + when(input.isGlobal()).thenReturn(false); + + final MessageInput messageInput = mock(MessageInput.class); + when(inputService.getMessageInput(input)).thenReturn(messageInput); + + listener.inputUpdated(InputUpdated.create(inputId)); + + verify(inputLauncher, never()).launch(messageInput); + } + + @Test + public void inputDeletedStopsInputIfItIsRunning() throws Exception { + final String inputId = "input-id"; + @SuppressWarnings("unchecked") final IOState inputState = mock(IOState.class); + when(inputState.getState()).thenReturn(IOState.Type.RUNNING); + when(inputRegistry.getInputState(inputId)).thenReturn(inputState); + + listener.inputDeleted(InputDeleted.create(inputId)); + + verify(inputRegistry, never()).remove(any(MessageInput.class)); + } + + @Test + public void inputDeletedDoesNothingIfInputIsNotRunning() throws Exception { + final String inputId = "input-id"; + @SuppressWarnings("unchecked") final IOState inputState = mock(IOState.class); + when(inputState.getState()).thenReturn(null); + when(inputRegistry.getInputState(inputId)).thenReturn(inputState); + + listener.inputDeleted(InputDeleted.create(inputId)); + + verify(inputRegistry, never()).remove(any(MessageInput.class)); + } +} \ No newline at end of file From 9263c3a8ce0e4ffd334096f007d47a8c82a53e32 Mon Sep 17 00:00:00 2001 From: Jochen Schalanda Date: Mon, 15 May 2017 16:11:24 +0200 Subject: [PATCH 2/2] Simplify start conditions in InputEventListener --- .../java/org/graylog2/inputs/InputEventListener.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/graylog2-server/src/main/java/org/graylog2/inputs/InputEventListener.java b/graylog2-server/src/main/java/org/graylog2/inputs/InputEventListener.java index ea5a62e36d0d..79a9bb79b64b 100644 --- a/graylog2-server/src/main/java/org/graylog2/inputs/InputEventListener.java +++ b/graylog2-server/src/main/java/org/graylog2/inputs/InputEventListener.java @@ -70,11 +70,9 @@ public void inputCreated(InputCreated inputCreatedEvent) { inputRegistry.remove(inputState); } - if (!input.isGlobal() && !this.nodeId.toString().equals(input.getNodeId())) { - return; + if (input.isGlobal() || this.nodeId.toString().equals(input.getNodeId())) { + startInput(input); } - - startInput(input); } @Subscribe @@ -98,11 +96,9 @@ public void inputUpdated(InputUpdated inputUpdatedEvent) { startInput = false; } - if (!startInput || (!input.isGlobal() && !this.nodeId.toString().equals(input.getNodeId()))) { - return; + if (startInput && (input.isGlobal() || this.nodeId.toString().equals(input.getNodeId()))) { + startInput(input); } - - startInput(input); } private void startInput(Input input) {