Skip to content
Permalink
Browse files
Update to zeroMQ 4.0.3 with CLRZMQ 3.0 wrapper library. It now intern…
…ally supports dynamic loading of 32-bit/64-bit native DLL at runtime.
  • Loading branch information
Jim Gomes committed Feb 22, 2014
1 parent 5bcebeb commit 727eeaf8abcaf7ac25fc61ebdc089aad7b752e7a
Showing 12 changed files with 126 additions and 231 deletions.
@@ -20,7 +20,7 @@
[assembly: AssemblyConfigurationAttribute("SNAPSHOT")]
[assembly: AssemblyCompanyAttribute("http://activemq.apache.org/nms")]
[assembly: AssemblyProductAttribute("Apache NMS for ZMQ Class Library")]
[assembly: AssemblyCopyrightAttribute("Copyright (C) 2011 Apache Software Foundation")]
[assembly: AssemblyCopyrightAttribute("Copyright (C) 2011-2014 Apache Software Foundation")]
[assembly: AssemblyTrademarkAttribute("")]
[assembly: AssemblyCultureAttribute("")]
[assembly: AssemblyVersionAttribute("1.0.0.1")]
@@ -16,7 +16,7 @@
*/

using System;
using ZContext = ZMQ.Context;
using ZeroMQ;

namespace Apache.NMS.ZMQ
{
@@ -36,7 +36,7 @@ public class Connection : IConnection
/// <summary>
/// ZMQ context
/// </summary>
static private ZContext _context = new ZContext(1);
private ZmqContext _context = ZmqContext.Create();

/// <summary>
/// Starts message delivery for this connection.
@@ -152,7 +152,7 @@ public ProducerTransformerDelegate ProducerTransformer
/// <summary>
/// Gets ZMQ connection context
/// </summary>
static internal ZContext Context
internal ZmqContext Context
{
get { return _context; }
}
@@ -28,12 +28,50 @@ public class ConnectionFactory : IConnectionFactory
private string clientID;
private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();

private const string DEFAULT_BROKER_URL = "tcp://localhost:5556";
private const string ENV_BROKER_URL = "ZMQ_BROKER_URL";

public ConnectionFactory()
: this(GetDefaultBrokerUrl())
{
}

public ConnectionFactory(string brokerUri)
: this(brokerUri, null)
{
}

public ConnectionFactory(string brokerUri, string clientID)
: this(new Uri(brokerUri), clientID)
{
}

public ConnectionFactory(Uri brokerUri, string clientID)
{
this.brokerUri = brokerUri;
this.clientID = clientID;
}

/// <summary>
/// Get the default connection Uri if none is specified.
/// The environment variable is checked first.
/// </summary>
/// <returns></returns>
private static string GetDefaultBrokerUrl()
{
string brokerUrl = Environment.GetEnvironmentVariable(ENV_BROKER_URL);

if(string.IsNullOrEmpty(brokerUrl))
{
brokerUrl = DEFAULT_BROKER_URL;
}

return brokerUrl;
}

#region IConnectionFactory Members


/// <summary>
/// Creates a new connection to ZMQ.
/// </summary>
@@ -106,5 +144,7 @@ public ProducerTransformerDelegate ProducerTransformer
get { return this.producerTransformer; }
set { this.producerTransformer = value; }
}

#endregion
}
}
@@ -19,9 +19,9 @@
using System.Text;
using System.Threading;
using Apache.NMS.Util;
using ZSendRecvOpt = ZMQ.SendRecvOpt;
using ZSocket = ZMQ.Socket;
using ZSocketType = ZMQ.SocketType;
using ZeroMQ;
//using ZSendRecvOpt = ZMQ.SendRecvOpt;
//using ZSocketType = ZeroMQ.SocketType;

namespace Apache.NMS.ZMQ
{
@@ -37,7 +37,7 @@ public class MessageConsumer : IMessageConsumer
/// <summary>
/// Socket object
/// </summary>
private ZSocket messageSubscriber = null;
private ZmqSocket messageSubscriber = null;
/// <summary>
/// Context binding string
/// </summary>
@@ -58,14 +58,14 @@ public ConsumerTransformerDelegate ConsumerTransformer

public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, IDestination destination, string selector)
{
if(null == Connection.Context)
if(null == session.Connection.Context)
{
throw new NMSConnectionException();
}

this.session = session;
this.acknowledgementMode = acknowledgementMode;
this.messageSubscriber = Connection.Context.Socket(ZSocketType.SUB);
this.messageSubscriber = session.Connection.Context.CreateSocket(SocketType.SUB);
if(null == this.messageSubscriber)
{
throw new ResourceAllocationException();
@@ -77,11 +77,18 @@ public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode,
this.destination = new Queue(this.contextBinding);
if(!string.IsNullOrEmpty(clientId))
{
this.messageSubscriber.StringToIdentity(clientId, Encoding.Unicode);
this.messageSubscriber.Identity = Encoding.Unicode.GetBytes(clientId);
}

this.messageSubscriber.Connect(contextBinding);
this.messageSubscriber.Subscribe(selector ?? string.Empty, Encoding.ASCII);
byte[] prefix = null;

if(!string.IsNullOrWhiteSpace(selector))
{
prefix = Encoding.ASCII.GetBytes(selector);
}

this.messageSubscriber.Subscribe(prefix);
}

public event MessageListener Listener
@@ -117,7 +124,7 @@ public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode,
public IMessage Receive()
{
// TODO: Support decoding of all message types + all meta data (e.g., headers and properties)
return ToNmsMessage(messageSubscriber.Recv(Encoding.ASCII, ZSendRecvOpt.NONE));
return ToNmsMessage(messageSubscriber.Receive(Encoding.ASCII));
}

/// <summary>
@@ -129,7 +136,7 @@ public IMessage Receive()
public IMessage Receive(TimeSpan timeout)
{
// TODO: Support decoding of all message types + all meta data (e.g., headers and properties)
return ToNmsMessage(messageSubscriber.Recv(Encoding.ASCII, timeout.Milliseconds));
return ToNmsMessage(messageSubscriber.Receive(Encoding.ASCII, timeout));
}

/// <summary>
@@ -16,9 +16,8 @@
*/

using System;
using ZSocket = ZMQ.Socket;
using ZSocketType = ZMQ.SocketType;
using System.Text;
using ZeroMQ;

namespace Apache.NMS.ZMQ
{
@@ -33,7 +32,7 @@ public class MessageProducer : IMessageProducer
/// <summary>
/// Socket object
/// </summary>
private ZSocket messageProducer = null;
private ZmqSocket messageProducer = null;
private MsgDeliveryMode deliveryMode;
private TimeSpan timeToLive;
private MsgPriority priority;
@@ -49,19 +48,19 @@ public ProducerTransformerDelegate ProducerTransformer

public MessageProducer(Connection connection, Session session, IDestination destination)
{
if(null == Connection.Context)
if(null == connection.Context)
{
throw new NMSConnectionException();
}

this.session = session;
this.destination = destination;
this.messageProducer = Connection.Context.Socket(ZSocketType.SUB);
this.messageProducer = connection.Context.CreateSocket(SocketType.SUB);

string clientId = connection.ClientId;
if(!string.IsNullOrEmpty(clientId))
{
this.messageProducer.StringToIdentity(clientId, Encoding.Unicode);
this.messageProducer.Identity = Encoding.Unicode.GetBytes(clientId);
}

this.messageProducer.Connect(connection.BrokerUri.LocalPath);
@@ -195,9 +195,12 @@ public void Rollback()

#region Transaction State Events

// The following delegates are not used, but are required to exist.
#pragma warning disable 0067
public event SessionTxEventDelegate TransactionStartedListener;
public event SessionTxEventDelegate TransactionCommittedListener;
public event SessionTxEventDelegate TransactionRolledBackListener;
#pragma warning restore 0067

#endregion

@@ -16,10 +16,9 @@
*/

using System;
using System.Messaging;
using NUnit.Framework;
using System.Threading;
using System.IO;
using System.Threading;
using NUnit.Framework;

namespace Apache.NMS.ZMQ
{
@@ -52,10 +51,12 @@ public void TestReceive()
string libFolder = System.Environment.CurrentDirectory;
string libFileName;

libFileName = Path.Combine(libFolder, "libzmq.dll");
Assert.IsTrue(File.Exists(libFileName), "Missing zmq library file: {0}", libFileName);
libFileName = Path.Combine(libFolder, "clrzmq.dll");
Assert.IsTrue(File.Exists(libFileName), "Missing zmq wrapper file: {0}", libFileName);
libFileName = Path.Combine(libFolder, "libzmq.dll");
Assert.IsTrue(File.Exists(libFileName), "Missing zmq library file: {0}", libFileName);
libFileName = Path.Combine(libFolder, "libzmq64.dll");
Assert.IsTrue(File.Exists(libFileName), "Missing 64-bit zmq library file: {0}", libFileName);
libFileName = Path.Combine(libFolder, "Apache.NMS.dll");
Assert.IsTrue(File.Exists(libFileName), "Missing Apache.NMS library file: {0}", libFileName);
libFileName = Path.Combine(libFolder, "Apache.NMS.ZMQ.dll");
@@ -57,7 +57,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="vs2010-zmq-net-4.0.csproj">
<Project>{5AA5A595-FF56-444D-A7BD-988001619FDC}</Project>
<Project>{624AA430-2EEF-4251-8700-B71A6D770A3B}</Project>
<Name>vs2010-zmq-net-4.0</Name>
</ProjectReference>
</ItemGroup>
@@ -1,13 +1,12 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003" ToolsVersion="4.0">
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProductVersion>8.0.30703</ProductVersion>
<ProductVersion>9.0.30729</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{5AA5A595-FF56-444D-A7BD-988001619FDC}</ProjectGuid>
<ProjectGuid>{624AA430-2EEF-4251-8700-B71A6D770A3B}</ProjectGuid>
<OutputType>Library</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>Apache.NMS.ZMQ</RootNamespace>
<AssemblyName>Apache.NMS.ZMQ</AssemblyName>
<TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
@@ -19,32 +18,67 @@
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\net-4.0\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<DefineConstants>TRACE;DEBUG;NET</DefineConstants>
<AllowUnsafeBlocks>false</AllowUnsafeBlocks>
<RegisterForComInterop>false</RegisterForComInterop>
<PlatformTarget>AnyCPU</PlatformTarget>
<TreatWarningsAsErrors>false</TreatWarningsAsErrors>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugType>pdbonly</DebugType>
<DebugSymbols>true</DebugSymbols>
<Optimize>true</Optimize>
<OutputPath>bin\net-4.0\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<DefineConstants>TRACE;NET</DefineConstants>
<AllowUnsafeBlocks>false</AllowUnsafeBlocks>
<DebugType>full</DebugType>
</PropertyGroup>
<ItemGroup>
<Reference Include="Apache.NMS, Version=1.5.1.2378, Culture=neutral, PublicKeyToken=82756feee3957618, processorArchitecture=MSIL">
<Reference Include="Apache.NMS, Version=1.5.0.2363, Culture=neutral, PublicKeyToken=82756feee3957618, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>lib\Apache.NMS\net-4.0\Apache.NMS.dll</HintPath>
</Reference>
<Reference Include="clrzmq, Version=0.0.0.0, Culture=neutral, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>lib\clrzmq\net-4.0\clrzmq.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.Data" />
<Reference Include="System.Messaging" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="src\main\csharp\BaseMessage.cs" />
<Compile Include="src\main\csharp\BytesMessage.cs">
<SubType>Code</SubType>
</Compile>
<Compile Include="src\main\csharp\CommonAssemblyInfo.cs" />
<Compile Include="src\main\csharp\CommonConnectionFactory.cs" />
<Compile Include="src\main\csharp\Connection.cs" />
<Compile Include="src\main\csharp\ConnectionFactory.cs" />
<Compile Include="src\main\csharp\ConnectionMetaData.cs" />
<Compile Include="src\main\csharp\TemporaryTopic.cs" />
<Compile Include="src\main\csharp\Topic.cs" />
<Compile Include="src\main\csharp\TemporaryQueue.cs" />
<Compile Include="src\main\csharp\Destination.cs" />
<Compile Include="src\main\csharp\MapMessage.cs" />
<Compile Include="src\main\csharp\MessageConsumer.cs" />
<Compile Include="src\main\csharp\MessageProducer.cs" />
<Compile Include="src\main\csharp\ObjectMessage.cs" />
<Compile Include="src\main\csharp\Queue.cs" />
<Compile Include="src\main\csharp\Session.cs" />
<Compile Include="src\main\csharp\StreamMessage.cs">
<SubType>Code</SubType>
</Compile>
<Compile Include="src\main\csharp\TextMessage.cs" />
</ItemGroup>
<ItemGroup>
<Content Include="lib\clrzmq\net-4.0\libzmq.dll">
<Link>libzmq.dll</Link>
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
<Content Include="lib\clrzmq\net-4.0\libzmq64.dll">
<Link>libzmq64.dll</Link>
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.

0 comments on commit 727eeaf

Please sign in to comment.