Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

maxProcessing config parameter in ns instead of ms #1132

Closed
youngj opened this issue Jan 26, 2022 · 15 comments
Closed

maxProcessing config parameter in ns instead of ms #1132

youngj opened this issue Jan 26, 2022 · 15 comments

Comments

@youngj
Copy link

youngj commented Jan 26, 2022

When running CometD 7.0.5 in production, CometD was continually increasing its memory usage. From a heap dump, it appears that much of the memory usage was due to old ServerSessionImpl instances holding references to a large number of ServerMessageImpl instances and their related data. I thought that CometD would use the timeout configuration parameter to remove old ServerSessionImpl instances, but it appears that this is not the case. The CometD documentation specifies a maxProcessing setting at https://docs.cometd.org/current/reference/#_java_server_configuration which seems like it would resolve this apparent memory leak: "The maximum period of time, in milliseconds, that the server waits for the processing of a message before considering the session invalid and removing it."

However, the actual implementation adds the value from maxProcessing setting to a time which is measured in nanoseconds:



As a result the maxProcessing setting would also need to be defined in nanoseconds, not milliseconds as specified in the documentation.

Although ServerSessionImpl uses a long for the _maxProcessing variable, setting the maxProcessing config setting larger than 2^31 nanoseconds causes a NumberFormatException which causes CometD to not work at all.

Tested with:

  • CometD 7.0.5
  • Jetty 11.0.7
  • openjdk 11.0.13

How to reproduce:

Set maxProcessing configuration setting to 2100000000 and verify that sessions are purged every 2.1 seconds:

<init-param>
      <param-name>maxProcessing</param-name>
      <param-value>2100000000</param-value>
    </init-param>

Set maxProcessing configuration setting to 2200000000 and verify that clients are unable to connect at all:

<init-param>
      <param-name>maxProcessing</param-name>
      <param-value>2200000000</param-value>
    </init-param>
@sbordet
Copy link
Member

sbordet commented Jan 26, 2022

That maxProcessing's unit of time is wrong it's a bug.

However, I think you should keep maxProcessing=-1 and look into maxInterval instead, which is the one that removes old sessions from memory.

The timeout parameter is the /meta/connect timeout, so it's out of the picture here.

If you have enabled JMX, can you take a BayeuxServerImpl.dump() using a JMX console?

@youngj
Copy link
Author

youngj commented Jan 26, 2022

I'm using the default value of maxInterval (10000), but it does not seem to remove all old sessions.

I haven't enabled JMX, but I used jattach to dump the heap:

image

For example, here is a ServerSessionImpl from a websocket client that appears to no longer be connected, with 4009 ServerMessageImpl instances in the _queue:

image

It has expireTime of 0. Based on the code here, it looks like I need to use maxProcessing in order to get CometD to sweep the session:

@sbordet
Copy link
Member

sbordet commented Jan 26, 2022

This may be a symptom of a different problem.

What transport do you use? AsyncJSONTransport? JSONTransport? Some WebSocket transport?

Can you also take a thread dump?

The only time that _expireTime is 0 is a short period of time between receiving a /meta/connect message and writing back its reply.

The extreme where this time becomes excessively long is when there are a ton of messages to write, the connection goes TCP congested, and the client does not read or otherwise communicate with the server, so _expireTime may stay at 0 for a long time.
Do you think you could be in this case? Slow clients that are supposed to receive a lot of messages but vanish?

@youngj
Copy link
Author

youngj commented Jan 26, 2022

We are using the default transports. The client shown in the heap dump above appears to be using the WebSocketTransport.

My guess is that this happens when a client connects and then vanishes for one reason or another. We then continue publishing messages to channels that the client had subscribed to, which get added to the ServerSessionImpl's _queue but the session never expires.

Here is the thread dump (from the same time as the heap dump above):

"BayeuxServerImpl@18a136ac-Executor-720" prio=5 tid=720 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#833
    at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:219)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.idleJobPoll(QueuedThreadPool.java:975)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1025)
    at java.lang.Thread.run(Thread.java:829)

  
"BayeuxServerImpl@18a136ac-Executor-721" prio=5 tid=721 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#850
    at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:219)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.idleJobPoll(QueuedThreadPool.java:975)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1025)
    at java.lang.Thread.run(Thread.java:829)

  
"qtp802581203-18" prio=5 tid=18 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#876
    at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:219)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.idleJobPoll(QueuedThreadPool.java:975)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1025)
    at java.lang.Thread.run(Thread.java:829)

  
"BayeuxServerImpl@18a136ac-Executor-694" prio=5 tid=694 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#872
    at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:219)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.idleJobPoll(QueuedThreadPool.java:975)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1025)
    at java.lang.Thread.run(Thread.java:829)

  
"BayeuxServerImpl@18a136ac-Executor-723" prio=5 tid=723 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#852
    at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:219)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.idleJobPoll(QueuedThreadPool.java:975)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1025)
    at java.lang.Thread.run(Thread.java:829)

  
"Reference Handler" daemon prio=10 tid=2 RUNNABLE
    at java.lang.ref.Reference.waitForReferencePendingList(Native Method)
    at java.lang.ref.Reference.processPendingReferences(Reference.java:241)
    at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:213)

  
"DestroyJavaVM" prio=5 tid=24 RUNNABLE

  
"qtp802581203-12" prio=5 tid=12 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#865
    at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:219)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.idleJobPoll(QueuedThreadPool.java:975)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1025)
    at java.lang.Thread.run(Thread.java:829)

  
"BayeuxServerImpl@18a136ac-Scheduler-1" prio=5 tid=22 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject#11
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#842
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1182)
       Local Variable: java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue#2
       Local Variable: java.util.concurrent.locks.ReentrantLock#207
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1054)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1114)
       Local Variable: java.util.concurrent.ScheduledThreadPoolExecutor#2
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
       Local Variable: java.util.concurrent.ThreadPoolExecutor$Worker#2
    at java.lang.Thread.run(Thread.java:829)

  
"BayeuxServerImpl@18a136ac-Executor-700" prio=5 tid=700 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#843
    at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:219)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.idleJobPoll(QueuedThreadPool.java:975)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1025)
    at java.lang.Thread.run(Thread.java:829)

  
"qtp802581203-13" prio=5 tid=13 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:462)
    at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:361)
       Local Variable: java.util.concurrent.SynchronousQueue$TransferStack$SNode#2
    at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:937)
    at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.reservedWait(ReservedThreadExecutor.java:321)
    at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:397)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:894)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1038)
       Local Variable: org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread#2
    at java.lang.Thread.run(Thread.java:829)

  
"BayeuxServerImpl@18a136ac-Executor-706" prio=5 tid=706 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#829
    at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:219)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.idleJobPoll(QueuedThreadPool.java:975)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1025)
    at java.lang.Thread.run(Thread.java:829)

  
"qtp802581203-15" prio=5 tid=15 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:462)
    at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:361)
       Local Variable: java.util.concurrent.SynchronousQueue$TransferStack$SNode#1
       Local Variable: java.util.concurrent.SynchronousQueue$TransferStack#1
    at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:937)
    at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.reservedWait(ReservedThreadExecutor.java:321)
    at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:397)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:894)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1038)
       Local Variable: org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread#1
    at java.lang.Thread.run(Thread.java:829)

  
"qtp802581203-11" prio=5 tid=11 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#847
    at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:219)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.idleJobPoll(QueuedThreadPool.java:975)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1025)
    at java.lang.Thread.run(Thread.java:829)

  
"Scheduler-398690014-1" prio=5 tid=21 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject#2
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#839
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1182)
       Local Variable: java.util.concurrent.locks.ReentrantLock#11
       Local Variable: java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue#1
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1054)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1114)
       Local Variable: java.util.concurrent.ScheduledThreadPoolExecutor#1
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
       Local Variable: java.util.concurrent.ThreadPoolExecutor$Worker#1
    at java.lang.Thread.run(Thread.java:829)

  
"Scanner-0-1" daemon prio=5 tid=23 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject#12
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#837
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1182)
       Local Variable: java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue#3
       Local Variable: java.util.concurrent.locks.ReentrantLock#208
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1054)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1114)
       Local Variable: java.util.concurrent.ScheduledThreadPoolExecutor#3
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
       Local Variable: java.util.concurrent.ThreadPoolExecutor$Worker#3
    at java.lang.Thread.run(Thread.java:829)

  
"BayeuxServerImpl@18a136ac-Executor-712" prio=5 tid=712 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#830
    at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:219)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.idleJobPoll(QueuedThreadPool.java:975)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1025)
    at java.lang.Thread.run(Thread.java:829)

  
"BayeuxServerImpl@18a136ac-Executor-701" prio=5 tid=701 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#831
    at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:219)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.idleJobPoll(QueuedThreadPool.java:975)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1025)
    at java.lang.Thread.run(Thread.java:829)

  
"BayeuxServerImpl@18a136ac-Executor-714" prio=5 tid=714 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#844
    at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:219)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.idleJobPoll(QueuedThreadPool.java:975)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1025)
    at java.lang.Thread.run(Thread.java:829)

  
"qtp802581203-16" prio=5 tid=16 RUNNABLE
    at sun.nio.ch.EPoll.wait(Native Method)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:120)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:124)
       Local Variable: sun.nio.ch.Util$2#1
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:141)
    at org.eclipse.jetty.io.ManagedSelector.nioSelect(ManagedSelector.java:180)
    at org.eclipse.jetty.io.ManagedSelector.select(ManagedSelector.java:187)
       Local Variable: org.eclipse.jetty.io.ManagedSelector#1
       Local Variable: sun.nio.ch.EPollSelectorImpl#1
    at org.eclipse.jetty.io.ManagedSelector$SelectorProducer.select(ManagedSelector.java:604)
    at org.eclipse.jetty.io.ManagedSelector$SelectorProducer.produce(ManagedSelector.java:541)
       Local Variable: org.eclipse.jetty.io.ManagedSelector$SelectorProducer#1
    at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.produceTask(AdaptiveExecutionStrategy.java:446)
    at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.tryProduce(AdaptiveExecutionStrategy.java:239)
       Local Variable: org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy#1
    at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.lambda$new$0(AdaptiveExecutionStrategy.java:138)
    at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy$$Lambda$235.run(<unknown string>)
    at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:407)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:894)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1038)
       Local Variable: org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread#3
    at java.lang.Thread.run(Thread.java:829)

  
"BayeuxServerImpl@18a136ac-Executor-704" prio=5 tid=704 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#840
    at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:219)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.idleJobPoll(QueuedThreadPool.java:975)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1025)
    at java.lang.Thread.run(Thread.java:829)

  
"Finalizer" daemon prio=8 tid=3 WAITING
    at java.lang.Object.wait(Native Method)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:155)
       Local Variable: java.lang.ref.ReferenceQueue$Lock#3
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:176)
       Local Variable: java.lang.ref.ReferenceQueue#2
    at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:170)
       Local Variable: java.lang.System$2#1

  
"qtp802581203-17" prio=5 tid=17 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#848
    at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:219)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.idleJobPoll(QueuedThreadPool.java:975)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1025)
    at java.lang.Thread.run(Thread.java:829)

  
"qtp802581203-19" prio=5 tid=19 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject#4
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#875
    at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:219)
       Local Variable: org.eclipse.jetty.util.BlockingArrayQueue#1
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.idleJobPoll(QueuedThreadPool.java:975)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1025)
    at java.lang.Thread.run(Thread.java:829)

  
"BayeuxServerImpl@18a136ac-Executor-709" prio=5 tid=709 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#832
    at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:219)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.idleJobPoll(QueuedThreadPool.java:975)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1025)
    at java.lang.Thread.run(Thread.java:829)

  
"BayeuxServerImpl@18a136ac-Executor-715" prio=5 tid=715 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#855
    at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:219)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.idleJobPoll(QueuedThreadPool.java:975)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1025)
    at java.lang.Thread.run(Thread.java:829)

  
"Common-Cleaner" daemon prio=8 tid=9 TIMED_WAITING
    at java.lang.Object.wait(Native Method)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:155)
       Local Variable: java.lang.ref.ReferenceQueue$Lock#4
       Local Variable: java.lang.ref.ReferenceQueue#3
    at jdk.internal.ref.CleanerImpl.run(CleanerImpl.java:148)
       Local Variable: jdk.internal.ref.CleanerImpl#1
    at java.lang.Thread.run(Thread.java:829)
    at jdk.internal.misc.InnocuousThread.run(InnocuousThread.java:134)

  
"qtp802581203-14" prio=5 tid=14 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#878
    at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:219)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.idleJobPoll(QueuedThreadPool.java:975)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1025)
    at java.lang.Thread.run(Thread.java:829)

  
"BayeuxServerImpl@18a136ac-Executor-724" prio=5 tid=724 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#836
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject#13
    at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:219)
       Local Variable: org.eclipse.jetty.util.BlockingArrayQueue#1
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.idleJobPoll(QueuedThreadPool.java:975)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1025)
       Local Variable: org.eclipse.jetty.util.thread.QueuedThreadPool$Runner#1
    at java.lang.Thread.run(Thread.java:829)

  
"qtp802581203-20-acceptor-0@4d8e1f78-ServerConnector@33b4f8fb{HTTP/1.1, (http/1.1)}{0.0.0.0:8081}" prio=3 tid=20 RUNNABLE
    at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
    at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:533)
    at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:285)
       Local Variable: sun.nio.ch.ServerSocketChannelImpl#1
       Local Variable: java.net.InetSocketAddress[]#1
       Local Variable: java.io.FileDescriptor#3064
    at org.eclipse.jetty.server.ServerConnector.accept(ServerConnector.java:397)
       Local Variable: org.eclipse.jetty.server.ServerConnector#1
    at org.eclipse.jetty.server.AbstractConnector$Acceptor.run(AbstractConnector.java:732)
       Local Variable: java.lang.String#8662
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:894)
       Local Variable: org.eclipse.jetty.util.thread.QueuedThreadPool#1
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1038)
       Local Variable: org.eclipse.jetty.util.thread.QueuedThreadPool$Runner#1
       Local Variable: org.eclipse.jetty.server.AbstractConnector$Acceptor#1
    at java.lang.Thread.run(Thread.java:829)

  
"Signal Dispatcher" daemon prio=9 tid=4 RUNNABLE

  
"Attach Listener" daemon prio=9 tid=331 RUNNABLE

  
"BayeuxServerImpl@18a136ac-Executor-718" prio=5 tid=718 TIMED_WAITING
    at jdk.internal.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
       Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#851
    at org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:219)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.idleJobPoll(QueuedThreadPool.java:975)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1025)
    at java.lang.Thread.run(Thread.java:829)

@sbordet
Copy link
Member

sbordet commented Jan 27, 2022

It would really help to understand the state of the sessions if you can take a BayeuxServerImpl.dump(), or attach a heap dump (if you can share it publicly), or otherwise send it to me if you can.

I would like to understand how things ended up in the situation you're facing.

@youngj
Copy link
Author

youngj commented Jan 27, 2022

I can't share the heap dump since it is from our production servers and contains some sensitive information. I could run OQL in VisualVM to extract specific information from the heap dump.

@youngj
Copy link
Author

youngj commented Jan 27, 2022

It seems plausible that the orphaned ServerSessionImpl instances might be associated with the same aborted Websocket connections that generate an EofException in #1094 (just speculating)

@sbordet
Copy link
Member

sbordet commented Jan 27, 2022

While you can be TCP congested and leave sessions with _expireTime=0 accumulating messages, the safety net is the server idle timeout, by default 30 seconds.

When the idle timeout fires on the server, the connection is closed and those blocked writes are aborted, which means that _expireTime is set and eventually the session is swept.

The JVM stack trace you posted above shows a completely idle server, which is probably the result of idle timeouts, which would then have set _expireTime.

#1094 is an issue about logging, so while I understand that it's your same case, I still struggle at understanding how it is possible that you have sessions with _expireTime=0 for so long.
I'll investigate further.

What is your idle timeout value (on the Jetty server connector)?

@youngj
Copy link
Author

youngj commented Jan 27, 2022

After enabling the maxProcessing setting, I'm no longer seeing a rapid increase in memory usage. I can see from the logs that the swept sessions are all in the HANDSHAKEN state (e.g. Sweeping during processing MyServerSessionImpl@4152d948[ggi1na0mk4hexpx815u695rjyxa84,HANDSHAKEN,cycle=268422,last=80478,expire=0]). It seems that this happens when Websocket clients do /meta/handshake but then abort before /meta/connect. We have set ws.idleTimeout to 80000.

@sbordet
Copy link
Member

sbordet commented Jan 27, 2022

@youngj I have investigated this further and found a bug, filed as #1134.

Sweeping during processing MyServerSessionImpl@4152d948[ggi1na0mk4hexpx815u695rjyxa84,HANDSHAKEN,cycle=268422,last=80478,expire=0]

Note that this session went through 268422 /meta/connect cycles, so it's not really freshly handshaken.
I'll review this case too.

What clients are you using?
I am dubious that the official CometD clients behave in this way of sending the /meta/handshake but not the /meta/connect.

Also, any reason to subclass ServerSessionImpl? Just curious.

@youngj
Copy link
Author

youngj commented Jan 27, 2022

I subclassed ServerSessionImpl to patch the maxProcessing time interval bug so I could fix the memory leak without needing to compile a patched version of CometD. The memory leak occurred when I was using ServerSessionImpl.

MyServerSessionImpl.java:

public class MyServerSessionImpl extends ServerSessionImpl {

    private static final Logger _logger = LoggerFactory.getLogger(MyServerSessionImpl.class);

    private static Field maxProcessingField;
    static {
        try {
            maxProcessingField = ServerSessionImpl.class.getDeclaredField("_maxProcessing");
            maxProcessingField.setAccessible(true);
        } catch (Exception ex) {
            _logger.warn("error getting _maxProcessing field", ex);
        }
    }

    public MyServerSessionImpl(BayeuxServerImpl bayeux) {
        super(bayeux);
    }

    @Override
    protected boolean handshake(ServerMessage.Mutable message) {
        boolean res = super.handshake(message);

        try {
            long maxProcessing = (long) maxProcessingField.get(this);
            if (maxProcessing > 0) {
                maxProcessing = maxProcessing * 1000000;
                maxProcessingField.set(this, maxProcessing);
            }
        } catch (Exception ex) {
            _logger.warn("error updating _maxProcessing field", ex);
        }
        return res;
    }
}

MyBayeuxServerImpl.java:

public class MyBayeuxServerImpl extends BayeuxServerImpl {

    @Override
    public ServerSessionImpl newServerSession() {
        return new MyServerSessionImpl(this);
    }
}

MyCometDServlet.java:

public class MyCometDServlet extends CometDServlet {
    @Override
    protected BayeuxServerImpl newBayeuxServer() {
        return new MyBayeuxServerImpl();
    }
}

The cycle printed in the logs is a property of the scheduler that appears to always increase over time, not a number of cycles for that particular session.

The client is a version of the CometD JavaScript library that is probably from CometD 2.6. Normally it does follow /meta/handshake with /meta/connect.

@sbordet sbordet changed the title maxProcessing config parameter in ns instead of ms; cannot set maxProcessing longer than 2.1 sec maxProcessing config parameter in ns instead of ms Jan 28, 2022
@sbordet
Copy link
Member

sbordet commented Jan 28, 2022

@youngj I fixed both this issue and #1134. Would you be able to build CometD locally and test that the fix works for you?

@zaynetro
Copy link
Contributor

zaynetro commented Feb 15, 2022

We are seeing similar issues after upgrading from 5.0.2 to 5.0.10. The symptoms are similar: after running a server for several days some ServerSessionImpl are not removed. It doesn't happen to all sessions though. When we stop new incoming connections to the server the amount of server sessions doesn't drop to zero even after many hours.

Our current way of coping with this is to set a max queue option so that we can clean at least some sessions.

Configuration:

  • Default transports (our clients use mostly websocket)
  • timeout = 60000
  • maxQueue = 250
  • maxInterval = 600000 (10 mins)
  • ws.messagesPerFrame = 2
  • custom jsonContext which uses addConvertor (that is something I forgot to change)
  • Ack extension

Clients run cometd 5.0.10 from npm.

We couldn't find the root cause and couldn't replicate the issue ourselves but we have an assumption: clients initiate a connection but then a page reload happens hence leaving session in an unfinished setup. There could be several consequent page reloads.

An example of a session that hit max queue limit despite us setting maxInterval (notice that lastConnected is null)

W|2022-02-15T05:46:17.664Z| =====>>> Session queue maxed for cometdSession 2q4so3ufya9186od1r9q082kgyp7d. 
  Disconnecting cometd session isConnected=false isHandshook=false startedAt=2022-02-15T04:27:33.974207Z lastConnected=null. 
session.setAttribute( "startedAt", OffsetDateTime.now() );
session.addListener( new ServerSession.QueueMaxedListener() {
    @Override
    public boolean queueMaxed( ServerSession session, Queue<ServerMessage> queue, ServerSession sender,
            ServerMessage message ) {
        LOG.warn(
                "=====>>> Session queue maxed for cometdSession {}."
                        + " Disconnecting cometd session isConnected={} isHandshook={} startedAt={} lastConnected={}.",
                cometdSessionId, 
                session.isConnected(), session.isHandshook(),
                session.getAttribute( "startedAt" ), session.getAttribute( "lastConnected" ) );
        session.disconnect();
        return false;
    }
} );

session.addListener( new ServerSession.HeartBeatListener() {
    @Override
    public void onResumed( ServerSession session, ServerMessage message, boolean timeout ) {
        // This is called when server sends /meta/connect to the client
    }

    @Override
    public void onSuspended( ServerSession session, ServerMessage message, long timeout ) {
        // This is called when server receives /meta/connect response from the client
        session.setAttribute( "lastConnected", OffsetDateTime.now() );
    }
} );

We will be interested to test the latest fixes once they are released to maven central.

@youngj
Copy link
Author

youngj commented Feb 18, 2022

In order to test the fixes I would also need a new version to be released to Maven Central.

@sbordet
Copy link
Member

sbordet commented Apr 29, 2024

@youngj @zaynetro please look at the fix for #1716, which is likely related to this issue as well.

The latest CometD versions all have the fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants