Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GC #21

Closed
gregoryyoung opened this issue Sep 3, 2015 · 15 comments
Closed

GC #21

gregoryyoung opened this issue Sep 3, 2015 · 15 comments

Comments

@gregoryyoung
Copy link

I think here: https://github.com/Azure/amqpnetlite/blob/master/src/Net/AsyncPump.cs#L59 could benefit from reusing memory as opposed to newing each time. I can send a PR for this and could use one of the libraries MS includes https://msdn.microsoft.com/en-us/library/ms405814.aspx but this is not available on all of the supported platforms. I could also include what we use https://github.com/EventStore/EventStore/blob/release-v3.2.0/src/EventStore.BufferManagement/BufferManager.cs but I am not sure this PR would be accepted at that point since I would be including code copied from a BSD project.

Thoughts?

@xinchen10
Copy link
Member

You are right. Some type of buffer pooling will improve GC time and also avoid potential managed heap fragmentation. It becomes more important if the library is used to build service listener or broker type of applications.

Overall I'd like to minimize the dependencies so that the library works on all platforms without too many platform specific code. But at the same time this rule should not affect useful features that are important for common scenarios. One way to achieve both is to have extension points where user can plug in their implementation as appropriate. A custom buffer manager may be one good example of this. I will think about how this can be done. Any suggestions/ideas are very welcome.

Thanks.

@robreeves
Copy link
Contributor

We use the listener portion of the library. We are sending large messages (1-2MB) and have increased the max frame size to 5MB from the discussion in issue #30. To isolate Amqp.Net Lite I have commented out all of our code in IMessageProcessor.Process. Depending on the number of clients and how frequently they send messages we are seeing the '% Time in GC' ranging from 10 to 40%.

We would also like to minimize dependencies and would prefer not to introduce a dependency on System.ServiceModel.dll by using BufferManager. My preference is to implement buffer pooling within the library that can be used by default. I imagine that should work fine for most use cases. If it needs to be extended then a feature can be added to allow someone to inject their own buffer pool.

@gregoryyoung
Copy link
Author

I can donate a buffer manager (3 clause BSD licensed). to avoid rewriting
one.

On Thu, Oct 1, 2015 at 7:19 PM, Rob notifications@github.com wrote:

We use the listener portion of the library. We are sending large messages
(1-2MB) and have increased the max frame size to 5MB from the discussion in
issue #30 #30. To isolate
Amqp.Net Lite I have commented out all of our code in
IMessageProcessor.Process. Depending on the number of clients and how
frequently they send messages we are seeing the '% Time in GC' ranging from
10 to 40%.

We would also like to minimize dependencies and would prefer not to
introduce a dependency on System.ServiceModel.dll by using BufferManager.
My preference is to implement buffer pooling within the library that can be
used by default. I imagine that should work fine for most use cases. If it
needs to be extended then a feature can be added to allow someone to inject
their own buffer pool.


Reply to this email directly or view it on GitHub
#21 (comment).

Studying for the Turing test

@xinchen10
Copy link
Member

@gregoryyoung I don't know if we can take BSD licensed code. Probably not.

Writing a buffer manager is not difficult, but in order to use pool buffers, the library and the user code need to work together to ensure a buffer is appropriately reclaimed. Specifically the library checks out a buffer and the user needs to tell the library that the message is done so the buffer can be returned.

@rr118 what is the body type of your messages (Data, AmqpValue of byte[])? For byte[] payload, the library now allocates the array twice (one for the frame buffer received from socket and one for the user payload decoded from the message body). With SSL there are even more inside the framework code.

I am making the following changes and we can check how they can help GC.

  1. add a buffer manager interface which user can implement their own buffer manager. This requires other changes to manage the lifetime of a buffer associated with a message.
  2. delay the payload byte[] allocation and support Message.GetBody(). When the user asks for a stream the library will wrap the frame buffer into a stream to avoid the second allocation and byte copy.

Thanks.
Xin

@gregoryyoung
Copy link
Author

I can make it MIT if thats easier (I am the license owner)

On Fri, Oct 2, 2015 at 4:59 PM, Xin Chen notifications@github.com wrote:

@gregoryyoung https://github.com/gregoryyoung I don't know if we can
take BSD licensed code. Probably not.

Writing a buffer manager is not difficult, but in order to use pool
buffers, the library and the user code need to work together to ensure a
buffer is appropriately reclaimed. Specifically the library checks out a
buffer and the user needs to tell the library that the message is done so
the buffer can be returned.

@rr118 https://github.com/rr118 what is the body type of your messages
(Data, AmqpValue of byte[])? For byte[] payload, the library now allocates
the array twice (one for the frame buffer received from socket and one for
the user payload decoded from the message body). With SSL there are even
more inside the framework code.

I am making the following changes and we can check how they can help GC.

  1. add a buffer manager interface which user can implement their own
    buffer manager. This requires other changes to manage the lifetime of a
    buffer associated with a message.
  2. delay the payload byte[] allocation and support Message.GetBody(). When
    the user asks for a stream the library will wrap the frame buffer into a
    stream to avoid the second allocation and byte copy.

Thanks.
Xin


Reply to this email directly or view it on GitHub
#21 (comment).

Studying for the Turing test

@robreeves
Copy link
Contributor

@xinchen10 Our body type is AmqpValue. My tests don't use SSL yet, but we will be adding this soon. I haven't had time to determine if this is appropriate for this issue, but I found a potentially promising Microsoft supported library (originally from Bing) for reusing streams that I thought you might be interested in. I'm going to take a deeper look in the next few days.

https://github.com/Microsoft/Microsoft.IO.RecyclableMemoryStream

@gregoryyoung Thanks. I'll take a look at this option. We are more comfortable with MIT licenses too.

@robreeves
Copy link
Contributor

@xinchen10 If you want me to test the buffer pooling to see how it changes the GC usage just let me know. I can test whenever you are ready.

@xinchen10
Copy link
Member

@rr118 that would be great. I need some time to make sure the ownership of the buffers are managed correctly during the lifetime of the message but I will let you know when it is ready for testing.

@xinchen10
Copy link
Member

I enabled buffer pooling on the receiving code path (including container host and listener). The code is published in the buffer-manager branch. I also included a simple buffer manager implementation as an example (Features.BufferManager).

ContainerHost host = new ContainerHost(addressUri);; 
foreach (var listener in host.Listeners) 
{ 
    listener.BufferManager = new BufferManager(256, 2 * 1024 * 1024, 100 * 1024 * 1024); 
} 
host.Open(); 

You can call messageContext.Message.GetBody<ByteBuffer>() to get a ByteBuffer object which wraps the payload without actually copying the bytes. If you prefer working with stream, you can create a memory stream from the ByteBuffer object new MemoryStream(buffer.Buffer, buffer.Offset, buffer.Length).
I will enable buffer pooling on the sending code path later.

@robreeves
Copy link
Contributor

Thanks! I'll check it out today and let you know the results.

@robreeves
Copy link
Contributor

@xinchen10 My apologies for the slow response. I had another issue come up that I had to prioritize.

I've done some initial testing and see moderate improvements. For my initial test I focused on having the server call MessageContext.Complete() without any processing. In my test I have 32 clients (spread across many VMs) sending a message body with a List as fast a possible. The typical buffserSize in IBufferManager I see being requested on the server is 1.4MB. The max frame size is set to 5MB. I am not using SSL. For this test I used your example buffer manager with a minimum size of 128B, maximum buffer 2MB and total size of 128MB. I confirmed all buffer requests were able to reuse an array from the pool.

[AmqpContract]
public class StreamData
{
    [AmqpMember]
    public string StreamID { get; set; }

    [AmqpMember]
    public byte[] Data { get; set; }
}

With the buffer manager I am seeing 25% time in GC, 240MB allocated per second and 20% CPU usage. Without the buffer manager I am seeing 35% time in GC, 447MB allocated per second and 30% CPU usage. So some decent improvements there. The total receive rate on the server is about 500 Mbps. I still need to dig in to further understand where the 25% is coming from and plan to tomorrow. At a very quick glance with perfview it appeared to be mainly from strings. My hunch is that this is from the StreamData.StreamID property.

Now that I understand the performance transporting the message to the server I am adding deserialization to the test. I don't quite understand the suggestion of messageContext.Message.GetBody() to avoid a payload copy because I have to eventually get it into the form of List on the server. Right now I use messageContext.Message.GetBody<List>(). I need to dig into the source more to get a better handle on your suggestion.

At a very initial look here is what I see for the same test when I add deserialization. With the buffer manager I am seeing 40% time in GC, 345MB allocated per second and 38% CPU usage. Without the buffer manager I am seeing 40% time in GC (not sure why this did not increase), 450MB allocated per second and 45% CPU usage. The total receive rate on the server is about 350 Mbps.

From here my plan is to dig in more and understand where the 40% GC is coming from and determine if there are any ways to get it down further. Any suggestions are more than welcome. Admittedly I know this is a large load, but I need to understand the limitations and do what I can to make the application efficient to decrease the number of instances required.

So far the buffer manager appears to have had a positive impact on the GC usage in terms of allocations and overall processor usage. I think it will be a nice feature addition. More to come on my findings.

@xinchen10
Copy link
Member

The GC time and memory allocation is due to the AmqpContract object deserialization. The serializer has to create the object and allocate a byte[] to copy the bytes from the transport buffer (which is from the buffer manager). I haven't enabled buffer manager for the serializer, but even if it does, it won't help because of the byte[] data member (memory has to be allocated and copied).

One quick solution is to do your own serialization. From your clients you send message with byte array body new Message(byte[]), and on the server you call Message.GetBody<ByteBuffer>() to get back a ByteBuffer object, which is very similar to ArraySegment<byte>. Your custom serialization of the byte[] could be "[str-count:4 bytes] [str of stream id] [ stream bytes]". You can start reading from ByteBuffer.Offset to consume the data. This will avoid the byte[] allocation.

@xinchen10
Copy link
Member

Another solution is to send StreamId as message property and Data as the body.

// on client
var message = new Message();
message.Properties = new Properties() { MessageId = "stream1" };
message.BodySection = new Data() { Binary = data };

// on server
string streamId = message.Properties.MessageId;
ByteBuffer streamData = message.GetBody<ByteBuffer>();

@robreeves
Copy link
Contributor

Thanks for the ideas. Originally I did think of putting the stream ID in the header. However the client has many streams, each with a relatively low update frequency. Combining the data from all the streams is a lot of data though and that is why I am batching up data from many streams into a single list. Another option I considered is using a map where the key is the stream ID and the value is a List<byte[]> of the events for each stream. This would minimize that streamID strings to be processed and sent over the wire.

I like the idea of the custom serialization and just sending the byte array. In fact that fits relatively well with the rest of the application and would not be hard to implement. I really like the type system in AMQP for making a nice client API, but it may have to be sacrificed in the name of performance.

Here are the new test results now that I understand what you were getting at with accessing the ByteBuffer directly. 32 clients with buffer manager % time in GC is < 1% and allocation is 5MB/s. The total received data on the broker/server is 1.6 Gbps. Without buffer manager time in GC is 30% and allocation is 200MB/s. This is a huge improvement!

I'd like to suggest you make the example buffer implementation part of the library. People could use it by setting the BufferManager property to BufferManager.Default(int min, int max, int maxTotal), similar to how you can set the SASL profile. Your buffer manager implementation should work the majority of the time out of the box and this way people don't have to rewrite their own implementation every time, but can if they want to. You could have BufferManager set to null by default so the library doesn't use more memory than expected by default.

For reference here is my client and broker/server code. I commented out the line where I set buffer manager to test without a buffer manager.

Server:

class Program
{
    static void Main(string[] args)
    {
        var address = new Uri("amqp://guest:guest@10.0.0.255:5672");
        var host = new ContainerHost(new List<Uri>() { address }, null, address.UserInfo);
        host.Listeners[0].AMQP.MaxFrameSize = 2 * 1024 * 1024;
        host.Listeners[0].BufferManager = new BufferManager(64, 2*1024*1024, 20*1024*1024);

        host.RegisterMessageProcessor("msg-queue", new MsgProcessor());
        host.Open();

        Console.WriteLine("ready");
        Console.Read();
    }
}

class MsgProcessor : IMessageProcessor
{
    BufferManager bufferMgr = new BufferManager(64, 2 * 1024 * 1024, 20 * 1024 * 1024);

    public int Credit
    {
        get { return 1; }
    }

    public void Process(MessageContext messageContext)
    {
        var data = (messageContext.Message.Body as ByteBuffer);

        #region A very crude simulation of my application

        //The application will copy the data, queue it and process it at a later time
        //This is to allow the clients to pump data in as quickly as possible.

        //Copy data
        var dataCopy = bufferMgr.TakeBuffer(data.Length);
        Array.Copy(data.Buffer, data.Offset, dataCopy.Array, 0, data.Length);

        //Return data
        //This would not happen here
        //This would get queued, processed and returned somewhere else
        bufferMgr.ReturnBuffer(dataCopy);

        #endregion

        messageContext.Complete();
    }
}

public class BufferManager : IBufferManager
{
    int minBufferSize;
    int maxBufferSize;
    long maxMemorySize;
    long currentMemorySize;
    int indexOffset;
    List<Pool> bufferPools;

    public BufferManager(int minBufferSize, int maxBufferSize, long maxMemorySize)
    {
        CheckBufferSize(minBufferSize, "minBufferSize");
        CheckBufferSize(maxBufferSize, "maxBufferSize");
        this.minBufferSize = minBufferSize;
        this.maxBufferSize = maxBufferSize;
        this.maxMemorySize = maxMemorySize;
        this.indexOffset = RoundToNextLog2(minBufferSize);
        this.bufferPools = new List<Pool>();

        int size = minBufferSize;
        while (size <= maxBufferSize)
        {
            this.bufferPools.Add(new Pool(size));
            size *= 2;
        }
    }

    public ArraySegment<byte> TakeBuffer(int bufferSize)
    {
        if (bufferSize > this.maxBufferSize)
        {
            Console.WriteLine("***Did not get buffer from pool (size too large)***");
            return new ArraySegment<byte>(new byte[bufferSize]);
        }

        int index = bufferSize <= this.minBufferSize ? 0 : RoundToNextLog2(bufferSize) - this.indexOffset;
        Pool pool = this.bufferPools[index];
        byte[] buffer;
        if (pool.TryTake(bufferSize, out buffer))
        {
            Interlocked.Add(ref this.currentMemorySize, -buffer.Length);
        }
        else
        {
            Console.WriteLine("***Did not get buffer from pool***");
            buffer = new byte[pool.BufferSize];
        }

        return new ArraySegment<byte>(buffer);
    }

    public void ReturnBuffer(ArraySegment<byte> buffer)
    {
        int bufferSize = buffer.Array.Length;
        if (bufferSize > this.maxBufferSize)
        {
            return;
        }

        int index = bufferSize <= this.minBufferSize ? 0 : RoundToNextLog2(bufferSize) - this.indexOffset;
        Pool pool = this.bufferPools[index];
        if (bufferSize == pool.BufferSize && this.currentMemorySize < this.maxMemorySize)
        {
            Interlocked.Add(ref this.currentMemorySize, pool.BufferSize);
            pool.Return(buffer.Array);
        }
    }

    static void CheckBufferSize(int bufferSize, string name)
    {
        if (bufferSize <= 0 || !IsPowerOfTwo(bufferSize))
        {
            throw new ArgumentException(name);
        }
    }

    static bool IsPowerOfTwo(int number)
    {
        return (number & (number - 1)) == 0;
    }

    static readonly int[] MultiplyDeBruijnBitPosition =
    {
        0, 1, 28, 2, 29, 14, 24, 3, 30, 22, 20, 15, 25, 17, 4, 8,
        31, 27, 13, 23, 21, 19, 16, 7, 26, 12, 18, 6, 11, 5, 10, 9
    };

    static int RoundToNextLog2(int bufferSize)
    {
        uint v = (uint)bufferSize;
        if (!IsPowerOfTwo(bufferSize))
        {
            v--;
            v |= v >> 1;
            v |= v >> 2;
            v |= v >> 4;
            v |= v >> 8;
            v |= v >> 16;
            v++;
        }

        return MultiplyDeBruijnBitPosition[(v * 0x077CB531U) >> 27];
    }

    class Pool
    {
        ConcurrentQueue<byte[]> buffers;

        public Pool(int bufferSize)
        {
            this.BufferSize = bufferSize;
            this.buffers = new ConcurrentQueue<byte[]>();
        }

        public int BufferSize
        {
            get;
            private set;
        }

        public bool TryTake(int size, out byte[] buffer)
        {
            for (int i = 0; i < 3; i++)
            {
                if (this.buffers.TryDequeue(out buffer))
                {
                    return true;
                }
            }

            buffer = null;
            return false;
        }

        public void Return(byte[] buffer)
        {
            this.buffers.Enqueue(buffer);
        }
    }
}

Client:

class Program
{
    static void Main(string[] args)
    {
        var address = "amqp://guest:guest@10.0.0.255:5672";
        var connFactory = new ConnectionFactory();
        connFactory.AMQP.MaxFrameSize = 2 * 1024 * 1024;

        var conn = connFactory.CreateAsync(new Address(address)).Result;
        var sess = new Session(conn);
        var link = new SenderLink(sess, "test-client", "msg-queue");

        var data = new byte[1024 * 1024];
        data[0] = data[data.Length - 1] = 99; //for debugging so i can easily tell the start and end of the data
        var msg = new Message(data);

        while (true)
        {
            link.Send(msg);
        }
    }
}

@gregoryyoung
Copy link
Author

👍

On Fri, Oct 23, 2015 at 6:25 PM, Xin Chen notifications@github.com wrote:

Closed #21 #21.


Reply to this email directly or view it on GitHub
#21 (comment).

Studying for the Turing test

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants