Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/Adaptive.Aeron/DriverProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ public class DriverProxy
/// Maximum capacity of the write buffer </summary>
public const int MSG_BUFFER_CAPACITY = 1024;

private readonly UnsafeBuffer _buffer = new UnsafeBuffer(BufferUtil.AllocateDirectAligned(MSG_BUFFER_CAPACITY,BitUtil.CACHE_LINE_LENGTH * 2));
// Keep a reference to _byteBuffer prevent it from being garbage collected. It unpins the array used by _buffer in its finalizer.
private readonly ByteBuffer _byteBuffer = BufferUtil.AllocateDirectAligned(MSG_BUFFER_CAPACITY, BitUtil.CACHE_LINE_LENGTH * 2);
private readonly UnsafeBuffer _buffer;
private readonly PublicationMessageFlyweight _publicationMessage = new PublicationMessageFlyweight();
private readonly SubscriptionMessageFlyweight _subscriptionMessage = new SubscriptionMessageFlyweight();
private readonly RemoveMessageFlyweight _removeMessage = new RemoveMessageFlyweight();
Expand All @@ -47,6 +49,8 @@ public DriverProxy(IRingBuffer toDriverCommandBuffer)
{
if (toDriverCommandBuffer == null) throw new ArgumentNullException(nameof(toDriverCommandBuffer));

_buffer = new UnsafeBuffer(_byteBuffer);

_toDriverCommandBuffer = toDriverCommandBuffer;

_publicationMessage.Wrap(_buffer, 0);
Expand Down
102 changes: 52 additions & 50 deletions src/Samples/Adaptive.Aeron.Samples.SimplePublisher/SimplePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,66 +34,68 @@ public static void Main()
{
// Allocate enough buffer size to hold maximum message length
// The UnsafeBuffer class is part of the Agrona library and is used for efficient buffer management
var buffer = new UnsafeBuffer(BufferUtil.AllocateDirectAligned(512, BitUtil.CACHE_LINE_LENGTH));

// The channel (an endpoint identifier) to send the message to
const string channel = "aeron:udp?endpoint=localhost:40123";
using (var byteBuffer = BufferUtil.AllocateDirectAligned(512, BitUtil.CACHE_LINE_LENGTH))
using (var buffer = new UnsafeBuffer(byteBuffer))
{
// The channel (an endpoint identifier) to send the message to
const string channel = "aeron:udp?endpoint=localhost:40123";

// A unique identifier for a stream within a channel. Stream ID 0 is reserved
// for internal use and should not be used by applications.
const int streamId = 10;
// A unique identifier for a stream within a channel. Stream ID 0 is reserved
// for internal use and should not be used by applications.
const int streamId = 10;

Console.WriteLine("Publishing to " + channel + " on stream Id " + streamId);
Console.WriteLine("Publishing to " + channel + " on stream Id " + streamId);

// Create a context, needed for client connection to media driver
// A separate media driver process needs to be running prior to starting this application
var ctx = new Aeron.Context();
// Create a context, needed for client connection to media driver
// A separate media driver process needs to be running prior to starting this application
var ctx = new Aeron.Context();

// Create an Aeron instance with client-provided context configuration and connect to the
// media driver, and create a Publication. The Aeron and Publication classes implement
// AutoCloseable, and will automatically clean up resources when this try block is finished.
using (var aeron = Aeron.Connect(ctx))
using (var publication = aeron.AddPublication(channel, streamId))
{
Thread.Sleep(100);
// Create an Aeron instance with client-provided context configuration and connect to the
// media driver, and create a Publication. The Aeron and Publication classes implement
// AutoCloseable, and will automatically clean up resources when this try block is finished.
using (var aeron = Aeron.Connect(ctx))
using (var publication = aeron.AddPublication(channel, streamId))
{
Thread.Sleep(100);

const string message = "Hello World! ";
var messageBytes = Encoding.UTF8.GetBytes(message);
buffer.PutBytes(0, messageBytes);
const string message = "Hello World! ";
var messageBytes = Encoding.UTF8.GetBytes(message);
buffer.PutBytes(0, messageBytes);

// Try to publish the buffer. 'offer' is a non-blocking call.
// If it returns less than 0, the message was not sent, and the offer should be retried.
var result = publication.Offer(buffer, 0, messageBytes.Length);
// Try to publish the buffer. 'offer' is a non-blocking call.
// If it returns less than 0, the message was not sent, and the offer should be retried.
var result = publication.Offer(buffer, 0, messageBytes.Length);

if (result < 0L)
{
switch (result)
if (result < 0L)
{
case Publication.BACK_PRESSURED:
Console.WriteLine(" Offer failed due to back pressure");
break;
case Publication.NOT_CONNECTED:
Console.WriteLine(" Offer failed because publisher is not connected to subscriber");
break;
case Publication.ADMIN_ACTION:
Console.WriteLine("Offer failed because of an administration action in the system");
break;
case Publication.CLOSED:
Console.WriteLine("Offer failed publication is closed");
break;
default:
Console.WriteLine(" Offer failed due to unknown reason");
break;
switch (result)
{
case Publication.BACK_PRESSURED:
Console.WriteLine(" Offer failed due to back pressure");
break;
case Publication.NOT_CONNECTED:
Console.WriteLine(" Offer failed because publisher is not connected to subscriber");
break;
case Publication.ADMIN_ACTION:
Console.WriteLine("Offer failed because of an administration action in the system");
break;
case Publication.CLOSED:
Console.WriteLine("Offer failed publication is closed");
break;
default:
Console.WriteLine(" Offer failed due to unknown reason");
break;
}
}
else
{
Console.WriteLine(" yay !!");
}
}
else
{
Console.WriteLine(" yay !!");
}

Console.WriteLine("Done sending.");
Console.WriteLine("Press any key...");
Console.ReadLine();
Console.WriteLine("Done sending.");
Console.WriteLine("Press any key...");
Console.ReadLine();
}
}
}
}
Expand Down