Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Heap size grows when publishing a very large batch of messages #1106

Closed
2m0nd opened this issue Nov 14, 2021 · 5 comments
Closed

Heap size grows when publishing a very large batch of messages #1106

2m0nd opened this issue Nov 14, 2021 · 5 comments
Assignees
Milestone

Comments

@2m0nd
Copy link

2m0nd commented Nov 14, 2021

I noticed a memory leak on production (application in the docker container) when I was publishing a large number of messages at one time (500 k).

Then I reproduced this behavior locally on the Mac:

  1. launched rabbitmq in docker
version: "3"
services:
  rabbitmq:
    image: "rabbitmq:3-management"
    ports:
      - "5672:5672"
      - "15672:15672"
    volumes:
      - 'rabbitmq_data:/data'

volumes:
  rabbitmq_data:
  1. write example code in which memory leaks
using System;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using Serilog;

namespace RabbitMemoryLeakMacOs
{
    class Program
    {              
        async static Task Main(string[] args)
        {
            var log = GetLogger();
            var connectionFactory = new ConnectionFactory()
            {
                HostName = "localhost"
            };
            var cn = connectionFactory.CreateConnection();
            var exchangeName = GetExchange(cn);
            var sendingModel = cn.CreateModel();
            
            var messageCounter = 0;
            var sendingLimit = 400_000;
            while (messageCounter < sendingLimit)
            {
                try
                { 
                    var basicProperties = sendingModel.CreateBasicProperties();
                    basicProperties.Persistent = true;

                    var randomTextMessage = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes(largeMessage));
                    
                    sendingModel.BasicPublish(exchangeName, "test", false, basicProperties, randomTextMessage);
                    
                    if(messageCounter++%100_000 == 0)
                        log.Information("Sended {counter}, used memory {TotalMemory}", messageCounter, GC.GetTotalMemory(true)/(1024*1024));
                }
                catch (Exception e)
                {
                    Console.WriteLine($"error on send {messageCounter}" + e.Message);
                }
            }
            
            GC.Collect();
            GC.WaitForPendingFinalizers();
            
            log.Information("Used memory {TotalMemory} MB", GC.GetTotalMemory(true)/(1024*1024));
            log.Information("OS {Os}", System.Runtime.InteropServices.RuntimeInformation.OSDescription);
            log.Information("Process id {Id}", System.Diagnostics.Process.GetCurrentProcess().Id);
            Console.ReadLine();
        }

        private const string largeMessage =
            @"test message large text test message large texttest message 
                large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message 
                large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message 
                large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message 
                large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message 
                large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message 
                large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message 
                large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message 
                large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message 
                large texttest message large texttest message large texttest message large texttest message large text";
          
        private static ILogger GetLogger()
        {
            return new LoggerConfiguration()
                .WriteTo.Console()
                .CreateLogger();
        }
        private static string GetExchange(IConnection connection)
        {
            var model = connection.CreateModel();
            var exchangeName = "testExchange";
            model.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout, durable: true, autoDelete: false);
            
            var queueName = model.QueueDeclare(exchangeName, true, false, false).QueueName;
            model.QueueBind(queue: queueName,
                exchange: exchangeName,
                routingKey: "");
            
            return exchangeName;
        }
    }
}

Program output:

[14:36:56 INF] Sended 1, used memory 0
[14:36:57 INF] Sended 100001, used memory 363
[14:36:58 INF] Sended 200001, used memory 724
[14:36:58 INF] Sended 300001, used memory 1090
[14:36:59 INF] Used memory 1472 MB
[14:36:59 INF] OS Darwin 20.6.0 Darwin Kernel Version 20.6.0: Wed Jun 23 00:26:31 PDT 2021; root:xnu-7195.141.2~5/RELEASE_X86_64
[14:36:59 INF] Process id 41765
  1. analyzed what objects and what code leads to this
    3.1) create memory dump with dotnet-dump tools
    3.2) get stat dumpheap -stat
00000001113a7a60      734        77088 System.String
0000000111f86050     1699       299024 System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Threading.Tasks.VoidTaskResult, System.Private.CoreLib],[System.IO.BufferedStream+<WriteToUnderlyingStreamAsync>d__62, System.Private.CoreLib]]
0000000111c14368        3      3670088 System.Memory`1[[System.Byte, System.Private.CoreLib]][]
00007f85a9010940   135604     24285808      Free
00000001114308a0   203743    839723804 System.Byte[]
Total 343840 objects

3.3) get references of instance dumpheap -mt 00000001114308a0

00000001c586ea20 00000001114308a0     4120     
00000001c586fa50 00000001114308a0     4120     
00000001c5870a80 00000001114308a0     4120     
00000001c5871ab0 00000001114308a0     4120     
00000001c5872ae0 00000001114308a0     4120     
00000001c5873b10 00000001114308a0     4120     
00000001c5874b40 00000001114308a0     4120     
00000001c5875b70 00000001114308a0     4120     
00000001c5c372e8 00000001114308a0     3024     
00000001c5c3c528 00000001114308a0     1049     
00000001c5c59f98 00000001114308a0       40     
00000001c5c5a080 00000001114308a0       56     
00000001c5c5e5c8 00000001114308a0       40     
00000001924a9038 00000001114308a0   342576

3.4) get stack trace created instance gcroot -all 00000001c5871ab0

HandleTable:
    0000000102BB1310 (strong handle)
    -> 00000001824BFBC8 System.Object[]
    -> 00000001824C5950 System.Threading.Tasks.Task
    -> 00000001824C5910 System.Action
    -> 00000001824C4C78 RabbitMQ.Client.Framing.Impl.Connection
    -> 00000001824B9AE8 RabbitMQ.Client.Impl.SocketFrameHandler
    -> 00000001824B9D58 System.Threading.Channels.SingleConsumerUnboundedChannel`1+UnboundedChannelWriter[[System.Memory`1[[System.Byte, System.Private.CoreLib]], System.Private.CoreLib]]
    -> 00000001824B9B88 System.Threading.Channels.SingleConsumerUnboundedChannel`1[[System.Memory`1[[System.Byte, System.Private.CoreLib]], System.Private.CoreLib]]
    -> 00000001824B9BD8 System.Collections.Concurrent.SingleProducerSingleConsumerQueue`1[[System.Memory`1[[System.Byte, System.Private.CoreLib]], System.Private.CoreLib]]
    -> 00000001B2195608 System.Collections.Concurrent.SingleProducerSingleConsumerQueue`1+Segment[[System.Memory`1[[System.Byte, System.Private.CoreLib]], System.Private.CoreLib]]
    -> 00000001926DCB68 System.Memory`1[[System.Byte, System.Private.CoreLib]][]
    -> 00000001C5871AB0 System.Byte[]

    0000000102BB1368 (strong handle)
    -> 000000019A4AD420 System.Object[]
    -> 00000001824BB448 System.Net.Sockets.SocketAsyncEngine[]
    -> 00000001824BB468 System.Net.Sockets.SocketAsyncEngine
    -> 00000001824BB4D0 System.Collections.Concurrent.ConcurrentDictionary`2[[System.IntPtr, System.Private.CoreLib],[System.Net.Sockets.SocketAsyncEngine+SocketAsyncContextWrapper, System.Net.Sockets]]
    -> 00000001824BB990 System.Collections.Concurrent.ConcurrentDictionary`2+Tables[[System.IntPtr, System.Private.CoreLib],[System.Net.Sockets.SocketAsyncEngine+SocketAsyncContextWrapper, System.Net.Sockets]]
    -> 00000001824BB880 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[System.IntPtr, System.Private.CoreLib],[System.Net.Sockets.SocketAsyncEngine+SocketAsyncContextWrapper, System.Net.Sockets]][]
    -> 00000001824C5B38 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[System.IntPtr, System.Private.CoreLib],[System.Net.Sockets.SocketAsyncEngine+SocketAsyncContextWrapper, System.Net.Sockets]]
    -> 00000001824C09A0 System.Net.Sockets.SocketAsyncContext
    -> 00000001824E5F68 System.Net.Sockets.SocketAsyncContext+BufferMemorySendOperation
    -> 00000001824E24E0 System.Action`5[[System.Int32, System.Private.CoreLib],[System.Byte[], System.Private.CoreLib],[System.Int32, System.Private.CoreLib],[System.Net.Sockets.SocketFlags, System.Net.Sockets],[System.Net.Sockets.SocketError, System.Net.Primitives]]
    -> 00000001824E23C8 System.Net.Sockets.Socket+AwaitableSocketAsyncEventArgs
    -> 00000001C5C87178 System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Threading.Tasks.VoidTaskResult, System.Private.CoreLib],[System.IO.BufferedStream+<WriteToUnderlyingStreamAsync>d__62, System.Private.CoreLib]]
    -> 00000001824C5DE8 System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Threading.Tasks.VoidTaskResult, System.Private.CoreLib],[RabbitMQ.Client.Impl.SocketFrameHandler+<WriteLoop>d__32, RabbitMQ.Client]]
    -> 00000001824B9AE8 RabbitMQ.Client.Impl.SocketFrameHandler
    -> 00000001824B9D58 System.Threading.Channels.SingleConsumerUnboundedChannel`1+UnboundedChannelWriter[[System.Memory`1[[System.Byte, System.Private.CoreLib]], System.Private.CoreLib]]
    -> 00000001824B9B88 System.Threading.Channels.SingleConsumerUnboundedChannel`1[[System.Memory`1[[System.Byte, System.Private.CoreLib]], System.Private.CoreLib]]
    -> 00000001824B9BD8 System.Collections.Concurrent.SingleProducerSingleConsumerQueue`1[[System.Memory`1[[System.Byte, System.Private.CoreLib]], System.Private.CoreLib]]
    -> 00000001B2195608 System.Collections.Concurrent.SingleProducerSingleConsumerQueue`1+Segment[[System.Memory`1[[System.Byte, System.Private.CoreLib]], System.Private.CoreLib]]
    -> 00000001926DCB68 System.Memory`1[[System.Byte, System.Private.CoreLib]][]
    -> 00000001C5871AB0 System.Byte[]

Check this code in dotnet sdk 5, and sdk 6.

@michaelklishin
Copy link
Member

All of those seem to be tasks and System.Collections.Concurrent queues entries.

@michaelklishin michaelklishin changed the title Memory leak when publish many message Heap size grows when publishing a very large batch of messages Nov 14, 2021
@bollhals
Copy link
Contributor

This is most likely related to the use of unbounded channel buffering we use for publishing messages (see here and here)

So most likely most of the used memory is freed again once the messages are actually published (most = the byte arrays).

This usage pattern I'd argue is unlikely something you encounter in a real world application, and even if, since most of the memory is going to be reclaimed again, it's less of an issue.

possible solution:

  • Switching to a bounded channel (either configurable override behavior or switch to async to wait until there's free space)
  • disable the intermediate buffering (possibly configurable, but impact is going to be substantial)

@wangxin5355
Copy link

I have the same problem,When I send data at high frequency。

@michaelklishin
Copy link
Member

Using a bounded channel will force us face a classic problem of "what to do when the buffer is full". Neither dropping data on the floor nor blocking publishing seem very appealing to most. In addition, the user can build something similar on top of what we have. Or we have to make this configurable so that each picks their own poison.

@lukebakken
Copy link
Contributor

My guess is that the changes already made in main, coupled with #1445, will address this. Closing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants