Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message class should be thread-safe (ConcurrentModificationException in output) #3876

Open
gianluca-valentini opened this issue May 30, 2017 · 9 comments

Comments

@gianluca-valentini
Copy link

@gianluca-valentini gianluca-valentini commented May 30, 2017

Expected Behavior

I associated twice the same output to a Stream. I expected that the same message is sent to both outputs.

Current Behavior

When I try to extract the fields from the Message I get this exception:

java.util.ConcurrentModificationException: null
	at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) ~[?:1.8.0_72-internal]
	at java.util.HashMap$EntryIterator.next(HashMap.java:1463) ~[?:1.8.0_72-internal]
	at java.util.HashMap$EntryIterator.next(HashMap.java:1461) ~[?:1.8.0_72-internal]
	at java.util.AbstractCollection.finishToArray(AbstractCollection.java:232) ~[?:1.8.0_72-internal]
	at java.util.AbstractCollection.toArray(AbstractCollection.java:199) ~[?:1.8.0_72-internal]
	at com.google.common.collect.Iterables.toArray(Iterables.java:295) ~[graylog.jar:?]
	at com.google.common.collect.ImmutableMap.copyOf(ImmutableMap.java:406) ~[graylog.jar:?]
	at com.google.common.collect.ImmutableMap.copyOf(ImmutableMap.java:391) ~[graylog.jar:?]

The problem is the behavior of ImmutableMap from Guava that fails concurrence management:
If you look at the Guava javadoc you can see:

/**
   * Returns an immutable map containing the same entries as {@code map}. If
   * {@code map} somehow contains entries with duplicate keys (for example, if
   * it is a {@code SortedMap} whose comparator is not <i>consistent with
   * equals</i>), the results of this method are undefined.
   *
   * <p>Despite the method name, this method attempts to avoid actually copying
   * the data when it is safe to do so. The exact circumstances under which a
   * copy will or will not be performed are undocumented and subject to change.
   *
   * @throws NullPointerException if any key or value in {@code map} is null
   */
  public static <K, V> ImmutableMap<K, V> copyOf(Map<? extends K, ? extends V> map) {
...
}

Possible Solution

Steps to Reproduce (for bugs)

  1. Create an input and associate it to a Stream.
  2. Define two identical outputs and assign them to the Stream (the output needs to read the Message fields)
  3. Send event to that input
  4. Search for the exeption in the log file

Context

Your Environment

  • Graylog Version: 2.2.3
  • Elasticsearch Version:
  • MongoDB Version:
  • Operating System: Graylog official docker container
  • Browser version:
@dennisoelkers
Copy link
Member

@dennisoelkers dennisoelkers commented May 30, 2017

Thanks for reporting!

@dennisoelkers dennisoelkers self-assigned this May 30, 2017
@dennisoelkers dennisoelkers added this to the 2.3.0 milestone May 31, 2017
@dennisoelkers
Copy link
Member

@dennisoelkers dennisoelkers commented May 31, 2017

@gianluca-valentini: I tried to verify this issue and failed. What types of outputs did you configure for your streams? What are the exact settings for those outputs?

The stack trace seems to be incomplete (as it contains only lines from the java library/guava). Can you add the complete stack trace?

@dennisoelkers dennisoelkers removed this from the 2.3.0 milestone May 31, 2017
@gianluca-valentini
Copy link
Author

@gianluca-valentini gianluca-valentini commented May 31, 2017

Hi @dennisoelkers,
I used a custom Output to send message to Kafka, where I extracted the fields from the org.graylog2.plugin.Message object, like below:

private String getMessage(int level, Message msg) {
	Map<String, Object> fields = msg.getFields();
	...

Here the full stack trace

2017-05-30 14:16:10,973 ERROR: org.graylog2.buffers.processors.OutputBufferProcessor - Error in output [class tech.sharelock.graylog2.plugin.KafkaOutput].
java.util.ConcurrentModificationException: null
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) ~[?:1.8.0_72-internal]
at java.util.HashMap$EntryIterator.next(HashMap.java:1463) ~[?:1.8.0_72-internal]
at java.util.HashMap$EntryIterator.next(HashMap.java:1461) ~[?:1.8.0_72-internal]
at java.util.AbstractCollection.finishToArray(AbstractCollection.java:232) ~[?:1.8.0_72-internal]
at java.util.AbstractCollection.toArray(AbstractCollection.java:199) ~[?:1.8.0_72-internal]
at com.google.common.collect.Iterables.toArray(Iterables.java:295) ~[graylog.jar:?]
at com.google.common.collect.ImmutableMap.copyOf(ImmutableMap.java:406) ~[graylog.jar:?]
at com.google.common.collect.ImmutableMap.copyOf(ImmutableMap.java:391) ~[graylog.jar:?]
at org.graylog2.plugin.Message.getFields(Message.java:387) ~[graylog.jar:?]
at tach.sharelock.graylog2.plugin.KafkaCEFSender.getMessage(KafkaCEFSender.java:53) ~[?:?]
at tach.sharelock.graylog2.plugin.KafkaCEFSender.send(KafkaCEFSender.java:36) ~[?:?]
at tach.sharelock.graylog2.plugin.KafkaOutput.write(KafkaOutput.java:249) ~[?:?]
at org.graylog2.buffers.processors.OutputBufferProcessor$1.run(OutputBufferProcessor.java:194) [graylog.jar:?]
at com.codahale.metrics.InstrumentedExecutorService$InstrumentedRunnable.run(InstrumentedExecutorService.java:176) [graylog.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_72-internal]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_72-internal]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_72-internal]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_72-internal]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_72-internal]

Thanks
Gianluca

@joschi
Copy link
Contributor

@joschi joschi commented Jul 3, 2017

@gianluca-valentini Please provide to full code of the custom output (tach.sharelock.graylog2.plugin.*).

@gianluca-valentini
Copy link
Author

@gianluca-valentini gianluca-valentini commented Jul 3, 2017

Hi @joschi,
in the tech.sharelock.graylog2.plugin.KafkaOutput in the

@Override
	public void write(Message msg) throws Exception {
                
		msg.addField("tag", this.tag);
		msg.addField("topic", this.topic);

                //Kafka Producer client
		sender.send(producer, this.topic, level, msg);
		...

where sender is a KafkaCEFSender instance.

The code below is the send implementation:

public class KafkaCEFSender implements MessageSender {

	@Override
	public void send(org.apache.kafka.clients.producer.Producer<Long, String> producer, String topic, int level, org.graylog2.plugin.Message msg) {
		String message = this.getMessage(level, msg);
		producer.send(new ProducerRecord<>(topic, System.currentTimeMillis(), message));
	}

	private String getMessage(int level, org.graylog2.plugin.Message msg) {
		Map<String, Object> fields = msg.getFields(); //**<--here I have the exeption**

		Map<String, String> jsonMap = new HashMap<>();
		for (String k : fields.keySet()) {
			Object v = fields.get(k);
			if (k.startsWith("_")) {
				continue;
			}
			jsonMap.put(k, v.toString());
		}
		StringBuilder o = new StringBuilder();
		appendHeader(msg, o);
		jsonMap.put("header", o.toString());
		String message = new GsonBuilder().setDateFormat("yyyyMMddHHmmssZ").setPrettyPrinting().create().toJson(jsonMap);
		return message;
	}

Please tell me if you need other info or code.
Just to give you more info, I can tell you that we have the same problem using the syslog output instead of ours (one stream with two identical Syslog output).

Thanks
Gianluca

@joschi joschi self-assigned this Jul 4, 2017
@joschi
Copy link
Contributor

@joschi joschi commented Jul 4, 2017

@gianluca-valentini I couldn't reproduce the error you've reported, but I've checked the code and found some places which could cause the error.

Messages are processed concurrently in the OutputBufferProcessor:
https://github.com/Graylog2/graylog2-server/blob/2.2.3/graylog2-server/src/main/java/org/graylog2/buffers/processors/OutputBufferProcessor.java#L190-L201

This means, that if you mutate a message in one output (e. g. call Message#addField()), other outputs/threads will see a ConcurrentModificationException when accessing the fields of the shared Message instance.

This isn't a problem as long as there is no output mutating messages it's supposed to write, but as soon as one or more outputs start mutating messages, this error is likely to occur (in the right circumstances).

In general, I'd strongly recommend to not mutate the Message instance in your output. This leads to all sorts of problems. Try creating your own Message instance or even better, use your own, domain-specific class to model a message you want to process with your output.

joschi pushed a commit that referenced this issue Jul 4, 2017
Jochen Schalanda
Refs #3876
@joschi joschi removed their assignment Jul 4, 2017
@gianluca-valentini
Copy link
Author

@gianluca-valentini gianluca-valentini commented Jul 4, 2017

Hi @joschi,
thanks for your explaination.
I agree with you about what you write about the Message fields, but, in my output I don't change any field.
I just extract them from Message (msg.getFields();) to save them in a JSON.

I think that the problem is in the ImmutableMap from Guava that fails concurrence management

If you look at the Guava copyOf javadoc you can see:

/**
   * Returns an immutable map containing the same entries as {@code map}. If
   * {@code map} somehow contains entries with duplicate keys (for example, if
   * it is a {@code SortedMap} whose comparator is not <i>consistent with
   * equals</i>), the results of this method are undefined.
   *
   * <p>Despite the method name, this method attempts to avoid actually copying
   * the data when it is safe to do so. The exact circumstances under which a
   * copy will or will not be performed are undocumented and subject to change.
   *
   * @throws NullPointerException if any key or value in {@code map} is null
   */
  public static <K, V> ImmutableMap<K, V> copyOf(Map<? extends K, ? extends V> map) {
...
}

The issue shoud happen if you write 2 output joined to the same stream, where you just implement the write method using the Message.getFields();
Thanks
Gianluca

@joschi
Copy link
Contributor

@joschi joschi commented Jul 4, 2017

@gianluca-valentini

msg.addField("tag", this.tag);
msg.addField("topic", this.topic);

That looks very much like a mutated message in your output (tech.sharelock.graylog2.plugin.KafkaOutput#write(Message)) to me.

If you look at the Guava copyOf javadoc you can see:

If you look at the code of ImmutableMap#copyOf() in the Guava version used by Graylog, you will see that it simply creates a copy of the HashMap<> containing the message fields.

bernd added a commit that referenced this issue Jul 4, 2017
@gianluca-valentini
Copy link
Author

@gianluca-valentini gianluca-valentini commented Jul 4, 2017

@joschi
thanks a lot, I forgot the addition of "tag" and "topic" fields.

Probably the error trace and the javadoc on copyOf method led me astray.
I will avoid to add new fields to Message.

Thanks
Gianluca

@joschi joschi changed the title I get an exception using two identical outputs with the same Stream Message should be thread-safe (ConcurrentModificationException in output) Jul 4, 2017
@joschi joschi changed the title Message should be thread-safe (ConcurrentModificationException in output) Message class should be thread-safe (ConcurrentModificationException in output) Jul 4, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked pull requests

Successfully merging a pull request may close this issue.

None yet
4 participants