Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't start stopped inputs after updating them #3824

Merged
merged 2 commits into from May 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -24,6 +24,7 @@
import org.graylog2.plugin.system.NodeId;
import org.graylog2.rest.models.system.inputs.responses.InputCreated;
import org.graylog2.rest.models.system.inputs.responses.InputDeleted;
import org.graylog2.rest.models.system.inputs.responses.InputUpdated;
import org.graylog2.shared.inputs.InputLauncher;
import org.graylog2.shared.inputs.InputRegistry;
import org.graylog2.shared.inputs.NoSuchInputTypeException;
Expand Down Expand Up @@ -52,39 +53,71 @@ public InputEventListener(EventBus eventBus,
eventBus.register(this);
}

@Subscribe public void inputCreated(InputCreated inputCreatedEvent) {
LOG.debug("Input created/changed: " + inputCreatedEvent.id());
@Subscribe
public void inputCreated(InputCreated inputCreatedEvent) {
final String inputId = inputCreatedEvent.id();
LOG.debug("Input created: {}", inputId);
final Input input;
try {
input = inputService.find(inputCreatedEvent.id());
input = inputService.find(inputId);
} catch (NotFoundException e) {
LOG.warn("Received InputCreated event but could not find Input: ", e);
LOG.warn("Received InputCreated event but could not find input {}", inputId, e);
return;
}

final IOState<MessageInput> inputState = inputRegistry.getInputState(inputCreatedEvent.id());
final IOState<MessageInput> inputState = inputRegistry.getInputState(inputId);
if (inputState != null) {
inputRegistry.remove(inputState);
}

if (!input.isGlobal() && !this.nodeId.toString().equals(input.getNodeId())) {
if (input.isGlobal() || this.nodeId.toString().equals(input.getNodeId())) {
startInput(input);
}
}

@Subscribe
public void inputUpdated(InputUpdated inputUpdatedEvent) {
final String inputId = inputUpdatedEvent.id();
LOG.debug("Input updated: {}", inputId);
final Input input;
try {
input = inputService.find(inputId);
} catch (NotFoundException e) {
LOG.warn("Received InputUpdated event but could not find input {}", inputId, e);
return;
}

final boolean startInput;
final IOState<MessageInput> inputState = inputRegistry.getInputState(inputId);
if (inputState != null) {
startInput = inputState.getState() == IOState.Type.RUNNING;
inputRegistry.remove(inputState);
} else {
startInput = false;
}

if (startInput && (input.isGlobal() || this.nodeId.toString().equals(input.getNodeId()))) {
startInput(input);
}
}

private void startInput(Input input) {
final MessageInput messageInput;
try {
messageInput = inputService.getMessageInput(input);
messageInput.initialize();
} catch (NoSuchInputTypeException e) {
LOG.warn("Newly created input is of invalid type: " + input.getType(), e);
LOG.warn("Input {} ({}) is of invalid type {}", input.getTitle(), input.getId(), input.getType(), e);
return;
}
messageInput.initialize();

final IOState<MessageInput> newInputState = inputLauncher.launch(messageInput);
inputRegistry.add(newInputState);
}

@Subscribe public void inputDeleted(InputDeleted inputDeletedEvent) {
LOG.debug("Input deleted: " + inputDeletedEvent.id());
@Subscribe
public void inputDeleted(InputDeleted inputDeletedEvent) {
LOG.debug("Input deleted: {}", inputDeletedEvent.id());
final IOState<MessageInput> inputState = inputRegistry.getInputState(inputDeletedEvent.id());
if (inputState != null) {
inputRegistry.remove(inputState);
Expand Down
Expand Up @@ -35,6 +35,8 @@ public interface InputService extends PersistedService {

Input create(Map<String, Object> fields);

String update(Input model) throws ValidationException;

Input find(String id) throws NotFoundException;

Input findForThisNode(String nodeId, String id) throws NotFoundException;
Expand Down
Expand Up @@ -113,6 +113,15 @@ public <T extends Persisted> String save(T model) throws ValidationException {
return resultId;
}

@Override
public String update(Input model) throws ValidationException {
final String resultId = super.save(model);
if (resultId != null && !resultId.isEmpty()) {
publishChange(InputUpdated.create(resultId));
}
return resultId;
}

@Override
public <T extends Persisted> String saveWithoutValidation(T model) {
final String resultId = super.saveWithoutValidation(model);
Expand Down
Expand Up @@ -187,7 +187,7 @@ public Response update(@ApiParam(name = "JSON body", required = true) @Valid @No
mergedInput.putAll(messageInput.asMap());

final Input newInput = inputService.create(input.getId(), mergedInput);
inputService.save(newInput);
inputService.update(newInput);

final URI inputUri = getUriBuilderToSelf().path(InputsResource.class)
.path("{inputId}")
Expand Down